Apache Spark: Machine Learning – predicting diabetes in patients

You are currently viewing Apache Spark: Machine Learning – predicting diabetes in patients

Today I will show you how you can use Machine Learning libraries (ML), which are available in Spark as a library under the name Spark MLib.

The MLlib library gives us a very wide range of available Machine Learning algorithms and additional tools for standardization, tokenization and many others (for more information visit the official website Apache Spark MLlib).

1. Data preparation

The first step we will take is to prepare the data. I used the existing collection available here.

This dataset consists of information in sequence such as:

  • Number of pregnancy cases\
  • Plasma glucose concentration 2 hours in the oral glucose tolerance test
  • Diastolic blood pressure (mm Hg)
  • Thickness of the skin fold on the triceps (mm)
  • 2-hour insulin in serum (mu U / ml)
  • Body mass index BMI
  • Pedigree function of diabetes
  • Age
  • Classification (label) from ang: label

Libsvm format

To use a given file, you must convert it to the libsvm format. This format consists in turn of the classification (label) and a number of features along with the assigned index. The general libvsm pattern design can be specified by the following formula:

<classification> <index1>:<feature_value> <index2>:<feature_value> ... <indexN>:<feature_value> 

As a result of the transformation to the libsvm format, I received a new set, which was presented below.

By preparing data in the libsvm format, you can omit features whose value is equal to 0. With very large data sets, this will allow you to reduce the size of the input data, and thus will not have any adverse effect on the algorithm result.

To make the result more obvious to you, I left the features whose value was 0. The full dataset with the source code is available here on my GitHub.

1 1:6 2:148 3:72 4:35 5:0 6:33.6 7:0.627 8:50
0 1:1 2:85 3:66 4:29 5:0 6:43642 7:0.351 8:31
1 1:8 2:183 3:64 4:0 5:0 6:43547 7:0.672 8:32
0 1:1 2:89 3:66 4:23 5:94 6:43493 7:0.167 8:21
1 1:0 2:137 3:40 4:35 5:168 6:43.1 7:2.288 8:33
0 1:5 2:116 3:74 4:0 5:0 6:43641 7:0.201 8:30
1 1:3 2:78 3:50 4:32 5:88 6:31.0 7:0.248 8:26
0 1:10 2:115 3:0 4:0 5:0 6:35.3 7:0.134 8:29
1 1:2 2:197 3:70 4:45 5:543 6:43615 7:0.158 8:53
1 1:8 2:125 3:96 4:0 5:0 6:0.0 7:0.232 8:54

 2. Creating a new project

I use the IntelliJ IDEA program, but you can choose any one that is the most friendly for you and that you use.

Create a new Scala sbt project.

If you do not know how to do it, I refer you to the article: Configuration of Apache Spark and Scala and IntelliJ IDEA

Add the Spark MLlib library to the build.sbt file.

name := "Diabetic Classification"

version := "0.1"

scalaVersion := "2.11.8"

resolvers ++= Seq(
  "All Spark Repository -> bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.3.1",
  "org.apache.spark" %% "spark-sql" % "2.3.1",
  "org.apache.spark" %% "spark-mllib" % "2.3.1"

Then create the com.bigdataetl package, and in it, Scala Object Train.scala.

3. Code

Now we will go to the most important part. In the Train.scala file, we will include our entire logic of determining the classification of diabetes in patients,

Train,scala file

package com.bigdataetl

import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.SparkSession

object Train extends App {

  System.setProperty("hadoop.home.dir", "C:\\hadoop")

  val spark = SparkSession.builder
    .appName("BigDataETL - Diabetic Classification")

  val data = spark.read.format("libsvm").option("numFeatures", "8")

  // Index labels, adding metadata to the label column.
  // Fit on whole dataset to include all labels in index.
  val labelIndexer = new StringIndexer()
  // Automatically identify categorical features, and index them.
  // Set maxCategories so features with > 4 distinct values are treated as continuous.
  val featureIndexer = new VectorIndexer()

  // Split the data into training and test sets (30% held out for testing).
  val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

  // Train a RandomForest model.
  val rf = new RandomForestClassifier()

  // Convert indexed labels back to original labels.
  val labelConverter = new IndexToString()

  // Chain indexers and forest in a Pipeline.
  val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

  val paramGrid = new ParamGridBuilder().build()

  // Select (prediction, true label) and compute test error.
  val evaluator = new BinaryClassificationEvaluator()

  val cv = new CrossValidator()

  val model = cv.fit(trainingData) // trainingData: DataFrame

  val predictions = model.transform(testData)

  // Get the best selected pipeline model
  val bestPipeline = model.bestModel.asInstanceOf[PipelineModel]
  val bestLRModel = bestPipeline.stages(2)
  val bestParams = bestLRModel.extractParamMap()

  val accuracy = evaluator.evaluate(predictions)
  println(s"bestParams: $bestParams")
  println(s"accuracy: $accuracy")


 4. Tests

I have run several tests and the best result I achieved was ~ 0.825. This is not a thrilling result, but it gives the basis to believe that if we had more data, the result would be better.

bestParams: {
  rfc_79801fc36c1f-cacheNodeIds: false,
  rfc_79801fc36c1f-checkpointInterval: 10,
  rfc_79801fc36c1f-featureSubsetStrategy: auto,
  rfc_79801fc36c1f-featuresCol: indexedFeatures,
  rfc_79801fc36c1f-impurity: gini,
  rfc_79801fc36c1f-labelCol: indexedLabel,
  rfc_79801fc36c1f-maxBins: 32,
  rfc_79801fc36c1f-maxDepth: 5,
  rfc_79801fc36c1f-maxMemoryInMB: 256,
  rfc_79801fc36c1f-minInfoGain: 0.0,
  rfc_79801fc36c1f-minInstancesPerNode: 1,
  rfc_79801fc36c1f-numTrees: 12,
  rfc_79801fc36c1f-predictionCol: prediction,
  rfc_79801fc36c1f-probabilityCol: probability,
  rfc_79801fc36c1f-rawPredictionCol: rawPrediction,
  rfc_79801fc36c1f-seed: 207336481,
  rfc_79801fc36c1f-subsamplingRate: 1.0
accuracy: 0.8246463142218914


In the above script, the BinaryClassificationEvaluator was used because our labels were only 0 or 1.

If, for example, you would like to specify not only if someone is diabetic, but also what the level of risk is, you could enter a classification that could include, for example:

1Very low
5Very high

In this situation, MulticlassClassificationEvaluator comes to our aid. Below is an example of the implementation of this algorithm:

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()

All you have to do is replace the implementations of evaluator with the code in the Train.scala file and restart the program. Ideally, a new dataset containing a multi-level classification would be available during these tests.

If you enjoyed this post please add the comment below or share this post on your Facebook, Twitter, LinkedIn or another social media webpage.
Thanks in advanced!

0 0 votes
Article Rating
Notify of
Inline Feedbacks
View all comments