Apache Spark Efektywne wykorzystanie DataFrame wczytywanie danych? – 3 proste wskazówki

You are currently viewing Apache Spark Efektywne wykorzystanie DataFrame  wczytywanie danych? – 3 proste wskazówki
Share This Post, Help Others, And Earn My Heartfelt Appreciation! :)
4.9
(161)

W tym krótkim samouczku pokażę, jak korzystać z API DataFrame [ Apache Spark Efektywne wykorzystanie DataFrame ] w celu zwiększenia wydajności aplikacji Spark’owej, podczas wczytywania dużych,  pół-strukturalnych zbiorów danych, takich jak CSV, XML i JSON.

Apache Spark Efektywne wykorzystanie DataFrame wczytywanie danych

Definiuj statyczny schemat danych

W Spark DataFrame API można zdefiniować statyczny schemat danych. Aby to osiągnąć, musisz podać obiekt klasy StructType, który zawiera listę StructField. Poniżej przedstawiłem dwa sposoby w jaki można zdefiniować schemat danych.

// -- Pierwszy sposób
val schema = new StructType()
.add("id", IntegerType, false)
.add("name", StringType, true)
.add("surname", StringType, true)
.add("age", IntegerType, true)
// -- lub drugi sposób
val schema = StructType.fromDDL("id integer, name string, surname string, age integer")

Wskazówka #1: wyłącz opcję inferSchema i użyj zdefiniowanego schematu

W poniższym przykładzie możesz znaleźć fragment kodu, gdzie wykorzystano wcześniej utworzony schemat danych oraz pokazano jak wyłączyć opcję inferSchema. Dzięki wyłączeniu opcji inferSchema, Spark nie będzie analizował zbioru danych, aby wywnioskować jaki jest schemat danych tylko wykorzysta ten, który mu podaliśmy.

// Plik CSV
val data = spark.sqlContext.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.schema(schema)
.load("/path/to/csv_data")
// Plik JSON
val data = spark.read.schema(schema).json("/path/to/json_data")
// Plik XML
val data = spark.read.format("com.databricks.spark.xml")
.options(rowTag="book").load("/path/to/xml_data")

Wskazówka #2: przechowuj schematy poza kodem aplikacji

Dobrym podejściem jest nie przechowywać schematów bezpośrednio w kodzie. W przypadku zmiany schematu danych musisz zmodyfikować swój kod, zbudować nowy pakiet i wdrożyć go. W tym samouczku pokażę, jak możesz przechowywać schematy w plikach w HDFS i tabeli Hive.

Wczytywanie schematu z pliku na HDFS

// -- Wczytywanie schematu z pliku na HDFS
val schemaAsTextFromFile = spark.sparkContext.textFile("/data/schemas/schema_from_hdfs.txt").toLocalIterator.toList(0)
val schema = StructType.fromDDL(schemaAsTextFromFile)

Wczytywanie schematu z tabeli w Hive -> Apache Spark

Najpierw utworzymy nowa bazę danych oraz tabele, w której będziemy przechowywać swoje schematy.

CREATE DATABASE bigdata_etl; ​
CREATE TABLE `bigdata_etl.schemas`(
  `id` int,
  `dataset_name` string,
  `schema` string
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde';

Wstaw wiersz do tabeli Hive z określonym schematem danych.

-- Insert do tabeli
INSERT INTO bigdata_etl.schemas (id, dataset_name, schema) VALUES (1,'Test_dataset','id integer, name string, surname string, age integer');

Teraz w kodzie aplikacji Spark’owej możemy wczytać dane z tabeli z Hive, następnie wyfiltrować interesujący nas wiersz zawierajacy odpowiedni schemat danych i na jego podstawie utworzyć obiekt klasy StrucType, który następnie możemy użyć podczas wczytywania danych. (Apache Spark Efektywne wykorzystanie DataFrame wczytywanie danych)

val schemasFromHive = sqlContext.table("bigdata_etl.schemas")
val dataSetSchema = schemasFromHive.filter(schemasFromHive("dataset_name") === "Test_dataset").select("schema").head.mkString
val schema = StructType.fromDDL(dataSetSchema)

Hint #3: wykorzystaj zewnętrzne tabele w Hive (Hive External tables)

Możesz także użyć tabeli zewnętrzych w Hive, aby poprawić czas wykonania aplikacji Spark’oewj podczas odczytu danych z plików. W poniższym przykładzie została utworzona zewnętrzna tabela w Hive. Parametr lokalizacji (location) jest kluczowy i określa, gdzie na HDFS przechowujesz dane w formacie CSV (w tym przykładzie). Ponieważ schemat jest zdefiniowany w tabeli w Hive, Spark nie będzie starał się wnioskować schematu  (infer schema)z plików przechowywanych w tej lokalizacji.

CREATE EXTERNAL TABLE `bigdata_etl.test_dataset`(
  `id` int,
  `name` string,
  `surname` string,
  `age` int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/path/to/csv_data';

Następnie w kodzie w Spark’u można po prostu odczytać dane z tej tabeli.

val testDataSetDF = spark.sqlContext.table("bigdata_etl.test_dataset")

Podsumowanie

W tym samouczku przedstawiłem Ci, jak możesz ulepszyć swój kod, gdy korzystasz z API DataFrame. W tym samouczku wziąłem kilka wzorów z bloga należącego do mojego znajomego, gdzie podobna koncepcja została przedstawiona dla osób, które wykorzystują API Python’a. Tutaj znajdziesz odniesienie do tego wpisu (https://szczeles.github.io/Reading-JSON-CSV-and-XML-files-efficiently-in-Apache-Spark/). (Apache Spark Efektywne wykorzystanie DataFrame wczytywanie danych)

Jeśli spodobał Ci się ten post to zostaw proszę komentarz poniżej lub udostępnij ten post na swoim Facebook’u, Twitter’ze, LinkedIn lub innej stronie z mediami społecznościowymi.
Dzięki!

How useful was this post?

Click on a star to rate it!

Average rating 4.9 / 5. Vote count: 161

No votes so far! Be the first to rate this post.

Subscribe
Powiadom o
guest
0 Comments
Inline Feedbacks
View all comments