Skip to content

Commit 1670b45

Browse files
committed
upload lab 10
1 parent 9ed485d commit 1670b45

File tree

1 file changed

+301
-0
lines changed

1 file changed

+301
-0
lines changed
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
# Lab 10: Scalable neural networks
2+
3+
## Study schedule
4+
5+
- [Section 1](#1-shallow-neural-networks-in-pyspark): To finish by Friday. **Essential**
6+
- [Section 2](#2-keras-on-pyspark): To finish before next Tuesday. **Essential**
7+
- [Section 3](#3-exercises): To finish before next Tuesday. ***Exercise***
8+
- [Section 4](#4-additional-exercises-optional): To explore further. *Optional*
9+
10+
## Introduction
11+
12+
In this notebook we will explore the use of the neural network model that comes implemented in Spark ML. We will also look at Pandas UDFs that allow to efficiently use models trained on a single machine using frameworks different to Spark ML, e.g. TensorFlow, Scikit-learn, and but keeping the benefits of Spark to apply such models to large datasets.
13+
14+
**Dependencies.** For this lab, we need to install the packages ``pyarrow``, ``matplotlib``, ``pandas``, ``numpy``, ``tensorflow``, ``keras``, ``temp``, and `scikit-learn`. You may have already installed some of these packages before. Make sure you install these packages in the environment **myspark**
15+
16+
Before we continue, open a new terminal and activate the environment
17+
18+
`module load apps/java/jdk1.8.0_102/binary`
19+
20+
`module load apps/python/conda`
21+
22+
`source activate myspark`
23+
24+
You can now use pip to install the packages we need
25+
26+
`pip install pyarrow tensorflow keras temp scikit-learn`
27+
28+
## 1. Shallow neural networks in PySpark
29+
30+
To illustrate the use of the neural network model that comes in Spark ML, we use the [Spambase Dataset](http://archive.ics.uci.edu/ml/datasets/Spambase) that we already used in Lab 3.
31+
32+
We need to enable Arrow in Spark. More on this later in the lab.
33+
34+
```python
35+
# Enable Arrow-based columnar data transfers. This line of code will be explained later
36+
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
37+
```
38+
39+
The following code is used to load the dataset and properly set the corresponding dataframe.
40+
41+
```python
42+
# We load the dataset and the names of the features
43+
import numpy as np
44+
rawdata = spark.read.csv('./Data/spambase.data')
45+
rawdata.cache()
46+
ncolumns = len(rawdata.columns)
47+
spam_names = [spam_names.rstrip('\n') for spam_names in open('./Data/spambase.data.names')]
48+
number_names = np.shape(spam_names)[0]
49+
for i in range(number_names):
50+
local = spam_names[i]
51+
colon_pos = local.find(':')
52+
spam_names[i] = local[:colon_pos]
53+
54+
# We rename the columns in the dataframe with names of the features in spamd.data.names
55+
schemaNames = rawdata.schema.names
56+
spam_names[ncolumns-1] = 'labels'
57+
for i in range(ncolumns):
58+
rawdata = rawdata.withColumnRenamed(schemaNames[i], spam_names[i])
59+
60+
# We cast the type string to double
61+
from pyspark.sql.types import StringType
62+
from pyspark.sql.functions import col
63+
64+
StringColumns = [x.name for x in rawdata.schema.fields if x.dataType == StringType()]
65+
for c in StringColumns:
66+
rawdata = rawdata.withColumn(c, col(c).cast("double"))
67+
```
68+
69+
We now create the training and test sets.
70+
71+
```python
72+
trainingData, testData = rawdata.randomSplit([0.7, 0.3], 42)
73+
```
74+
75+
We create instances for the vector assembler and the neural network.
76+
77+
```python
78+
from pyspark.ml.feature import VectorAssembler
79+
vecAssembler = VectorAssembler(inputCols = spam_names[0:ncolumns-1], outputCol = 'features')
80+
```
81+
82+
The class that implements the neural network model is called the [MultilayerPerceptronClassifier](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.MultilayerPerceptronClassifier.html) The multilayer perceptron implemented in Spark ML only allows for sigmoidal activation functions in the intermediate layers and the softmax function in the output layer. We can then use the model for binary and multi-class classification.
83+
84+
The architecture of the network is specified through the argument ``layers`` which is a list. The length of the list is equivalent to the number of hidden layers plus two additional numbers that indicate the number of inputs and the number of outputs. The number of inputs is the first element of the list and the number of outputs is the last element of the list.
85+
86+
For example, if ``layers=[10, 5, 4, 3]``, then this neural network assumes a first layer of 10 nodes (the features), followed by two hidden layers of 5 and 4 nodes and a last layer of 3 outputs (classes).
87+
88+
```python
89+
from pyspark.ml.classification import MultilayerPerceptronClassifier
90+
# The first element HAS to be equal to the number of input features
91+
layers = [len(trainingData.columns)-1, 20, 5, 2]
92+
mpc = MultilayerPerceptronClassifier(labelCol="labels", featuresCol="features", maxIter=100, layers=layers, seed=1500)
93+
```
94+
95+
We now create the pipeline, fit it to data and compute the performance over the test set
96+
97+
```python
98+
# Create the pipeline
99+
from pyspark.ml import Pipeline
100+
stages = [vecAssembler, mpc]
101+
pipeline = Pipeline(stages=stages)
102+
pipelineModel = pipeline.fit(trainingData)
103+
104+
# We now make predictions
105+
predictions = pipelineModel.transform(testData)
106+
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
107+
evaluator = MulticlassClassificationEvaluator\
108+
(labelCol="labels", predictionCol="prediction", metricName="accuracy")
109+
accuracy = evaluator.evaluate(predictions)
110+
print("Accuracy = %g " % accuracy)
111+
```
112+
113+
Accuracy = 0.865194
114+
115+
## 2. Keras on PySpark
116+
117+
The alternatives for neural networks in Spark ML are rather limited. We can only do classification through the class MultilayerPerceptronClassifier and even for this model there are important restrictions, e.g. it can only use logistic sigmoid activation functions.
118+
119+
It is possible to use more advaced neural network models, including deep learning models, in PySpark. A way to do it is to make use of a very powerful API in Spark SQL known as **pandas user defined functions** or [**pandas UDFs**](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html#pandas-udfs-a-k-a-vectorized-udfs) for short (they are also known as vectorized UDFs) or equivalently through different [**pandas functions APIs**](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html#pandas-function-apis), like **mapInPandas()**. PySpark uses [Apache Arrow](https://en.wikipedia.org/wiki/Apache_Arrow) to efficiently transfer data between the JVM (Java Virtual Machine) and Python processes, allowing the efficient use of pandas for data analytic jobs in the cluster. A comprehensive user guide of Pandas with Arrow can be found [here](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html)
120+
121+
A typical use case for pandas UDFs or pandas functions APIs in scalable machine learning consists on training a machine learning model on a single machine using a subset of the data and then using that model to provide predictions at scale by distributing the trained model to the executors, which later compute the predictions.
122+
123+
In this section of the Lab, we will train a Keras model using a subset of the spambase dataset and then we will use [mapInPandas()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.mapInPandas.html?highlight=mapinpandas) to ask the executors to compute the predictions over the test set.
124+
125+
### Using keras to train a neural network model over the Spambase dataset
126+
127+
We want to use the same training data that we used in Section 1 of the lab. However, we first need to transform the spark dataframe into a pandas dataframe.
128+
129+
If you go back to the beginning of the lab, we used the instruction
130+
131+
`spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")`
132+
133+
This instruction enabled Arrow so that the transformation between spark dataframes and pandas dataframes can be done efficiently.
134+
135+
```python
136+
# Convert the Spark DataFrame to a Pandas DataFrame using Arrow
137+
trainingDataPandas = trainingData.select("*").toPandas()
138+
```
139+
140+
We prepare the data for keras
141+
142+
```python
143+
nfeatures = ncolumns-1
144+
Xtrain = trainingDataPandas.iloc[:, 0:nfeatures]
145+
ytrain = trainingDataPandas.iloc[:, -1]
146+
```
147+
148+
We configure the neural network model
149+
150+
*Note.* If you are running on HPC, you may get several warnings when working with this installation of TensorFlow. The warnings are related to a non-proper configuration of TensorFlow to work with GPUs. You can safely ignore those for this lab.
151+
152+
```python
153+
from keras import models
154+
from keras import layers
155+
156+
model = models.Sequential()
157+
model.add(layers.Dense(20, activation='relu', input_shape=(np.shape(Xtrain)[1],)))
158+
model.add(layers.Dense(5, activation='relu'))
159+
model.add(layers.Dense(1, activation='sigmoid'))
160+
```
161+
162+
We now compile the model
163+
164+
```python
165+
model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
166+
```
167+
168+
Fit the model to data using 20% of the training data for validation
169+
170+
```python
171+
history = model.fit(Xtrain, ytrain, epochs=100, batch_size=100, validation_split=0.2, verbose=False)
172+
```
173+
174+
Let us plot the progress of the training
175+
176+
```python
177+
import matplotlib.pyplot as plt
178+
179+
history_dict = history.history
180+
acc_values= history_dict['accuracy']
181+
val_acc_values= history_dict['val_accuracy']
182+
epochs = range(1, len(acc_values)+1)
183+
184+
plt.plot(epochs, acc_values, 'bo', label='Training acc')
185+
plt.plot(epochs, val_acc_values, 'b', label='Validation acc')
186+
plt.title('Training and validation accuracy')
187+
plt.xlabel('Epochs')
188+
plt.ylabel('Accuracy')
189+
plt.legend()
190+
plt.savefig("./Output/keras_nn_train_validation_history.png")
191+
```
192+
193+
We will now use mapInPandas to distribute the computation efficiently. For being able to broadcast the trained model to the executors, the model needs to be picklable, this is, it is required that the model can be serialized by the [pickle module](https://docs.python.org/3/library/pickle.html). Unfortunately, Keras does not support pickle to serialize its objects (models). Zach Moshe proposed a [patch to serialize Keras models](http://zachmoshe.com/2017/04/03/pickling-keras-models.html) that was further elaborated in [this StackOverFlow entry](https://stackoverflow.com/questions/61096573/using-tensorflow-keras-model-in-pyspark-udf-generates-a-pickle-error). We use the `ModelWrapperPickable` class suggested in the StackOverFlow entry.
194+
195+
```python
196+
import tempfile
197+
import tensorflow
198+
import pandas as pd
199+
200+
class ModelWrapperPickable:
201+
202+
def __init__(self, model):
203+
self.model = model
204+
205+
def __getstate__(self):
206+
model_str = ''
207+
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
208+
tensorflow.keras.models.save_model(self.model, fd.name, overwrite=True)
209+
model_str = fd.read()
210+
d = { 'model_str': model_str }
211+
return d
212+
213+
def __setstate__(self, state):
214+
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
215+
fd.write(state['model_str'])
216+
fd.flush()
217+
self.model = tensorflow.keras.models.load_model(fd.name)
218+
```
219+
220+
We now create a model using the `ModelWrapperPickable` class
221+
222+
```python
223+
model_wrapper = ModelWrapperPickable(model)
224+
```
225+
226+
The mapInPandas() function that we use here will return a dataframe that has the same column features of `testData` and an additional column for the predictions. We first create a schema containing this structure. Before creating the schema, though, we extract from `testData` a dataframe that only contains the features.
227+
228+
```python
229+
Xtest = testData.select(spam_names[0:ncolumns-1])
230+
```
231+
232+
Now, the new schema for the output dataframes
233+
234+
```python
235+
from pyspark.sql.types import StructField, StructType, DoubleType
236+
pred_field = [StructField("prediction", DoubleType(), True)]
237+
new_schema = StructType(Xtest.schema.fields + pred_field)
238+
```
239+
240+
We create a `predict` method which will be applied by the executors to compute the predictions. What this routine does is to iterate over batches of dataframes, using the model_wrapper to compute the predictions, and return the corresponding batches of dataframes with the additional prediction column.
241+
242+
```python
243+
def predict(iterator):
244+
for features in iterator:
245+
yield pd.concat([features, pd.Series(model_wrapper.model.predict(features).flatten(), name="prediction")], axis=1)
246+
```
247+
248+
We now apply predict to batches of the `Xtest` dataframe using mapInPandas
249+
250+
```python
251+
prediction_keras_df = Xtest.mapInPandas(predict, new_schema)
252+
```
253+
254+
The resulting dataframe is a spark dataframe. We select the column of predictions and transform it to pandas to later compute the accuracy on the test data.
255+
256+
```python
257+
ypred_keras = prediction_keras_df.select('prediction').toPandas().values
258+
```
259+
260+
We use a threshold of 0.5 to assign the predictions to class 0 and class 1
261+
262+
```python
263+
ypred_keras[ypred_keras <0.5] = 0
264+
ypred_keras[ypred_keras >0.5] = 1
265+
```
266+
267+
We now extract the target test data from the `testData` dataframe
268+
269+
```python
270+
testDataPandas = testData.select("*").toPandas()
271+
ytest = testDataPandas.iloc[:, -1].values
272+
```
273+
274+
We finally use the accuracy_score method from scikit-learn to compute the accuracy
275+
276+
```python
277+
from sklearn.metrics import accuracy_score
278+
print("Accuracy = %g " % accuracy_score(ypred_keras, ytest))
279+
```
280+
281+
Accuracy = 0.926885
282+
283+
The accuracy obtained by the keras model is different than the one obtained using the neural network model in Spark ML even though they are using the same training data. Why is that?
284+
285+
## 3. Exercises
286+
287+
**Note**: A *reference* solution will be provided in Blackboard for this part by the following Wednesday (the latest).
288+
289+
### Exercise 1
290+
291+
Include a cross-validation step for the pipeline of the neural network applied to the spambase dataset in [section 1](#1-shallow-neural-networks-in-pyspark). An example of a cross-validator can be found [here](http://spark.apache.org/docs/3.0.1/ml-tuning.html#cross-validation). Make <tt>paramGrid</tt> contains different values for the parameter ``layers`` and find the best parameters and associated accuracy on the test data.
292+
293+
### Exercise 2
294+
295+
Repeat [section 2](#2-keras-on-pyspark) of this Lab but now experiment with a Scikit-learn model. Choose [a classifier from the ones available](https://scikit-learn.org/stable/supervised_learning.html#supervised-learning), train the classifier over the same training data and use pandas and arrow to send the model to the executors, which will provide the predictions. Do you need to use the class `ModelWrapperPickable`?
296+
297+
## 4. Additional exercises (optional)
298+
299+
**Note**: NO solutions will be provided for this part.
300+
301+
Repeat [section 2](#2-keras-on-pyspark) using a PyTorch model.

0 commit comments

Comments
 (0)