Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,270 changes: 648 additions & 622 deletions api/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ ignore = [
"./tests/v0/mockers/*.py" = ["C901"] # Allow high code complexity in mockers
"./src/v0/database/client.py" = ["B024"] # Allow Abstract class without abstractmethod
"./src/v0/services/base.py" = ["B024"] # Allow Abstract class without abstractmethod
"./src/services/classes/directed_graph.py" = ["B024"] # Allow Abstract class without abstractmethod
"./src/v0/services/classes/abstract_directed_graph.py" = ["B024"] # Allow Abstract class without abstractmethod

[tool.codespell]
skip = "*.lock"
Expand Down
2 changes: 1 addition & 1 deletion api/src/v0/models/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class DecisionTreeResponse(DOTModel):
"name": "Joe can test the car",
"shortname": "Test",
"uuid": "ad651f50-22de-4f85-a560-bf5fb2d9f706",
"alternatives": ['"Test"', '" no Test"'],
"alternatives": ['"Test"', '"no Test"'],
},
"children": [
{
Expand Down
168 changes: 168 additions & 0 deletions api/src/v0/services/analysis/id_to_dt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""
Conversion of influence diagram to decision tree.
The decision tree format is used for display in the frontend.
"""

from src.v0.services.classes.arc import Arc
from src.v0.services.classes.decision_tree import DecisionTree
from src.v0.services.classes.influence_diagram import InfluenceDiagram
from src.v0.services.classes.node import NodeABC, UtilityNode
from src.v0.services.errors import PartialOrderOutputModeError


class InfluenceDiagramToDecisionTree:
def decision_elimination_order(
self, influence_diagram: InfluenceDiagram
) -> list[NodeABC]:
"""Decision Elimination Order algorithm

Args:
influence_diagram (InfluenceDiagram): the influence diagram object
to convert

Returns:
list[NodeABC] : the decision elimination order graph associated to
the influence diagram. Nodes in the list are copies of the
nodes of the influence diagram ones.

TODO: add description of what is the algorithm about
"""
cid_copy = influence_diagram.copy()
decisions = []
decisions_count = cid_copy.decision_count
while decisions_count > 0:
nodes = list(cid_copy.graph.nodes())
for node in nodes:
if not cid_copy.has_children(node):
if node.is_decision_node:
decisions.append(node)
decisions_count -= 1
cid_copy.graph.remove_node(node)
return decisions

def calculate_partial_order(
self, influence_diagram: InfluenceDiagram, *, mode="view"
) -> list[NodeABC]:
"""Partial order algorithm


Args:
influence_diagram (InfluenceDiagram): the influence diagram object
to convert
mode (str): ["view"(default)|"copy"]
returns a view or a copy of the nodes

Returns
List[NodeABC]: list of nodes (copies or vioews) sorted in decision order

TODO: add description of what the algorithm is about
TODO: handle utility nodes
"""
if mode not in ["view", "copy"]:
raise PartialOrderOutputModeError(mode)

# get all chance nodes
uncertainty_node = influence_diagram.get_uncertainty_nodes()
elimination_order = self.decision_elimination_order(influence_diagram)
# TODO: Add utility nodes
partial_order = []

while elimination_order:
decision = elimination_order.pop()
parent_decision_nodes = []
for parent in influence_diagram.get_parents(decision):
if not parent.is_decision_node:
if parent in uncertainty_node:
parent_decision_nodes.append(parent)
uncertainty_node.remove(parent)

if len(parent_decision_nodes) > 0:
partial_order += parent_decision_nodes
partial_order.append(decision)

partial_order += uncertainty_node

if mode == "copy":
partial_order = [node.copy() for node in partial_order]

return partial_order

def _output_branches_from_node(
self, node: NodeABC, node_in_partial_order: NodeABC, flip=True
) -> list[tuple[Arc, NodeABC]]:
"""Make a list of output branches from a node

This method actually returns the states of the nodes.

Args
node (NodeABC): node to find the output branch from
node_in_partial_order (NodeABC): associated node in the partial order - to
keep reference too
flip (bool): if True (default), flip the list of branches so a generated
decision tree is in the same order as the entered states.
If False, the tree will be flipped horizontally.

Returns
List: the list of tuples (Arc, Node in partial order)
The edges have the input node as start endpoint and name given by the
state
"""
if node.is_utility_node:
tree_stack = [
Arc(tail=node, head=None, label=utility) for utility in node.utility
]
if node.is_decision_node:
tree_stack = [
Arc(tail=node, head=None, label=alternative)
for alternative in node.alternatives
]
if node.is_uncertainty_node:
# This needs to be re-written according to the way we deal with probabilities
tree_stack = [
Arc(tail=node, head=None, label=outcome) for outcome in node.outcomes
]
if flip:
tree_stack.reverse()

return zip(tree_stack, [node_in_partial_order] * len(tree_stack), strict=False)

def conversion(self, influence_diagram: InfluenceDiagram) -> DecisionTree:
"""Convert the influence diagram into a DecisionTree object

Returns:
DecisionTree: The symmetric decision tree equivalent to the influence diagram

TODO: Update ID2DT according to way we deal with probabilities
"""
partial_order = self.calculate_partial_order(influence_diagram)
root_node = partial_order[0]
# decision_tree = DecisionTree.initialize_with_root(root_node)
decision_tree = DecisionTree(root=root_node)
# tree_stack contains views of the partial order nodes
# decision_tree contains copy of the nodes (as they appear several times)
tree_stack = [(root_node, root_node)]

while tree_stack:
element = tree_stack.pop()

if isinstance(element[0], NodeABC):
tree_stack += self._output_branches_from_node(*element)

else: # element is a branch
tail_index = partial_order.index(element[1])

if tail_index < len(partial_order) - 1:
head = partial_order[tail_index + 1].copy()
tree_stack.append((head, partial_order[tail_index + 1]))
else:
# head = UtilityNode(
# name=element[0].name, tag=element[0].name.lower()
# )
head = UtilityNode(shortname="ut", description="Utility")

element[0].head = head
decision_tree.add_arc(
element[0]
) # node is added when the branch is added

return decision_tree
137 changes: 137 additions & 0 deletions api/src/v0/services/analysis/id_to_pyagrum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""
Conversion of influence diagram to pyAgrum format.

.. seealso:
pyAgrum documentation https://pyagrum.readthedocs.io/
"""

from itertools import product

import pyAgrum as gum

from src.v0.services.classes.discrete_conditional_probability import (
DiscreteConditionalProbability,
)
from src.v0.services.classes.discrete_unconditional_probability import (
DiscreteUnconditionalProbability,
)
from src.v0.services.classes.influence_diagram import InfluenceDiagram
from src.v0.services.classes.node import (
DecisionNode,
UncertaintyNode,
UtilityNode,
)
from src.v0.services.errors import (
ArcPyAgrumFormatError,
InfluenceDiagramNotAcyclicError,
ProbabilityPyAgrumFormatError,
)


class InfluenceDiagramToPyAgrum:
def probabilities_conversion(self, probability):
if isinstance(probability, DiscreteConditionalProbability):
return self.conditional_probabilities_conversion(probability)
if isinstance(probability, DiscreteUnconditionalProbability):
return self.unconditional_probabilities_conversion(probability)

def unconditional_probabilities_conversion(self, probability):
variables = probability.variables
if len(variables) != 1:
raise ProbabilityPyAgrumFormatError(variables)
return [
(
{},
[
probability._cpt.sel(**{variables[0]: state})
for state in probability.outcomes
],
)
]

def conditional_probabilities_conversion(self, probability):
# agrum = list()
coords = probability._cpt.coords
variables = {
key: coords[key].data.tolist() for key in coords if key is not coords.dims[0]
}
agrum_dict = {k: list(range(len(v))) for k, v in variables.items()}
agrum_dict = [
dict(zip(agrum_dict.keys(), values, strict=False))
for values in product(*agrum_dict.values())
]
agrum_prob = [
probability.get_distribution(
**{key: coords[key][val] for key, val in item.items()}
).data.tolist()
for item in agrum_dict
]
agrum = list(zip(agrum_dict, agrum_prob, strict=False))
return agrum

def nodes_conversion(self, nodes, gum_id):
# create an uuid for gum as 8 bytes integer and keep relation to uuid
uuid_dot_to_gum = {}
uuid_gum_to_dot = {}
node_uuid = {}

for node in nodes:
labelized_variables = [node.shortname, node.description]
if isinstance(node, UncertaintyNode):
try:
labelized_variables.append(node.outcomes)
variable_id = gum_id.addChanceNode(
gum.LabelizedVariable(*labelized_variables)
)
except Exception as e:
raise ProbabilityPyAgrumFormatError(e)
elif isinstance(node, DecisionNode):
# This works even when alternatives are [""] or None
labelized_variables.append(node.alternatives)
variable_id = gum_id.addDecisionNode(
gum.LabelizedVariable(*labelized_variables)
)
elif isinstance(node, UtilityNode):
# Utility not yet implemented
labelized_variables.append(1)
variable_id = gum_id.addUtilityNode(
gum.LabelizedVariable(*labelized_variables)
)

uuid_dot_to_gum[node.uuid] = variable_id
uuid_gum_to_dot[variable_id] = node.uuid
node_uuid[variable_id] = node

return uuid_dot_to_gum, uuid_gum_to_dot, node_uuid

def arcs_conversion(self, arcs, gum_id, uuid_dot_to_gum):
for arc in arcs:
tail = uuid_dot_to_gum[arc.tail.uuid]
head = uuid_dot_to_gum[arc.head.uuid]
try:
gum_id.addArc(tail, head)
except Exception as e:
raise ArcPyAgrumFormatError(e)
return None

def conversion(self, influence_diagram: InfluenceDiagram):
if not influence_diagram.is_acyclic:
raise InfluenceDiagramNotAcyclicError(False)

gum_id = gum.InfluenceDiagram()
variable_id = []

uuid_dot_to_gum, uuid_gum_to_dot, node_uuid = self.nodes_conversion(
influence_diagram.nodes, gum_id
)
# Add head and tail in gum_id
self.arcs_conversion(influence_diagram.arcs, gum_id, uuid_dot_to_gum)

for variable_id in uuid_gum_to_dot:
if isinstance(node_uuid[variable_id], UncertaintyNode):
for agrum_prob in self.probabilities_conversion(
node_uuid[variable_id].probability
):
gum_id.cpt(variable_id)[agrum_prob[0]] = agrum_prob[1]

return gum_id
37 changes: 37 additions & 0 deletions api/src/v0/services/class_validations/validate_and_set_arc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Any
from uuid import UUID, uuid4

from src.v0.services.classes.node import NodeABC
from src.v0.services.errors import (
ArcLabelValidationError,
EndPointValidationError,
UUIDValidationError,
)


def label(arg: Any) -> str:
if not (isinstance(arg, str) or arg is None):
raise ArcLabelValidationError(arg)
return arg


def edge(arg: Any) -> NodeABC:
if not (isinstance(arg, NodeABC) or arg is None):
raise EndPointValidationError(arg)
return arg


def uuid(arg: Any) -> str:
if not (isinstance(arg, str | UUID) or arg is None):
raise UUIDValidationError(arg)
if arg is None:
return str(uuid4())
if isinstance(arg, UUID) and arg.version == 4:
return str(arg)
try: # if arg is str
uuid_obj = UUID(arg)
if uuid_obj.version == 4:
return arg
except Exception as e:
raise UUIDValidationError(e)
raise UUIDValidationError(f"version {uuid_obj.version}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Any

from src.v0.services.classes.arc import Arc
from src.v0.services.classes.node import (
DecisionNode,
NodeABC,
UncertaintyNode,
UtilityNode,
)
from src.v0.services.errors import (
ArcTypeValidationError,
DTNodeTypeValidationError,
IDNodeTypeValidationError,
)


def id_node(arg: Any) -> DecisionNode | UncertaintyNode | UtilityNode:
if not isinstance(arg, DecisionNode | UncertaintyNode | UtilityNode):
raise IDNodeTypeValidationError(arg)
return arg


def dt_node(arg: Any) -> DecisionNode | UncertaintyNode | UtilityNode:
if not isinstance(arg, DecisionNode | UncertaintyNode | UtilityNode):
raise DTNodeTypeValidationError(arg)
return arg


def arc_to_graph(arg: Any) -> tuple[tuple[NodeABC, NodeABC], dict]:
if not isinstance(arg, Arc):
raise ArcTypeValidationError(arg)
return (arg.tail, arg.head), {
"dtype": arg.dtype,
"label": arg.label,
"uuid": arg.uuid,
}
Loading
Loading