Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 88 additions & 8 deletions packages/valory/connections/abci/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,79 @@ async def send(self, envelope: Envelope) -> None:
writer.write(data)


class MockServerChannel: # pylint: disable=too-many-instance-attributes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add the HTTP part

"""Mock server channel to handle local round-trips, mocking Tendermint behaviour."""

def __init__(
self,
target_skill_id: PublicId = PUBLIC_ID,
address: str = "local",
port: int = -1,
logger: Optional[Logger] = None,
):
"""
Initialize the mock server.

It's effectively a queue wrapped in the same interface as the other servers.

:param target_skill_id: the public id of the target skill.
:param address: the listen address.
:param port: the port to listen from.
:param logger: the logger.
"""
self.target_skill_id = target_skill_id
self.address = address
self.port = port
self.logger = logger or logging.getLogger()

# channel state
self._dialogues = AbciDialogues(connection_id=PUBLIC_ID)
self._is_stopped: bool = True
self.queue: Optional[asyncio.Queue] = None

@property
def is_stopped(self) -> bool:
"""Check that the channel is stopped."""
return self._is_stopped

async def connect(self, loop: AbstractEventLoop) -> None:
"""
Connect.

Upon TCP Channel connection, start the TCP Server asynchronously.

:param loop: asyncio event loop
"""
if not self._is_stopped: # pragma: nocover
return
self._is_stopped = False
self.queue = asyncio.Queue()

async def disconnect(self) -> None:
"""Disconnect the channel"""
if self.is_stopped: # pragma: nocover
return
self._is_stopped = True
self.queue = None

async def get_message(self) -> Envelope:
"""Get a message from the queue."""
# TOCHECK might have to swap to/from?
# TOCHECK do we need to update some dialogue?
Comment on lines +1095 to +1096
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole "tendermint mocking" is also missing here

return await cast(asyncio.Queue, self.queue).get()

async def send(self, envelope: Envelope) -> None:
"""Send a message."""
self.logger = cast(Logger, self.logger)
message = cast(AbciMessage, envelope.message)
dialogue = self._dialogues.update(message)
if dialogue is None: # pragma: nocover
self.logger.warning(f"Could not create dialogue for message={message}")
return
self.logger.debug(f"Adding dialogue to queue: {dialogue.incomplete_dialogue_label}")
await cast(asyncio.Queue, self.queue).put(envelope)


class StoppableThread(
Thread,
): # pragma: no cover (covered via deployments/Dockerfiles/tendermint/tendermint.py)
Expand Down Expand Up @@ -1301,7 +1374,7 @@ class ABCIServerConnection(Connection): # pylint: disable=too-many-instance-att
connection_id = PUBLIC_ID
params: Optional[TendermintParams] = None
node: Optional[TendermintNode] = None
channel: Optional[Union[TcpServerChannel, GrpcServerChannel]] = None
channel: Optional[Union[TcpServerChannel, GrpcServerChannel, MockServerChannel]] = None

def __init__(self, **kwargs: Any) -> None:
"""
Expand All @@ -1314,20 +1387,24 @@ def __init__(self, **kwargs: Any) -> None:
self._process_connection_params()
self._process_tendermint_params()

if self.use_grpc:
if self.use_grpc and not self.use_mock:
self.channel = GrpcServerChannel(
self.target_skill_id,
address=self.host,
port=self.port,
logger=self.logger,
)
else:
elif not self.use_grpc and not self.use_mock:
self.channel = TcpServerChannel(
self.target_skill_id,
address=self.host,
port=self.port,
logger=self.logger,
)
else:
self.channel = MockServerChannel(
logger=self.logger,
)

def _process_connection_params(self) -> None:
"""
Expand Down Expand Up @@ -1366,6 +1443,9 @@ def _process_tendermint_params(self) -> None:
self.use_tendermint = cast(
bool, self.configuration.config.get("use_tendermint")
)
self.use_mock = cast(
bool, self.configuration.config.get("use_mock")
)
self.use_grpc = cast(bool, self.configuration.config.get("use_grpc", False))

if not self.use_tendermint:
Expand Down Expand Up @@ -1399,7 +1479,7 @@ def _ensure_connected(self) -> None:
"""Ensure that the connection and the channel are ready."""
super()._ensure_connected()

self.channel = cast(Union[TcpServerChannel, GrpcServerChannel], self.channel)
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel, MockServerChannel], self.channel)
if self.channel.is_stopped:
raise ConnectionError("The channel is stopped.")

Expand All @@ -1413,7 +1493,7 @@ async def connect(self) -> None:
return

self.state = ConnectionStates.connecting
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel], self.channel)
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel, MockServerChannel], self.channel)
if self.use_tendermint:
self.node = cast(TendermintNode, self.node)
self.node.init()
Expand All @@ -1435,7 +1515,7 @@ async def disconnect(self) -> None:
return

self.state = ConnectionStates.disconnecting
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel], self.channel)
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel, MockServerChannel], self.channel)
await self.channel.disconnect()
if self.use_tendermint:
self.node = cast(TendermintNode, self.node)
Expand All @@ -1449,7 +1529,7 @@ async def send(self, envelope: Envelope) -> None:
:param envelope: the envelope to send.
"""
self._ensure_connected()
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel], self.channel)
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel, MockServerChannel], self.channel)
await self.channel.send(envelope)

async def receive(self, *args: Any, **kwargs: Any) -> Optional[Envelope]:
Expand All @@ -1461,7 +1541,7 @@ async def receive(self, *args: Any, **kwargs: Any) -> Optional[Envelope]:
:return: the envelope received, if present. # noqa: DAR202
"""
self._ensure_connected()
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel], self.channel)
self.channel = cast(Union[TcpServerChannel, GrpcServerChannel, MockServerChannel], self.channel)
try:
message = await self.channel.get_message()
return message
Expand Down
1 change: 1 addition & 0 deletions packages/valory/connections/abci/connection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ config:
home: null
consensus_create_empty_blocks: true
use_grpc: false
use_mock: false
use_tendermint: true
excluded_protocols: []
restricted_to_protocols: []
Expand Down