From ad1bbeae6a5d15bb213fa6ef56a311ad4b161d9a Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Thu, 28 Feb 2019 17:06:21 +0200 Subject: [PATCH 1/9] keep things DRY, move identical methods to base classes --- algorithms/fsm/base.py | 11 ++++++++--- algorithms/fsm/incremental/exact_counting.py | 8 -------- algorithms/fsm/incremental/naive_reservoir.py | 14 -------------- algorithms/fsm/incremental/optimized_reservoir.py | 14 -------------- algorithms/fsm/reservoir.py | 11 +++++++++++ 5 files changed, 19 insertions(+), 39 deletions(-) diff --git a/algorithms/fsm/base.py b/algorithms/fsm/base.py index aaeaa0f..609685f 100644 --- a/algorithms/fsm/base.py +++ b/algorithms/fsm/base.py @@ -4,6 +4,8 @@ from graph.simple_graph import SimpleGraph +from subgraph.pattern import canonical_label + from algorithms.exploration.util import ( new_subgraphs_func, all_subgraphs_func) @@ -29,10 +31,13 @@ def add_edge(self, edge): @abstractmethod - def add_subgraph(self, subgraph): + def remove_edge(self, edge): pass - @abstractmethod + def add_subgraph(self, subgraph): + self.patterns[canonical_label(subgraph)] += 1 + + def remove_subgraph(self, subgraph): - pass + self.patterns[canonical_label(subgraph)] -= 1 diff --git a/algorithms/fsm/incremental/exact_counting.py b/algorithms/fsm/incremental/exact_counting.py index a26abcb..4e14e9d 100644 --- a/algorithms/fsm/incremental/exact_counting.py +++ b/algorithms/fsm/incremental/exact_counting.py @@ -52,11 +52,3 @@ def add_edge(self, edge): self.metrics['new_subgraph_count'].append(len(additions)) return True - - - def add_subgraph(self, subgraph): - self.patterns.update([canonical_label(subgraph)]) - - - def remove_subgraph(self, subgraph): - self.patterns.subtract([canonical_label(subgraph)]) diff --git a/algorithms/fsm/incremental/naive_reservoir.py b/algorithms/fsm/incremental/naive_reservoir.py index 786c11f..a700dbe 100644 --- a/algorithms/fsm/incremental/naive_reservoir.py +++ b/algorithms/fsm/incremental/naive_reservoir.py @@ -67,17 +67,3 @@ def process_new_subgraph(self, subgraph): if old_subgraph: self.remove_subgraph(old_subgraph) return success - - - def process_existing_subgraph(self, old_subgraph, new_subgraph): - self.reservoir.replace(old_subgraph, new_subgraph) - self.remove_subgraph(old_subgraph) - self.add_subgraph(new_subgraph) - - - def add_subgraph(self, subgraph): - self.patterns[canonical_label(subgraph)] += 1 - - - def remove_subgraph(self, subgraph): - self.patterns[canonical_label(subgraph)] -= 1 diff --git a/algorithms/fsm/incremental/optimized_reservoir.py b/algorithms/fsm/incremental/optimized_reservoir.py index fc4214f..df7a3fe 100644 --- a/algorithms/fsm/incremental/optimized_reservoir.py +++ b/algorithms/fsm/incremental/optimized_reservoir.py @@ -95,17 +95,3 @@ def process_new_subgraph(self, subgraph): if old_subgraph: self.remove_subgraph(old_subgraph) return success - - - def process_existing_subgraph(self, old_subgraph, new_subgraph): - self.reservoir.replace(old_subgraph, new_subgraph) - self.remove_subgraph(old_subgraph) - self.add_subgraph(new_subgraph) - - - def add_subgraph(self, subgraph): - self.patterns[canonical_label(subgraph)] += 1 - - - def remove_subgraph(self, subgraph): - self.patterns[canonical_label(subgraph)] -= 1 diff --git a/algorithms/fsm/reservoir.py b/algorithms/fsm/reservoir.py index 86eb7fa..d2b87c7 100644 --- a/algorithms/fsm/reservoir.py +++ b/algorithms/fsm/reservoir.py @@ -17,3 +17,14 @@ def __init__(self, M=None, **kwargs): @abstractmethod def process_new_subgraph(self, subgraph): pass + + + @abstractmethod + def process_old_subgraph(self, subgraph): + pass + + + def process_existing_subgraph(self, old_subgraph, new_subgraph): + self.reservoir.replace(old_subgraph, new_subgraph) + self.remove_subgraph(old_subgraph) + self.add_subgraph(new_subgraph) From dad89c28b3659e1f69f3d5185316e3d41e977029 Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Fri, 1 Mar 2019 16:07:25 +0200 Subject: [PATCH 2/9] add dynamic exact counting --- algorithms/fsm/dynamic/exact_counting.py | 54 ++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 algorithms/fsm/dynamic/exact_counting.py diff --git a/algorithms/fsm/dynamic/exact_counting.py b/algorithms/fsm/dynamic/exact_counting.py new file mode 100644 index 0000000..b5edbfe --- /dev/null +++ b/algorithms/fsm/dynamic/exact_counting.py @@ -0,0 +1,54 @@ +from datetime import datetime, timedelta + +from ..incremental.exact_counting import IncrementalExactCountingAlgorithm + +from subgraph.util import make_subgraph +from subgraph.pattern import canonical_label + + +class DynamicExactCountingAlgorithm(IncrementalExactCountingAlgorithm): + + + def __init__(self, k=3, **kwargs): + super().__init__(k=k) + + + def remove_edge(self, edge): + if edge not in self.graph: + return False + + self.graph.add_edge(edge) + + e_add_start = datetime.now() + + u = edge.get_u() + v = edge.get_v() + + removals, replacements = self.get_all_subgraphs(u, v) + + for nodes in removals: + # collect the induced subgraph after removal of edge + # remove the subgraph with the removed edge included + edges = self.graph.get_induced_edges(nodes) + subgraph = make_subgraph(nodes, edges + [edge]) + self.remove_subgraph(subgraph) + + for nodes in replacements: + # collect the induced subgraph before removal of edge + # remove that subgraph + # update the subgraph by not including the removed edge + # add the updated subgraph + edges = self.graph.get_induced_edges(nodes) + + existing_subgraph = make_subgraph(nodes, edges + [edge]) + self.remove_subgraph(existing_subgraph) + + updated_subgraph = make_subgraph(nodes, edges) + self.add_subgraph(updated_subgraph) + + e_add_end = datetime.now() + ms = timedelta(microseconds=1) + self.metrics['edge_remove_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['removed_subgraph_count'].append(len(removals)) + + return True From baf14a95dc3b89956a84ad5c33c73d8156484d80 Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Tue, 12 Mar 2019 15:28:30 +0200 Subject: [PATCH 3/9] implement removal of subgraphs from the reservoir --- sampling/subgraph_reservoir.py | 45 +++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/sampling/subgraph_reservoir.py b/sampling/subgraph_reservoir.py index 995abb6..3f00103 100644 --- a/sampling/subgraph_reservoir.py +++ b/sampling/subgraph_reservoir.py @@ -6,16 +6,18 @@ class SubgraphReservoir: subgraphs = None vertex_subgraphs = None - def __init__(self, size): + def __init__(self, max_size): """ Initialize a new subgraph reservoir. :param size: The maximum size of the reservoir. :type size: int """ - self.max_size = size + self.size = 0 + self.max_size = max_size self.subgraphs = [] self.subgraph_indices = {} + self.vacant_indices = [] self.vertex_subgraphs = defaultdict(set) @@ -24,12 +26,12 @@ def __contains__(self, subgraph): def __len__(self): - return len(self.subgraphs) + return self.size def is_full(self): """Checks if the reservoir has reached max_size.""" - return len(self) >= self.max_size + return self.size >= self.max_size def add(self, subgraph, N=float('-inf')): @@ -50,8 +52,16 @@ def add(self, subgraph, N=float('-inf')): else: # the reservoir is not full, so we add the new subgraph - idx = len(self) - self.subgraphs.append(subgraph) + + # determine where in the list the subgraph is added + # if there are vacant indices in the list, use them first + # otherwise append the subgraph to the end + if len(self.vacant_indices) > 0: + idx = self.vacant_indices.pop() + self.subgraphs[idx] = subgraph + else: + idx = len(self) + self.subgraphs.append(subgraph) self.subgraph_indices[subgraph] = idx @@ -60,6 +70,9 @@ def add(self, subgraph, N=float('-inf')): success = True + if success and old_subgraph == None: + self.size += 1 + return success, old_subgraph @@ -86,6 +99,26 @@ def replace(self, old_subgraph, new_subgraph): self.vertex_subgraphs[v].remove(idx) + def remove(self, subgraph): + """Removes subgraph from reservoir if possible.""" + + if subgraph in self: + idx = self.subgraph_indices[subgraph] + + self.subgraphs[idx] = None + del self.subgraph_indices[subgraph] + + for u in subgraph.nodes: + self.vertex_subgraphs[u].remove(idx) + + self.vacant_indices.append(idx) + self.size -= 1 + + return True + else: + return False + + def get_common_subgraphs(self, u, v): """Get all subgraphs from the reservoir that contain the edge (u, v).""" common_indices = self.vertex_subgraphs[u] & self.vertex_subgraphs[v] From 1236e15daa5bd1a3882296a7a933f7d7ede19f26 Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Tue, 12 Mar 2019 15:30:35 +0200 Subject: [PATCH 4/9] add naive reservoir sampling in the dynamic setting --- algorithms/fsm/dynamic/exact_counting.py | 2 +- algorithms/fsm/dynamic/naive_reservoir.py | 141 ++++++++++++++++++ algorithms/fsm/incremental/exact_counting.py | 4 + algorithms/fsm/incremental/naive_reservoir.py | 8 + algorithms/fsm/reservoir.py | 2 +- 5 files changed, 155 insertions(+), 2 deletions(-) create mode 100644 algorithms/fsm/dynamic/naive_reservoir.py diff --git a/algorithms/fsm/dynamic/exact_counting.py b/algorithms/fsm/dynamic/exact_counting.py index b5edbfe..8c85379 100644 --- a/algorithms/fsm/dynamic/exact_counting.py +++ b/algorithms/fsm/dynamic/exact_counting.py @@ -17,7 +17,7 @@ def remove_edge(self, edge): if edge not in self.graph: return False - self.graph.add_edge(edge) + self.graph.remove_edge(edge) e_add_start = datetime.now() diff --git a/algorithms/fsm/dynamic/naive_reservoir.py b/algorithms/fsm/dynamic/naive_reservoir.py new file mode 100644 index 0000000..02d9812 --- /dev/null +++ b/algorithms/fsm/dynamic/naive_reservoir.py @@ -0,0 +1,141 @@ +import random + +from datetime import datetime, timedelta + +from ..reservoir import ReservoirAlgorithm + +from subgraph.util import make_subgraph, make_subgraph_edge +from subgraph.pattern import canonical_label + + +class DynamicNaiveReservoirAlgorithm(ReservoirAlgorithm): + + def __init__(self, k=3, M=1000): + super().__init__(k=k, M=M) + + # the count of uncompensated deletions, where + self.c1 = 0 # i) the deleted element was in the sample, and + self.c2 = 0 # ii) the deleted element was not in the sample + + + def add_edge(self, edge): + if edge in self.graph: + return False + + e_add_start = datetime.now() + + u = edge.get_u() + v = edge.get_v() + + # replace update all existing subgraphs with u and v in the reservoir + s_rep_start = datetime.now() + + for old_subg in self.reservoir.get_common_subgraphs(u, v): + new_subg = make_subgraph(old_subg.nodes, old_subg.edges + (edge,)) + self.process_existing_subgraph(old_subg, new_subg) + + s_rep_end = datetime.now() + + # find new subgraph candidates for the reservoir + s_add_start = datetime.now() + additions = self.get_new_subgraphs(u, v) + + # perform reservoir sampling for each new subgraph candidate + I = 0 + for nodes in additions: + self.N += 1 + edges = self.graph.get_induced_edges(nodes) + subgraph = make_subgraph(nodes, edges+[edge]) + I += int(self.process_new_subgraph(subgraph)) + s_add_end = datetime.now() + + self.graph.add_edge(edge) + + e_add_end = datetime.now() + + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_candidate_subgraphs'].append(len(additions)) + self.metrics['num_processed_subgraphs'].append(I) + self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) + + return True + + + def remove_edge(self, edge): + if edge not in self.graph: + return False + + e_del_start = datetime.now() + + self.graph.remove_edge(edge) + + u = edge.get_u() + v = edge.get_v() + + # find all nodesets representing subgraphs that will + # no longer be connected after the removal of this edge + removals = self.get_new_subgraphs(u, v) + D = len(removals) + + old_c1 = self.c1 + + # find all subgraphs in the reservoir containing nodes u and v + for old_subg in self.reservoir.get_common_subgraphs(u, v): + + if frozenset(old_subg.nodes) in removals: + # subgraph is no longer connected after edge removal, remove it + self.process_old_subgraph(old_subg) + else: + # subgraph stays connected after edge removal, replace it + old_edges = old_subg.edges + edges = [e for e in old_edges if e != make_subgraph_edge(edge)] + new_subg = make_subgraph(old_subg.nodes, edges) + self.process_existing_subgraph(old_subg, new_subg) + + # update the count of uncompensated deletions from outside the sample + removals_from_sample = self.c1 - old_c1 + self.c2 += D - removals_from_sample + + self.N -= D + + e_del_end = datetime.now() + + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('del') + self.metrics['edge_op_ms'].append((e_del_end - e_del_start) / ms) + self.metrics['num_candidate_subgraphs'].append(D) + self.metrics['num_processed_subgraphs'].append(removals_from_sample) + self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) + + return True + + + def process_new_subgraph(self, subgraph): + success = False + do_sampling = False + + if self.c1 + self.c2 == 0: + # there are no uncompensated deletions, + # so we can do normal reservoir sampling + do_sampling = True + else: + # there are uncompensated deletions, + # add subgraph to reservoir with probability c1 / (c1 + c2) + if random.random() < self.c1 / float(self.c1 + self.c2): + self.c1 -= 1 + do_sampling = True + else: + self.c2 -= 1 + + if do_sampling: + success, old_subgraph = self.reservoir.add(subgraph, N=self.N) + if success: self.add_subgraph(subgraph) + if old_subgraph: self.remove_subgraph(old_subgraph) + + return success + + + def process_old_subgraph(self, subgraph): + if self.reservoir.remove(subgraph): self.c1 += 1 diff --git a/algorithms/fsm/incremental/exact_counting.py b/algorithms/fsm/incremental/exact_counting.py index 4e14e9d..221592f 100644 --- a/algorithms/fsm/incremental/exact_counting.py +++ b/algorithms/fsm/incremental/exact_counting.py @@ -52,3 +52,7 @@ def add_edge(self, edge): self.metrics['new_subgraph_count'].append(len(additions)) return True + + + def remove_edge(self, edge): + pass diff --git a/algorithms/fsm/incremental/naive_reservoir.py b/algorithms/fsm/incremental/naive_reservoir.py index a700dbe..de45fc9 100644 --- a/algorithms/fsm/incremental/naive_reservoir.py +++ b/algorithms/fsm/incremental/naive_reservoir.py @@ -67,3 +67,11 @@ def process_new_subgraph(self, subgraph): if old_subgraph: self.remove_subgraph(old_subgraph) return success + + + def process_old_subgraph(self, subgraph): + raise NotImplementedError() + + + def remove_edge(self, edge): + raise NotImplementedError() diff --git a/algorithms/fsm/reservoir.py b/algorithms/fsm/reservoir.py index d2b87c7..cc316de 100644 --- a/algorithms/fsm/reservoir.py +++ b/algorithms/fsm/reservoir.py @@ -9,7 +9,7 @@ class ReservoirAlgorithm(BaseAlgorithm, metaclass=ABCMeta): def __init__(self, M=None, **kwargs): self.M = M # reservoir size self.N = 0 # number of subgraphs encountered - self.reservoir = SubgraphReservoir(size=M) + self.reservoir = SubgraphReservoir(M) super().__init__(M=M, **kwargs) From 067d0623595c898e77a24122356de77831a904e2 Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Tue, 12 Mar 2019 15:31:23 +0200 Subject: [PATCH 5/9] move accuracy utilities to util module --- accuracy.py => util/accuracy.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename accuracy.py => util/accuracy.py (100%) diff --git a/accuracy.py b/util/accuracy.py similarity index 100% rename from accuracy.py rename to util/accuracy.py From f806b2c5755be280f98cb6d82fce0c31d3314a91 Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Tue, 12 Mar 2019 15:32:18 +0200 Subject: [PATCH 6/9] create program for continuous accuracy experiments --- continuous_accuracy.py | 233 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 continuous_accuracy.py diff --git a/continuous_accuracy.py b/continuous_accuracy.py new file mode 100644 index 0000000..5cb83a9 --- /dev/null +++ b/continuous_accuracy.py @@ -0,0 +1,233 @@ +import os +import csv +import time +import uuid + +import numpy as np +import matplotlib.pyplot as plt + +from collections import deque, defaultdict +from argparse import ArgumentParser, FileType + +from graph.util import make_edge + +from util.accuracy import ( + pattern_frequencies, + threshold_frequencies, + precision, recall, avg_relative_error +) + +from algorithms.fsm.incremental.exact_counting import IncrementalExactCountingAlgorithm +from algorithms.fsm.incremental.naive_reservoir import IncrementalNaiveReservoirAlgorithm +from algorithms.fsm.incremental.optimized_reservoir import IncerementalOptimizedReservoirAlgorithm + +from algorithms.fsm.dynamic.exact_counting import DynamicExactCountingAlgorithm +from algorithms.fsm.dynamic.naive_reservoir import DynamicNaiveReservoirAlgorithm + +ACCURACY_SAMPLE_INTERVAL = 10 + +ALGORITHMS = { + + 'incremental': { + 'exact': IncrementalExactCountingAlgorithm, + 'naive': IncrementalNaiveReservoirAlgorithm, + 'optimal': IncerementalOptimizedReservoirAlgorithm + }, + + 'dynamic': { + 'exact': DynamicExactCountingAlgorithm, + 'naive': DynamicNaiveReservoirAlgorithm, + 'optimal': None + } + +} + + +def run_simulation(simulators, edges, window_size=0): + np.random.shuffle(edges) + + accuracy_metrics = defaultdict(list) + + if window_size > 0: + sliding_window = deque() + + start_time = time.time() + + for idx, edge_to_add in enumerate(edges): + simulators['exact'].add_edge(edge_to_add) + simulators['naive'].add_edge(edge_to_add) + + if window_size > 0: + sliding_window.append(edge_to_add) + + if len(sliding_window) > window_size: + edge_to_remove = sliding_window.popleft() + simulators['exact'].remove_edge(edge_to_add) + simulators['naive'].remove_edge(edge_to_add) + + if idx % ACCURACY_SAMPLE_INTERVAL == 0: + exact_patterns = +simulators['exact'].patterns + naive_patterns = +simulators['naive'].patterns + + exact_freqs = pattern_frequencies(exact_patterns) + naive_freqs = pattern_frequencies(naive_patterns) + + exact_freqs = threshold_frequencies(exact_freqs, 0.005) + naive_freqs = threshold_frequencies(naive_freqs, 0.005) + + prec = precision(exact_freqs, naive_freqs) + rec = recall(exact_freqs, naive_freqs) + are = avg_relative_error(exact_freqs, naive_freqs, 40) + + accuracy_metrics['precision'].append(prec) + accuracy_metrics['recall'].append(rec) + accuracy_metrics['avg_relative_error'].append(are) + + end_time = time.time() + + return end_time - start_time, accuracy_metrics + + +def main(): + parser = ArgumentParser(description="Run a FSM continuous accuracy experiment.") + + parser.add_argument("k", + type=int, + help="size of subgraphs (k-nodes) being mined") + + parser.add_argument('stream_setting', + choices=['incremental', 'dynamic'], + help="choose between incremental or fully dynamic stream setting") + + ''' + parser.add_argument('algorithm', + choices=['exact', 'naive', 'optimal'], + help="choose exact counting, or naive or optimised reservoir sampling") + ''' + + parser.add_argument('edge_file', + type=FileType('r'), + help="path to the input graph edge file") + + ''' + parser.add_argument('output_dir', + help="path to the directory for output files") + ''' + + parser.add_argument('reservoir_size', + type=int, + help="reservoir size required for naive and optimal algorithms") + + parser.add_argument('-t', '--times', + type=int, + default=10, + help="number of times the simulation is run in this instance") + + parser.add_argument('-w', '--windowsize', + dest="window_size", + type=int, + help="size of the sliding window (requires dynamic stream setting)") + + args = vars(parser.parse_args()) + + k = args['k'] + #algo = args['algorithm'] + stream = args['stream_setting'] + M = args['reservoir_size'] + times = args['times'] + window_size = args['window_size'] + + in_file = args['edge_file'] + #output_dir = args['output_dir'] + + print("Running the Continuous Accuracy Experiment", "\n") + + print("PARAMETERS") + print("stream setting:", stream) + #print("algorithm: ", algo) + print("k: ", k) + print("M: ", M) + print("times: ", times) + print("window size: ", window_size) + print("input graph: ", in_file.name, "\n") + + + ExactAlgorithm = ALGORITHMS[stream]['exact'] + NaiveAlgorithm = ALGORITHMS[stream]['naive'] + + if ExactAlgorithm == None: + msg = "exact algorithm is not available for %s stream setting" % (stream) + raise NotImplementedError(msg) + + if NaiveAlgorithm == None: + msg = "naive algorithm is not available for %s stream setting" % (stream) + raise NotImplementedError(msg) + + if window_size and stream != 'dynamic': + msg = "sliding window is only used with %s stream setting" % (stream) + raise ValueError(msg) + + + # read the input graph from the edge file + with in_file as edge_file: + edge_reader = csv.reader(edge_file, delimiter=' ') + edges = [make_edge(*tuple(int(x) for x in row)) for row in edge_reader] + + + # run simulations and collect the duration and metrics from each run + durations = [] + run_metrics = defaultdict(list) + run_accuracy_metrics = [] + + print("SIMULATIONS", "\n") + + for i in range(times): + print("Running simulation", i + 1, "...") + + simulators = { + 'exact': ExactAlgorithm(k=k), + 'naive': NaiveAlgorithm(k=k, M=M) + } + + duration, accuracy_metrics = run_simulation(simulators, edges, window_size) + + print("Done, run took", duration, "seconds.", "\n") + + durations.append(duration) + run_accuracy_metrics.append(accuracy_metrics) + + + avg_duration = np.mean(durations) + + print("Average duration of a run was", avg_duration, "seconds.") + + + print("Plots of accuracy") + + avg_precision = np.mean(np.asarray([d['precision'] for d in run_accuracy_metrics]), axis=0) + avg_recall = np.mean(np.asarray([d['recall'] for d in run_accuracy_metrics]), axis=0) + avg_are = np.mean(np.asarray([d['avg_relative_error'] for d in run_accuracy_metrics]), axis=0) + + edge_add_points = [ACCURACY_SAMPLE_INTERVAL * i for i in range(1, len(avg_precision) + 1)] + + fig = plt.figure() + ax = fig.add_subplot(311) + + ax.plot(edge_add_points, avg_precision) + plt.title("precision") + + ax = fig.add_subplot(312) + + ax.plot(edge_add_points, avg_recall) + plt.title("recall") + + ax = fig.add_subplot(313) + + ax.plot(edge_add_points, avg_are) + plt.title("average relative error") + + plt.show() + + +if __name__ == '__main__': + main() From 21cf7c57ab4f2c02aae6766695e8aa0dfb388a98 Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Tue, 26 Mar 2019 20:32:04 +0200 Subject: [PATCH 7/9] add random pairing skip optimization code and fsm algorithm --- algorithms/fsm/dynamic/optimized_reservoir.py | 191 ++++++++++++++++++ sampling/skip_rp.py | 107 ++++++++++ 2 files changed, 298 insertions(+) create mode 100644 algorithms/fsm/dynamic/optimized_reservoir.py create mode 100644 sampling/skip_rp.py diff --git a/algorithms/fsm/dynamic/optimized_reservoir.py b/algorithms/fsm/dynamic/optimized_reservoir.py new file mode 100644 index 0000000..684b483 --- /dev/null +++ b/algorithms/fsm/dynamic/optimized_reservoir.py @@ -0,0 +1,191 @@ +import random + +from datetime import datetime, timedelta + +from ..reservoir import ReservoirAlgorithm + +from graph.simple_graph import SimpleGraph + +from subgraph.util import make_subgraph +from subgraph.pattern import canonical_label + +from sampling import skip_rp +from sampling.skip_rs import SkipRS + + +class IncerementalOptimizedReservoirAlgorithm(ReservoirAlgorithm): + + + def __init__(self, k=3, M=1000): + super().__init__(k=k, M=M) + + self.skip_rs = SkipRS(M) + + # number of overflowing subgraphs skipped + self.skip_sum_rs = 0 + self.skip_sum_rp = 0 + + # the count of uncompensated deletions, where + self.c1 = 0 # i) the deleted element was in the sample, and + self.c2 = 0 # ii) the deleted element was not in the sample + + + @property + def d(self): + return self.c1 + self.c2 + + + def add_edge(self, edge): + if edge in self.graph: + return False + + e_add_start = datetime.now() + + u = edge.get_u() + v = edge.get_v() + + # replace update all existing subgraphs with u and v in the reservoir + for old_subg in self.reservoir.get_common_subgraphs(u, v): + new_subg = make_subgraph(old_subg.nodes, old_subg.edges + (edge,)) + self.process_existing_subgraph(old_subg, new_subg) + + # find new subgraph candidates for the reservoir + subgraph_candidates = self.get_new_subgraphs(u, v) + + W = len(subgraph_candidates) + I = 0 # number of subgraph candidates to include in sample + + if not self.reservoir.is_full(): + # the reservoir is not full, which means we either + # haven't encountered enough subgraphs to fill it, or + # there are uncompensated deletions from the reservoir + + # in the former case, we simply add subgraphs into + # the reservoir until it is full + + if self.d == 0: + I = min(W, self.M - len(self.reservoir)) + self.s = I + self.N += I + + # RANDOM PAIRING STEP + + if self.d > 0: + sum_rp = 0 + + while (self.d > 0) and (sum_rp < W): + num_picked_subgraphs = 0 + Z_rp = skip_rp.skip_records(self.c1, self.d) + + if sum_rp + Z_rp < W: + num_picked_subgraphs = int(self.c1 > 0) + else: + Z_rp = W - sum_rp + + I += num_picked_subgraphs + self.c1 -= num_picked_subgraphs + self.c2 -= Z_rp + + sum_rp += Z_rp + num_picked_subgraphs + + W -= sum_rp + + # RANDOM SAMPLING STEP + + # determine the number of candidates I to include in the sample + while self.s < W: + I += 1 + Z_rs = self.skip_rs.apply(self.N) + self.N += Z_rs + 1 + self.s += Z_rs + 1 + + # sample I subgraphs from the W candidates + if I < W: + additions = random.sample(subgraph_candidates, I) + else: + additions = subgraph_candidates + + # add all sampled subgraphs + for nodes in additions: + edges = self.graph.get_induced_edges(nodes) + subgraph = make_subgraph(nodes, edges + [edge]) + self.process_new_subgraph(subgraph) + + self.graph.add_edge(edge) + self.s -= W + + e_add_end = datetime.now() + + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_candidate_subgraphs'].append(len(subgraph_candidates)) + self.metrics['num_processed_subgraphs'].append(I) + self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) + + return True + + + def remove_edge(self, edge): + if edge not in self.graph: + return False + + e_del_start = datetime.now() + + self.graph.remove_edge(edge) + + u = edge.get_u() + v = edge.get_v() + + # find all nodesets representing subgraphs that will + # no longer be connected after the removal of this edge + removals = self.get_new_subgraphs(u, v) + D = len(removals) + + # we start compensating for subgraph deletions with variables c1 and c2 + # after the reservoir has filled up once + compensate_for_removals = self.reservoir.is_full() or self.d > 0 + removals_from_sample = 0 + + # find all subgraphs in the reservoir containing nodes u and v + for old_subg in self.reservoir.get_common_subgraphs(u, v): + + if frozenset(old_subg.nodes) in removals: + # subgraph is no longer connected after edge removal, remove it + removals_from_sample += int(self.process_old_subgraph(old_subg)) + else: + # subgraph stays connected after edge removal, replace it + old_edges = old_subg.edges + edges = [e for e in old_edges if e != make_subgraph_edge(edge)] + new_subg = make_subgraph(old_subg.nodes, edges) + self.process_existing_subgraph(old_subg, new_subg) + + if compensate_for_removals: + # update the counts of uncompensated deletions + self.c1 += removals_from_sample + self.c2 += D - removals_from_sample + + self.N -= D + + e_del_end = datetime.now() + + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('del') + self.metrics['edge_op_ms'].append((e_del_end - e_del_start) / ms) + self.metrics['num_candidate_subgraphs'].append(D) + self.metrics['num_processed_subgraphs'].append(removals_from_sample) + self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) + + return True + + + def process_new_subgraph(self, subgraph): + success, old_subgraph = self.reservoir.add(subgraph) + + if success: self.add_subgraph(subgraph) + if old_subgraph: self.remove_subgraph(old_subgraph) + + return success + + def process_old_subgraph(self, subgraph): + return self.reservoir.remove(subgraph) diff --git a/sampling/skip_rp.py b/sampling/skip_rp.py new file mode 100644 index 0000000..07621aa --- /dev/null +++ b/sampling/skip_rp.py @@ -0,0 +1,107 @@ +import numpy as np + +# The alpha value used in Vitter's "An efficient algorithm for sequential +# random sampling" was 1/13 +ALPHA_INV = 13 + +def draw_V_prime(coefficient): + return np.exp(np.log(np.random.rand()) * coefficient) + + +def skip_records(n, N): + """ + Returns the number of records S to skip before selecting the next record. + + :param n: The number of records to be selected out of the pool. + :param N: The total number of records to choose from. + :type n: int + :type N: int + :returns: S, the number of records to skip + :rtype: int + """ + + V_prime = draw_V_prime(1 / float(n)) + + threshold = n * ALPHA_INV + + S = N + + if (n > 1): + if (threshold < N): + S, = algorithm_D(n, N, V_prime) + else: + S = algorithm_A(n, N) + elif (n == 1): + S = int(N * V_prime) + + return S + + + +def algorithm_A(n, N): + top = N - n + N_real = float(N) + + V = np.random.rand() + S = 0 + + quot = (N - n) / N_real + + while quot > V: + S += 1 + top -= 1 + N_real -= 1 + + quot *= top / N_real + + return S + + + +def algorithm_D(n, N, V_prime) + n_inv = 1 / float(n) + n_min1_inv = 1 / float(n - 1) + + qu1 = N - n + 1 + + while True: + while True: + X = N * (1 - V_prime) + S = int(X) + + if S < qu1: + break + + V_prime = draw_V_prime(n_inv) + + U = np.random.rand() + + y1 = np.exp(np.log(U * N / float(qu1)) * n_min1_inv) + + V_prime = y1 * (- X / float(N + 1)) * (qu1 / float(qu1 - S)) + + if V_prime <= 1: + break + + y2 = 1 + top = N - 1 + + if n - 1 > S: + bottom = float(N - n) + limit = N - S + else: + bottom = float(N - S - 1) + limit = qu1 + + for t in range(N - 1, limit - 1, -1): + y2 *= top / bottom + top -= 1 + bottom -= 1 + + if N / float(N - X) >= y1 * np.exp(np.log(y2) * n_min1_inv): + V_prime = draw_V_prime(n_min1_inv) + break + + V_prime = draw_V_prime(n_inv) + + return S, V_prime From 59679282c3c7d0c5ef13f3dd945f76539e3d9d8d Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Mon, 22 Apr 2019 23:34:37 +0300 Subject: [PATCH 8/9] finalize fully dynamic setting --- algorithms/fsm/dynamic/exact_counting.py | 5 +-- algorithms/fsm/dynamic/naive_reservoir.py | 14 +++++--- algorithms/fsm/dynamic/optimized_reservoir.py | 35 +++++++++---------- algorithms/fsm/incremental/exact_counting.py | 5 +-- algorithms/fsm/incremental/naive_reservoir.py | 15 +++----- .../fsm/incremental/optimized_reservoir.py | 14 +++----- sampling/skip_rp.py | 24 +++++++------ 7 files changed, 53 insertions(+), 59 deletions(-) diff --git a/algorithms/fsm/dynamic/exact_counting.py b/algorithms/fsm/dynamic/exact_counting.py index 8c85379..2cf10e6 100644 --- a/algorithms/fsm/dynamic/exact_counting.py +++ b/algorithms/fsm/dynamic/exact_counting.py @@ -48,7 +48,8 @@ def remove_edge(self, edge): e_add_end = datetime.now() ms = timedelta(microseconds=1) - self.metrics['edge_remove_ms'].append((e_add_end - e_add_start) / ms) - self.metrics['removed_subgraph_count'].append(len(removals)) + self.metrics['edge_op'].append('del') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_processed_subgraphs'].append(len(removals)) return True diff --git a/algorithms/fsm/dynamic/naive_reservoir.py b/algorithms/fsm/dynamic/naive_reservoir.py index 02d9812..e1f20dd 100644 --- a/algorithms/fsm/dynamic/naive_reservoir.py +++ b/algorithms/fsm/dynamic/naive_reservoir.py @@ -79,14 +79,17 @@ def remove_edge(self, edge): removals = self.get_new_subgraphs(u, v) D = len(removals) - old_c1 = self.c1 + # we start compensating for subgraph deletions with variables c1 and c2 + # after the reservoir has filled up once + compensate_for_removals = self.reservoir.is_full() or (self.c1 + self.c2) > 0 + removals_from_sample = 0 # find all subgraphs in the reservoir containing nodes u and v for old_subg in self.reservoir.get_common_subgraphs(u, v): if frozenset(old_subg.nodes) in removals: # subgraph is no longer connected after edge removal, remove it - self.process_old_subgraph(old_subg) + removals_from_sample += int(self.process_old_subgraph(old_subg)) else: # subgraph stays connected after edge removal, replace it old_edges = old_subg.edges @@ -95,8 +98,9 @@ def remove_edge(self, edge): self.process_existing_subgraph(old_subg, new_subg) # update the count of uncompensated deletions from outside the sample - removals_from_sample = self.c1 - old_c1 - self.c2 += D - removals_from_sample + if compensate_for_removals: + self.c1 += removals_from_sample + self.c2 += D - removals_from_sample self.N -= D @@ -138,4 +142,4 @@ def process_new_subgraph(self, subgraph): def process_old_subgraph(self, subgraph): - if self.reservoir.remove(subgraph): self.c1 += 1 + return self.reservoir.remove(subgraph) diff --git a/algorithms/fsm/dynamic/optimized_reservoir.py b/algorithms/fsm/dynamic/optimized_reservoir.py index 684b483..9fc0d12 100644 --- a/algorithms/fsm/dynamic/optimized_reservoir.py +++ b/algorithms/fsm/dynamic/optimized_reservoir.py @@ -6,14 +6,14 @@ from graph.simple_graph import SimpleGraph -from subgraph.util import make_subgraph +from subgraph.util import make_subgraph, make_subgraph_edge from subgraph.pattern import canonical_label from sampling import skip_rp from sampling.skip_rs import SkipRS -class IncerementalOptimizedReservoirAlgorithm(ReservoirAlgorithm): +class DynamicOptimizedReservoirAlgorithm(ReservoirAlgorithm): def __init__(self, k=3, M=1000): @@ -70,25 +70,24 @@ def add_edge(self, edge): # RANDOM PAIRING STEP - if self.d > 0: - sum_rp = 0 + sum_rp = 0 - while (self.d > 0) and (sum_rp < W): - num_picked_subgraphs = 0 - Z_rp = skip_rp.skip_records(self.c1, self.d) + while (self.d > 0) and (sum_rp < W): + num_picked_subgraphs = 0 + Z_rp = skip_rp.skip_records(self.c1, self.d) - if sum_rp + Z_rp < W: - num_picked_subgraphs = int(self.c1 > 0) - else: - Z_rp = W - sum_rp + if sum_rp + Z_rp < W: + num_picked_subgraphs = int(self.c1 > 0) + else: + Z_rp = W - sum_rp - I += num_picked_subgraphs - self.c1 -= num_picked_subgraphs - self.c2 -= Z_rp + I += num_picked_subgraphs + self.c1 -= num_picked_subgraphs + self.c2 -= Z_rp - sum_rp += Z_rp + num_picked_subgraphs + sum_rp += Z_rp + num_picked_subgraphs - W -= sum_rp + W -= sum_rp # RANDOM SAMPLING STEP @@ -99,8 +98,8 @@ def add_edge(self, edge): self.N += Z_rs + 1 self.s += Z_rs + 1 - # sample I subgraphs from the W candidates - if I < W: + # sample I subgraphs from among the candidates + if I < len(subgraph_candidates): additions = random.sample(subgraph_candidates, I) else: additions = subgraph_candidates diff --git a/algorithms/fsm/incremental/exact_counting.py b/algorithms/fsm/incremental/exact_counting.py index 221592f..d3b1fb9 100644 --- a/algorithms/fsm/incremental/exact_counting.py +++ b/algorithms/fsm/incremental/exact_counting.py @@ -48,8 +48,9 @@ def add_edge(self, edge): e_add_end = datetime.now() ms = timedelta(microseconds=1) - self.metrics['edge_add_ms'].append((e_add_end - e_add_start) / ms) - self.metrics['new_subgraph_count'].append(len(additions)) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_processed_subgraphs'].append(len(additions)) return True diff --git a/algorithms/fsm/incremental/naive_reservoir.py b/algorithms/fsm/incremental/naive_reservoir.py index de45fc9..f7390d3 100644 --- a/algorithms/fsm/incremental/naive_reservoir.py +++ b/algorithms/fsm/incremental/naive_reservoir.py @@ -24,16 +24,11 @@ def add_edge(self, edge): v = edge.get_v() # replace update all existing subgraphs with u and v in the reservoir - s_rep_start = datetime.now() - for old_subg in self.reservoir.get_common_subgraphs(u, v): new_subg = make_subgraph(old_subg.nodes, old_subg.edges + (edge,)) self.process_existing_subgraph(old_subg, new_subg) - s_rep_end = datetime.now() - # find new subgraph candidates for the reservoir - s_add_start = datetime.now() additions = self.get_new_subgraphs(u, v) # perform reservoir sampling for each new subgraph candidate @@ -43,18 +38,16 @@ def add_edge(self, edge): edges = self.graph.get_induced_edges(nodes) subgraph = make_subgraph(nodes, edges+[edge]) I += int(self.process_new_subgraph(subgraph)) - s_add_end = datetime.now() self.graph.add_edge(edge) e_add_end = datetime.now() ms = timedelta(microseconds=1) - self.metrics['edge_add_ms'].append((e_add_end - e_add_start) / ms) - self.metrics['subgraph_add_ms'].append((s_add_end - s_add_start) / ms) - self.metrics['subgraph_replace_ms'].append((s_rep_end - s_rep_start) / ms) - self.metrics['new_subgraph_count'].append(len(additions)) - self.metrics['included_subgraph_count'].append(I) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_candidate_subgraphs'].append(len(additions)) + self.metrics['num_processed_subgraphs'].append(I) self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) return True diff --git a/algorithms/fsm/incremental/optimized_reservoir.py b/algorithms/fsm/incremental/optimized_reservoir.py index df7a3fe..21d7df9 100644 --- a/algorithms/fsm/incremental/optimized_reservoir.py +++ b/algorithms/fsm/incremental/optimized_reservoir.py @@ -30,14 +30,11 @@ def add_edge(self, edge): v = edge.get_v() # replace update all existing subgraphs with u and v in the reservoir - s_rep_start = datetime.now() for old_subg in self.reservoir.get_common_subgraphs(u, v): new_subg = make_subgraph(old_subg.nodes, old_subg.edges + (edge,)) self.process_existing_subgraph(old_subg, new_subg) - s_rep_end = datetime.now() # find new subgraph candidates for the reservoir - s_add_start = datetime.now() subgraph_candidates = self.get_new_subgraphs(u, v) W = len(subgraph_candidates) @@ -69,19 +66,16 @@ def add_edge(self, edge): subgraph = make_subgraph(nodes, edges+[edge]) self.process_new_subgraph(subgraph) - s_add_end = datetime.now() - self.graph.add_edge(edge) self.s -= W e_add_end = datetime.now() ms = timedelta(microseconds=1) - self.metrics['edge_add_ms'].append((e_add_end - e_add_start) / ms) - self.metrics['subgraph_add_ms'].append((s_add_end - s_add_start) / ms) - self.metrics['subgraph_replace_ms'].append((s_rep_end - s_rep_start) / ms) - self.metrics['new_subgraph_count'].append(W) - self.metrics['included_subgraph_count'].append(I) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_candidate_subgraphs'].append(W) + self.metrics['num_processed_subgraphs'].append(I) self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) self.metrics['skiprs_treshold_bool'].append(int(self.skip_rs.is_threshold_reached(self.N))) diff --git a/sampling/skip_rp.py b/sampling/skip_rp.py index 07621aa..1bc2906 100644 --- a/sampling/skip_rp.py +++ b/sampling/skip_rp.py @@ -20,19 +20,21 @@ def skip_records(n, N): :rtype: int """ - V_prime = draw_V_prime(1 / float(n)) - threshold = n * ALPHA_INV S = N - if (n > 1): - if (threshold < N): - S, = algorithm_D(n, N, V_prime) + if n > 0: + V_prime = draw_V_prime(1 / float(n)) + + if (n > 1): + if (threshold < N): + S = algorithm_D(n, N, V_prime) + else: + S = algorithm_A(n, N) else: - S = algorithm_A(n, N) - elif (n == 1): - S = int(N * V_prime) + # the n == 1 special case + S = int(N * V_prime) return S @@ -58,7 +60,7 @@ def algorithm_A(n, N): -def algorithm_D(n, N, V_prime) +def algorithm_D(n, N, V_prime): n_inv = 1 / float(n) n_min1_inv = 1 / float(n - 1) @@ -99,9 +101,9 @@ def algorithm_D(n, N, V_prime) bottom -= 1 if N / float(N - X) >= y1 * np.exp(np.log(y2) * n_min1_inv): - V_prime = draw_V_prime(n_min1_inv) + # V_prime = draw_V_prime(n_min1_inv) break V_prime = draw_V_prime(n_inv) - return S, V_prime + return S From acdcad46fe32cc7d91ee7ecbf1279172647c7c6d Mon Sep 17 00:00:00 2001 From: Jaakko Hynynen Date: Mon, 22 Apr 2019 23:37:30 +0300 Subject: [PATCH 9/9] grok the continuous accuracy experiment to run tests on triton --- continuous_accuracy.py | 166 ++++++++++++++++++++++++++++------------- 1 file changed, 115 insertions(+), 51 deletions(-) diff --git a/continuous_accuracy.py b/continuous_accuracy.py index 5cb83a9..f5ddb79 100644 --- a/continuous_accuracy.py +++ b/continuous_accuracy.py @@ -6,6 +6,7 @@ import numpy as np import matplotlib.pyplot as plt +from datetime import datetime, timedelta from collections import deque, defaultdict from argparse import ArgumentParser, FileType @@ -23,6 +24,7 @@ from algorithms.fsm.dynamic.exact_counting import DynamicExactCountingAlgorithm from algorithms.fsm.dynamic.naive_reservoir import DynamicNaiveReservoirAlgorithm +from algorithms.fsm.dynamic.optimized_reservoir import DynamicOptimizedReservoirAlgorithm ACCURACY_SAMPLE_INTERVAL = 10 @@ -37,16 +39,19 @@ 'dynamic': { 'exact': DynamicExactCountingAlgorithm, 'naive': DynamicNaiveReservoirAlgorithm, - 'optimal': None + 'optimal': DynamicOptimizedReservoirAlgorithm } } -def run_simulation(simulators, edges, window_size=0): +def run_simulation(simulators, edges, T_k, window_size=0): + ms = timedelta(microseconds=1) + np.random.shuffle(edges) - accuracy_metrics = defaultdict(list) + accuracy_metrics = defaultdict(lambda: defaultdict(list)) + performance_metrics = defaultdict(list) if window_size > 0: sliding_window = deque() @@ -54,38 +59,52 @@ def run_simulation(simulators, edges, window_size=0): start_time = time.time() for idx, edge_to_add in enumerate(edges): - simulators['exact'].add_edge(edge_to_add) - simulators['naive'].add_edge(edge_to_add) + + edge_to_remove = None if window_size > 0: sliding_window.append(edge_to_add) if len(sliding_window) > window_size: - edge_to_remove = sliding_window.popleft() - simulators['exact'].remove_edge(edge_to_add) - simulators['naive'].remove_edge(edge_to_add) + edge_to_remove = sliding_window.popleft() + + + for algorithm in ['exact', 'naive', 'optimal']: + + edge_op_start = datetime.now() + + simulators[algorithm].add_edge(edge_to_add) + + if edge_to_remove != None: + simulators[algorithm].remove_edge(edge_to_remove) + + edge_op_end = datetime.now() + + performance_metrics[algorithm].append((edge_op_end - edge_op_start) / ms) + if idx % ACCURACY_SAMPLE_INTERVAL == 0: exact_patterns = +simulators['exact'].patterns - naive_patterns = +simulators['naive'].patterns - exact_freqs = pattern_frequencies(exact_patterns) - naive_freqs = pattern_frequencies(naive_patterns) + exact_t_freqs = threshold_frequencies(exact_freqs, 0.005) + + for algo_type in ['naive', 'optimal']: - exact_freqs = threshold_frequencies(exact_freqs, 0.005) - naive_freqs = threshold_frequencies(naive_freqs, 0.005) + algo_patterns = +simulators[algo_type].patterns + algo_freqs = pattern_frequencies(algo_patterns) + algo_t_freqs = threshold_frequencies(algo_freqs, 0.005) - prec = precision(exact_freqs, naive_freqs) - rec = recall(exact_freqs, naive_freqs) - are = avg_relative_error(exact_freqs, naive_freqs, 40) + prec = precision(exact_t_freqs, algo_t_freqs) + rec = recall(exact_t_freqs, algo_t_freqs) + are = avg_relative_error(exact_t_freqs, algo_t_freqs, T_k) - accuracy_metrics['precision'].append(prec) - accuracy_metrics['recall'].append(rec) - accuracy_metrics['avg_relative_error'].append(are) + accuracy_metrics[algo_type]['precision'].append(prec) + accuracy_metrics[algo_type]['recall'].append(rec) + accuracy_metrics[algo_type]['avg_relative_error'].append(are) end_time = time.time() - return end_time - start_time, accuracy_metrics + return end_time - start_time, accuracy_metrics, performance_metrics def main(): @@ -109,15 +128,17 @@ def main(): type=FileType('r'), help="path to the input graph edge file") - ''' parser.add_argument('output_dir', help="path to the directory for output files") - ''' parser.add_argument('reservoir_size', type=int, help="reservoir size required for naive and optimal algorithms") + parser.add_argument('T_k', + type=int, + help="number of possible k-node subgraph patterns") + parser.add_argument('-t', '--times', type=int, default=10, @@ -133,7 +154,9 @@ def main(): k = args['k'] #algo = args['algorithm'] stream = args['stream_setting'] + output_dir = args['output_dir'] M = args['reservoir_size'] + T_k = args['T_k'] times = args['times'] window_size = args['window_size'] @@ -147,21 +170,17 @@ def main(): #print("algorithm: ", algo) print("k: ", k) print("M: ", M) + print("T_k: ", T_k) print("times: ", times) print("window size: ", window_size) - print("input graph: ", in_file.name, "\n") + print("input graph: ", in_file.name) + print("output dir: ", output_dir, "\n") ExactAlgorithm = ALGORITHMS[stream]['exact'] NaiveAlgorithm = ALGORITHMS[stream]['naive'] + OptimizedAlgorithm = ALGORITHMS[stream]['optimal'] - if ExactAlgorithm == None: - msg = "exact algorithm is not available for %s stream setting" % (stream) - raise NotImplementedError(msg) - - if NaiveAlgorithm == None: - msg = "naive algorithm is not available for %s stream setting" % (stream) - raise NotImplementedError(msg) if window_size and stream != 'dynamic': msg = "sliding window is only used with %s stream setting" % (stream) @@ -176,8 +195,8 @@ def main(): # run simulations and collect the duration and metrics from each run durations = [] - run_metrics = defaultdict(list) run_accuracy_metrics = [] + run_performance_metrics = [] print("SIMULATIONS", "\n") @@ -186,47 +205,92 @@ def main(): simulators = { 'exact': ExactAlgorithm(k=k), - 'naive': NaiveAlgorithm(k=k, M=M) + 'naive': NaiveAlgorithm(k=k, M=M), + 'optimal': OptimizedAlgorithm(k=k, M=M) } - duration, accuracy_metrics = run_simulation(simulators, edges, window_size) + duration, acc_metrics, perf_metrics = run_simulation(simulators, edges, T_k, window_size) print("Done, run took", duration, "seconds.", "\n") - durations.append(duration) - run_accuracy_metrics.append(accuracy_metrics) + durations.append(duration) + run_accuracy_metrics.append(acc_metrics) + run_performance_metrics.append(perf_metrics) avg_duration = np.mean(durations) - print("Average duration of a run was", avg_duration, "seconds.") + print("Average duration of a run was", avg_duration, "seconds.", "\n") + + ec_edge_op_durations = [x['exact'] for x in run_performance_metrics] + nrs_edge_op_durations = [x['naive'] for x in run_performance_metrics] + ors_edge_op_durations = [x['optimal'] for x in run_performance_metrics] + + print("Average edge operation durations") + print("EC: %d ms" % (np.mean(ec_edge_op_durations))) + print("NRS: %d ms" % (np.mean(nrs_edge_op_durations))) + print("ORS: %d ms" % (np.mean(ors_edge_op_durations))) + + identifier = uuid.uuid4() # unique identifier for files + print("Plots of performance") - print("Plots of accuracy") + perf_plot_path = os.path.join(output_dir, "%s_performance_plot.pdf" % (identifier)) - avg_precision = np.mean(np.asarray([d['precision'] for d in run_accuracy_metrics]), axis=0) - avg_recall = np.mean(np.asarray([d['recall'] for d in run_accuracy_metrics]), axis=0) - avg_are = np.mean(np.asarray([d['avg_relative_error'] for d in run_accuracy_metrics]), axis=0) + x_values = np.arange(len(ec_edge_op_durations[0])) - edge_add_points = [ACCURACY_SAMPLE_INTERVAL * i for i in range(1, len(avg_precision) + 1)] + ec_avg_edge_op_durations = np.mean(ec_edge_op_durations, axis=0) + nrs_avg_edge_op_durations = np.mean(nrs_edge_op_durations, axis=0) + ors_avg_edge_op_durations = np.mean(ors_edge_op_durations, axis=0) fig = plt.figure() - ax = fig.add_subplot(311) + ax = fig.add_subplot(111) + ax.plot(x_values, ec_avg_edge_op_durations, label="EC") + ax.plot(x_values, nrs_avg_edge_op_durations, label="NRS") + ax.plot(x_values, ors_avg_edge_op_durations, label="ORS") + ax.legend() + plt.title("edge op durations, k="+str(k)) + plt.savefig(perf_plot_path) + + print("Saved figure to", perf_plot_path) + + + print("Plots of accuracy", "\n") + + for algo_type in ['naive', 'optimal']: + algo_acc_metrics = [d[algo_type] for d in run_accuracy_metrics] + + avg_precision = np.mean([c['precision'] for c in algo_acc_metrics], axis=0) + avg_recall = np.mean([c['recall'] for c in algo_acc_metrics], axis=0) + avg_are = np.mean([c['avg_relative_error'] for c in algo_acc_metrics], axis=0) + + x_values = [ACCURACY_SAMPLE_INTERVAL * i for i in range(1, len(avg_precision) + 1)] + + fig = plt.figure() + ax = fig.add_subplot(311) + + ax.plot(x_values, avg_precision) + plt.title("precision") + + ax = fig.add_subplot(312) + + ax.plot(x_values, avg_recall) + plt.title("recall") + + ax = fig.add_subplot(313) - ax.plot(edge_add_points, avg_precision) - plt.title("precision") + ax.plot(x_values, avg_are) + plt.title("average relative error") - ax = fig.add_subplot(312) + fig.suptitle('%s fully dynamic algorithm performance' % (algo_type), fontsize=16) - ax.plot(edge_add_points, avg_recall) - plt.title("recall") + plt.tight_layout() - ax = fig.add_subplot(313) + acc_plot_path = os.path.join(output_dir, "%s_%s_vs_ec_accuracy_plot.pdf" % (identifier, algo_type)) - ax.plot(edge_add_points, avg_are) - plt.title("average relative error") + plt.savefig(acc_plot_path) - plt.show() + print("Saved figure to", acc_plot_path) if __name__ == '__main__':