Skip to content
This repository was archived by the owner on Mar 24, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ pyspark-docker
IoT-Data-Science/PythonModel/metastore_db
lib
/data
IoT-DataScience/PythonModel/redis_push.py
1 change: 1 addition & 0 deletions IoT-DataScience/.profile.d/setenv.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export LANG=en_US.UTF-8
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,24 @@
# ipython BatchTrain.py Configuration/default.conf

import ConfigParser
import sys
import datetime
import json
from operator import add
import cPickle as pickle
import sys

import Models
import Data
import Models
import redis

from operator import add
from pyspark import SparkContext, SparkConf
from sklearn.externals import joblib


conf = SparkConf().setAppName("BatchTraining")
sc = SparkContext(conf=conf)


def main(args=None):
# Setting up configuration TODO: use docopt
if(args):
configuration_string = args[0]
else:
Expand All @@ -43,24 +45,26 @@ def main(args=None):
config = ConfigParser.ConfigParser()
config.read(configuration_string)
rawdata_directory = config.get("Directories", "dir_rawdata")
storedmodel_directory = config.get("Directories", "dir_storedmodel")
cluster_json_directory = config.get("Directories", "dir_clusters")

# Set up Redis
redis_host = config.get("Redis", "host")
redis_port = int(config.get("Redis", "port"))
redis_password = config.get("Redis", "password")
# Connect to Redis
r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password)

# Parameters TODO: set these in config file
# Clustering
cluster_feature_names = ["StartLat", "StartLong", "EndLat", "EndLong"] # Which features to cluster over
clustering_alg = "KMeans" # Which Clustering Algorithm to use
cluster_model_file = config.get("Batch", "cluster_class_file")
cluster_params = {"max_clusters": 10, "n_init": 10} # Parameters for KMeans

# Initial Classification
init_class_alg = "RandomForest"
init_class_model_file = config.get("Batch", "init_class_file")
init_class_feature_names = ["StartLat", "StartLong"] # Which features to cluster over

# Online Classification
online_class_alg = "RandomForest"
online_class_model_file = config.get("Batch", "online_class_file")
online_class_feature_names = ["Latitude", "Longitude", "StartLat", "StartLong"]

# Read in batch data
Expand All @@ -79,22 +83,26 @@ def main(args=None):
journeys_with_id.persist()
journey_clusters = journeys_with_id.mapValues(lambda journeys: Data.create_journey_clusters(journeys)).persist()
journey_clusters_local = journey_clusters.collectAsMap()
joblib.dump(journey_clusters_local, storedmodel_directory + cluster_model_file + "_JourneyClusters")
cluster_json = journey_clusters.map(Data.extract_journey_json).collect()
with open(cluster_json_directory + "clusters.json", "w") as f:
for cluster in cluster_json:
f.write(cluster + "\n")
r.set("journey_clusters", pickle.dumps(journey_clusters_local))

cluster_json = journey_clusters.map(Data.extract_journey_json).collectAsMap()
for vin, clusters in cluster_json.iteritems():
r.set(vin, json.dumps(clusters))
journey_clusters.unpersist()

# Build initial classification models
init_class_models = journeys_with_id.mapValues(lambda data: Models.train_init_class_model(init_class_alg, init_class_feature_names, data)).collectAsMap()
joblib.dump(init_class_models, storedmodel_directory + init_class_model_file)
r.set("init_models", pickle.dumps(init_class_models))

# Build online classification models
online_class_models = journeys_with_id.mapValues(lambda data: Models.train_online_class_model(online_class_alg, online_class_feature_names, data)).collectAsMap()
joblib.dump(online_class_models, storedmodel_directory + online_class_model_file)
online_class_models = journeys_with_id.mapValues(lambda data: Models.train_online_class_model(online_class_alg,
online_class_feature_names,
data)).collectAsMap()
r.set("online_models", pickle.dumps(online_class_models))

sc.stop()

r.set("models_last_trained", datetime.datetime.utcnow().strftime("%y-%m-%d %H:%M:%S"))

if __name__ == "__main__":
main(sys.argv[1:])
9 changes: 9 additions & 0 deletions IoT-DataScience/Configuration/default.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[Directories]
dir_rawdata: hdfs://10.68.52.89:8020/user/gpadmin/data/pydata_london_simulated/*.out

[Redis]
host: <host>
port: <port>
password: <password>


Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import numpy as np
import pandas as pd
from IPython import embed
from pandas import parser


Expand Down Expand Up @@ -321,8 +320,7 @@ def isfloat(value):
self.transform_data()

# Read in data and assign variables
def load_streaming_data(self, input):
input_json = json.loads(input)
def load_streaming_data(self, input_json):
for k in input_json:
if input_json[k] == "":
input_json[k] = np.nan
Expand Down Expand Up @@ -385,7 +383,6 @@ def create_journey_stats(self, journeys):

def extract_journey_json(tup):
vin, clusters = tup
result = OrderedDict()
clusters_dict = OrderedDict()
for cluster in clusters:
cluster_dict = OrderedDict()
Expand All @@ -394,6 +391,4 @@ def extract_journey_json(tup):
cluster_dict["long"] = cluster.averages["EndLong"]
cluster_dict["address"] = ""
clusters_dict[str(cluster.clusterID)] = cluster_dict
result["vin"] = str(vin)
result["clusters"] = clusters_dict
return json.dumps(result)
return (vin, clusters_dict)
94 changes: 44 additions & 50 deletions IoT-DataScience/PythonModel/Models.py → IoT-DataScience/Models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import numpy as np
import pandas as pd
from IPython import embed
from sklearn import cross_validation
from sklearn import metrics
from sklearn import neighbors
Expand All @@ -29,19 +28,15 @@

class Model:
def __init__(self):
self.data = None
self.model = None
self.feature_names = None
self.input_data = None
self.output_data = None

# go through batch data object and extract relevant features
def create_train_features(self, data):
self.data = data
self.input_data = pd.DataFrame()

for journey in self.data:
self.input_data = self.input_data.append(self.create_journey_features(journey))
features = pd.DataFrame()
for journey in data:
features = features.append(self.create_journey_features(journey))
return features

def create_journey_features(self, journey):
return pd.DataFrame(dict([(k, journey.attributes.get(k)) for k in self.feature_names]),
Expand All @@ -66,27 +61,29 @@ def __init__(self):
Model.__init__(self)

def predict(self, journey):
input = self.create_journey_features(journey)
return self.model.predict_proba(input.values)
new_data = self.create_journey_features(journey)
return self.model.predict_proba(new_data.values)

def train(self, data, feature_names):
raise Exception("NotImplementedException")

def create_train_label(self):
self.output_data = []
for journey in self.data:
self.output_data.append(journey.journey_cluster_id)

def score(self):
cnt = Counter(self.output_data)
indexer = [cnt[x] > 2 for x in self.output_data]
features = self.input_data[indexer].values
labels = [x[0] for x in zip(self.output_data, indexer) if x[1]]
def create_train_labels(self, data):
labels = []
for journey in data:
labels.append(journey.journey_cluster_id)
return labels

def score(self, data, labels):
cnt = Counter(labels)
indexer = [cnt[x] > 2 for x in labels]
features = create_train_features(data)[indexer].values
labels = [x[0] for x in zip(labels, indexer) if x[1]]
loo = cross_validation.KFold(len(labels), len(labels))
self.xval = cross_validation.cross_val_score(self.model,
features,
np.array(labels),
cv=loo)
xval = cross_validation.cross_val_score(self.model,
features,
np.array(labels),
cv=loo)
return xval


class InitClass_RF(InitClass):
Expand All @@ -96,13 +93,13 @@ def __init__(self):
def train(self, data, feature_names):
print "Creating InitClass_RF"
self.feature_names = feature_names
self.create_train_features(data)
self.create_train_label()
features = self.create_train_features(data)
labels = self.create_train_labels(data)

self.model = RandomForestClassifier(n_estimators=100,
max_depth=None,
random_state=0)
self.model.fit(self.input_data.values, self.output_data)
self.model.fit(features, labels)


########################################################
Expand All @@ -126,36 +123,33 @@ def create_journey_features(self, journey):
journey.NArows = pd.isnull(journey.data[self.feature_names]).any(1).nonzero()[0]
return journey.data.drop(journey.data.index[journey.NArows])[self.feature_names]

def create_train_label(self):
self.output_data = pd.Series()
for journey in self.data:
self.output_data = self.output_data.append(journey.data.drop(journey.data.index[journey.NArows]).journey_cluster_id)
def create_train_labels(self, data):
labels = pd.Series()
for journey in data:
labels = labels.append(journey.data.drop(journey.data.index[journey.NArows]).journey_cluster_id)
return labels

def train(self, data, feature_names, params):
raise Exception("NotImplementedException")

def predict(self, journey):
input = self.create_journey_features(journey)
return self.model.predict_proba(input.tail(1))
new_data = self.create_journey_features(journey)
return self.model.predict_proba(new_data.tail(1))


# interface to be implemented by every online classification class
class OnlineClass_RF(OnlineClass):
def __init__(self):
OnlineClass.__init__(self)

# def __init__(self):
# OnlineClass.__init__(self)
# print "creating online rf class"

def train(self, data, feature_names):
print "Creating OnlineClass_RF"
self.feature_names = feature_names
self.create_train_features(data)
self.create_train_label()
train_data = self.create_train_features(data)
train_labels = self.create_train_labels(data)

self.model = RandomForestClassifier(n_estimators=100, max_depth=None, random_state=0, max_features=None)
self.model.fit(self.input_data.values, self.output_data)
self.model.fit(train_data, train_labels)


################################################
Expand Down Expand Up @@ -185,38 +179,38 @@ def train(self, data, feature_names, params):
n_init = params["n_init"]

self.feature_names = feature_names
self.create_train_features(data)
features = self.create_train_features(data)

kmeans_objects = []

# try different numbers of clusters
for i in range(2, max_clusters + 1):
kmeans = KMeans(init="k-means++", n_clusters=i, n_init=n_init, random_state=0)
kmeans.fit(self.input_data)
kmeans.fit(features)
kmeans_objects.append(kmeans)

# get silhouette coefficients of all different cluster models
silhouette_score = [metrics.silhouette_score(self.input_data, model.labels_, metric="euclidean") for model in
silhouette_score = [metrics.silhouette_score(features, model.labels_, metric="euclidean") for model in
kmeans_objects]

# save kmeans model with highest silhouette
self.model = kmeans_objects[silhouette_score.index(max(silhouette_score))]

def assign(self, journeys):
predictions = self.model.predict(self.input_data)
for i, journey in enumerate(journeys):
def assign(self, data):
features = self.create_train_features(data)
predictions = self.model.predict(features)
for i, journey in enumerate(data):
journey.journey_cluster_id = predictions[i]
journey.data["journey_cluster_id"] = predictions[i]
journeys[i] = journey
return journeys
data[i] = journey
return data


def cluster(model_type, feature_names, params, processed_journeys):
if model_type == "KMeans":
cluster_model = Clust_KMeans()
else:
raise Exception("NotImplementedException")

cluster_model.train(processed_journeys, feature_names, params)
assigned_journeys = cluster_model.assign(processed_journeys)
return assigned_journeys
13 changes: 0 additions & 13 deletions IoT-DataScience/PythonModel/Configuration/cluster.conf

This file was deleted.

22 changes: 0 additions & 22 deletions IoT-DataScience/PythonModel/Configuration/default.conf

This file was deleted.

19 changes: 0 additions & 19 deletions IoT-DataScience/PythonModel/Configuration/default.conf.bak

This file was deleted.

Loading