Dziś przedstawię Ci w jaki sposób możesz wykorzystać biblioteki uczenia maszynowego (ML – Machine Learning), które są dostępne w Spark’u jako biblioteka pod nazwą Spark MLib.
Biblioteka MLlib daje nam bardzo szeroki wachlarz dostępnych algorytmów uczenia maszynowego oraz dodatkowe narzędzia do normalizacji, tokenizacji oraz wielu wielu innych (aby uzyskać więcej informacji wejdź na oficjalną stronę Apache Spark MLlib).
1. Przygotowanie danych
Pierwszym krokiem jaki wykonamy będzie przygotowanie danych. Ja skorzystałem z istniejącego zbioru dostępnego tutaj.
Ten zbiór danych składa się z informacji kolejno t.j:
- Liczba przypadków ciąży
- Stężenie glukozy w osoczu 2 godziny w doustnym teście tolerancji glukozy
- Rozkurczowe ciśnienie krwi (mm Hg)
- Grubość fałdu skóry na tricepsie (mm)
- 2-godzinna insulina w surowicy (mu U / ml )
- Wskaźnik masy ciała BMI
- Funkcja rodowodu cukrzycy
- Wiek
- Klasyfikacja (etykieta) z ang: label
Format libsvm
Aby wykorzystać dany zbiór należy przekształcić go do formatu libsvm. Format ten składa się kolejno z klasyfikacji (etykiety) oraz szeregu cech wraz z przypisanym indeksem. Ogólny wzór formatu libvsm może być określony poniższym wzorem:
<klasyfikacja> <indeks1>:<wartość_cechy> <indeks2>:<wartość_cechy> ... <indeksN>:<wartość_cechy>
W wyniku przekształcenia do formatu libsvm otrzymałem nowy zbiór, który został przedstawiony poniżej.
Przygotowując dane w formacie libsvm można pominąć cechy, których wartość jest równa 0. Przy bardzo dużych zbiorach danych pozwoli to na zmniejszenie rozmiaru danych wejściowych, a tym samym nie wpłynie w żaden sposób negatywnie na wynik algorytmu.
Aby wynik był bardziej dla Ciebie oczywisty zostawiłem cechy których wartość wynosiła 0. Pełny zbiór danych w raz z kodem źródłowym dostępny jest tutaj na moim GitHubie.
1 1:6 2:148 3:72 4:35 5:0 6:33.6 7:0.627 8:50 0 1:1 2:85 3:66 4:29 5:0 6:43642 7:0.351 8:31 1 1:8 2:183 3:64 4:0 5:0 6:43547 7:0.672 8:32 0 1:1 2:89 3:66 4:23 5:94 6:43493 7:0.167 8:21 1 1:0 2:137 3:40 4:35 5:168 6:43.1 7:2.288 8:33 0 1:5 2:116 3:74 4:0 5:0 6:43641 7:0.201 8:30 1 1:3 2:78 3:50 4:32 5:88 6:31.0 7:0.248 8:26 0 1:10 2:115 3:0 4:0 5:0 6:35.3 7:0.134 8:29 1 1:2 2:197 3:70 4:45 5:543 6:43615 7:0.158 8:53 1 1:8 2:125 3:96 4:0 5:0 6:0.0 7:0.232 8:54 ...
2. Utworzenie nowego projektu
Ja używam programu IntelliJ IDEA, ale Ty możesz wybrać dowolny, który dla Ciebie jest najbardziej przyjazny i którego to Ty używasz.
Utwórzmy nowy projekt Scala sbt.
Jeśli nie wiesz jak to zrobić to odsyłam Cie do artykułu: Konfiguracja Apache Spark oraz Scala i IntelliJ IDEA
Dodaj bilbiotekę Spark MLlib to pliku build.sbt.
name := "Diabetic Classification" version := "0.1" scalaVersion := "2.11.8" resolvers ++= Seq( "All Spark Repository -> bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/" ) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.3.1", "org.apache.spark" %% "spark-sql" % "2.3.1", "org.apache.spark" %% "spark-mllib" % "2.3.1" )
Następnie utwórz pakiet com.bigdataetl, a w nim Scala Object Train.scala.
3. Kod
Teraz przejdziemy do najważniejszej części. W pliku Train.scala zawrzemy naszą całą logikę wyznaczania klasyfikacji wystąpienia cukrzycy u pacjentów,
Plik Train,scala
package com.bigdataetl import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator} import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.sql.SparkSession object Train extends App { System.setProperty("hadoop.home.dir", "C:\\hadoop") val spark = SparkSession.builder .master("local[*]") .appName("BigDataETL - Diabetic Classification") .getOrCreate() val data = spark.read.format("libsvm").option("numFeatures", "8") .load("file:///C:\\Users\\pciesla\\Dropbox\\Blog\\BlogSparkML\\data.txt") // Index labels, adding metadata to the label column. // Fit on whole dataset to include all labels in index. val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(data) // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .setHandleInvalid("keep") .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("indexedFeatures") .setNumTrees(12) // Convert indexed labels back to original labels. val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(labelIndexer.labels) // Chain indexers and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter)) val paramGrid = new ParamGridBuilder().build() // Select (prediction, true label) and compute test error. val evaluator = new BinaryClassificationEvaluator() .setMetricName("areaUnderROC") .setRawPredictionCol("rawPrediction") .setLabelCol("label") val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(5) val model = cv.fit(trainingData) // trainingData: DataFrame val predictions = model.transform(testData) // Get the best selected pipeline model val bestPipeline = model.bestModel.asInstanceOf[PipelineModel] val bestLRModel = bestPipeline.stages(2) val bestParams = bestLRModel.extractParamMap() val accuracy = evaluator.evaluate(predictions) println(s"bestParams: $bestParams") println(s"accuracy: $accuracy") }
4. Testy
Uruchomiłem kilkukrotnie testy i najlepszy wynik jaki osiągnąłem wyniósł ~ 0.825. Nie jest to porywający wynik, ale daje podstawę sądzić, że jeśli byśmy posiadali większą ilość danych to rezultat byłby lepszy.
bestParams: { rfc_79801fc36c1f-cacheNodeIds: false, rfc_79801fc36c1f-checkpointInterval: 10, rfc_79801fc36c1f-featureSubsetStrategy: auto, rfc_79801fc36c1f-featuresCol: indexedFeatures, rfc_79801fc36c1f-impurity: gini, rfc_79801fc36c1f-labelCol: indexedLabel, rfc_79801fc36c1f-maxBins: 32, rfc_79801fc36c1f-maxDepth: 5, rfc_79801fc36c1f-maxMemoryInMB: 256, rfc_79801fc36c1f-minInfoGain: 0.0, rfc_79801fc36c1f-minInstancesPerNode: 1, rfc_79801fc36c1f-numTrees: 12, rfc_79801fc36c1f-predictionCol: prediction, rfc_79801fc36c1f-probabilityCol: probability, rfc_79801fc36c1f-rawPredictionCol: rawPrediction, rfc_79801fc36c1f-seed: 207336481, rfc_79801fc36c1f-subsamplingRate: 1.0 } accuracy: 0.8246463142218914
Podsumowanie
W powyższym skrypcie wykorzystany został BinaryClassificationEvaluator, ponieważ nasze etykiety przyjmowały tylko wartości 0 lub 1.
W przypadku, gdy np. chciałbyś określić nie tylko czy ktoś zachoruję na cukrzycę, ale również to jaki jest poziom ryzyka, mógłbyś wprowadzić klasyfikację, która mogła by się np. składać:
1 | Bardzo niska |
2 | Niska |
3 | Średnia |
4 | Wysoka |
5 | Bardzo wysoka |
W takiej sytuacji z pomocą przychodzi nam MulticlassClassificationEvaluator. Poniżej przedstawiam przykładową implementację tego algorytmu:
// Select (prediction, true label) and compute test error. val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("indexedLabel") .setPredictionCol("prediction") .setMetricName("accuracy")
Wystarczy, że w kodzie w pliku Train.scala zamienisz implementacje zmiennej evaluator i uruchomisz ponownie program. Idealnie by było jakby podczas tych testów dostępny był nowy zbiór danych zawierający klasyfikację wielopoziomową.