W jaki sposób efektywnie wykorzystywać API DataFrame w Sparku podczas wczytywania danych?

W jaki sposób efektywnie wykorzystywać API DataFrame w Sparku podczas wczytywania danych?

W tym krótkim samouczku pokażę, jak korzystać z API DataFrame w celu zwiększenia wydajności aplikacji Spark’owej, podczas wczytywania dużych,  pół-strukturalnych zbiorów danych, takich jak CSV, XML i JSON.

[Założenie ogólne] definiuj statyczny schemat danych

W Spark DataFrame API można zdefiniować statyczny schemat danych. Aby to osiągnąć, musisz podać obiekt klasy StructType, który zawiera listę StructField. Poniżej przedstawiłem dwa sposoby w jaki można zdefiniować schemat danych.

// -- Pierwszy sposób
val schema = new StructType()
.add("id", IntegerType, false)
.add("name", StringType, true)
.add("surname", StringType, true)
.add("age", IntegerType, true)
// -- lub drugi sposób
val schema = StructType.fromDDL("id integer, name string, surname string, age integer")

Wskazówka #1: wyłącz opcję inferSchema i użyj zdefiniowanego schematu

W poniższym przykładzie możesz znaleźć fragment kodu, gdzie wykorzystano wcześniej utworzony schemat danych oraz pokazano jak wyłączyć opcję inferSchema. Dzięki wyłączeniu opcji inferSchema, Spark nie będzie analizował zbioru danych, aby wywnioskować jaki jest schemat danych tylko wykorzysta ten, który mu podaliśmy.

// Plik CSV
val data = spark.sqlContext.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.schema(schema)
.load("/path/to/csv_data")
// Plik JSON
val data = spark.read.schema(schema).json("/path/to/json_data")
// Plik XML
val data = spark.read.format("com.databricks.spark.xml")
.options(rowTag="book").load("/path/to/xml_data")

Wskazówka #2: przechowuj schematy poza kodem aplikacji

Dobrym podejściem jest nie przechowywać schematów bezpośrednio w kodzie. W przypadku zmiany schematu danych musisz zmodyfikować swój kod, zbudować nowy pakiet i wdrożyć go. W tym samouczku pokażę, jak możesz przechowywać schematy w plikach w HDFS i tabeli Hive.

Wczytywanie schematu z pliku na HDFS

// -- Wczytywanie schematu z pliku na HDFS
val schemaAsTextFromFile = spark.sparkContext.textFile("/data/schemas/schema_from_hdfs.txt").toLocalIterator.toList(0)
val schema = StructType.fromDDL(schemaAsTextFromFile)

Wczytywanie schematu z tabeli w Hive

Najpierw utworzymy nowa bazę danych oraz tabele, w której będziemy przechowywać swoje schematy.

CREATE DATABASE bigdata_etl; ​
CREATE TABLE `bigdata_etl.schemas`(
  `id` int,
  `dataset_name` string,
  `schema` string
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde';

Wstaw wiersz do tabeli Hive z określonym schematem danych.

-- Insert do tabeli
INSERT INTO bigdata_etl.schemas (id, dataset_name, schema) VALUES (1,'Test_dataset','id integer, name string, surname string, age integer');

Teraz w kodzie aplikacji Spark’owej możemy wczytać dane z tabeli z Hive, następnie wyfiltrować interesujący nas wiersz zawierajacy odpowiedni schemat danych i na jego podstawie utworzyć obiekt klasy StrucType, który następnie możemy użyć podczas wczytywania danych.

val schemasFromHive = sqlContext.table("bigdata_etl.schemas")
val dataSetSchema = schemasFromHive.filter(schemasFromHive("dataset_name") === "Test_dataset").select("schema").head.mkString
val schema = StructType.fromDDL(dataSetSchema)

Hint #3: wykorzystaj zewnętrzne tabele w Hive (Hive External tables)

Możesz także użyć tabeli zewnętrzych w Hive, aby poprawić czas wykonania aplikacji Spark’oewj podczas odczytu danych z plików. W poniższym przykładzie została utworzona zewnętrzna tabela w Hive. Parametr lokalizacji (location) jest kluczowy i określa, gdzie na HDFS przechowujesz dane w formacie CSV (w tym przykładzie). Ponieważ schemat jest zdefiniowany w tabeli w Hive, Spark nie będzie starał się wnioskować schematu  (infer schema)z plików przechowywanych w tej lokalizacji.

CREATE EXTERNAL TABLE `bigdata_etl.test_dataset`(
  `id` int,
  `name` string,
  `surname` string,
  `age` int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/path/to/csv_data';

Następnie w kodzie w Spark’u można po prostu odczytać dane z tej tabeli.

val testDataSetDF = spark.sqlContext.table("bigdata_etl.test_dataset")

Podsumowanie

W tym samouczku przedstawiłem Ci, jak możesz ulepszyć swój kod, gdy korzystasz z API DataFrame. W tym samouczku wziąłem kilka wzorów z bloga należącego do mojego znajomego, gdzie podobna koncepcja została przedstawiona dla osób, które wykorzystują API Python’a. Tutaj znajdziesz odniesienie do tego wpisu (https://szczeles.github.io/Reading-JSON-CSV-and-XML-files-efficiently-in-Apache-Spark/).

Jeśli spodobał Ci się ten post to napisz proszę komentarz poniżej oraz udostępnij ten post na swoim Facebook’u, Twitter’ze, LinkedIn lub innej stronie z mediami społecznościowymi.
Z góry dzięki!

Please follow and like us:
error

Dodaj komentarz

Close Menu
Social media & sharing icons powered by UltimatelySocial
error

Enjoy this blog? Please spread the word :)