diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..f4ce391 --- /dev/null +++ b/README.txt @@ -0,0 +1,6 @@ +#LOT +This is the intern version of the project, feel free to play around with this repo + +Language of Thought Modelling of Cognitive Algorithms + +Hello World \ No newline at end of file diff --git a/cognitive_functions.py b/cognitive_functions.py index 816d459..ca8e656 100644 --- a/cognitive_functions.py +++ b/cognitive_functions.py @@ -1,129 +1,169 @@ import primitive_fucntions as pf from collections import defaultdict -from utils import Stopwatch, Element, ElementSet, Associations +from utils import Stopwatch, Element, ElementSet, Associations, SpaceComplexity # 1-D def iterate(S: ElementSet): # 112233 + items = _items_from_S(S) + sc_obj = SpaceComplexity() # preprocessing (not part of measured cognitive process) - n = len(S.elements) # number of elements in the set - chunks = defaultdict(list) + n = len(items) # number of elements in the set + + # persistent container: chunks + sc_obj.startcounttemp() + chunks = defaultdict(list) stopwatch = Stopwatch() - # --- # - # select attribute which chunking is based on - ### ADD CODE HERE - """ graph algo here""" - bias = find_bias(S.elements, stopwatch) + bias = find_bias(items, stopwatch) for _ in range(n//2): - element = pf.sample(S) - result = None - time_elapsed = None + _ = pf.sample(items) - - # """ non graph algo here""" - bias = find_bias(S, stopwatch) + bias = find_bias(items, stopwatch) stopwatch.start() for _ in range(n): - element = pf.sample(S) # select an element in the set + element = pf.sample(items) # select an element in the set sorter = getattr(element, bias) chunks[sorter].append(element) - pf.setminus(S, element) + pf.setminus(items, element) stopwatch.stop() - n = len(chunks) # reassign n - chunks = {tuple(v) for k, v in chunks.items()} - stopwatch.start() + sc_obj.startcounttemp() + n_chunks = len(chunks) # reassign n + chunks_set = {tuple(v) for k, v in chunks.items()} + + sc_obj.startcounttemp() result = [] - for _ in range(n): - chunk = pf.sample(chunks) - stopwatch.stop() + stopwatch.start() + for _ in range(n_chunks): + chunk = pf.sample(chunks_set) + sc_obj.startcounttemp() temp = list(chunk) - stopwatch.start() - pf.append(result, temp) - pf.setminus(chunks, chunk) + sc_obj.subtemp() + result = pf.append(result, temp) + pf.setminus(chunks_set, chunk) stopwatch.stop() time_elapsed = stopwatch.get_elapsed_time() + space_used = sc_obj.get_max() - return (result, time_elapsed) + return (result, time_elapsed, space_used) def palindrome(S): - # 123321 + items = _items_from_S(S) + sc_obj = SpaceComplexity() + # preprocessing (not part of measured cognitive process) - n = len(S) // 2 # number of elements in basis + n = len(items) // 2 # number of elements in basis stopwatch = Stopwatch() + # select attribute which chunking is based on - bias = find_bias(S, stopwatch) + bias = find_bias(items, stopwatch) stopwatch.start() - basis, rev = [], [] - # write_random() based implementation - ### WRITE CODE HERE - while (len(S) > n): - element = pf.sample(S) - if len(basis)==0 or not (any(pf.check_if_same_type(element, chosen, bias) for chosen in basis)): + + sc_obj.startcounttemp() + basis = [] + sc_obj.startcounttemp() + rev = [] + + # write_random() fallback implementation: + def _pick_matching_element(pool, attr, value): + sc_obj.startcounttemp() + candidates = [e for e in pool if getattr(e, attr) == value] + sc_obj.subtemp() + if not candidates: + return None + return pf.sample(set(candidates)) + + while len(items) > n: + element = pf.sample(items) + if len(basis)==0 or not any(getattr(element, bias) == getattr(chosen, bias) for chosen in basis): pf.pair(basis, element) - pf.setminus(S, element) + pf.setminus(items, element) + for _ in range(n): - element = pf.write_random(S, bias, getattr(basis[n-1-_], bias)) - pf.pair(rev,element) - pf.setminus(S, element) - result = pf.append(basis,rev) + target_type = getattr(basis[n-1-_], bias) + element = None + try: + candidate = pf.write_random(items, bias, target_type) + except Exception: + candidate = None + if candidate: + element = candidate + else: + element = _pick_matching_element(items, bias, target_type) + if element is None: + if len(items) == 0: + break + element = pf.sample(items) + pf.pair(rev, element) + pf.setminus(items, element) + + result = pf.append(basis, rev) stopwatch.stop() time_elapsed = stopwatch.get_elapsed_time() - - return (result, time_elapsed) + space_used = sc_obj.get_max() + + return (result, time_elapsed, space_used) def alternate(S): - # 121212 - # preprocessing (not part of measured cognitive process) - n = len(S) # number of elements in the set + items = _items_from_S(S) + sc_obj = SpaceComplexity() stopwatch = Stopwatch() - # --- # - # select attribute which chunking is based on - ### ADD CODE HERE - ### subject knows what types of attributes are there - ### and what type of attribute to select - bias = find_bias(S,stopwatch,two_flag=True) + sc_obj.startcounttemp() result = [] - while (len(S) > 0): - element = pf.sample(S) - if len(result) == 0 or not pf.check_if_same_type(element, result[-1], bias): + + # select attribute which chunking is based on + bias = find_bias(items, stopwatch, two_flag=True) + stopwatch.start() + while len(items) > 0: + element = pf.sample(items) + if len(result) == 0 or not (getattr(element, bias) == getattr(result[-1], bias)): pf.pair(result, element) - pf.setminus(S, element) + pf.setminus(items, element) + else: + # skip and try again + continue + stopwatch.stop() time_elapsed = stopwatch.get_elapsed_time() + space_used = sc_obj.get_max() - return (result, time_elapsed) + return (result, time_elapsed, space_used) def chaining(S, associations: dict): - # preprocessing (not part of measured cognitive process) - n = len(S) # number of elements in the set + items = _items_from_S(S) + sc_obj = SpaceComplexity() stopwatch = Stopwatch() - # --- # - stopwatch.start() + + sc_obj.startcounttemp() chunks = [] - result = [] - while (len(S) > 0): - element = pf.sample(S) + stopwatch.start() + while len(items) > 0: + element = pf.sample(items) if element in associations.keys(): + sc_obj.startcounttemp() # chunk persisted inside chunks chunk = [] pf.pair(chunk, element) - pf.setminus(S, element) + pf.setminus(items, element) while True: - next_element = pf.sample(S) + if len(items) == 0: + break + next_element = pf.sample(items) if next_element == associations[element]: pf.pair(chunk, next_element) - pf.setminus(S, next_element) + pf.setminus(items, next_element) pf.pair(chunks, chunk) break else: continue else: + # element had no association; skip it continue stopwatch.stop() time_elapsed = stopwatch.get_elapsed_time() + space_used = sc_obj.get_max() - return (result, time_elapsed) + return (chunks, time_elapsed, space_used) def seriate(S): # 123123 @@ -132,55 +172,125 @@ def seriate(S): # -------- 2-D -------- # def serial_crossed(S): - n = len(S) // 2 # number of elements in the basis + items = _items_from_S(S) + sc_obj = SpaceComplexity() stopwatch = Stopwatch() - bias = find_bias(S, stopwatch, higher_dim=True) + + n = len(items) // 2 # number of elements in the basis + + bias = find_bias(items, stopwatch, higher_dim=True) + + sc_obj.startcounttemp() result = [] - while len(S) > n: - element = pf.sample(S) - if len(result) == 0 or pf.check_if_same_type(element, result[-1], bias[0]): + + stopwatch.start() + while len(items) > n: + element = pf.sample(items) + if len(result) == 0 or getattr(element, bias[0]) == getattr(result[-1], bias[0]): pf.pair(result, element) - pf.setminus(S, element) + pf.setminus(items, element) + for _ in range(n): - element = pf.write_random(S, bias[1], getattr(result[_], bias[1])) - pf.pair(result, element) - pf.setminus(S, element) + target_type = getattr(result[_], bias[1]) + chosen = None + try: + chosen = pf.write_random(items, bias[1], target_type) + except Exception: + chosen = None + if not chosen: + sc_obj.startcounttemp() + candidates = [e for e in items if getattr(e, bias[1]) == target_type] + sc_obj.subtemp() + chosen = pf.sample(set(candidates)) if candidates else None + if chosen is None: + if len(items) == 0: + break + chosen = pf.sample(items) + pf.pair(result, chosen) + pf.setminus(items, chosen) + stopwatch.stop() time_elapsed = stopwatch.get_elapsed_time() - return (result, time_elapsed) + space_used = sc_obj.get_max() + return (result, time_elapsed, space_used) def center_embedded(S): - n = len(S) // 2 # number of elements in the basis - # + items = _items_from_S(S) + sc_obj = SpaceComplexity() stopwatch = Stopwatch() - bias = find_bias(S, stopwatch, higher_dim=True) + + n = len(items) // 2 + bias = find_bias(items, stopwatch, higher_dim=True) + + sc_obj.startcounttemp() result = [] - while len(S) > 0: - element = pf.sample(S) - if len(result) == 0 or pf.check_if_same_type(element, result[-1], bias[0]): + + stopwatch.start() + while len(items) > 0: + element = pf.sample(items) + if len(result) == 0 or getattr(element, bias[0]) == getattr(result[-1], bias[0]): pf.pair(result, element) - pf.setminus(S, element) + pf.setminus(items, element) + for _ in range(n): - element = pf.write_random(S, bias[1], getattr(result[n - 1 - _], bias[1])) - pf.pair(result, element) - pf.setminus(S, element) + target_type = getattr(result[n - 1 - _], bias[1]) + # try write_random + chosen = None + try: + chosen = pf.write_random(items, bias[1], target_type) + except Exception: + chosen = None + if not chosen: + sc_obj.startcounttemp() + candidates = [e for e in items if getattr(e, bias[1]) == target_type] + sc_obj.subtemp() + chosen = pf.sample(set(candidates)) if candidates else None + if chosen is None: + if len(items) == 0: + break + chosen = pf.sample(items) + pf.pair(result, chosen) + pf.setminus(items, chosen) stopwatch.stop() time_elapsed = stopwatch.get_elapsed_time() - return (result, time_elapsed) + space_used = sc_obj.get_max() + return (result, time_elapsed, space_used) def tail_recursive(S): + items = _items_from_S(S) + sc_obj = SpaceComplexity() stopwatch = Stopwatch() - bias = find_bias(S, stopwatch, two_flag=True, higher_dim=True) + + bias = find_bias(items, stopwatch, two_flag=True, higher_dim=True) stopwatch.start() + + sc_obj.startcounttemp() result = [] - while len(S) > 0: - element = pf.sample(S) - pf.setminus(S, element) - paired_element = pf.write_random(S, bias[0], getattr(element, bias[0])) + + while len(items) > 0: + element = pf.sample(items) + pf.setminus(items, element) + target_type = getattr(element, bias[0]) + # try write_random / fallback + paired_element = None + try: + paired_element = pf.write_random(items, bias[0], target_type) + except Exception: + paired_element = None + if not paired_element: + sc_obj.startcounttemp() + candidates = [e for e in items if getattr(e, bias[0]) == target_type] + sc_obj.subtemp() + paired_element = pf.sample(set(candidates)) if candidates else None + if paired_element is None: + if len(items) == 0: + break + paired_element = pf.sample(items) result = pf.append(result, pf.merge(element, paired_element)) - pf.setminus(S, paired_element) + pf.setminus(items, paired_element) stopwatch.stop() time_elapsed = stopwatch.get_elapsed_time() - return (result, time_elapsed) + space_used = sc_obj.get_max() + return (result, time_elapsed, space_used) # ---------------------------------------------------------------------# diff --git a/experiments.ipynb b/experiments.ipynb index 0562298..c1cdedc 100644 --- a/experiments.ipynb +++ b/experiments.ipynb @@ -2,11 +2,33 @@ "cells": [ { "cell_type": "code", - "execution_count": 11, + "execution_count": 1, "id": "08bf7d5a", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Requirement already satisfied: networkx in c:\\users\\mikea\\appdata\\local\\programs\\python\\python312\\lib\\site-packages (3.3)\n", + "Note: you may need to restart the kernel to use updated packages.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING: Ignoring invalid distribution ~ontourpy (c:\\Users\\Mikea\\AppData\\Local\\Programs\\Python\\Python312\\Lib\\site-packages)\n", + "WARNING: Ignoring invalid distribution ~ontourpy (c:\\Users\\Mikea\\AppData\\Local\\Programs\\Python\\Python312\\Lib\\site-packages)\n", + "WARNING: Ignoring invalid distribution ~ontourpy (c:\\Users\\Mikea\\AppData\\Local\\Programs\\Python\\Python312\\Lib\\site-packages)\n", + "c:\\Users\\Mikea\\AppData\\Local\\Programs\\Python\\Python312\\Lib\\site-packages\\seaborn\\_statistics.py:32: UserWarning: A NumPy version >=1.23.5 and <2.3.0 is required for this version of SciPy (detected version 2.3.0)\n", + " from scipy.stats import gaussian_kde\n" + ] + } + ], "source": [ + "%pip install networkx\n", + "\n", "import numpy as np\n", "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", @@ -281,7 +303,7 @@ ], "metadata": { "kernelspec": { - "display_name": ".venv", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -295,7 +317,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.2" + "version": "3.12.4" } }, "nbformat": 4, diff --git a/playground.ipynb b/playground.ipynb index ffece1a..4fd0240 100644 --- a/playground.ipynb +++ b/playground.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -26,33 +26,22 @@ "metadata": {}, "outputs": [], "source": [ - "obj1 = Element(\"ridhi\",\"yellow\", \"square\")\n", - "obj2 = Element(\"2\",\"blue\", \"square\")\n", - "obj3 = Element(\"3\",\"yellow\", \"circle\")\n", - "obj4 = Element(\"4\",\"blue\", \"circle\")\n" + "obj1 = (\"ridhi\",\"yellow\", \"square\")\n", + "obj2 = (\"2\",\"blue\", \"square\")\n", + "obj3 = (\"3\",\"yellow\", \"circle\")\n", + "obj4 = (\"4\",\"blue\", \"circle\")\n" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 5, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 3, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "S = {obj1, obj2, obj3, obj4}\n", - "assoc = Associations({obj1: obj2, obj3: obj4})\n", - "S = ElementSet(S, assoc)\n", - "S.graph\n" + "# assoc = Associations({obj1: obj2, obj3: obj4})\n", + "# S = ElementSet(S, assoc)\n", + "# S.graph\n" ] }, { @@ -206,11 +195,33 @@ "for u, v, d in G.edges(data=True):\n", " print(f\"{u} --({d['label']})--> {v}\")" ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "ename": "UnboundLocalError", + "evalue": "cannot access local variable 'sc' where it is not associated with a value", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mUnboundLocalError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[6]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m \u001b[43mcf\u001b[49m\u001b[43m.\u001b[49m\u001b[43miterate\u001b[49m\u001b[43m(\u001b[49m\u001b[43mS\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32mc:\\localpython\\lot_25\\LoT-modeling\\cognitive_functions.py:9\u001b[39m, in \u001b[36miterate\u001b[39m\u001b[34m(S)\u001b[39m\n\u001b[32m 8\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34miterate\u001b[39m(S): \u001b[38;5;66;03m# 112233 \u001b[39;00m\n\u001b[32m----> \u001b[39m\u001b[32m9\u001b[39m sc = \u001b[43msc\u001b[49m()\n\u001b[32m 10\u001b[39m n = \u001b[38;5;28mlen\u001b[39m(S.elements) \u001b[38;5;66;03m# number of elements in the set\u001b[39;00m\n\u001b[32m 11\u001b[39m sc.startcounttemp(\u001b[32m1\u001b[39m) \u001b[38;5;66;03m# n: 1 unit\u001b[39;00m\n", + "\u001b[31mUnboundLocalError\u001b[39m: cannot access local variable 'sc' where it is not associated with a value" + ] + } + ], + "source": [ + "cf.iterate(S)" + ] } ], "metadata": { "kernelspec": { - "display_name": ".venv", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -224,7 +235,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.2" + "version": "3.12.4" } }, "nbformat": 4, diff --git a/primitive_fucntions.py b/primitive_fucntions.py index 55fd25c..d5338b6 100644 --- a/primitive_fucntions.py +++ b/primitive_fucntions.py @@ -28,16 +28,16 @@ def flip(p): # Time complexity: O(len(set1) + len(set2))""" # return set1 | set2 + def setminus(set1, s): - """Remove a string from a set - Time complexity: O(1) for single item, O(len(s)) for set s""" + #Remove a string from a set + #Time complexity: O(1) for single item, O(len(s)) for set s set1.remove(s) def sample(collection): - """Sample from a set or list of strings. - Time complexity: O(1) for non-empty sets if using random.choice, - O(n) for lists using random.sample. - """ + #Sample from a set or list of strings. + #Time complexity: O(1) for non-empty sets if using random.choice, + #O(n) for lists using random.sample. if not collection: return None @@ -46,6 +46,7 @@ def sample(collection): return random.sample(collection, 1)[0] + # # Function calls with memoization # memoization_cache = {} @@ -88,13 +89,15 @@ def check_if_same_type(e1, e2, bias): Time complexity: O(1)""" return getattr(e1,bias) == getattr(e2,bias) -def write_random(G, bias, type): - """Returns one unused member of particular type - Time complexity: O(n) to filter tokens""" - # for element in S: - # if getattr(element, bias) == type: - # return element - type_elements = [u for u, v, d in G.edges(data=True) if v == type and d["label"] == bias] + + +def write_random(S, bias, type): + #Returns one unused member of particular type + #Time complexity: O(n) to filter tokens + for element in S: + if getattr(element, bias) == type: + return element + #type_elements = [u for u, v, d in G.edges(data=True) if v == type and d["label"] == bias] ### WRITE CODE ### maybe add a random list shuffling thing here @@ -102,6 +105,8 @@ def write_random(G, bias, type): pass + + def implement(FUN, N): """Keeps implementing a function N times Time complexity: O(N * T) where T is time of FUN""" diff --git a/sc_experiment.ipynb b/sc_experiment.ipynb new file mode 100644 index 0000000..b9a783d --- /dev/null +++ b/sc_experiment.ipynb @@ -0,0 +1,86 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "e786240c", + "metadata": {}, + "outputs": [], + "source": [ + "import cognitive_functions as cf\n", + "from utils import Stopwatch, Element, Associations, ElementSet, pretty_view\n", + "from utils import SpaceComplexity as sc\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "ceb06be9", + "metadata": {}, + "outputs": [], + "source": [ + "obj1 = Element(\"ridhi\",\"yellow\", \"square\")\n", + "obj2 = Element(\"2\",\"blue\", \"square\")\n", + "obj3 = Element(\"3\",\"yellow\", \"circle\")\n", + "obj4 = Element(\"4\",\"blue\", \"circle\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "2868266c", + "metadata": {}, + "outputs": [], + "source": [ + "S = {obj1, obj2, obj3, obj4}\n", + "S = ElementSet(S)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "094c0d9b", + "metadata": {}, + "outputs": [ + { + "ename": "TypeError", + "evalue": "Population must be a sequence. For dicts or sets, use sorted(d).", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mTypeError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[6]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m \u001b[43mcf\u001b[49m\u001b[43m.\u001b[49m\u001b[43miterate\u001b[49m\u001b[43m(\u001b[49m\u001b[43mS\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32mc:\\localpython\\lot_25\\LoT-modeling\\cognitive_functions.py:20\u001b[39m, in \u001b[36miterate\u001b[39m\u001b[34m(S)\u001b[39m\n\u001b[32m 18\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m _ \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mrange\u001b[39m(n//\u001b[32m2\u001b[39m):\n\u001b[32m 19\u001b[39m sp.startcounttemp() \u001b[38;5;66;03m# element: 2 units (1-D)\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m20\u001b[39m element = \u001b[43mpf\u001b[49m\u001b[43m.\u001b[49m\u001b[43msample\u001b[49m\u001b[43m(\u001b[49m\u001b[43mS\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 21\u001b[39m sp.subtemp()\n\u001b[32m 22\u001b[39m result = \u001b[38;5;28;01mNone\u001b[39;00m\n", + "\u001b[36mFile \u001b[39m\u001b[32mc:\\localpython\\lot_25\\LoT-modeling\\primitive_fucntions.py:48\u001b[39m, in \u001b[36msample\u001b[39m\u001b[34m(collection)\u001b[39m\n\u001b[32m 45\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(collection, \u001b[38;5;28mset\u001b[39m):\n\u001b[32m 46\u001b[39m collection = \u001b[38;5;28mtuple\u001b[39m(collection) \u001b[38;5;66;03m# Convert to tuple for sampling\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m48\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mrandom\u001b[49m\u001b[43m.\u001b[49m\u001b[43msample\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcollection\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[32;43m1\u001b[39;49m\u001b[43m)\u001b[49m[\u001b[32m0\u001b[39m]\n", + "\u001b[36mFile \u001b[39m\u001b[32mc:\\Users\\Mikea\\AppData\\Local\\Programs\\Python\\Python312\\Lib\\random.py:413\u001b[39m, in \u001b[36mRandom.sample\u001b[39m\u001b[34m(self, population, k, counts)\u001b[39m\n\u001b[32m 389\u001b[39m \u001b[38;5;66;03m# Sampling without replacement entails tracking either potential\u001b[39;00m\n\u001b[32m 390\u001b[39m \u001b[38;5;66;03m# selections (the pool) in a list or previous selections in a set.\u001b[39;00m\n\u001b[32m 391\u001b[39m \n\u001b[32m (...)\u001b[39m\u001b[32m 409\u001b[39m \u001b[38;5;66;03m# too many calls to _randbelow(), making them slower and\u001b[39;00m\n\u001b[32m 410\u001b[39m \u001b[38;5;66;03m# causing them to eat more entropy than necessary.\u001b[39;00m\n\u001b[32m 412\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(population, _Sequence):\n\u001b[32m--> \u001b[39m\u001b[32m413\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m(\u001b[33m\"\u001b[39m\u001b[33mPopulation must be a sequence. \u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 414\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mFor dicts or sets, use sorted(d).\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 415\u001b[39m n = \u001b[38;5;28mlen\u001b[39m(population)\n\u001b[32m 416\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m counts \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n", + "\u001b[31mTypeError\u001b[39m: Population must be a sequence. For dicts or sets, use sorted(d)." + ] + } + ], + "source": [ + "cf.iterate(S)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/simulation.py b/simulation.py index ced6c35..587e98d 100644 --- a/simulation.py +++ b/simulation.py @@ -123,4 +123,4 @@ def plot_simulation(metrics): ha='center', va='center', fontsize=10, color='black', xytext=(0, 5), textcoords='offset points') plt.tight_layout(rect=[0, 0, 1, 0.96]) - plt.show() + plt.show() \ No newline at end of file diff --git a/spaceUsedprim.py b/spaceUsedprim.py new file mode 100644 index 0000000..c59782c --- /dev/null +++ b/spaceUsedprim.py @@ -0,0 +1,232 @@ +import random +from utils import SpaceComplexity as sc + +SEED = 42 +random.seed(SEED) + +# Functions on lists (strings) + +# if the boolean return_space is True, the function returns a tuple (result, space_used) +# otherwise it returns just the result (as before) + +def pair(L, C, return_space: bool = False): + """Concatenate item C onto list L (in-place).""" + sc_obj = sc() + # pairing mutates L in-place (persistent) + sc_obj.startcounttemp() # account for the persistent container L + L.append(C) + # we keep the persistent allocation counted; no subtemp + space_used = sc_obj.get_max() + if return_space: + return (None, space_used) + return None + +def append(X, Y, return_space: bool = False): + """Append lists X and Y (returns a new list).""" + sc_obj = sc() + # allocating result list + sc_obj.startcounttemp() + result = X + Y + space_used = sc_obj.get_max() + if return_space: + return (result, space_used) + return result + +# Random functions + +def flip(p, return_space: bool = False): + """Returns True with probability p.""" + sc_obj = sc() + # trivial constant-space operation; count one slot to be explicit + sc_obj.startcounttemp() + val = random.random() < p + sc_obj.subtemp() + space_used = sc_obj.get_max() + if return_space: + return (val, space_used) + return val + +# Set functions + +def setminus(set1, s, return_space: bool = False): + """Remove an item from a set in-place.""" + sc_obj = sc() + sc_obj.startcounttemp() # persistent container set1 counted + set1.remove(s) + space_used = sc_obj.get_max() + if return_space: + return (None, space_used) + return None + +def sample(collection, return_space: bool = False): + """ + Sample one element from set or list. + Returns element (or None if empty). Default behavior unchanged. + """ + sc_obj = sc() + sc_obj.startcounttemp() # sampling overhead + if not collection: + sc_obj.subtemp() + space_used = sc_obj.get_max() + if return_space: + return (None, space_used) + return None + + if isinstance(collection, set): + # convert to tuple for sampling (temporary) + sc_obj.startcounttemp() + collection = tuple(collection) + sc_obj.subtemp() + + val = random.sample(collection, 1)[0] + sc_obj.subtemp() + space_used = sc_obj.get_max() + if return_space: + return (val, space_used) + return val + +# Token-related / utility functions + +def add(T, lst, return_space: bool = False): + """Return a new list with T appended (tracks space).""" + sc_obj = sc() + sc_obj.startcounttemp() # persistent result + result = lst.copy() + result.append(T) + space_used = sc_obj.get_max() + if return_space: + return (result, space_used) + return result + +def remove(T, lst, return_space: bool = False): + """Return a copy of list with T removed (tracks space).""" + sc_obj = sc() + sc_obj.startcounttemp() # persistent result + result = lst.copy() + if T in result: + result.remove(T) + space_used = sc_obj.get_max() + if return_space: + return (result, space_used) + return result + +def write_random(G, bias, type, return_space: bool = False): + """ + Given a NetworkX graph G, find nodes 'u' such that there is an edge (u -> v) + with v == type and edge data label == bias. Return a randomly-selected u or None. + """ + sc_obj = sc() + sc_obj.startcounttemp() # temporary buffer for matches + type_elements = [] + + # Support simple edges (u, v, d) and other edge tuple shapes + for edge in G.edges(data=True): + if len(edge) == 3: + u, v, d = edge + else: + u, v, *rest = edge + d = rest[-1] if rest else {} + label = d.get("label") if isinstance(d, dict) else None + if v == type and label == bias: + type_elements.append(u) + + sc_obj.subtemp() # done with temporary buffer + if not type_elements: + space_used = sc_obj.get_max() + if return_space: + return (None, space_used) + return None + + val = random.choice(type_elements) + space_used = sc_obj.get_max() + if return_space: + return (val, space_used) + return val + +def implement(FUN, N, return_space: bool = False): + """Call FUN N times and return a list of results (tracks space).""" + sc_obj = sc() + sc_obj.startcounttemp() # persistent results + results = [] + for _ in range(N): + sc_obj.startcounttemp() # iteration temporary + results.append(FUN()) + sc_obj.subtemp() + space_used = sc_obj.get_max() + if return_space: + return (results, space_used) + return results + +def write_all(S, bias, type, return_space: bool = False): + """Collect all elements from S matching getattr(element, bias) == type.""" + sc_obj = sc() + sc_obj.startcounttemp() # persistent result + result = [] + for element in S: + sc_obj.startcounttemp() # element temporary + if getattr(element, bias) == type: + pair(result, element) # pair is in-place + sc_obj.subtemp() + space_used = sc_obj.get_max() + if return_space: + return (result, space_used) + return result + +# Additional functions + +def list_create(M, return_space: bool = False): + """Create a blank list with slots for M items""" + sc_obj = sc() + sc_obj.startcounttemp() # persistent list + result = [None] * M + space_used = sc_obj.get_max() + if return_space: + return (result, space_used) + return result + +def merge(I, J, return_space: bool = False): + """Merge two items I and J to create a list""" + sc_obj = sc() + sc_obj.startcounttemp() # persistent merged list + result = [I, J] + space_used = sc_obj.get_max() + if return_space: + return (result, space_used) + return result + +def remove_item(I, L, return_space: bool = False): + sc_obj = sc() + sc_obj.startcounttemp() # persistent result + result = L.copy() + if I in result: + result.remove(I) + space_used = sc_obj.get_max() + if return_space: + return (result, space_used) + return result + +def dim_set(D): + def classify(items, dimension_func=D, return_space: bool = False): + sc_obj = sc() + sc_obj.startcounttemp() # persistent result set + result = set() + for item in items: + sc_obj.startcounttemp() # small temporary + result.add(dimension_func(item)) + sc_obj.subtemp() + space_used = sc_obj.get_max() + if return_space: + return (result, space_used) + return result + return classify + +def write_all_set(S, return_space: bool = False): + """Write all items belonging to a particular set S as a sorted string.""" + sc_obj = sc() + sc_obj.startcounttemp() # temporary sorting buffer + out = " - ".join(sorted(S)) + sc_obj.subtemp() + space_used = sc_obj.get_max() + if return_space: + return (out, space_used) + return out diff --git a/test_cognitive_primitives.ipynb b/test_cognitive_primitives.ipynb new file mode 100644 index 0000000..5e5e241 --- /dev/null +++ b/test_cognitive_primitives.ipynb @@ -0,0 +1,266 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "sys.path appended /mnt/data\n" + ] + } + ], + "source": [ + "# Ensure local project path is on sys.path\n", + "import sys\n", + "sys.path.append('/mnt/data')\n", + "print('sys.path appended /mnt/data')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "imports ok\n" + ] + } + ], + "source": [ + "# Imports\n", + "import spaceUsedprim as pf\n", + "import cognitive_functions as cf\n", + "from utils import Element, ElementSet, Associations\n", + "import networkx as nx\n", + "from pprint import pprint\n", + "print('imports ok')" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "ElementSet size: 6\n", + "attribute1 types: {'red', 'green', 'blue'}\n", + "attribute2 types: {'circle', 'square', 'triangle'}\n" + ] + } + ], + "source": [ + "# Build a small set of Elements for tests\n", + "e1 = Element('A1','red','circle')\n", + "e2 = Element('A2','blue','square')\n", + "e3 = Element('B1','red','square')\n", + "e4 = Element('B2','blue','circle')\n", + "e5 = Element('C1','green','triangle')\n", + "e6 = Element('C2','green','triangle')\n", + "elements = {e1,e2,e3,e4,e5,e6}\n", + "ES = ElementSet(elements)\n", + "print('ElementSet size:', len(ES.elements))\n", + "print('attribute1 types:', ES.attribute1_types())\n", + "print('attribute2 types:', ES.attribute2_types())" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "---- primitives ----\n", + "pair -> lst: [1, 2, 3] return: (None, 1)\n", + "append -> [1, 2, 3, 4] space 1\n", + "flip -> False space 1\n", + "sample -> 10 space 2\n", + "add -> [1, 2, 9] space 1\n", + "remove -> [1, 3] space 1\n", + "list_create -> len 5 space 1\n", + "merge -> ['x', 'y'] space 1\n", + "remove_item -> ['b'] space 1\n", + "dim_set -> {'red', 'blue'} space 2\n" + ] + } + ], + "source": [ + "# Test primitive functions (with space reporting where available)\n", + "print('---- primitives ----')\n", + "# pair (in-place)\n", + "lst = [1,2]\n", + "res = pf.pair(lst, 3, return_space=True)\n", + "print('pair -> lst:', lst, 'return:', res)\n", + "# append\n", + "ap, space = pf.append([1,2],[3,4], return_space=True)\n", + "print('append ->', ap, 'space', space)\n", + "# flip\n", + "val, space = pf.flip(0.5, return_space=True)\n", + "print('flip ->', val, 'space', space)\n", + "# sample\n", + "sval, space = pf.sample(set([10,20,30]), return_space=True)\n", + "print('sample ->', sval, 'space', space)\n", + "# add / remove\n", + "r, sp = pf.add(9, [1,2], return_space=True)\n", + "print('add ->', r, 'space', sp)\n", + "r2, sp2 = pf.remove(2, [1,2,3], return_space=True)\n", + "print('remove ->', r2, 'space', sp2)\n", + "# list_create/merge\n", + "lc, sp = pf.list_create(5, return_space=True)\n", + "print('list_create -> len', len(lc), 'space', sp)\n", + "m, sp = pf.merge('x','y', return_space=True)\n", + "print('merge ->', m, 'space', sp)\n", + "# remove_item\n", + "ri, sp = pf.remove_item('a', ['a','b'], return_space=True)\n", + "print('remove_item ->', ri, 'space', sp)\n", + "# dim_set\n", + "cls = pf.dim_set(lambda e: getattr(e,'attribute1'))\n", + "ds, sp = cls([e1,e2,e3], return_space=True)\n", + "print('dim_set ->', ds, 'space', sp)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "graph nodes, edges [Element(object=A1, attribute 1=red, attribute 2=circle), Element(object=A2, attribute 1=blue, attribute 2=square), Element(object=B1, attribute 1=red, attribute 2=square)] [(Element(object=A1, attribute 1=red, attribute 2=circle), Element(object=B1, attribute 1=red, attribute 2=square), {'label': 'precedes'}), (Element(object=A2, attribute 1=blue, attribute 2=square), Element(object=B1, attribute 1=red, attribute 2=square), {'label': 'precedes'})]\n", + "write_random -> A2, blue, square) space 1\n", + "write_all -> ['A1', 'B1'] space 2\n", + "implement -> [2, 2, 2] space 2\n", + "write_all_set -> a - b - c space 1\n" + ] + } + ], + "source": [ + "# Test write_random using a small graph\n", + "G = nx.DiGraph()\n", + "# nodes will be Element objects\n", + "G.add_node(e1)\n", + "G.add_node(e2)\n", + "G.add_node(e3)\n", + "# add edges from u -> v with label\n", + "G.add_edge(e1, e3, label='precedes')\n", + "G.add_edge(e2, e3, label='precedes')\n", + "print('graph nodes, edges', G.nodes(), list(G.edges(data=True)))\n", + "wr, sp = pf.write_random(G, 'precedes', e3, return_space=True)\n", + "print('write_random ->', wr, 'space', sp)\n", + "\n", + "# write_all\n", + "wa, sp = pf.write_all([e1,e2,e3,e4], 'attribute1', 'red', return_space=True)\n", + "print('write_all ->', [getattr(x,'name') for x in wa], 'space', sp)\n", + "\n", + "# implement (call simple lambda)\n", + "impl_res, sp = pf.implement(lambda: 1+1, 3, return_space=True)\n", + "print('implement ->', impl_res, 'space', sp)\n", + "\n", + "# write_all_set\n", + "was, sp = pf.write_all_set({'b','a','c'}, return_space=True)\n", + "print('write_all_set ->', was, 'space', sp)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "---- cognitive functions ----\n" + ] + }, + { + "ename": "NameError", + "evalue": "name '_items_from_S' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[1;31mNameError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[1;32mIn[6], line 7\u001b[0m\n\u001b[0;32m 4\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m ElementSet({Element(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mA1\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mred\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mcircle\u001b[39m\u001b[38;5;124m'\u001b[39m), Element(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mA2\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mblue\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124msquare\u001b[39m\u001b[38;5;124m'\u001b[39m), Element(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mB1\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mred\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124msquare\u001b[39m\u001b[38;5;124m'\u001b[39m), Element(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mB2\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mblue\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mcircle\u001b[39m\u001b[38;5;124m'\u001b[39m), Element(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mC1\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mgreen\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mtriangle\u001b[39m\u001b[38;5;124m'\u001b[39m), Element(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mC2\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mgreen\u001b[39m\u001b[38;5;124m'\u001b[39m,\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mtriangle\u001b[39m\u001b[38;5;124m'\u001b[39m)})\n\u001b[0;32m 6\u001b[0m ES1 \u001b[38;5;241m=\u001b[39m fresh_set()\n\u001b[1;32m----> 7\u001b[0m it_res \u001b[38;5;241m=\u001b[39m \u001b[43mcf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43miterate\u001b[49m\u001b[43m(\u001b[49m\u001b[43mES1\u001b[49m\u001b[43m)\u001b[49m\n\u001b[0;32m 8\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124miterate -> result len\u001b[39m\u001b[38;5;124m'\u001b[39m, \u001b[38;5;28mlen\u001b[39m(it_res[\u001b[38;5;241m0\u001b[39m]) \u001b[38;5;28;01mif\u001b[39;00m it_res[\u001b[38;5;241m0\u001b[39m] \u001b[38;5;28;01melse\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m, \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mtime\u001b[39m\u001b[38;5;124m'\u001b[39m, it_res[\u001b[38;5;241m1\u001b[39m])\n\u001b[0;32m 10\u001b[0m ES2 \u001b[38;5;241m=\u001b[39m fresh_set()\n", + "File \u001b[1;32mc:\\localpython\\lot_25\\LoT-modeling\\cognitive_functions.py:8\u001b[0m, in \u001b[0;36miterate\u001b[1;34m(S)\u001b[0m\n\u001b[0;32m 7\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;21miterate\u001b[39m(S: ElementSet): \u001b[38;5;66;03m# 112233 \u001b[39;00m\n\u001b[1;32m----> 8\u001b[0m items \u001b[38;5;241m=\u001b[39m \u001b[43m_items_from_S\u001b[49m(S)\n\u001b[0;32m 9\u001b[0m sc_obj \u001b[38;5;241m=\u001b[39m SpaceComplexity()\n\u001b[0;32m 11\u001b[0m \u001b[38;5;66;03m# preprocessing (not part of measured cognitive process)\u001b[39;00m\n", + "\u001b[1;31mNameError\u001b[0m: name '_items_from_S' is not defined" + ] + } + ], + "source": [ + "print('\\n---- cognitive functions ----')\n", + "# Make a fresh copy of elements for each test because functions remove from sets\n", + "def fresh_set():\n", + " return ElementSet({Element('A1','red','circle'), Element('A2','blue','square'), Element('B1','red','square'), Element('B2','blue','circle'), Element('C1','green','triangle'), Element('C2','green','triangle')})\n", + "\n", + "ES1 = fresh_set()\n", + "it_res = cf.iterate(ES1)\n", + "print('iterate -> result len', len(it_res[0]) if it_res[0] else None, 'time', it_res[1])\n", + "\n", + "ES2 = fresh_set()\n", + "pal_res = cf.palindrome(ES2)\n", + "print('palindrome -> result len', len(pal_res[0]) if pal_res[0] else None, 'time', pal_res[1])\n", + "\n", + "ES3 = fresh_set()\n", + "alt_res = cf.alternate(ES3)\n", + "print('alternate -> result len', len(alt_res[0]) if alt_res[0] else None, 'time', alt_res[1])\n", + "\n", + "# chaining needs explicit associations mapping (Element -> Element)\n", + "ES4 = fresh_set()\n", + "elems_list = list(ES4.elements)\n", + "assoc = {elems_list[0]: elems_list[1], elems_list[2]: elems_list[3]}\n", + "chain_res = cf.chaining(ES4, assoc)\n", + "print('chaining -> chunks count', len(chain_res[0]) if chain_res[0] else 0, 'time', chain_res[1])\n", + "\n", + "ES5 = fresh_set()\n", + "sc_res = cf.serial_crossed(ES5)\n", + "print('serial_crossed -> result len', len(sc_res[0]) if sc_res[0] else None, 'time', sc_res[1])\n", + "\n", + "ES6 = fresh_set()\n", + "ce_res = cf.center_embedded(ES6)\n", + "print('center_embedded -> result len', len(ce_res[0]) if ce_res[0] else None, 'time', ce_res[1])\n", + "\n", + "ES7 = fresh_set()\n", + "tr_res = cf.tail_recursive(ES7)\n", + "print('tail_recursive -> result len', len(tr_res[0]) if tr_res[0] else None, 'time', tr_res[1])\n", + "\n", + "print('\\nAll tests finished.')\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/utils.py b/utils.py index be59325..32b2030 100644 --- a/utils.py +++ b/utils.py @@ -10,11 +10,11 @@ # ---------------------------------------------------------------------# -@dataclass class Element: # n-dimensional element - name : str - attribute1 : int | float | str - attribute2 : int | float | str | None = None + def __init__(self, name, attribute1, attribute2=None): + self.name = name + self.attribute1 = attribute1 + self.attribute2 = attribute2 def __repr__(self): return f"Element(object={self.name}, attribute 1={self.attribute1}, attribute 2={self.attribute2})" def __str__(self): @@ -34,6 +34,8 @@ def build_updates(self, graph): for key, value in self.associations.items(): graph.add_edge(key.name, value.name, label="precedes", directed=True) + +''' class ElementSet: # n-dimensional element set def __init__(self, elements: set, associations: Associations = None): self.elements = elements @@ -87,8 +89,9 @@ def get_type(type_): return getattr(self, f"{attribute}_types", set()) - - + + +''' def pretty_view(sequence): @@ -133,3 +136,41 @@ def get_elapsed_time(self): if self._running: return time.perf_counter() - self._start_time return self._elapsed_time + +class SpaceComplexity: + """ + Tracks space complexity for temporary variables in cognitive functions. + Use startcounttemp(x) to add x units, subtemp(x) to subtract x units. + get_max() returns the maximum count reached. + """ + def __init__(self): + self.count = 0 + self.max = 0 + def startcounttemp(self): + self.count += 1 + if self.count > self.max: + self.max = self.count + def subtemp(self): + self.count -= 1 + def get_max(self): + return self.max + def reset(self): + self.count = 0 + self.max = 0 + +class ElementSet: # n-dimensional element set + def __init__(self, elements: set, associations: Associations = None): + self.elements = elements + self.associations = associations + def __repr__(self): + raise NotImplementedError + def attribute1_types(self): + """Returns a set of unique attribute1 types.""" + return set(obj.attribute1 for obj in self.elements) + def attribute2_types(self): + """Returns a set of unique attribute2 types.""" + return set(obj.attribute2 for obj in self.elements if obj.attribute2 is not None) + def attribute_items(self, attribute): + def get_type(type_): + + return getattr(self, f"{attribute}_types", set())