Functional Data Science for Titanic Dataset #2: using Spark with Scala for ETL, data imputation, and feature engineering

Angelica Tiara
12 min readJul 26, 2023

--

This piece is part 2 of a 2-part series of articles.

In part #1 of this series, I wrote about the basic understanding needed for functional programming and functional data science using languages such as Scala — a different way to look at logic by combining functions instead of the usual OOP methodology. We have monads to preserve the structure of the mathematical functions, and functors as the mapping orders of the objects as well as categories.

Now we’re delving into the application of these understanding by processing the Titanic datasets using FDS methodology. There’s no better way of learning than directly getting our hands dirty, right?

Downloading the Datasets

If you’re unfamiliar with the Titanic datasets or you haven’t had the chance to play with them before, please feel free to download and examine the datasets first here on Kaggle. The download button should be on the bottom right side of the page, and it will give you three files of .csv dataset:

  1. train.csv → This will be the main dataset we will work on. The training set is used to build our model and for each row containing the variables/features, there’s an assigned ground truth for each passenger: survival 0 (no) or 1 (yes).
  2. test.csv → This will be used later after we have our model, as a tester on how it performs. This set doesn’t have the ground truth or the answer for each passenger’s survival, so it’s the model’s job to predict the answer.
  3. gender_submission.csv → This is an example prediction that only uses one feature: gender. It assumes that only and all female passengers survived, which is a very rudimentary form of model prediction. We can ignore this file for this task.

Extract these files and store them in a folder that is easily accessed. If necessary, note down the address of this folder so we can just run the command to access it easily.

A short guide on the Titanic datasets on Youtube. Feel free to use this to study the data landscape.

Installing Apache Toree and Jupyter Notebook

One of the tools that we’re going to use next is Apache Spark — and by extension, Apache Toree.

Imagine you have a lot of data, like mountains of information that you need to analyze and make sense of. Apache Spark is like a super-smart data processing engine that helps you tackle this huge task. It’s like having a team of many computers working together, each handling a part of the data, and they all talk to each other to get the job done faster.

You can tell Spark what you want to do with the data, like counting things, finding patterns, or making predictions. Spark figures out the most efficient way to divide the work among the computers so that everything gets done quickly. It’s like having a super-efficient manager that assigns tasks to the team members based on their strengths.

Spark can do this so well because it keeps important information in its memory (RAM) instead of reading and writing from the hard disk all the time. This way, it can access the data much faster, like looking at a note on your desk instead of searching for it in a file cabinet. This makes Spark lightning-fast, especially when compared to traditional methods like Excel or regular databases.

It’s also very flexible, which means you can use it for different kinds of jobs. Whether you want to analyze data that you already have or process data as it’s coming in (like from sensors or social media), Spark can handle it all. It’s like having a tool that adapts to any kind of data analysis task you throw at it.

Now, let’s talk about Apache Toree, which is related to Apache Spark. Toree is like a language interpreter or translator that helps you communicate with Spark using different programming languages.

You see, Spark has its own native language (like a secret code), which makes it very efficient for certain tasks. This native language is called Scala, and Spark understands it best. However, not everyone knows Scala, and that’s where Toree comes in.

Toree acts like a bridge between Spark and other popular programming languages, such as Python and R. It translates the code you write in these languages into Spark’s native language (Scala) so that Spark can understand and execute it. It’s like having a multilingual assistant who can translate your messages to different team members so that everyone understands what needs to be done.

So, if you’re comfortable using Python or R in the Jupyter Notebook or Google Collab environements, you can still work with Spark thanks to Toree. It makes Spark more accessible and user-friendly, as you can use the language you’re most familiar with and still take advantage of Spark’s powerful data processing capabilities.

For this project, we will construct our environment by installing Apache Spark, Jupyter Notebook, and Apache Toree as the connector.

Step 1: Install Java

Apache Spark runs on the Java Virtual Machine (JVM), so you need to have Java installed on your system. You can download and install the latest version of Java from the official Oracle website. Follow the installation wizard and make sure Java is set up correctly.

Step 2: Download Apache Spark

  1. Go to the Apache Spark website and download the latest version of Spark (preferably the pre-built for Apache Hadoop 2.7 package).
  2. Extract the downloaded archive to a directory of your choice. This will be your Spark installation directory.

Step 3: Set up Environment Variables

To use Spark conveniently, you need to set up some environment variables:

  1. Open a terminal or command prompt.
  2. Add the following lines to your shell configuration file (e.g., .bashrc for Linux/Mac or Environment Variables for Windows):

For Linux/Mac:

export SPARK_HOME=/path/to/your/spark/directory
export PATH=$SPARK_HOME/bin:$PATH

For Windows:

setx SPARK_HOME "C:\path\to\your\spark\directory"
setx PATH "%SPARK_HOME%\bin;%PATH%"

Replace /path/to/your/spark/directory with the actual path where you extracted Spark in Step 2.

3. Close and reopen your terminal or command prompt to apply the changes.

Step 4: Install Jupyter Notebook

  1. Install Jupyter Notebook using pip (Python’s package manager). Open your terminal or command prompt and run:
pip install jupyter

If you don’t have pip installed, you can install it by following the instructions on the official Python website (https://www.python.org/downloads/). Trust me, pip makes every installation — and your life! — so much easier~

Step 5: Install Apache Toree

  1. Install Apache Toree using pip:
pip install toree

Step 6: Configure Apache Toree

  1. After installing Toree, you need to configure it to work with Jupyter Notebook. In the terminal or command prompt, run the following command:
jupyter toree install --user --spark_home=$SPARK_HOME

This command configures Toree to use the Spark installation you set up in Step 3.

Step 7: Start Jupyter Notebook

  1. In the terminal or command prompt, navigate to the directory where you want to create and store your Jupyter Notebook files.
  2. Start Jupyter Notebook by running the following command:
jupyter notebook

This will open Jupyter Notebook in your default web browser. You can create a new notebook by clicking the “New” button and selecting “Toree — Scala” or “Toree — PySpark” (depending on your preferred language).

You now have a Jupyter Notebook running with Apache Spark and Apache Toree, ready to perform big data processing and analytics!

Remember to keep your Spark and Jupyter Notebook instances running while working on your notebooks. When you’re done, you can stop the Jupyter Notebook server by pressing Ctrl+C in the terminal or command prompt where it's running.

That’s it! You’ve successfully installed Apache Spark, Apache Toree, and Jupyter Notebook, and you’re all set to work with big data and perform data analysis using the power of Spark in the familiar Jupyter environment.

Apache Toree. Really loving the torii gate as a huge appreciator of Japanese culture ^.^

Data ETL, Imputation, and Feature Engineering

Let’s take a look at the data landscape and structure before continuing. We need to examine carefully all the features and characteristics of the dataset we have whenever we do the process of data ETL (Extract, Transform, and Load).

For this section, we will mainly use the train.csv data.

Some notes:

pclass: A proxy for socio-economic status (SES)
1st = Upper
2nd = Middle
3rd = Lower

age: Age is fractional if less than 1. If the age is estimated, is it in the form of xx.5

sibsp: The dataset defines family relations in this way…
Sibling = brother, sister, stepbrother, stepsister
Spouse = husband, wife (mistresses and fiancés were ignored)

parch: The dataset defines family relations in this way…
Parent = mother, father
Child = daughter, son, stepdaughter, stepson
Some children travelled only with a nanny, therefore parch=0 for them.

Let’s import some of the files and libraries necessary into the notebook.

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType};
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType}
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

Now, we will create the data schema into an object in Scala by defining each feature with their respective data type.

val dataScheme = (new StructType)
.add("PassengerId", IntegerType)
.add("Survived", IntegerType)
.add("Pclass", IntegerType)
.add("Name", StringType)
.add("Sex", StringType)
.add("Age", FloatType)
.add("SibSp", IntegerType)
.add("Parch", IntegerType)
.add("Ticket", StringType)
.add("Fare", FloatType)
.add("Cabin", StringType)
.add("Embarked", StringType)

dataScheme: org.apache.spark.sql.types.StructType = StructType(StructField(PassengerId,IntegerType,true), StructField(Survived,IntegerType,true), StructField(Pclass,IntegerType,true), StructField(Name,StringType,true), StructField(Sex,StringType,true), StructField(Age,FloatType,true), StructField(SibSp,IntegerType,true), StructField(Parch,IntegerType,true), StructField(Ticket,StringType,true), StructField(Fare,FloatType,true), StructField(Cabin,StringType,true), StructField(Embarked,StringType,true))

The code below reads the train.csv file that we have downloaded before. Change the “/FileStore/tables/i31jf15l1496724300776/train.csv” part into the exact address where you have saved your train.csv file — and make sure that Jupyter and Spark have access to the file.

val datasetDF=sqlContext.read.schema(dataScheme).option("header", "true").csv("/FileStore/tables/i31jf15l1496724300776/train.csv")
datasetDF.createOrReplaceTempView("train")
datasetDF: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 10 more fields]

Our data has now been loaded! To check, let’s run this code:

%sql select * from train

It should show a table detailing the data loaded in the notebook, similar to this.

Next, we’re going to do a little and very simple feature engineering using a Scala function — through the manipulation of the ‘Embarked’ column using udf().

udf() is most commonly known as User Defined Function in Scala.

It is a custom function that you create to perform a specific task or computation based on your requirements. It’s like creating your own mini-program that can be reused whenever you need it.

Just like how you might use a calculator to perform specific math operations like addition or multiplication, a User-Defined Function is a way to define your own “operation” or “function” in programming. You give it a name and specify what input it needs and what output it will produce.

For example, imagine you want to write a function that takes two numbers as input and returns their sum. You can create a UDF called “add” that does this. Once defined, you can use the “add” function anytime you want to add two numbers together without having to write the addition code over and over again.

So, a User-Defined Function in Scala is like having your own custom tool that you design to perform a specific task, making your code more organized, efficient, and easier to understand.

val embarked: (String => String) = {
case "" => "S"
case null =>"S"
case a => a
}
val embarkedUDF = udf(embarked)
embarked: String => String = <function1> 
embarkedUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

This should compute all the null values in the ‘Embarked’ columns to be filled with “S”, while the remaining values remain similar. That’s a simple demonstration on how to create and use a function effectively.

Next, let’s perform a data imputation measure here. There are gaps in the dataset for columns/features “Age” and “Fare” that could be filled.

We will code a program that fill in the gaps using the average values. Let’s compute the average values first for every aggregation.

//Calculate average age for filling gaps in dataset
val averageAge = datasetDF.select("Age")
.agg(avg("Age"))
.collect() match {
case Array(Row(avg: Double)) => avg
case _ => 0
}

//Calculate average fare for filling gaps in dataset
val averageFare = datasetDF.select("Fare")
.agg(avg("Fare"))
.collect() match {
case Array(Row(avg: Double)) => avg
case _ => 0
}
averageAge: Double = 29.69911764704046 
averageFare: Double = 32.20420804114722

Then we input those average values into the dataset:

val filledDF = datasetDF.na.fill(Map("Fare" -> averageFare, "Age" -> averageAge))
val filledDF2 = filledDF.withColumn("Embarked", embarkedUDF(filledDF.col("Embarked")))
val Array(trainingData, testData) = filledDF2.randomSplit(Array(0.7, 0.3))
filledDF: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 10 more fields]
filledDF2: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 10 more fields]
trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [PassengerId: int, Survived: int ... 10 more fields]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [PassengerId: int, Survived: int ... 10 more fields]

Voila!

Let’s do another feature engineering trick here. Now we’re going to use a function called StringIndexer().

Imagine you have a dataset that contains categorical values like “red,” “blue,” and “green.” However, many machine learning algorithms work best with numerical data, not text. So, you need to convert these categorical values into numbers.

StringIndexer() is like a translator that helps you achieve this conversion. It takes the categorical values as input and assigns a unique number to each category. For example, it might assign "red" as 0, "blue" as 1, and "green" as 2.

The purpose of this transformation is to make the data suitable for machine learning algorithms, which often require numerical inputs. With the categorical values converted to numbers, you can now use them in mathematical calculations and statistical models.

In essence, StringIndexer() helps you "index" or "encode" categorical values into numerical representations so that you can use them effectively in machine learning tasks, allowing you to make sense of and draw insights from your data.

Let’s code the indexer into a program:

//Indexing categorical fetures
val featuresCatColNames = Seq("Pclass", "Sex", "Embarked")
val stringIndexers = featuresCatColNames.map { colName =>
new StringIndexer()
.setInputCol(colName)
.setOutputCol(colName + "Indexed")
.fit(trainingData)
}

//Indexing label
val labelIndexer = new StringIndexer()
.setInputCol("Survived")
.setOutputCol("SurvivedIndexed")
.fit(trainingData)

val featuresNumColNames = Seq("Age", "SibSp", "Parch", "Fare")
val indexedfeaturesCatColNames = featuresCatColNames.map(_ + "Indexed")
val allIndexedFeaturesColNames = featuresNumColNames ++ indexedfeaturesCatColNames
val assembler = new VectorAssembler()
.setInputCols(Array(allIndexedFeaturesColNames: _*))
.setOutputCol("Features")
featuresCatColNames: Seq[String] = List(Pclass, Sex, Embarked) 
stringIndexers: Seq[org.apache.spark.ml.feature.StringIndexerModel] = List(strIdx_2bed7aaffeeb, strIdx_5129cc0f59d3, strIdx_e8e4e3704a90)
labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_8dea39324b65
featuresNumColNames: Seq[String] = List(Age, SibSp, Parch, Fare)
indexedfeaturesCatColNames: Seq[String] = List(PclassIndexed, SexIndexed, EmbarkedIndexed)
allIndexedFeaturesColNames: Seq[String] = List(Age, SibSp, Parch, Fare, PclassIndexed, SexIndexed, EmbarkedIndexed)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_be3be4fcf211

After we’re done with all these data processing, let’s move on to the last and most juicy part: the model!

Pipelines, Classifier, and Prediction Test

For this part, I will only write briefly about building a classifier around the data we have processed, then inputting them into a pipeline. We will need the next part of the series to truly dive deep into the model and results.

First, let’s create a classifier for the data. I will use the RandomForestClassifier() to simplify the process.

val randomForest = new RandomForestClassifier()
.setLabelCol("SurvivedIndexed")
.setFeaturesCol("Features")

//Retrieve original labels
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
randomForest: org.apache.spark.ml.classification.RandomForestClassifier = rfc_91fdaea4c4c0 
labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_b3fdbcc36a4b

Now put the data into a (virtual) pipeline alongside with the classifier — this is starting to sound like a cooking manual haha!

// define the order of the operations to be performed
val pipeline = new Pipeline().setStages(
(stringIndexers :+ labelIndexer :+ assembler :+ randomForest :+ labelConverter).toArray)
pipeline: org.apache.spark.ml.Pipeline = pipeline_840ad0334e35

From here, it’s very easy to just run the program and let it build a model according to the train dataset that we have processed.

// grid of values to perform cross validation on
val paramGrid = new ParamGridBuilder()
.addGrid(randomForest.maxBins, Array(25, 28, 31))
.addGrid(randomForest.maxDepth, Array(4, 6, 8))
.addGrid(randomForest.impurity, Array("entropy", "gini"))
.build()

val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("SurvivedIndexed")

val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(10)

// train the model
val crossValidatorModel = cv.fit(trainingData)

paramGrid: Array[org.apache.spark.ml.param.ParamMap] = 
Array({
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 25,
rfc_91fdaea4c4c0-maxDepth: 4
}, {
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 25,
rfc_91fdaea4c4c0-maxDepth: 6
}, {
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 25,
rfc_91fdaea4c4c0-maxDepth: 8
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 25,
rfc_91fdaea4c4c0-maxDepth: 4
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 25,
rfc_91fdaea4c4c0-maxDepth: 6
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 25,
rfc_91fdaea4c4c0-maxDepth: 8
}, {
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 28,
rfc_91fdaea4c4c0-maxDepth: 4
}, {
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 28,
rfc_91fdaea4c4c0-maxDepth: 6
}, {
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 28,
rfc_91fdaea4c4c0-maxDepth: 8
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 28,
rfc_91fdaea4c4c0-maxDepth: 4
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 28,
rfc_91fdaea4c4c0-maxDepth: 6
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 28,
rfc_91fdaea4c4c0-maxDepth: 8
}, {
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 31,
rfc_91fdaea4c4c0-maxDepth: 4
}, {
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 31,
rfc_91fdaea4c4c0-maxDepth: 6
}, {
rfc_91fdaea4c4c0-impurity: entropy,
rfc_91fdaea4c4c0-maxBins: 31,
rfc_91fdaea4c4c0-maxDepth: 8
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 31,
rfc_91fdaea4c4c0-maxDepth: 4
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 31,
rfc_91fdaea4c4c0-maxDepth: 6
}, {
rfc_91fdaea4c4c0-impurity: gini,
rfc_91fdaea4c4c0-maxBins: 31,
rfc_91fdaea4c4c0-maxDepth: 8
})
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_9fc7b5a3e8df
cv: org.apache.spark.ml.tuning.CrossValidator = cv_96e448bc4a2f
crossValidatorModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_96e448bc4a2f

Finally, we pull out the test.csv dataset to test the model that we have created here:

// make predictions
val predictions = crossValidatorModel.transform(testData)

//Accuracy
val accuracy = evaluator.evaluate(predictions)
println("Test Error DT= " + (1.0 - accuracy))
Test Error DT= 0.13585164835164742
predictions: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 19 more fields]
accurancy: Double = 0.8641483516483526

The accuracy is 86.4% for the model that we have created.

Let’s talk about the model in depth next.

Next:

Functional Data Science for Titanic Dataset #3: mathematical modeling and data exploration using functions

--

--

Angelica Tiara

I have two personas . The scientist and the ballerina. Coding data algorithms by day, spinning pirouettes by night.