diff --git a/.appveyor.yml b/.appveyor.yml index d485ffdb..002cbf3c 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -2,7 +2,6 @@ version: 0.1.0.{build} environment: matrix: - - PYTHON: "C:\\Python27-x64" - PYTHON: "C:\\Python35-x64" - PYTHON: "C:\\Python36-x64" - PYTHON: "C:\\Python37-x64" diff --git a/.green b/.green index 934c39ac..a032b985 100644 --- a/.green +++ b/.green @@ -1,3 +1,4 @@ verbose = 1 logging = True run-coverage = True +omit-patterns = .eggs/* diff --git a/.travis.yml b/.travis.yml index 68db83b7..462af891 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,6 @@ os: # - osx python: - - "2.7" - "3.5" - "3.6" - "3.7" diff --git a/examples/Tests.ipynb b/examples/Tests.ipynb new file mode 100644 index 00000000..81e2ebbd --- /dev/null +++ b/examples/Tests.ipynb @@ -0,0 +1,187 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.insert(0,\"../\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "from neet.boolean import WTNetwork\n", + "from os.path import join" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "from neet.boolean.random import constraints, dynamics, randomizer, topology\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "import test.tests" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "tn = test.tests.TestNetwork()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "test passed!\n" + ] + } + ], + "source": [ + "tn.test_uniform_bias()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "#tn.test_external_exclusion()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "test passed!\n" + ] + } + ], + "source": [ + "tn.test_correct_bias()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": { + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "[((1, 2, 3), {'001', '100'}), ((1, 2, 3), {'011', '101'}), ((1, 2, 3), {'111'}), ((1, 2, 3), {'000', '010', '111'})]\n", + "bias_1: 8.0\n", + "row_count_1: 4.0\n", + "mean_1: 2.0\n", + "[((1, 2, 3), {'100', '000'}), ((1, 2, 3), {'011', '001'}), ((1, 2, 3), {'100', '010'}), ((1, 2, 3), {'110', '101'})]\n", + "bias_2: 8.0\n", + "row_count_2: 4.0\n", + "mean_2: 2.0\n" + ] + } + ], + "source": [ + "tn.test_correct_mean(tn.net4, printq=False, debug=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "test passed!\n" + ] + } + ], + "source": [ + "tn.test_correct_local()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "test passed!\n" + ] + } + ], + "source": [ + "tn.test_correct_errors_thrown()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/Untitled.ipynb b/examples/Untitled.ipynb new file mode 100644 index 00000000..2a0ab542 --- /dev/null +++ b/examples/Untitled.ipynb @@ -0,0 +1,145 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.insert(0,\"../\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "from neet.boolean import WTNetwork\n", + "from os.path import join" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "from neet.boolean.random import constraints, dynamics, randomizer, topology\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "import test.tests" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "tn = test.tests.TestNetwork()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "test passed!\n" + ] + } + ], + "source": [ + "tn.test_uniform_bias()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "#tn.test_external_exclusion()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "test passed!\n" + ] + } + ], + "source": [ + "tn.test_correct_bias()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "[((1, 2, 3), {'001', '100'}), ((1, 2, 3), {'101', '011'}), ((1, 2, 3), {'111'}), ((1, 2, 3), {'111', '010', '000'})]\n", + "[((1, 2, 3), {'010', '000'}), ((1, 2, 3), {'111', '000'}), ((1, 2, 3), {'110', '010'}), ((1, 2, 3), {'011', '110'})]\n" + ] + } + ], + "source": [ + "tn.test_correct_mean(tn.net4, printq=False, debug=True)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/neet/boolean/__init__.py b/neet/boolean/__init__.py index ba14dcf3..077c0643 100644 --- a/neet/boolean/__init__.py +++ b/neet/boolean/__init__.py @@ -19,3 +19,4 @@ from .wtnetwork import WTNetwork # noqa from .logicnetwork import LogicNetwork # noqa from .sensitivity import SensitivityMixin # noqa +from . import random # noqa diff --git a/neet/boolean/random/__init__.py b/neet/boolean/random/__init__.py new file mode 100644 index 00000000..2469db3a --- /dev/null +++ b/neet/boolean/random/__init__.py @@ -0,0 +1,4 @@ +from . import constraints # noqa +from . import randomizer # noqa +from . import topology # noqa +from . import dynamics # noqa diff --git a/neet/boolean/random/constraints.py b/neet/boolean/random/constraints.py new file mode 100644 index 00000000..ef40e7fc --- /dev/null +++ b/neet/boolean/random/constraints.py @@ -0,0 +1,287 @@ +import neet +import networkx as nx +import numpy as np +from abc import ABCMeta, abstractmethod + + +class ConstraintError(Exception): + """ + A constraint was applied to an invalid object. + """ + pass + + +class AbstractConstraint(object, metaclass=ABCMeta): + """ + An abstract class representing a constraint used for rejection testing. + """ + @abstractmethod + def satisfies(self, net): + """ + Test a provided network against the constraint. + + :param net: a network to test + :returns: ``True`` if the constraint is satisfied + """ + return True + + +class TopologicalConstraint(AbstractConstraint): + """ + An abstract class representing a constraint on the topology of a network. + """ + @abstractmethod + def satisfies(self, graph): + """ + Test a provided graph against the constraint. + + :param graph: a graph to test + :type graph: nx.DiGraph + :returns: ``True`` if the constraint is satisfied + :raises TypeError: if the graph is not a networkx DiGraph + """ + if not isinstance(graph, nx.DiGraph): + raise TypeError('only directed graphs are testable with topological constraints') + return super().satisfies(graph) + + +class DynamicalConstraint(AbstractConstraint): + """ + An abstract class representing a constraint on the dynamics of a network. + """ + @abstractmethod + def satisfies(self, net): + """ + Test a provided net against the constraint. + + :param net: a network to test + :type net: neet.Network + :returns: ``True`` if the constraint is satisfied + :raises TypeError: if the network is not a neet.Network + """ + if not isinstance(net, neet.Network): + raise TypeError('only neet networks are testable with dynamical constraints') + return super().satisfies(net) + + +class HasExternalNodes(TopologicalConstraint): + def __init__(self, target): + """ + An topological constraint requiring a specific number of external + nodes, i.e. a specific number of nodes with no incomming edges. + + If ``target`` is a directed graph, this constraint will require + networks to have the same number of external nodes as the ``target``. + + Alternativly, ``target`` can be a non-negative integer. + + :param target: the target number of external nodes + :type target: nx.DiGraph or integer + """ + if isinstance(target, int): + if target < 0: + raise ValueError('the target number of external nodes must be non-negative') + num_external = target + elif isinstance(target, nx.DiGraph): + num_external = self.__count_external(target) + else: + raise TypeError('target must be either an integer or nx.DiGraph') + + self.num_external = num_external + + def __count_external(self, g): + """ + Count the number of external nodes in a directed graph. + """ + return np.count_nonzero([d == 0 for _, d in g.in_degree()]) + + def satisfies(self, graph): + """ + This constraint is only satisfied if the provided graph has + ``self.num_external``-many external nodes. + + :param graph: a graph to test + :type graph: nx.DiGraph + :returns: ``True`` if the digraph as the desired number of external + nodes + """ + if super().satisfies(graph): + return self.__count_external(graph) == self.num_external + + +class IsConnected(TopologicalConstraint): + """ + Ensure that the resulting graph is (weakly) connected. + """ + def satisfies(self, graph): + """ + This constraint is only satisfied if the provided graph as is weakly + connected. + + :param graph: a graph to test + :type graph: nx.DiGraph + :returns: ``True`` if the digraph as the desired number of external + nodes + """ + if super().satisfies(graph): + try: + return nx.is_weakly_connected(graph) + except nx.exception.NetworkXException as err: + raise ConstraintError() from err + + +class IsIrreducible(DynamicalConstraint): + """ + Ensure that all dynamical nodes have irreducible functions. + """ + def satisfies(self, network): + """ + This constraint is only satisfied if every node's function logically + depends on each of it's incoming neighbors. + + :param network: a network to test + :type network: neet.boolean.LogicNetwork + :returns: ``True`` if every node's function is irredicible + :raises NotImplementedError: if the network is not a + neet.boolean.LogicNetwork + """ + if super().satisfies(network): + if not isinstance(network, neet.boolean.LogicNetwork): + raise TypeError('network must be a neet.boolean.LogicNetwork') + + for idx in range(network.size): + for neighbor_in in network.neighbors_in(idx): + if not network.is_dependent(idx, neighbor_in): + return False + return True + + +class HasCanalizingNodes(DynamicalConstraint): + def __init__(self, target): + """ + A dynamical constraint requiring that a specific number of nodes be + canalizing, i.e. a specific number of nodes are canalizing on at least + one input. + + If ``target`` is a Neet network, this constraint will require that + networks to have the same number of canalizing nodes as ``target``. + + Alternatively, ``target`` can be a non-negative integer. + + :param target: the target number of canalizing nodes + :type target: neet.boolean.LogicNetwork or integer + :raises NotImplementedError: if the provied target is nei + """ + if isinstance(target, int): + if target < 0: + raise ValueError('the target number of canalizing nodes must be non-negative') + num_canalizing = target + elif isinstance(target, neet.Network): + num_canalizing = self.__count_canalizing_nodes(target) + else: + raise TypeError('target must be either an integer or a neet.Network') + + self.num_canalizing = num_canalizing + + def __count_canalizing_nodes(self, network): + """ + Count the number of canalizing nodes in a network. + """ + return len(network.canalizing_nodes()) + + def satisfies(self, network): + """ + This constraint is only satisfied if the provided network has + ``self.num_canalizing``-many canalizing nodes. + """ + if super().satisfies(network): + return self.__count_canalizing_nodes(network) == self.num_canalizing + + +class GenericTopologicalConstraint(TopologicalConstraint): + def __init__(self, test): + """ + A generic constraint defined in terms of a callable. + + :param test: a user-specified test + :type test: callable + """ + if not callable(test): + raise TypeError('test must be callable') + self.test = test + + def satisfies(self, net): + """ + Test a provided network against a generic constraint. + + :param net: a network to test + :returns: ``True`` if the constraint is satisified + """ + if super().satisfies(net): + return self.test(net) + + +class GenericDynamicalConstraint(DynamicalConstraint): + def __init__(self, test): + """ + A generic constraint defined in terms of a callable. + + :param test: a user-specified test + :type test: callable + """ + if not callable(test): + raise TypeError('test must be callable') + self.test = test + + def satisfies(self, net): + """ + Test a provided network against a generic constraint. + + :param net: a network to test + :returns: ``True`` if the constraint is satisified + """ + if super().satisfies(net): + return self.test(net) + + +class NodeConstraint(DynamicalConstraint): + """ + Constraints which operate on nodes (functions) of dynamical networks. + """ + pass + + +class GenericNodeConstraint(NodeConstraint): + def __init__(self, test): + if not callable(test): + raise TypeError('test must be callable') + self.test = test + + def satisfies(self, conditions): + if not isinstance(conditions, set): + raise TypeError('only a set of activation conditions is testable with a node constraint') + return self.test(conditions) + + +class IrreducibleNode(NodeConstraint): + def satisfies(self, conditions): + if not isinstance(conditions, set): + raise TypeError('only a set of activation conditiosn is testable with IrreducibleNode') + + if len(conditions) == 0: + return True + + k = list(map(len, conditions))[0] + for i in range(k): + counter = {} # type: ignore + for state in conditions: + state_sans_source = state[:i] + state[i + 1:] + if int(state[i]) == 1: + counter[state_sans_source] = counter.get(state_sans_source, 0) + 1 + else: + counter[state_sans_source] = counter.get(state_sans_source, 0) - 1 + + if not any(counter.values()): + return False + + return True diff --git a/neet/boolean/random/dynamics.py b/neet/boolean/random/dynamics.py new file mode 100644 index 00000000..cd748bd9 --- /dev/null +++ b/neet/boolean/random/dynamics.py @@ -0,0 +1,263 @@ +import neet +import numpy as np + +from abc import abstractmethod +from .randomizer import AbstractRandomizer +from .topology import TopologyRandomizer, FixedTopology, InDegree +from .constraints import DynamicalConstraint, TopologicalConstraint, GenericDynamicalConstraint, \ + NodeConstraint, GenericNodeConstraint, ConstraintError +from inspect import isclass + + +class NetworkRandomizer(AbstractRandomizer): + def __init__(self, network, trand=None, constraints=None, timeout=1000, **kwargs): + """ + An abstract base class for all randomizers which implement dynamical + randomization. + + :param network: a base network or graph + :type network: neet.Network or networkx.DiGraph + :param trand: how to randomize the topology (default: FixedTopology) + :type trand: instance or subclass of TopologyRandomizer, or None + :param constraints: constraints used for rejection testing + :type constraints: a sequence of AbstractConstraint instances + :param timeout: the number of attempts before rejection testing times + out. If less than 1, the rejection testing will never + time out. + """ + if trand is None: + trand = FixedTopology(network, timeout=timeout, **kwargs) + elif isclass(trand) and issubclass(trand, TopologyRandomizer): + trand = trand(network, timeout=timeout, **kwargs) + elif isinstance(trand, TopologyRandomizer): + pass + else: + raise TypeError('trand must be an instance or subclass of TopologyRandomizer') + self.trand = trand + super().__init__(network, constraints, timeout, **kwargs) + + @property + def constraints(self): + return super().constraints + + @constraints.setter + def constraints(self, constraints): + """ + Set the randomizer's constraints. + + :param constraints: the new constraints + :type constraints: a seq of AbstractConstraint instances + :raises TypeError: if any of the contraints are not an AbstractConstraint + """ + if constraints is None: + constraints = [] + elif not isinstance(constraints, list): + constraints = list(constraints) + + tconstraints, nconstraints, dconstraints = [], [], [] + + for i, constraint in enumerate(constraints): + if isinstance(constraint, NodeConstraint): + nconstraints.append(constraint) + elif isinstance(constraint, DynamicalConstraint): + dconstraints.append(constraint) + elif isinstance(constraint, TopologicalConstraint): + tconstraints.append(constraint) + elif callable(constraint): + dconstraints.append(GenericDynamicalConstraint(constraint)) + else: + msg = 'constraints must be callable, a DynamicalConstraint or TopologicalConstraint' + raise TypeError(msg) + + self.trand.constraints = tconstraints + self.node_constraints = nconstraints + AbstractRandomizer.constraints.__set__(self, dconstraints) # type: ignore + + @property + def node_constraints(self): + return self.__node_constraints + + @node_constraints.setter + def node_constraints(self, constraints): + if constraints is None: + constraints = [] + elif not isinstance(constraints, list): + constraints = list(constraints) + + for i, constraint in enumerate(constraints): + if isinstance(constraint, NodeConstraint): + pass + elif callable(constraint): + constraint[i] = GenericNodeConstraint(constraint) + else: + msg = 'constraints must be callable, a DynamicalConstraint or TopologicalConstraint' + raise TypeError(msg) + + self.__node_constraints = constraints + + def add_constraint(self, constraint): + """ + Append a constraint to the randomizer's list of constraints. + + :param constraint: the new constraint + :type constraint: AbstractConstraint + :raises TypeError: if the constraint is not an AbstractConstraint + """ + if isinstance(constraint, NodeConstraint): + self.__node_constraints.append(constraint) + elif isinstance(constraint, DynamicalConstraint): + super().add_constraint(constraint) + elif callable(constraint): + super().add_constraint(GenericDynamicalConstraint(constraint)) + elif isinstance(constraint, TopologicalConstraint): + self.trand.add_constraint(constraint) + else: + msg = 'constraints must be callable, a DynamicalConstraint or TopologicalConstraint' + raise TypeError(msg) + + def random(self): + """ + Returns a random network + """ + topology = self.trand.random() + + loop = 0 + while self.timeout <= 0 or loop < self.timeout: + net = self._randomize(topology) + if self._check_constraints(net): + return net + loop += 1 + raise ConstraintError('failed to generate a network that statisfies all constraints') + + def _randomize(self, topology): + table = [] + for node in sorted(topology.nodes): + predecessors = tuple(topology.predecessors(node)) + params = self._function_class_parameters(topology, node) + table.append((predecessors, self._random_function(**params))) + return neet.boolean.LogicNetwork(table) + + def _check_node_constraints(self, f): + for constraint in self.node_constraints: + if not constraint.satisfies(f): + return False + return True + + def _random_function(self, k, p, **kwargs): + volume = 2**k + integer, decimal = divmod(p * volume, 1) + + loop = 0 + while self.timeout <= 0 or loop < self.timeout: + num_states = int(integer + np.random.choice(2, p=[1 - decimal, decimal])) + indices = np.random.choice(volume, num_states, replace=False) + f = set('{0:0{1}b}'.format(index, k) for index in indices) + if self._check_node_constraints(f): + return f + raise ConstraintError('failed to generate a function that statisfies all constraints') + + @abstractmethod + def _function_class_parameters(self, topology, node, **kwargs): + return {'topology': topology, 'node': node, 'k': topology.in_degree(node)} + + +class UniformBias(NetworkRandomizer): + def __init__(self, network, p=0.5, **kwargs): + """ + Generate random Boolean networks with the same bias on each non-external + node. + "Bias" = prior probability based on (# of node input combinations which will + activate the node) / (the # of possible node input combinations) + """ + super().__init__(network, **kwargs) + self.p = p + + def _function_class_parameters(self, topology, node, **kwargs): + params = super()._function_class_parameters(topology, node) + params.update({'p': self.p}) + return params + + +class MeanBias(UniformBias): + def __init__(self, network, **kwargs): + """ + Generate random Boolean networks with the same mean bias (on average) + as the original network. + """ + if not isinstance(network, neet.boolean.LogicNetwork): + raise NotImplementedError() + super().__init__(network, self._mean_bias(network), **kwargs) + + def _mean_bias(self, network): + """ + Get the mean bias of a network + """ + return np.mean([float(len(row[1])) / float(2**len(row[0])) for row in network.table]) + + +class LocalBias(NetworkRandomizer): + def __init__(self, network, trand=None, **kwargs): + """ + Generate networks with the same bias on each node. This scheme can only + be applied in conjunction with the ``FixedTopology`` and ``InDegree` + topological randomizers. + """ + if not isinstance(network, neet.boolean.LogicNetwork): + raise NotImplementedError(type(network)) + elif trand is not None: + if isclass(trand) and not issubclass(trand, (FixedTopology, InDegree)): + raise NotImplementedError(trand) + elif not isclass(trand) and not isinstance(trand, (FixedTopology, InDegree)): + raise NotImplementedError(type(trand)) + + super().__init__(network, trand, **kwargs) + self.local_bias = [float(len(row[1]) / 2**len(row[0])) for row in network.table] + + def _function_class_parameters(self, topology, node, **kwargs): + params = super()._function_class_parameters(topology, node) + params.update({'p': self.local_bias[node]}) + return params + + +class FixCanalizingMixin(NetworkRandomizer): + def _randomize(self, topology): + table = [] + if self.network is None: # type: ignore + raise NotImplementedError('Randomizer is based on a graph, cannot infer canalization') + canalizing = self.network.canalizing_nodes() + for node in sorted(topology.nodes): + predecessors = tuple(topology.predecessors(node)) + params = self._function_class_parameters(topology, node) + if node in canalizing: + table.append((predecessors, self._random_canalizing_function(**params))) + else: + table.append((predecessors, self._random_function(**params))) + return neet.boolean.LogicNetwork(table) + + def _random_canalizing_function(self, k, p, **kwargs): + integer, decimal = divmod(2**k * p, 1) + num_states = int(integer + np.random.choice(2, p=[1 - decimal, decimal])) + + canalizing_input = np.random.choice(k) + canalizing_value = np.random.choice(2) + if num_states > 2**(k - 1): + canalized_value = 1 + elif num_states < 2**(k - 1): + canalized_value = 0 + else: + canalized_value = np.random.choice(2) + + fixed_states = self._all_states_with_one_node_fixed(k, canalizing_input, canalizing_value) + other_states = np.lib.arraysetops.setxor1d(np.arange(2**k), fixed_states, assume_unique=True) + + if canalized_value == 1: + state_idxs = np.random.choice(other_states, num_states - len(fixed_states), replace=False) + state_idxs = np.concatenate((state_idxs, np.array(fixed_states))) + elif canalized_value == 0: + state_idxs = np.random.choice(other_states, num_states, replace=False) + + return set('{0:0{1}b}'.format(idx, k) for idx in state_idxs) + + def _all_states_with_one_node_fixed(self, k, fixed_index, fixed_value): + return [idx for idx in range(2**k) + if '{0:0{1}b}'.format(idx, k)[fixed_index] == str(fixed_value)] diff --git a/neet/boolean/random/randomizer.py b/neet/boolean/random/randomizer.py new file mode 100644 index 00000000..ca04fad9 --- /dev/null +++ b/neet/boolean/random/randomizer.py @@ -0,0 +1,168 @@ +import neet +import networkx as nx +from abc import ABCMeta, abstractmethod +from .constraints import AbstractConstraint, ConstraintError + + +class AbstractRandomizer(object, metaclass=ABCMeta): + def __init__(self, network, constraints=None, timeout=1000, **kwargs): + """ + An abstract interface for all randomizers based on randomly modifying a + base network or graph. Rejection testing is used to enforce + user-specified constraints. Networks/graphs will be repeatedly randomized + until a instance satisfying all constraints is found. If a + ```timeout``` is provided, the rejection testing will stop after that + many attempts and raise ``ConstraintError`` if no valid network was found. + If ``timeout <= 0``, then the rejection testing will never time out. + + :param network: a base network or graph + :type network: neet.Network or networkx.DiGraph + :param constraints: constraints used for rejection testing + :type constraints: a sequence of AbstractConstraint instances + :param timeout: the number of attempts before rejection testing times + out. If less than 1, the rejection testing will never + time out. + """ + if isinstance(network, neet.Network): + self.network = network + elif isinstance(network, nx.DiGraph): + self.__network = None + self.__graph = network + else: + raise TypeError('network must be a neet.Network or a networkx.DiGraph') + + self.timeout = timeout + self.constraints = constraints + + @property + def network(self): + """ + Get the randomizer's network + + :returns: neet.Network or None + """ + return self.__network + + @network.setter + def network(self, network): + """ + Set the randomizer's network and replace the graph with the network's + graph. + + :param network: the new network + :type network: neet.Network + :raises TypeError: if the argument is not a neet.Network + """ + if not isinstance(network, neet.Network): + raise TypeError('network must be an instance of neet.Network') + self.__network = network + self.__graph = self.__network.network_graph() + + @property + def graph(self): + """ + Get the randomizer's graph + + :returns: networkx.DiGraph + """ + return self.__graph + + @graph.setter + def graph(self, graph): + """ + Set the randomizer's graph and replace the network with ``None``. + + :param graph: the new graph + :type graph: networkx.DiGraph + :raises TypeError: if the argument is not a networkx.DiGraph + """ + if not isinstance(graph, nx.DiGraph): + raise TypeError('graph must be an instance of networkx.DiGraph') + self.__network = None + self.__graph = graph + + @property + def constraints(self): + """ + Get the randomizer's constraints. + + :returns: a list of AbstractConstraint instances + """ + return self.__constraints + + @constraints.setter + def constraints(self, constraints): + """ + Set the randomizer's constraints. + + :param constraints: the new constraints + :type constraints: a seq of AbstractConstraint instances + :raises TypeError: if any of the contraints are not an AbstractConstraint + """ + if constraints is None: + constraints = [] + elif not isinstance(constraints, list): + constraints = list(constraints) + + for i, constraint in enumerate(constraints): + if not isinstance(constraint, AbstractConstraint): + raise TypeError('constraints must be instances of AbstractConstraint') + + self.__constraints = constraints + + def add_constraint(self, constraint): + """ + Append a constraint to the randomizer's list of constraints. + + :param constraint: the new constraint + :type constraint: AbstractConstraint + :raises TypeError: if the constraint is not an AbstractConstraint + """ + if not isinstance(constraint, AbstractConstraint): + raise TypeError('constraints must be instances of AbstractConstraint') + self.__constraints.append(constraint) + + def _check_constraints(self, net): + """ + Check a network or graph against the randomizer's constraints. + + :param net: the network or directed graph + :type net: neet.Network or networkx.DiGraph + :returns: ``True`` if the network/graph satisfies all constraints + """ + for constraint in self.constraints: + if not constraint.satisfies(net): + return False + return True + + def __iter__(self): + """ + Generate an infinite sequence of random networks or graphs. + """ + while True: + yield self.random() + + def random(self): + """ + Create a random network variant. + + :returns: a random network or graph + :raises ConstraintError: if a constraint could not be satisfied before + the randomizer's timeout. + """ + loop = 0 + while self.timeout <= 0 or loop < self.timeout: + net = self._randomize() + if self._check_constraints(net): + return net + loop += 1 + raise ConstraintError('failed to generate a network that statisfies all constraints') + + @abstractmethod + def _randomize(self): + """ + Create and *unconstrained* network variant. + + :returns: a random network or graph + """ + pass diff --git a/neet/boolean/random/topology.py b/neet/boolean/random/topology.py new file mode 100644 index 00000000..56daf925 --- /dev/null +++ b/neet/boolean/random/topology.py @@ -0,0 +1,193 @@ +import networkx as nx +import numpy as np + +from .randomizer import AbstractRandomizer +from .constraints import TopologicalConstraint, GenericTopologicalConstraint, ConstraintError + + +class TopologyRandomizer(AbstractRandomizer): + """ + An abstract base class for all randomizers which implement topological + randomization. + """ + @property + def constraints(self): + return super().constraints + + @constraints.setter + def constraints(self, constraints): + """ + Set the randomizer's constraints. + + :param constraints: the new constraints + :type constraints: a seq of AbstractConstraint or callable instances + :raises TypeError: if any of the contraints are neither an AbstractConstraint nor callable + """ + if constraints is None: + constraints = [] + elif not isinstance(constraints, list): + constraints = list(constraints) + + for i, constraint in enumerate(constraints): + if isinstance(constraint, TopologicalConstraint): + pass + elif callable(constraint): + constraints[i] = GenericTopologicalConstraint(constraint) + else: + raise TypeError('constraints must be callable or type TopologicalConstraint') + + AbstractRandomizer.constraints.__set__(self, constraints) # type: ignore + + def add_constraint(self, constraint): + """ + Append a constraint to the randomizer's list of constraints. + + :param constraint: the new constraint + :type constraint: TopologicalConstraint + :raises TypeError: if the constraint is not an TopologicalConstraint + """ + if isinstance(constraint, TopologicalConstraint): + pass + elif callable(constraint): + constraint = GenericTopologicalConstraint(constraint) + else: + raise TypeError('constraints must be callable or type TopologicalConstraint') + + super().add_constraint(constraint) + + +class FixedTopology(TopologyRandomizer): + @property + def constraints(self): + return super().constraints + + @constraints.setter + def constraints(self, constraints): + """ + Set the randomizer's constraints. + + :param constraints: the new constraints + :type constraints: a seq of AbstractConstraint instances + :raises TypeError: if any of the contraints are not an AbstractConstraint + """ + if constraints is None: + constraints = [] + elif not isinstance(constraints, list): + constraints = list(constraints) + + for i, constraint in enumerate(constraints): + if isinstance(constraint, TopologicalConstraint): + pass + elif callable(constraint): + constraints[i] = GenericTopologicalConstraint(constraint) + else: + raise TypeError('constraints must be callable or type TopologicalConstraint') + if not constraints[i].satisfies(self.graph): + msg = 'the provided network is inconsistent with the provided constraints' + raise ConstraintError(msg) + + TopologyRandomizer.constraints.__set__(self, constraints) # type: ignore + + def add_constraint(self, constraint): + """ + Append a constraint to the randomizer's list of constraints. + + :param constraint: the new constraint + :type constraint: TopologicalConstraint + :raises TypeError: if the constraint is not an TopologicalConstraint + """ + if isinstance(constraint, TopologicalConstraint): + pass + elif callable(constraint): + constraint = GenericTopologicalConstraint(constraint) + else: + raise TypeError('constraints must be callable or type TopologicalConstraint') + + if not constraint.satisfies(self.graph): + msg = 'the provided network is inconsistent with the provided constraints' + raise ConstraintError(msg) + + super().add_constraint(constraint) + + def random(self): + """ + Create a random network variant. Because we check the constraints + against the randomizer's graph when they are added, and we are just + returning the graph, we can be certain that this will always succeed. + That is, this method **will not** raise a ``ConstraintError``. + + :returns: a random network or graph + """ + return self._randomize() + + def _randomize(self): + """ + Return a graph that is isomorphic to the desired graph. + + :returns: networkx.DiGraph + """ + return self.graph + + +class MeanDegree(TopologyRandomizer): + """ + Generate a topology with the same mean degree as the initial network. This + amounts to randomly constructing a graph with the same number of edges as + the original graph. + + :returns: networkx.DiGraph + """ + def _randomize(self): + n = len(self.graph) + edgeindices = np.random.choice(n * n, self.graph.size(), replace=False) + + G = nx.DiGraph() + G.add_nodes_from(range(n)) + G.add_edges_from(map(lambda i: divmod(i, n), edgeindices)) + return G + + +class InDegree(TopologyRandomizer): + """ + Generate a topology with the same in-degree distribution as the initial + network. This amounts iterating over all nodes and selecting :math:`k` + nodes from which to draw an edge, where :math:`k` is the in-degree of the + node in the original graph. + + :returns: networkx.DiGraph + """ + def _randomize(self): + n = len(self.graph) + edges = [] + for j in range(n): + for i in np.random.choice(n, self.graph.in_degree(j), replace=False): + edges.append((i, j)) + + G = nx.DiGraph() + G.add_nodes_from(range(n)) + G.add_edges_from(edges) + + return G + + +class OutDegree(TopologyRandomizer): + """ + Generate a topology with the same out-degree distribution as the initial + network. This amounts iterating over all nodes and selecting :math:`k` + nodes to which to draw an edge, where :math:`k` is the in-degree of the + node in the original graph. + + :returns: networkx.DiGraph + """ + def _randomize(self): + n = len(self.graph) + edges = [] + for i in range(n): + for j in np.random.choice(n, self.graph.out_degree(i), replace=False): + edges.append((i, j)) + + G = nx.DiGraph() + G.add_nodes_from(range(n)) + G.add_edges_from(edges) + + return G diff --git a/neet/boolean/reca.py b/neet/boolean/reca.py index a05f7a49..61e3b119 100644 --- a/neet/boolean/reca.py +++ b/neet/boolean/reca.py @@ -124,7 +124,7 @@ def __init__(self, code, boundary=None, size=None, wiring=None, names=None, meta elif wiring is not None: if not isinstance(wiring, (list, np.ndarray)): raise TypeError("wiring must be a list or an array") - wiring_array = np.copy(wiring) + wiring_array = np.array(wiring, dtype=object) shape = wiring_array.shape if wiring_array.ndim != 2: raise ValueError("wiring must be a matrix") @@ -138,7 +138,7 @@ def __init__(self, code, boundary=None, size=None, wiring=None, names=None, meta super(RewiredECA, self).__init__(int(shape[1]), names=names, metadata=metadata) self.code = code self.boundary = boundary - self.__wiring = wiring_array + self.__wiring = np.array(wiring_array, dtype=int) else: raise ValueError("either size or wiring must be provided") diff --git a/neet/boolean/sensitivity.py b/neet/boolean/sensitivity.py index f82268a0..04ca33b6 100644 --- a/neet/boolean/sensitivity.py +++ b/neet/boolean/sensitivity.py @@ -8,6 +8,8 @@ import copy import numpy as np import numpy.linalg as linalg +import math +import itertools as itt class SensitivityMixin(object): @@ -84,17 +86,28 @@ def sensitivity(self, state, transitions=None): encoder = self._unsafe_encode distance = self.distance neighbors = self.hamming_neighbors(state) + #neighbors_copy = [neighbor.copy() for neighbor in neighbors] nextState = self.update(state) # count sum of differences found in neighbors of the original s = 0. + #debugging_index = 0 for neighbor in neighbors: if transitions is not None: newState = transitions[encoder(neighbor)] else: newState = self._unsafe_update(neighbor) s += distance(newState, nextState) + """print("testing whether the hamming neighbors are correct") + print("1. neighbor: ", neighbors_copy[debugging_index]) + print("2. state: ", state,"\n") + print("1. newState: ", newState) + print("2. nextState: ", nextState,"\n\n") + debugging_index += 1""" + #s += distance(nextState, nextState)#DEBUGGING CODE! DO NOT LEAVE IN + # DO NOT LEAVE THIS LINE UNCOMMENTED WHILE THE ABOVE LINE IS COMMENTED + #print("testing if the distance between the same input is anything other than 0") return s / self.size diff --git a/neet/landscape.py b/neet/landscape.py index 0f5deeac..3bea317f 100644 --- a/neet/landscape.py +++ b/neet/landscape.py @@ -142,7 +142,7 @@ def landscape(self, index=None, pin=None, values=None): This function implicitly calls :attr:`clear_landscape`, so make sure to create a reference to :attr:`landscape_data` if landscape information - has previously been compute and you wish to keep it around. + has previously been computed and you wish to keep it around. .. rubric:: Basic Usage @@ -1045,7 +1045,7 @@ def expound(self): data.basins = basins data.basin_sizes = np.asarray(basin_sizes) - data.attractors = np.asarray(attractors) + data.attractors = np.asarray(attractors, dtype=list) data.attractor_lengths = np.asarray(attractor_lengths) data.in_degrees = in_degrees data.heights = heights diff --git a/neet/network.py b/neet/network.py index 0ca0b50f..5b28dcb6 100644 --- a/neet/network.py +++ b/neet/network.py @@ -358,6 +358,7 @@ def network_graph(self, labels='indices', **kwargs): def draw_network_graph(self, graphkwargs={}, pygraphkwargs={}): """ + Draw network's networkx graph using PyGraphviz. .. Note:: diff --git a/setup.py b/setup.py index a266b72d..4894ad44 100644 --- a/setup.py +++ b/setup.py @@ -36,11 +36,11 @@ url='https://github.com/elife-asu/neet', license=LICENSE, install_requires=['six', 'numpy', 'networkx', 'pyinform', 'deprecated'], - extra_requires={ + extras_require={ "draw": ['pygraphviz'] }, setup_requires=['green'], - packages=['neet', 'neet.boolean'], + packages=['neet', 'neet.boolean' 'neet.boolean.random'], package_data={'neet.boolean': ['data/*.txt', 'data/*.dat']}, test_suite='test', platforms=['Windows', 'OS X', 'Linux'] diff --git a/test/boolean/random/__init__.py b/test/boolean/random/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/boolean/random/test_constraints.py b/test/boolean/random/test_constraints.py new file mode 100644 index 00000000..d4b1ca7f --- /dev/null +++ b/test/boolean/random/test_constraints.py @@ -0,0 +1,224 @@ +import unittest +import networkx as nx +import numpy as np +from math import isclose +from neet.boolean import LogicNetwork +from neet.boolean.examples import mouse_cortical_7B, myeloid, s_pombe +from neet.boolean.random import constraints + +class TestConstraints(unittest.TestCase): + def test_has_external_nodes_invalid_type(self): + with self.assertRaises(TypeError): + constraints.HasExternalNodes(1.4) + with self.assertRaises(TypeError): + constraints.HasExternalNodes('abc') + with self.assertRaises(TypeError): + constraints.HasExternalNodes(LogicNetwork([(0,), ['0']])) + + def test_has_external_nodes_integer(self): + with self.assertRaises(ValueError): + constraints.HasExternalNodes(-1) + + constraint = constraints.HasExternalNodes(5) + self.assertEquals(5, constraint.num_external) + + net = nx.DiGraph() + net.add_nodes_from(range(5)) + self.assertTrue(constraint.satisfies(net)) + + net = nx.DiGraph() + net.add_nodes_from(range(6)) + self.assertFalse(constraint.satisfies(net)) + + net = nx.DiGraph() + net.add_edge(0,1) + self.assertFalse(constraint.satisfies(net)) + + def test_has_external_nodes_graph(self): + net5 = nx.DiGraph() + net5.add_nodes_from(range(5)) + + net4_01 = nx.DiGraph() + net4_01.add_nodes_from(range(5)) + net4_01.add_edge(0,1) + + net4_31 = nx.DiGraph() + net4_31.add_nodes_from(range(5)) + net4_31.add_edge(3,1) + + constraint = constraints.HasExternalNodes(net5) + self.assertEquals(5, constraint.num_external) + self.assertTrue(constraint.satisfies(net5)) + self.assertFalse(constraint.satisfies(net4_01)) + self.assertFalse(constraint.satisfies(net4_31)) + + constraint = constraints.HasExternalNodes(net4_01) + self.assertEquals(4, constraint.num_external) + self.assertTrue(constraint.satisfies(net4_01)) + self.assertTrue(constraint.satisfies(net4_31)) + self.assertFalse(constraint.satisfies(net5)) + + def test_is_connected_null_graph(self): + constraint = constraints.IsConnected() + net = nx.DiGraph() + with self.assertRaises(constraints.ConstraintError): + constraint.satisfies(net) + + def test_is_connected(self): + constraint = constraints.IsConnected() + + net = nx.DiGraph() + net.add_node(1) + self.assertTrue(constraint.satisfies(net)) + + net.add_node(2) + self.assertFalse(constraint.satisfies(net)) + + net.add_edge(1,2) + self.assertTrue(constraint.satisfies(net)) + + def test_is_irreducible_invalid_type(self): + constraint = constraints.IsIrreducible() + with self.assertRaises(TypeError): + constraint.satisfies(s_pombe) + + def test_is_irreducible(self): + constraint = constraints.IsIrreducible() + + net = LogicNetwork([((), ())]) + self.assertTrue(constraint.satisfies(net)) + + net = LogicNetwork([((0,), ('0', '1'))]) + self.assertFalse(constraint.satisfies(net)) + + net = LogicNetwork([((1,), ('0',)), + ((0,), ('1',))]) + self.assertTrue(constraint.satisfies(net)) + + net = LogicNetwork([((1,), ('0',)), + ((0,), ('0', '1',))]) + self.assertFalse(constraint.satisfies(net)) + + def test_has_canalizing_nodes_invalid_type(self): + with self.assertRaises(TypeError): + constraints.HasCanalizingNodes(1.5) + with self.assertRaises(TypeError): + constraints.HasCanalizingNodes('abc') + with self.assertRaises(TypeError): + constraints.HasCanalizingNodes(nx.DiGraph()) + + def test_has_canalizing_nodes(self): + with self.assertRaises(ValueError): + constraints.HasCanalizingNodes(-1) + + constraint = constraints.HasCanalizingNodes(11) + self.assertTrue(constraint.satisfies(myeloid)) + + constraint = constraints.HasCanalizingNodes(8) + self.assertFalse(constraint.satisfies(myeloid)) + + constraint = constraints.HasCanalizingNodes(13) + self.assertFalse(constraint.satisfies(myeloid)) + + constraint = constraints.HasCanalizingNodes(myeloid) + self.assertTrue(constraint.satisfies(myeloid)) + self.assertFalse(constraint.satisfies(s_pombe)) + + constraint = constraints.HasCanalizingNodes(5) + self.assertTrue(constraint.satisfies(s_pombe)) + + constraint = constraints.HasCanalizingNodes(3) + self.assertFalse(constraint.satisfies(s_pombe)) + + constraint = constraints.HasCanalizingNodes(7) + self.assertFalse(constraint.satisfies(s_pombe)) + + constraint = constraints.HasCanalizingNodes(s_pombe) + self.assertFalse(constraint.satisfies(myeloid)) + self.assertTrue(constraint.satisfies(s_pombe)) + + def test_generic_topological_constraint(self): + with self.assertRaises(TypeError): + constraints.GenericTopologicalConstraint(None) + with self.assertRaises(TypeError): + constraints.GenericTopologicalConstraint(1) + with self.assertRaises(TypeError): + constraints.GenericTopologicalConstraint(1.5) + + constraint = constraints.GenericTopologicalConstraint(lambda g: True) + + with self.assertRaises(TypeError): + constraint.satisfies(myeloid) + + net = nx.DiGraph() + self.assertTrue(constraint.satisfies(net)) + net.add_nodes_from(range(5)) + self.assertTrue(constraint.satisfies(net)) + + constraint = constraints.GenericTopologicalConstraint(lambda g: len(g) == 5) + net = nx.DiGraph() + self.assertFalse(constraint.satisfies(net)) + net.add_nodes_from(range(5)) + self.assertTrue(constraint.satisfies(net)) + + def test_generic_dynamical_constraint(self): + with self.assertRaises(TypeError): + constraints.GenericDynamicalConstraint(None) + with self.assertRaises(TypeError): + constraints.GenericDynamicalConstraint(1) + with self.assertRaises(TypeError): + constraints.GenericDynamicalConstraint(1.5) + + constraint = constraints.GenericDynamicalConstraint(lambda g: True) + + with self.assertRaises(TypeError): + constraint.satisfies(nx.DiGraph()) + + self.assertTrue(constraint.satisfies(myeloid)) + self.assertTrue(constraint.satisfies(s_pombe)) + + def expect_mean_bias(bias): + def mean_bias(net): + if not isinstance(net, LogicNetwork): + raise constraints.ConstraintError() + return np.mean([float(len(row[1])) / float(2**len(row[0])) + for row in net.table]) + return lambda net: isclose(mean_bias(net), bias) + + constraint = constraints.GenericDynamicalConstraint(expect_mean_bias(0.2840909090909091)) + self.assertTrue(constraint.satisfies(myeloid)) + self.assertFalse(constraint.satisfies(mouse_cortical_7B)) + with self.assertRaises(constraints.ConstraintError): + constraint.satisfies(s_pombe) + + def test_generic_node_constraint(self): + with self.assertRaises(TypeError): + constraints.GenericNodeConstraint(None) + with self.assertRaises(TypeError): + constraints.GenericNodeConstraint(1) + with self.assertRaises(TypeError): + constraints.GenericNodeConstraint(1.5) + + constraint = constraints.GenericNodeConstraint(lambda cond: True) + + with self.assertRaises(TypeError): + constraint.satisfies(myeloid) + + self.assertTrue(constraint.satisfies(set())) + self.assertTrue(constraint.satisfies(set(['111', '110']))) + + constraint = constraints.GenericNodeConstraint(lambda cond: '111' in cond) + self.assertTrue(constraint.satisfies(set(['111']))) + self.assertFalse(constraint.satisfies(set(['110', '000']))) + self.assertTrue(constraint.satisfies(set(['110', '000', '111']))) + + def test_irreducible_node(self): + constraint = constraints.IrreducibleNode() + self.assertTrue(constraint.satisfies(set())) + self.assertTrue(constraint.satisfies(set(['1']))) + self.assertFalse(constraint.satisfies(set(['0', '1']))) + self.assertTrue(constraint.satisfies(set(['00', '11']))) + self.assertFalse(constraint.satisfies(set(['00', '01']))) + + with self.assertRaises(TypeError): + constraint.satisfies(myeloid) diff --git a/test/boolean/random/test_topology.py b/test/boolean/random/test_topology.py new file mode 100644 index 00000000..72b3df86 --- /dev/null +++ b/test/boolean/random/test_topology.py @@ -0,0 +1,112 @@ +import unittest +import networkx as nx +import numpy as np +from itertools import islice +from neet.boolean.random import topology +from neet.boolean.random.constraints import ConstraintError, HasExternalNodes + + +class TestTopology(unittest.TestCase): + def test_fixed_topology_without_constraints(self): + for net in self.erdos_renyi(10, 10, 0.5, directed=True): + gen = topology.FixedTopology(net) + for r in islice(gen, 10): + self.assertTrue(nx.is_isomorphic(net, r)) + + def test_fixed_topology_with_init_constraints(self): + for net in self.erdos_renyi(10, 10, 0.5, directed=True): + num_external = np.count_nonzero([d == 0 for _, d in net.in_degree()]) + gen = topology.FixedTopology(net, constraints=[HasExternalNodes(num_external)]) + for r in islice(gen, 10): + self.assertTrue(nx.is_isomorphic(net, r)) + + with self.assertRaises(ConstraintError): + topology.FixedTopology(net, constraints=[HasExternalNodes(num_external + 1)]) + + gen = topology.FixedTopology(net, constraints=[lambda net: len(net) == 10]) + for r in islice(gen, 10): + self.assertTrue(nx.is_isomorphic(net, r)) + + with self.assertRaises(ConstraintError): + topology.FixedTopology(net, constraints=[lambda net: len(net) != 10]) + + def test_fixed_topology_with_added_constraints(self): + for net in self.erdos_renyi(10, 10, 0.5, directed=True): + num_external = np.count_nonzero([d == 0 for _, d in net.in_degree()]) + gen = topology.FixedTopology(net) + gen.add_constraint(HasExternalNodes(num_external)) + self.assertEquals(1, len(gen.constraints)) + for r in islice(gen, 10): + self.assertTrue(nx.is_isomorphic(net, r)) + + with self.assertRaises(ConstraintError): + gen.add_constraint(HasExternalNodes(num_external + 1)) + self.assertEquals(1, len(gen.constraints)) + + gen.add_constraint(lambda net: len(net) == 10) + self.assertEquals(2, len(gen.constraints)) + for r in islice(gen, 10): + self.assertTrue(nx.is_isomorphic(net, r)) + + with self.assertRaises(ConstraintError): + gen.add_constraint(lambda net: len(net) != 10) + self.assertEquals(2, len(gen.constraints)) + + with self.assertRaises(TypeError): + gen.add_constraint(None) + with self.assertRaises(TypeError): + gen.add_constraint(1) + with self.assertRaises(TypeError): + gen.add_constraint(1.5) + with self.assertRaises(TypeError): + gen.add_constraint(True) + + def test_fixed_topology_with_set_constraints(self): + for net in self.erdos_renyi(10, 10, 0.5, directed=True): + num_external = np.count_nonzero([d == 0 for _, d in net.in_degree()]) + gen = topology.FixedTopology(net) + gen.constraints = [HasExternalNodes(num_external)] + self.assertEquals(1, len(gen.constraints)) + for r in islice(gen, 10): + self.assertTrue(nx.is_isomorphic(net, r)) + + with self.assertRaises(ConstraintError): + gen.constraints = [HasExternalNodes(num_external + 1)] + self.assertEquals(1, len(gen.constraints)) + + gen.constraints = [HasExternalNodes(num_external), lambda net: len(net) == 10] + self.assertEquals(2, len(gen.constraints)) + for r in islice(gen, 10): + self.assertTrue(nx.is_isomorphic(net, r)) + + gen.constraints = [] + self.assertEquals(0, len(gen.constraints)) + with self.assertRaises(ConstraintError): + gen.constraints = [HasExternalNodes(num_external + 1), lambda net: len(net) != 10] + self.assertEquals(0, len(gen.constraints)) + + with self.assertRaises(TypeError): + gen.constraints = [None] + with self.assertRaises(TypeError): + gen.constraints = [1] + with self.assertRaises(TypeError): + gen.constraints = [1.5] + with self.assertRaises(TypeError): + gen.constraints = [True] + + with self.assertRaises(TypeError): + gen.constraints = HasExternalNodes(num_external) + + with self.assertRaises(TypeError): + gen.constraints = lambda net: len(net) == 10 + + # def test_mean_degree_without_constraints(self): + # for net in self.erdos_renyi(10, 10, 0.5, directed=True): + # mean_degree = mean + # gen = topology.MeanDegree(net) + # for r in islice(gen, 10): + # neg.degree + # self.assertTrue(nx.is_isomorphic(net, r)) + + def erdos_renyi(self, num, *args, **kwargs): + return [nx.generators.random_graphs.erdos_renyi_graph(*args, **kwargs) for _ in range(num)]