Skip to content

implement k-mediods algorithm and tests #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions src/k_medoids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import numpy as np

def _manhattan_distance(x, y):
'''
x: numpy array
y: numpy array
RETURNS: int/float
Compute the manhattan distance between two points x and y.
'''
return np.sum(np.abs(y - x))

def _find_closest_center(x, cluster_centers):
'''
x: 1d numpy array (shape is (p,))
y: 2d numpy array (shape is (n, p))
RETURNS: int
Using manhattan distance, find the index of the center from cluster_centers
that x is closest to.
'''
distances = np.sum(np.abs(cluster_centers - x), axis=1)
return np.argmin(distances)

def _find_cost(cluster, center):
'''
cluster: list of 1d numpy arrays
center: 1d numpy array
Using manhattan distance, find the total cost of using center as the center
'''
return sum(_manhattan_distance(x, center) for x in cluster)

def _find_total_cost(clusters, cluster_centers):
'''
clusters: list of list of 1d numpy arrays
cluster_centers: 2d numpy array
RETURNS: int/float
Calculate the total cost of the chosen clusters and cluster_centers.
'''
return sum(
_find_cost(clusters[i], cluster_centers[i]) \
for i in xrange(len(clusters))
)

def _find_medoid(cluster):
'''
cluster: list of 1d numpy arrays
RETURNS: 1d numpy array
Using manhattan distance, find the datapoint from the cluster that is the
medoid.
'''
min_cost = None
medoid = None
for y in cluster:
cost = _find_cost(cluster, y)
if min_cost is None or cost < min_cost:
min_cost = cost
medoid = y
return medoid

class kMedoids(object):
'''
An implementation of k-medoids algorithm using manhattan distance.
'''
def __init__(self, n_clusters, max_iter=300, initial_centers=None):
'''
n_clusters: int
max_iter: int
initial_centers: list of ints
RETURNS: None
Intialize kMedoids algorithm with n_clusters as the number of clusters
and max_iter the maximum number of iterations.
initial_centers is a list of the indices of the initial cluster centers.
If value is None, initial cluster centers are chosen randomly.
'''
self.cluster_centers = None
self.n_clusters = n_clusters
self.max_iter = max_iter
self.initial_centers = initial_centers

def fit(self, X):
'''
X: 2d numpy array
RETURNS: None
Build the clusters for k-medoids with data X.
'''
if self.initial_centers is None:
center_indicies = np.random.choice(
range(X.shape[0]),
self.n_clusters,
replace=False
)
else:
center_indicies = self.initial_centers
self.cluster_centers = X[center_indicies]
cost = None
for j in xrange(self.max_iter):
self.clusters = [[] for _ in xrange(self.n_clusters)]
for x in X:
center = _find_closest_center(x, self.cluster_centers)
self.clusters[center].append(x)
for index, cluster in enumerate(self.clusters):
self.cluster_centers[index] = _find_medoid(cluster)
new_cost = _find_total_cost(self.clusters, self.cluster_centers)
if cost is None or new_cost < cost:
cost = new_cost
else:
break

def predict(self, X):
'''
X: 2d numpy array
RETURNS: 1d numpy array
Give the predicted cluster for each datapoint in X
'''
y = np.zeros(X.shape[0])
for i, x in enumerate(X):
y[i] = _find_closest_center(x, self.cluster_centers)
return y
179 changes: 179 additions & 0 deletions test/test_k_medoids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import unittest
import numpy as np
from src.k_medoids import _manhattan_distance, _find_closest_center, \
_find_cost, _find_total_cost, _find_medoid, kMedoids

def unordered_array_equal(x, y):
'''
x: 2d numpy array
y: 2d numpy array
RETURNS: boolean
Return True iff x and y have the same elements, potentially in a different
order.
'''
x_list = x.tolist()
y_list = y.tolist()
x_list.sort()
y_list.sort()
return x_list == y_list

class TestKMedoids(unittest.TestCase):
def test_manhattan_distance(self):
x = np.array((1, 2, 3))
y = np.array((5, 4, 3))
expected = 6
actual = _manhattan_distance(x, y)
self.assertEqual(
expected,
actual,
msg='expected {}, actual {}'.format(expected, actual)
)

def test_find_closest_center(self):
x = np.array((1, 4, 5))
cluster_centers = np.array([
(1, 1, 0),
(2, 3, 5),
(5, 4, 4),
])
expected = 1
actual = _find_closest_center(x, cluster_centers)
self.assertEqual(
expected,
actual,
msg='expected {}, actual {}'.format(expected, actual)
)

def test_find_cost(self):
cluster = [
np.array((1, 1, 0)),
np.array((2, 3, 5)),
np.array((5, 4, 4)),
np.array((6, 1, 1)),
np.array((4, 3, 2)),
]
center = np.array((2, 3, 5))
expected = 28
actual = _find_cost(cluster, center)
self.assertEqual(
expected,
actual,
msg='expected {}, actual {}'.format(expected, actual)
)

def test_find_total_cost(self):
clusters = [
[
np.array((1, 1, 0)),
np.array((2, 3, 5)),
np.array((5, 4, 4)),
np.array((6, 1, 1)),
np.array((4, 3, 2)),
],
[
np.array((3, 4, 2)),
np.array((1, 1, 1)),
np.array((0, 1, 2)),
],
]
cluster_centers = np.array([
[2, 3, 5],
[1, 1, 1],
])
expected = 36
actual = _find_total_cost(clusters, cluster_centers)
self.assertEqual(
expected,
actual,
msg='expected {}, actual {}'.format(expected, actual)
)

def test_find_medoid(self):
cluster = [
np.array((1, 1, 0)),
np.array((2, 3, 5)),
np.array((5, 4, 4)),
np.array((6, 1, 1)),
np.array((4, 3, 2)),
]
expected = np.array((4, 3, 2))
actual = _find_medoid(cluster)
self.assertTrue(
np.array_equal(expected, actual),
msg='expected {}, actual {}'.format(expected, actual)
)

def test_k_medoids_fit1(self):
X = np.array([
[2, 1],
[1, 2],
[1, 3],
[6, 9],
[5, 10],
[4, 11],
])
model = kMedoids(2, initial_centers=[0, 3])
model.fit(X)
expected_cost = 7
actual_cost = _find_total_cost(model.clusters, model.cluster_centers)
self.assertTrue(
np.array_equal(expected_cost, actual_cost),
msg='expected {}, actual {}'.format(expected_cost, actual_cost)
)
expected_centers = np.array([[1, 2], [5, 10]])
actual_centers = model.cluster_centers
self.assertTrue(
unordered_array_equal(expected_centers, actual_centers),
msg='expected centers {},\nactual centers {}'.
format(expected_centers, actual_centers)
)

def test_k_medoids_fit2(self):
X = np.array([
[2, 6],
[3, 4],
[3, 8],
[4, 7],
[6, 2],
[6, 4],
[7, 3],
[7, 4],
[8, 5],
[7, 6],
])
model = kMedoids(2, initial_centers=[0, 1])
model.fit(X)
expected_cost = 18
actual_cost = _find_total_cost(model.clusters, model.cluster_centers)
self.assertTrue(
np.array_equal(expected_cost, actual_cost),
msg='expected {}, actual {}'.format(expected_cost, actual_cost)
)
expected_centers = np.array([[7, 4], [2, 6]])
actual_centers = model.cluster_centers
self.assertTrue(
unordered_array_equal(expected_centers, actual_centers),
msg='expected centers {},\nactual centers {}'.
format(expected_centers, actual_centers)
)

def test_predict(self):
X = np.array([
[2, 1],
[1, 2],
[1, 3],
[6, 9],
[5, 10],
[4, 11],
])
model = kMedoids(2, initial_centers=[0, 3])
model.fit(X)
expected_predictions = np.array([0, 0, 0, 1, 1, 1])
y = model.predict(X)
self.assertTrue(
np.all(y[:3] == y[0]) and np.all(y[3:] == y[3]),
msg='expected {}, actual: {}'.format(expected_predictions, y)
)

if __name__ == '__main__':
unittest.main()