Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## todo
- Improve error message to include agent and module id in validation errors

## 0.8.2
- Realtime Environment always has a clock to prevent agents that define callbacks only from terminating
- Environment time in real time is now based on system time, decoupling it from the simpy process
Expand Down
61 changes: 41 additions & 20 deletions agentlib/core/agent.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""
Module containing only the Agent class.
"""

import json
import threading
from copy import deepcopy
from pathlib import Path
from typing import Union, List, Dict, TypeVar, Optional

from pathlib import Path
from pydantic import field_validator, BaseModel, FilePath, Field

import agentlib
Expand Down Expand Up @@ -36,7 +38,12 @@ class AgentConfig(BaseModel):
description="The ID of the Agent, should be unique in "
"the multi-agent-system the agent is living in.",
)
modules: List[Union[Dict, FilePath]] = None
modules: Union[List[Union[Dict, FilePath]], Dict[str, Union[Dict, FilePath]]] = (
Field(
default_factory=list,
description="A list or dictionary of modules. If a dictionary is provided, keys are treated as module_ids.",
)
)
check_alive_interval: float = Field(
title="check_alive_interval",
default=1,
Expand All @@ -51,22 +58,37 @@ class AgentConfig(BaseModel):
default=1000,
ge=-1,
description="Maximal number of waiting items in data-broker queues. "
"Set to -1 for infinity"
"Set to -1 for infinity",
)

@field_validator("modules")
@classmethod
def check_modules(cls, modules: List):
"""Validator to ensure all modules are in dict-format."""
def check_modules(cls, modules: Union[List, Dict]):
"""Validator to ensure all modules are in dict-format and include 'module_id'."""
modules_loaded = []
for module in modules:
if isinstance(module, (str, Path)):
if Path(module).exists():
with open(module, "r") as f:
module = json.load(f)
else:
module = json.loads(module)
modules_loaded.append(module)
if isinstance(modules, dict):
for module_id, module in modules.items():
if isinstance(module, (str, Path)):
Comment on lines +69 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we just support both and don't do deprication warnings? If so, you can ignore my comments below.

if Path(module).exists():
with open(module, "r") as f:
module = json.load(f)
else:
module = json.loads(module)
if isinstance(module, dict):
module = deepcopy(module)
module["module_id"] = module_id
modules_loaded.append(module)
elif isinstance(modules, list):
for module in modules:
if isinstance(module, (str, Path)):
if Path(module).exists():
with open(module, "r") as f:
module = json.load(f)
else:
module = json.loads(module)
modules_loaded.append(module)
else:
raise TypeError("modules must be a list or a dict")
return modules_loaded


Expand Down Expand Up @@ -94,21 +116,18 @@ def __init__(self, *, config, env: Environment):
)
if env.config.rt:
self._data_broker = RTDataBroker(
env=env, logger=data_broker_logger,
max_queue_size=config.max_queue_size
env=env, logger=data_broker_logger, max_queue_size=config.max_queue_size
)
self.register_thread(thread=self._data_broker.thread)
else:
self._data_broker = LocalDataBroker(
env=env, logger=data_broker_logger,
max_queue_size=config.max_queue_size
env=env, logger=data_broker_logger, max_queue_size=config.max_queue_size
)
# Update modules
self.config = config
# Setup logger
self.logger = agentlib_logging.create_logger(env=self.env, name=self.id)


# Register the thread monitoring if configured
if env.config.rt:
self.env.process(self._monitor_threads())
Expand Down Expand Up @@ -231,8 +250,10 @@ def _monitor_threads(self):
while True:
for name, thread in self._threads.items():
if not thread.is_alive():
msg = (f"The thread {name} is not alive anymore. Exiting agent. "
f"Check errors above for possible reasons")
msg = (
f"The thread {name} is not alive anymore. Exiting agent. "
f"Check errors above for possible reasons"
)
self.logger.critical(msg)
self.is_alive = False
raise RuntimeError(msg)
Expand Down
25 changes: 18 additions & 7 deletions agentlib/core/data_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def _execute_callbacks(self):
logger=self.logger,
queue_name="Callback-Distribution",
queue_object=self._variable_queue,
max_queue_size=self._max_queue_size
max_queue_size=self._max_queue_size,
)
_map_tuple = (variable.alias, variable.source)
# First the unmapped cbs
Expand Down Expand Up @@ -343,7 +343,9 @@ class LocalDataBroker(DataBroker):
"""Local variation of the DataBroker written for fast-as-possible
simulation within a single non-realtime Environment."""

def __init__(self, env: Environment, logger: CustomLogger, max_queue_size: int = 1000):
def __init__(
self, env: Environment, logger: CustomLogger, max_queue_size: int = 1000
):
"""
Initialize env
"""
Expand Down Expand Up @@ -380,7 +382,9 @@ def _run_callbacks(self, callbacks: List[BrokerCallback], variable: AgentVariabl
class RTDataBroker(DataBroker):
"""DataBroker written for Realtime operation regardless of Environment."""

def __init__(self, env: Environment, logger: CustomLogger, max_queue_size: int = 1000):
def __init__(
self, env: Environment, logger: CustomLogger, max_queue_size: int = 1000
):
"""
Initialize env.
Adds the function to start callback execution to the environment as a process.
Expand Down Expand Up @@ -412,7 +416,9 @@ def _callback_thread(self):
while True:
if not self._stop_queue.empty():
err, module_id = self._stop_queue.get()
raise RuntimeError(f"A callback failed in the module {module_id}.") from err
raise RuntimeError(
f"A callback failed in the module {module_id}."
) from err
self._execute_callbacks()

def register_callback(
Expand Down Expand Up @@ -464,11 +470,16 @@ def _run_callbacks(self, callbacks: List[BrokerCallback], variable: AgentVariabl
logger=self.logger,
queue_name=cb.module_id,
queue_object=self._module_queues[cb.module_id],
max_queue_size=self._max_queue_size
max_queue_size=self._max_queue_size,
)


def log_queue_status(logger: logging.Logger, queue_object: queue.Queue, max_queue_size: int, queue_name: str):
def log_queue_status(
logger: logging.Logger,
queue_object: queue.Queue,
max_queue_size: int,
queue_name: str,
):
"""
Log the current load of the given queue in percent.

Expand All @@ -492,5 +503,5 @@ def log_queue_status(logger: logging.Logger, queue_object: queue.Queue, max_queu
"Queue '%s' fullness is %s percent (%s items).",
queue_name,
percent_full,
number_of_items
number_of_items,
)
1 change: 1 addition & 0 deletions agentlib/core/datamodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
The datamodels module contains all classes
defining basic models to handle data.
"""

import abc
import functools
import json
Expand Down
4 changes: 2 additions & 2 deletions agentlib/core/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __new__(


def make_env_config(
config: Union[dict, EnvironmentConfig, str, None]
config: Union[dict, EnvironmentConfig, str, None],
) -> EnvironmentConfig:
"""Creates the environment config from different sources."""
if config is None:
Expand Down Expand Up @@ -166,7 +166,7 @@ def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
@property
def time(self) -> float:
"""Get the current system time as unix timestamp, with the enivronement
offset. """
offset."""
return time.time() + self.config.offset

def pretty_time(self) -> str:
Expand Down
8 changes: 4 additions & 4 deletions agentlib/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ class OptionalDependencyError(Exception):
"""

def __init__(
self,
used_object: str,
dependency_install: str,
dependency_name: str = None,
self,
used_object: str,
dependency_install: str,
dependency_name: str = None,
):
message = (
f"{used_object} is an optional dependency which you did not "
Expand Down
1 change: 1 addition & 0 deletions agentlib/core/model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module contains just the basic Model."""

import abc
import os
import json
Expand Down
19 changes: 13 additions & 6 deletions agentlib/core/module.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""This module contains the base AgentModule."""

from __future__ import annotations

import abc
import json
import logging
Expand All @@ -18,19 +20,19 @@

import pydantic
from pydantic import field_validator, ConfigDict, BaseModel, Field, PrivateAttr
from pydantic_core import core_schema
from pydantic.json_schema import GenerateJsonSchema
from pydantic_core import core_schema

from agentlib.core.environment import CustomSimpyEnvironment
from agentlib.core.errors import ConfigurationError
import agentlib.core.logging_ as agentlib_logging
from agentlib.core import datamodels
from agentlib.core.datamodels import (
AgentVariable,
Source,
AgentVariables,
AttrsToPydanticAdaptor,
)
from agentlib.core import datamodels
import agentlib.core.logging_ as agentlib_logging
from agentlib.core.environment import CustomSimpyEnvironment
from agentlib.core.errors import ConfigurationError
from agentlib.utils.fuzzy_matching import fuzzy_match, RAPIDFUZZ_IS_INSTALLED
from agentlib.utils.validators import (
include_defaults_in_root,
Expand Down Expand Up @@ -290,7 +292,12 @@ def __init__(self, _agent_id, *args, **kwargs):
super().__init__(*args, **kwargs)
except pydantic.ValidationError as e:
better_error = self._improve_extra_field_error_messages(e)
raise better_error
module_id = _user_config.get("module_id")
module_id_text = f" / module '{module_id}" if module_id is not None else ""
raise ConfigurationError(
f"Error in Configuration of agent '{_agent_id}{module_id_text}': \n {better_error}"
)

# Enable mutation
self.model_config["frozen"] = False
self._variables = self.__class__.merge_variables(
Expand Down
1 change: 1 addition & 0 deletions agentlib/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Package with available models for the agentlib"""

from agentlib.utils import plugin_import
from agentlib.utils.fuzzy_matching import fuzzy_match
from agentlib.utils.plugin_import import ModuleImport
Expand Down
41 changes: 24 additions & 17 deletions agentlib/models/fmu_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module contains the FMUModel class."""

import queue
import shutil
import os
Expand Down Expand Up @@ -234,23 +235,29 @@ def __init_fmu(self) -> fmpy.fmi2.FMU2Slave:
dict(
name=_model_var.name,
type=_model_var.type,
value=self._converter(_model_var.type, _model_var.start)
if (
_model_var.causality
in [
Causality.parameter,
Causality.calculatedParameter,
Causality.input,
]
and _model_var.start is not None
)
else None,
unit=_model_var.unit
if _model_var.unit is not None
else attrs.fields(ModelVariable).unit.default,
description=_model_var.description
if _model_var.description is not None
else attrs.fields(ModelVariable).description.default,
value=(
self._converter(_model_var.type, _model_var.start)
if (
_model_var.causality
in [
Causality.parameter,
Causality.calculatedParameter,
Causality.input,
]
and _model_var.start is not None
)
else None
),
unit=(
_model_var.unit
if _model_var.unit is not None
else attrs.fields(ModelVariable).unit.default
),
description=(
_model_var.description
if _model_var.description is not None
else attrs.fields(ModelVariable).description.default
),
causality=_model_var.causality,
variability=_model_var.variability,
)
Expand Down
1 change: 1 addition & 0 deletions agentlib/models/scipy_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module contains the ScipyStateSpaceModel class."""

import logging
from typing import Union
from pydantic import ValidationError, model_validator
Expand Down
1 change: 1 addition & 0 deletions agentlib/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Use the helper functions get_module_type
to load module classes from this package.
"""

from typing import Union, Dict, Iterable, List

from agentlib.utils.fuzzy_matching import fuzzy_match
Expand Down
1 change: 1 addition & 0 deletions agentlib/modules/communicator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Package contains all modules to communicate messages with
"""

from agentlib.utils.plugin_import import ModuleImport


Expand Down
Loading