diff --git a/acapy_agent/anoncreds/constants.py b/acapy_agent/anoncreds/constants.py new file mode 100644 index 0000000000..dbc0a89181 --- /dev/null +++ b/acapy_agent/anoncreds/constants.py @@ -0,0 +1,21 @@ +"""Constants for AnonCreds.""" + +DEFAULT_CRED_DEF_TAG = "default" +DEFAULT_MAX_CRED_NUM = 1000 +DEFAULT_SIGNATURE_TYPE = "CL" + +CATEGORY_SCHEMA = "schema" + +CATEGORY_CRED_DEF = "credential_def" +CATEGORY_CRED_DEF_PRIVATE = "credential_def_private" +CATEGORY_CRED_DEF_KEY_PROOF = "credential_def_key_proof" + +CATEGORY_REV_LIST = "revocation_list" +CATEGORY_REV_REG_DEF = "revocation_reg_def" +CATEGORY_REV_REG_DEF_PRIVATE = "revocation_reg_def_private" + +STATE_FINISHED = "finished" +STATE_REVOCATION_PENDING = "pending" +STATE_REVOCATION_POSTED = "posted" + +REV_REG_DEF_STATE_ACTIVE = "active" diff --git a/acapy_agent/anoncreds/default/legacy_indy/registry.py b/acapy_agent/anoncreds/default/legacy_indy/registry.py index e66569eea7..4fd2810dab 100644 --- a/acapy_agent/anoncreds/default/legacy_indy/registry.py +++ b/acapy_agent/anoncreds/default/legacy_indy/registry.py @@ -53,6 +53,11 @@ BaseAnonCredsRegistrar, BaseAnonCredsResolver, ) +from ...constants import ( + CATEGORY_REV_LIST, + CATEGORY_REV_REG_DEF, + CATEGORY_REV_REG_DEF_PRIVATE, +) from ...events import RevListFinishedEvent from ...issuer import CATEGORY_CRED_DEF, AnonCredsIssuer, AnonCredsIssuerError from ...models.credential_definition import ( @@ -76,11 +81,6 @@ ) from ...models.schema import AnonCredsSchema, GetSchemaResult, SchemaResult, SchemaState from ...models.schema_info import AnonCredsSchemaInfo -from ...revocation import ( - CATEGORY_REV_LIST, - CATEGORY_REV_REG_DEF, - CATEGORY_REV_REG_DEF_PRIVATE, -) from .recover import generate_ledger_rrrecovery_txn LOGGER = logging.getLogger(__name__) diff --git a/acapy_agent/anoncreds/issuer.py b/acapy_agent/anoncreds/issuer.py index c1fbc72808..78f1e1a16c 100644 --- a/acapy_agent/anoncreds/issuer.py +++ b/acapy_agent/anoncreds/issuer.py @@ -22,6 +22,16 @@ from ..database_manager.db_errors import DBError from ..protocols.endorse_transaction.v1_0.util import is_author_role from .base import AnonCredsSchemaAlreadyExists, BaseAnonCredsError +from .constants import ( + CATEGORY_CRED_DEF, + CATEGORY_CRED_DEF_KEY_PROOF, + CATEGORY_CRED_DEF_PRIVATE, + CATEGORY_SCHEMA, + DEFAULT_CRED_DEF_TAG, + DEFAULT_MAX_CRED_NUM, + DEFAULT_SIGNATURE_TYPE, + STATE_FINISHED, +) from .error_messages import ANONCREDS_PROFILE_REQUIRED_MSG from .events import CredDefFinishedEvent from .models.credential_definition import CredDef, CredDefResult @@ -30,15 +40,6 @@ LOGGER = logging.getLogger(__name__) -DEFAULT_CRED_DEF_TAG = "default" -DEFAULT_SIGNATURE_TYPE = "CL" -DEFAULT_MAX_CRED_NUM = 1000 -CATEGORY_SCHEMA = "schema" -CATEGORY_CRED_DEF = "credential_def" -CATEGORY_CRED_DEF_PRIVATE = "credential_def_private" -CATEGORY_CRED_DEF_KEY_PROOF = "credential_def_key_proof" -STATE_FINISHED = "finished" - EVENT_PREFIX = "acapy::anoncreds::" EVENT_SCHEMA = EVENT_PREFIX + CATEGORY_SCHEMA EVENT_CRED_DEF = EVENT_PREFIX + CATEGORY_CRED_DEF @@ -406,6 +407,7 @@ async def store_credential_definition( ) -> None: """Store the cred def and it's components in the wallet.""" options = options or {} + identifier = ( cred_def_result.job_id or cred_def_result.credential_definition_state.credential_definition_id @@ -444,6 +446,7 @@ async def store_credential_definition( CATEGORY_CRED_DEF_KEY_PROOF, identifier, key_proof.to_json_buffer() ) await txn.commit() + if cred_def_result.credential_definition_state.state == STATE_FINISHED: await self.notify( CredDefFinishedEvent.with_payload( @@ -463,6 +466,8 @@ async def finish_cred_def( self, job_id: str, cred_def_id: str, options: Optional[dict] = None ) -> None: """Finish a cred def.""" + options = options or {} + async with self.profile.transaction() as txn: entry = await self._finish_registration( txn, CATEGORY_CRED_DEF, job_id, cred_def_id diff --git a/acapy_agent/anoncreds/revocation/__init__.py b/acapy_agent/anoncreds/revocation/__init__.py index 4e23c7abc0..65b3ab7043 100644 --- a/acapy_agent/anoncreds/revocation/__init__.py +++ b/acapy_agent/anoncreds/revocation/__init__.py @@ -7,9 +7,6 @@ from .manager import RevocationManager, RevocationManagerError from .recover import RevocRecoveryException, fetch_txns, generate_ledger_rrrecovery_txn from .revocation import ( - CATEGORY_REV_LIST, - CATEGORY_REV_REG_DEF, - CATEGORY_REV_REG_DEF_PRIVATE, AnonCredsRevocation, AnonCredsRevocationError, AnonCredsRevocationRegistryFullError, @@ -17,9 +14,6 @@ from .revocation_setup import DefaultRevocationSetup __all__ = [ - "CATEGORY_REV_LIST", - "CATEGORY_REV_REG_DEF", - "CATEGORY_REV_REG_DEF_PRIVATE", "AnonCredsRevocation", "AnonCredsRevocationError", "AnonCredsRevocationRegistryFullError", diff --git a/acapy_agent/anoncreds/revocation/revocation.py b/acapy_agent/anoncreds/revocation/revocation.py index b84fb221d2..747c980e20 100644 --- a/acapy_agent/anoncreds/revocation/revocation.py +++ b/acapy_agent/anoncreds/revocation/revocation.py @@ -32,14 +32,17 @@ from ...database_manager.db_errors import DBError from ...kanon.profile_anon_kanon import KanonAnonCredsProfileSession # type: ignore from ...tails.anoncreds_tails_server import AnonCredsTailsServer -from ..error_messages import ANONCREDS_PROFILE_REQUIRED_MSG -from ..events import RevListFinishedEvent, RevRegDefFinishedEvent -from ..issuer import ( +from ..constants import ( CATEGORY_CRED_DEF, CATEGORY_CRED_DEF_PRIVATE, + CATEGORY_REV_LIST, + CATEGORY_REV_REG_DEF, + CATEGORY_REV_REG_DEF_PRIVATE, STATE_FINISHED, - AnonCredsIssuer, ) +from ..error_messages import ANONCREDS_PROFILE_REQUIRED_MSG +from ..events import RevListFinishedEvent, RevRegDefFinishedEvent +from ..issuer import AnonCredsIssuer from ..models.credential_definition import CredDef from ..models.revocation import ( RevList, @@ -54,12 +57,9 @@ LOGGER = logging.getLogger(__name__) -CATEGORY_REV_LIST = "revocation_list" -CATEGORY_REV_REG_DEF = "revocation_reg_def" -CATEGORY_REV_REG_DEF_PRIVATE = "revocation_reg_def_private" -STATE_REVOCATION_POSTED = "posted" -STATE_REVOCATION_PENDING = "pending" -REV_REG_DEF_STATE_ACTIVE = "active" +REVOCATION_REGISTRY_CREATION_TIMEOUT = float( + os.getenv("REVOCATION_REGISTRY_CREATION_TIMEOUT", "60.0") +) class AnonCredsRevocationError(BaseError): @@ -389,6 +389,73 @@ async def set_active_registry(self, rev_reg_def_id: str) -> None: ) await txn.commit() + async def wait_for_active_revocation_registry(self, cred_def_id: str) -> None: + """Wait for revocation registry setup to complete. + + Polls for the creation of revocation registry definitions until we have + the 1 active registry or timeout occurs. + + Args: + cred_def_id: The credential definition ID + + Raises: + TimeoutError: If timeout occurs before completion + + """ + LOGGER.debug( + "Waiting for revocation setup completion for cred_def_id: %s", cred_def_id + ) + + expected_count = 1 # Active registry + poll_interval = 0.5 # Poll every 500ms + max_iterations = int(REVOCATION_REGISTRY_CREATION_TIMEOUT / poll_interval) + registries = [] + + for _iteration in range(max_iterations): + try: + # Check for finished revocation registry definitions + async with self.profile.session() as session: + registries = await session.handle.fetch_all( + CATEGORY_REV_REG_DEF, + {"cred_def_id": cred_def_id, "active": "true"}, + ) + + current_count = len(registries) + LOGGER.debug( + "Revocation setup progress for %s: %d/%d registries active", + cred_def_id, + current_count, + expected_count, + ) + + if current_count >= expected_count: + LOGGER.info( + "Revocation setup completed for cred_def_id: %s " + "(%d registries active)", + cred_def_id, + current_count, + ) + return + + except Exception as e: + LOGGER.warning( + "Error checking revocation setup progress for %s: %s", cred_def_id, e + ) + # Continue polling despite errors - they might be transient + + await asyncio.sleep(poll_interval) # Wait before next poll + + # Timeout occurred + current_count = len(registries) + + raise TimeoutError( + "Timeout waiting for revocation setup completion for credential definition " + f"{cred_def_id}. Expected {expected_count} revocation registries, but " + f"{current_count} were active within {REVOCATION_REGISTRY_CREATION_TIMEOUT} " + "seconds. Note: Revocation registry creation may still be in progress in the " + "background. You can check status using the revocation registry endpoints." + ) + async def create_and_register_revocation_list( self, rev_reg_def_id: str, options: Optional[dict] = None ) -> RevListResult: @@ -1151,7 +1218,14 @@ async def _create_credential_helper( rev_reg_def_result = None if revocable: - rev_reg_def_result = await self.get_or_create_active_registry(cred_def_id) + try: + rev_reg_def_result = await self.get_or_create_active_registry( + cred_def_id + ) + except AnonCredsRevocationError: + # No active registry, try again + continue + if ( rev_reg_def_result.revocation_registry_definition_state.state != STATE_FINISHED diff --git a/acapy_agent/anoncreds/revocation/revocation_setup.py b/acapy_agent/anoncreds/revocation/revocation_setup.py index 60d74efcf9..ceb91393cf 100644 --- a/acapy_agent/anoncreds/revocation/revocation_setup.py +++ b/acapy_agent/anoncreds/revocation/revocation_setup.py @@ -79,6 +79,10 @@ async def on_cred_def(self, profile: Profile, event: CredDefFinishedEvent) -> No options=payload.options, ) + if event.payload.options.get("wait_for_revocation_setup"): + # Wait for registry activation, if configured to do so + await revoc.wait_for_active_revocation_registry(payload.cred_def_id) + async def on_rev_reg_def( self, profile: Profile, event: RevRegDefFinishedEvent ) -> None: diff --git a/acapy_agent/anoncreds/revocation/tests/test_wait_for_revocation_setup.py b/acapy_agent/anoncreds/revocation/tests/test_wait_for_revocation_setup.py new file mode 100644 index 0000000000..ecaced5899 --- /dev/null +++ b/acapy_agent/anoncreds/revocation/tests/test_wait_for_revocation_setup.py @@ -0,0 +1,437 @@ +"""Tests for credential definition creation with wait_for_revocation_setup options.""" + +import json +from unittest import IsolatedAsyncioTestCase + +import pytest + +from ....tests import mock +from ....utils.testing import create_test_profile +from ...issuer import AnonCredsIssuer +from ...models.credential_definition import CredDef +from ..revocation import AnonCredsRevocation +from ..revocation_setup import DefaultRevocationSetup + + +@pytest.mark.anoncreds +class TestAnonCredsIssuerWaitForRevocation(IsolatedAsyncioTestCase): + """Tests for wait_for_revocation_setup functionality.""" + + async def asyncSetUp(self) -> None: + """Set up test environment.""" + self.profile = await create_test_profile( + settings={"wallet.type": "askar-anoncreds"}, + ) + self.issuer = AnonCredsIssuer(self.profile) + + @mock.patch.object(AnonCredsIssuer, "notify") + async def test_finish_cred_def_passes_options(self, mock_notify): + """Test finish_cred_def method passes options correctly to the event.""" + # Mock transaction and entry data + mock_entry = mock.MagicMock() + mock_entry.value = json.dumps( + {"issuer_id": "issuer-id", "schema_id": "schema-id"} + ) + mock_entry.tags = {"support_revocation": "True", "max_cred_num": "1000"} + + self.profile.transaction = mock.Mock( + return_value=mock.MagicMock( + commit=mock.CoroutineMock(), + ) + ) + + with mock.patch.object( + self.issuer, "_finish_registration", return_value=mock_entry + ): + with mock.patch.object(CredDef, "from_json") as mock_from_json: + mock_cred_def = mock.MagicMock() + mock_cred_def.schema_id = "schema-id" + mock_cred_def.issuer_id = "issuer-id" + mock_from_json.return_value = mock_cred_def + + await self.issuer.finish_cred_def( + job_id="job-id", + cred_def_id="cred-def-id", + options={"wait_for_revocation_setup": True}, + ) + + # Should notify with correct parameters including options + mock_notify.assert_called_once() + call_args = mock_notify.call_args[0][0] # Get the event passed to notify + assert call_args.payload.cred_def_id == "cred-def-id" + assert call_args.payload.support_revocation is True + assert call_args.payload.options["wait_for_revocation_setup"] is True + + async def test_event_handler_respects_wait_option(self): + """Test that the event handler respects the wait_for_revocation_setup option. + + This is a basic integration test to verify the event handler behavior. + More comprehensive tests should be added to the revocation setup module. + """ + # Create event handler + setup_manager = DefaultRevocationSetup() + + # Create mock event with wait_for_revocation_setup=False + mock_payload = mock.MagicMock() + mock_payload.support_revocation = True + mock_payload.cred_def_id = "test-cred-def-id" + mock_payload.issuer_id = "test-issuer-id" + mock_payload.max_cred_num = 1000 + mock_payload.options = {"wait_for_revocation_setup": False} + + event = mock.MagicMock() + event.payload = mock_payload + + # Mock the AnonCredsRevocation class + with mock.patch( + "acapy_agent.anoncreds.revocation.revocation_setup.AnonCredsRevocation" + ) as mock_revocation_class: + mock_revocation = mock_revocation_class.return_value + mock_revocation.create_and_register_revocation_registry_definition = ( + mock.CoroutineMock() + ) + mock_revocation.wait_for_active_revocation_registry = mock.CoroutineMock() + + # Call the event handler + await setup_manager.on_cred_def(self.profile, event) + + # Should create registries but not wait + assert ( + mock_revocation.create_and_register_revocation_registry_definition.call_count + == 2 + ) + mock_revocation.wait_for_active_revocation_registry.assert_not_called() + + async def test_event_handler_waits_when_configured(self): + """Test that the event handler waits when wait_for_revocation_setup=True.""" + # Create event handler + setup_manager = DefaultRevocationSetup() + + # Create mock event with wait_for_revocation_setup=True + mock_payload = mock.MagicMock() + mock_payload.support_revocation = True + mock_payload.cred_def_id = "test-cred-def-id" + mock_payload.issuer_id = "test-issuer-id" + mock_payload.max_cred_num = 1000 + mock_payload.options = {"wait_for_revocation_setup": True} + + event = mock.MagicMock() + event.payload = mock_payload + + # Mock the AnonCredsRevocation class + with mock.patch( + "acapy_agent.anoncreds.revocation.revocation_setup.AnonCredsRevocation" + ) as mock_revocation_class: + mock_revocation = mock_revocation_class.return_value + mock_revocation.create_and_register_revocation_registry_definition = ( + mock.CoroutineMock() + ) + mock_revocation.wait_for_active_revocation_registry = mock.CoroutineMock() + + # Call the event handler + await setup_manager.on_cred_def(self.profile, event) + + # Should create registries AND wait + assert ( + mock_revocation.create_and_register_revocation_registry_definition.call_count + == 2 + ) + mock_revocation.wait_for_active_revocation_registry.assert_called_once_with( + "test-cred-def-id" + ) + + +class TestAnonCredsRevocationWaitMethod(IsolatedAsyncioTestCase): + """Test AnonCredsRevocation.wait_for_active_revocation_registry method.""" + + async def asyncSetUp(self): + """Set up test environment.""" + self.profile = await create_test_profile( + settings={"wallet.type": "askar-anoncreds"} + ) + self.revocation = AnonCredsRevocation(self.profile) + self.cred_def_id = "test-cred-def-id" + + async def test_immediate_success_registry_already_active(self): + """Test immediate success when registry is already active.""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None # Make sleep instant + + # Mock the session and database query + mock_session_context = mock.MagicMock() + mock_session = mock.MagicMock() + mock_session.handle.fetch_all = mock.CoroutineMock( + return_value=[{"id": "reg1"}] # 1 active registry + ) + mock_session_context.__aenter__ = mock.CoroutineMock( + return_value=mock_session + ) + mock_session_context.__aexit__ = mock.CoroutineMock(return_value=None) + + with mock.patch.object( + self.profile, "session", return_value=mock_session_context + ): + # Should complete immediately without timeout + await self.revocation.wait_for_active_revocation_registry( + self.cred_def_id + ) + + # Should only query once + mock_session.handle.fetch_all.assert_called_once_with( + "revocation_reg_def", + {"cred_def_id": self.cred_def_id, "active": "true"}, + ) + # Should not need to sleep + mock_sleep.assert_not_called() + + async def test_success_after_polling(self): + """Test successful completion after some polling iterations.""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None # Make sleep instant + + # Mock the session and database query + mock_session_context = mock.MagicMock() + mock_session = mock.MagicMock() + # First 2 calls return empty, 3rd call returns 1 active registry + mock_session.handle.fetch_all = mock.CoroutineMock( + side_effect=[[], [], [{"id": "reg1"}]] + ) + mock_session_context.__aenter__ = mock.CoroutineMock( + return_value=mock_session + ) + mock_session_context.__aexit__ = mock.CoroutineMock(return_value=None) + + with mock.patch.object( + self.profile, "session", return_value=mock_session_context + ): + await self.revocation.wait_for_active_revocation_registry( + self.cred_def_id + ) + + # Should have queried 3 times + assert mock_session.handle.fetch_all.call_count == 3 + # Should have slept twice (after 1st and 2nd empty results) + assert mock_sleep.call_count == 2 + mock_sleep.assert_called_with(0.5) + + async def test_timeout_no_active_registries(self): + """Test timeout when no registries become active.""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None # Make sleep instant + + # Mock the session and database query + mock_session_context = mock.MagicMock() + mock_session = mock.MagicMock() + mock_session.handle.fetch_all = mock.CoroutineMock( + return_value=[] # No active registries + ) + mock_session_context.__aenter__ = mock.CoroutineMock( + return_value=mock_session + ) + mock_session_context.__aexit__ = mock.CoroutineMock(return_value=None) + + # Set a very short timeout for testing + with mock.patch( + "acapy_agent.anoncreds.revocation.revocation.REVOCATION_REGISTRY_CREATION_TIMEOUT", + 1.0, + ): + with mock.patch.object( + self.profile, "session", return_value=mock_session_context + ): + with self.assertRaises(TimeoutError) as exc_context: + await self.revocation.wait_for_active_revocation_registry( + self.cred_def_id + ) + + # Check error message content + error_message = str(exc_context.exception) + assert ( + "Timeout waiting for revocation setup completion" in error_message + ) + assert self.cred_def_id in error_message + assert "Expected 1 revocation registries" in error_message + assert "still be in progress in the background" in error_message + + # Should have polled multiple times (1.0s timeout / 0.5s interval = 2 iterations) + assert mock_session.handle.fetch_all.call_count == 2 + + async def test_polling_with_transient_errors_then_success(self): + """Test that polling continues despite transient database errors.""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None # Make sleep instant + + # Mock the session and database query + mock_session_context = mock.MagicMock() + mock_session = mock.MagicMock() + # Simulate: error, error, success + mock_session.handle.fetch_all = mock.CoroutineMock( + side_effect=[ + Exception("Database connection error"), + Exception("Temporary network issue"), + [{"id": "reg1"}], # Success on 3rd attempt + ] + ) + mock_session_context.__aenter__ = mock.CoroutineMock( + return_value=mock_session + ) + mock_session_context.__aexit__ = mock.CoroutineMock(return_value=None) + + with mock.patch.object( + self.profile, "session", return_value=mock_session_context + ): + await self.revocation.wait_for_active_revocation_registry( + self.cred_def_id + ) + + # Should have retried despite errors + assert mock_session.handle.fetch_all.call_count == 3 + # Should have slept after each error + assert mock_sleep.call_count == 2 + + async def test_multiple_active_registries(self): + """Test success when multiple registries are active (more than expected).""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None # Make sleep instant + + # Mock the session and database query + mock_session_context = mock.MagicMock() + mock_session = mock.MagicMock() + mock_session.handle.fetch_all = mock.CoroutineMock( + return_value=[ + {"id": "reg1"}, + {"id": "reg2"}, + {"id": "reg3"}, + ] # 3 active registries + ) + mock_session_context.__aenter__ = mock.CoroutineMock( + return_value=mock_session + ) + mock_session_context.__aexit__ = mock.CoroutineMock(return_value=None) + + with mock.patch.object( + self.profile, "session", return_value=mock_session_context + ): + await self.revocation.wait_for_active_revocation_registry( + self.cred_def_id + ) + + # Should complete immediately since we have >= 1 + mock_session.handle.fetch_all.assert_called_once() + mock_sleep.assert_not_called() + + async def test_custom_timeout_value(self): + """Test behavior with custom timeout configuration.""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None # Make sleep instant + + # Mock the session and database query + mock_session_context = mock.MagicMock() + mock_session = mock.MagicMock() + mock_session.handle.fetch_all = mock.CoroutineMock( + return_value=[] # No active registries + ) + mock_session_context.__aenter__ = mock.CoroutineMock( + return_value=mock_session + ) + mock_session_context.__aexit__ = mock.CoroutineMock(return_value=None) + + # Set a custom timeout + custom_timeout = 5.0 + with mock.patch( + "acapy_agent.anoncreds.revocation.revocation.REVOCATION_REGISTRY_CREATION_TIMEOUT", + custom_timeout, + ): + with mock.patch.object( + self.profile, "session", return_value=mock_session_context + ): + with self.assertRaises(TimeoutError): + await self.revocation.wait_for_active_revocation_registry( + self.cred_def_id + ) + + # Should have polled based on custom timeout (5.0s / 0.5s = 10 iterations) + expected_iterations = int(custom_timeout / 0.5) + assert mock_session.handle.fetch_all.call_count == expected_iterations + + async def test_logging_behavior(self): + """Test that appropriate log messages are generated.""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None + + # Mock the session and database query + mock_session_context = mock.MagicMock() + mock_session = mock.MagicMock() + # First call empty, second call has 1 registry + mock_session.handle.fetch_all = mock.CoroutineMock( + side_effect=[[], [{"id": "reg1"}]] + ) + mock_session_context.__aenter__ = mock.CoroutineMock( + return_value=mock_session + ) + mock_session_context.__aexit__ = mock.CoroutineMock(return_value=None) + + with mock.patch( + "acapy_agent.anoncreds.revocation.revocation.LOGGER" + ) as mock_logger: + with mock.patch.object( + self.profile, "session", return_value=mock_session_context + ): + await self.revocation.wait_for_active_revocation_registry( + self.cred_def_id + ) + + # Should log debug message at start + mock_logger.debug.assert_any_call( + "Waiting for revocation setup completion for cred_def_id: %s", + self.cred_def_id, + ) + + # Should log progress updates + mock_logger.debug.assert_any_call( + "Revocation setup progress for %s: %d/%d registries active", + self.cred_def_id, + 0, # First iteration + 1, # Expected count + ) + + # Should log completion + mock_logger.info.assert_called_once_with( + "Revocation setup completed for cred_def_id: %s " + "(%d registries active)", + self.cred_def_id, + 1, + ) + + async def test_session_context_manager_usage(self): + """Test that database session context manager is properly used.""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None + + # Mock the session and database query + mock_session_context = mock.MagicMock() + mock_session = mock.MagicMock() + mock_session.handle.fetch_all = mock.CoroutineMock( + return_value=[{"id": "reg1"}] # Success immediately + ) + mock_session_context.__aenter__ = mock.CoroutineMock( + return_value=mock_session + ) + mock_session_context.__aexit__ = mock.CoroutineMock(return_value=None) + + with mock.patch.object( + self.profile, "session", return_value=mock_session_context + ): + await self.revocation.wait_for_active_revocation_registry( + self.cred_def_id + ) + + # Should have used the session context manager + self.profile.session.assert_called_once() + mock_session_context.__aenter__.assert_called_once() + mock_session_context.__aexit__.assert_called_once() + + # Query should have been called with correct parameters + mock_session.handle.fetch_all.assert_called_once_with( + "revocation_reg_def", + {"cred_def_id": self.cred_def_id, "active": "true"}, + ) diff --git a/acapy_agent/anoncreds/routes/cred_defs/models.py b/acapy_agent/anoncreds/routes/cred_defs/models.py index 1dcb36e507..ab02354f80 100644 --- a/acapy_agent/anoncreds/routes/cred_defs/models.py +++ b/acapy_agent/anoncreds/routes/cred_defs/models.py @@ -74,6 +74,13 @@ class CredDefPostRequestSchema(OpenAPISchema): credential_definition = fields.Nested(InnerCredDefSchema()) options = fields.Nested(CredDefPostOptionsSchema()) + wait_for_revocation_setup = fields.Boolean( + required=False, + load_default=True, + metadata={ + "description": "Wait for revocation registry setup to complete before returning" # noqa: E501 + }, + ) class CredDefsQueryStringSchema(SchemaQueryFieldsMixin): diff --git a/acapy_agent/anoncreds/routes/cred_defs/routes.py b/acapy_agent/anoncreds/routes/cred_defs/routes.py index d654d663a4..789732663f 100644 --- a/acapy_agent/anoncreds/routes/cred_defs/routes.py +++ b/acapy_agent/anoncreds/routes/cred_defs/routes.py @@ -11,6 +11,7 @@ from ....admin.decorators.auth import tenant_authentication from ....admin.request_context import AdminRequestContext +from ....protocols.endorse_transaction.v1_0.util import is_author_role from ....utils.profiles import is_not_anoncreds_profile_raise_web_exception from ...base import AnonCredsObjectNotFound, AnonCredsResolutionError from ...issuer import AnonCredsIssuer, AnonCredsIssuerError @@ -50,8 +51,17 @@ async def cred_def_post(request: web.BaseRequest): is_not_anoncreds_profile_raise_web_exception(profile) body = await request.json() - options = body.get("options", {}) cred_def = body.get("credential_definition") + options = body.get("options") or {} + wait_for_revocation_setup = body.get("wait_for_revocation_setup", True) + + if wait_for_revocation_setup and is_author_role(profile): + # Override setting; for authors it should only be True if auto-create flag is True + wait_for_revocation_setup = profile.settings.get( + "endorser.auto_create_rev_reg", False + ) + + options["wait_for_revocation_setup"] = wait_for_revocation_setup if cred_def is None: raise web.HTTPBadRequest(reason="cred_def object is required") diff --git a/acapy_agent/anoncreds/routes/revocation/credentials/tests/test_routes.py b/acapy_agent/anoncreds/routes/revocation/credentials/tests/test_routes.py index ab0fc7e13a..dc820ec8a7 100644 --- a/acapy_agent/anoncreds/routes/revocation/credentials/tests/test_routes.py +++ b/acapy_agent/anoncreds/routes/revocation/credentials/tests/test_routes.py @@ -2,7 +2,8 @@ import pytest from aiohttp import web -from aiohttp.web import HTTPNotFound +from aiohttp.web import HTTPForbidden, HTTPNotFound +from marshmallow import ValidationError from ......admin.request_context import AdminRequestContext from ......storage.error import StorageNotFoundError @@ -41,9 +42,9 @@ def test_validate_cred_rev_rec_qs_and_revoke_req(self): } ) req.validate_fields({"cred_ex_id": "12345678-1234-5678-9abc-def012345678"}) - with self.assertRaises(Exception): # ValidationError + with self.assertRaises(ValidationError): req.validate_fields({}) - with self.assertRaises(Exception): # ValidationError + with self.assertRaises(ValidationError): req.validate_fields( { "rev_reg_id": ( @@ -52,9 +53,9 @@ def test_validate_cred_rev_rec_qs_and_revoke_req(self): ) } ) - with self.assertRaises(Exception): # ValidationError + with self.assertRaises(ValidationError): req.validate_fields({"cred_rev_id": "1"}) - with self.assertRaises(Exception): # ValidationError + with self.assertRaises(ValidationError): req.validate_fields( { "rev_reg_id": ( @@ -64,14 +65,14 @@ def test_validate_cred_rev_rec_qs_and_revoke_req(self): "cred_ex_id": "12345678-1234-5678-9abc-def012345678", } ) - with self.assertRaises(Exception): # ValidationError + with self.assertRaises(ValidationError): req.validate_fields( { "cred_rev_id": "1", "cred_ex_id": "12345678-1234-5678-9abc-def012345678", } ) - with self.assertRaises(Exception): # ValidationError + with self.assertRaises(ValidationError): req.validate_fields( { "rev_reg_id": ( @@ -257,5 +258,5 @@ async def test_credential_revocation_wrong_profile_403(self): ) # Test revoke endpoint - with self.assertRaises(Exception): # Should raise HTTPForbidden + with self.assertRaises(HTTPForbidden): await revoke(self.request) diff --git a/acapy_agent/anoncreds/routes/revocation/tails/tests/test_routes.py b/acapy_agent/anoncreds/routes/revocation/tails/tests/test_routes.py index e6f3f1f4a6..303c558a9e 100644 --- a/acapy_agent/anoncreds/routes/revocation/tails/tests/test_routes.py +++ b/acapy_agent/anoncreds/routes/revocation/tails/tests/test_routes.py @@ -88,7 +88,7 @@ async def test_tails_wrong_profile_403(self): ) self.request.match_info = {"rev_reg_id": "rev_reg_id"} - with self.assertRaises(Exception): # Should raise HTTPForbidden + with self.assertRaises(HTTPForbidden): await get_tails_file(self.request) @mock.patch.object( diff --git a/acapy_agent/core/event_bus.py b/acapy_agent/core/event_bus.py index af49a29064..0e3eb6a50c 100644 --- a/acapy_agent/core/event_bus.py +++ b/acapy_agent/core/event_bus.py @@ -2,6 +2,7 @@ import asyncio import logging +import os from contextlib import contextmanager from functools import partial from typing import ( @@ -19,11 +20,15 @@ Tuple, ) +from ..utils.task_queue import CompletedTask, TaskQueue + if TYPE_CHECKING: # To avoid circular import error from .profile import Profile LOGGER = logging.getLogger(__name__) +MAX_ACTIVE_EVENT_BUS_TASKS = int(os.getenv("MAX_ACTIVE_EVENT_BUS_TASKS", "50")) + class Event: """A simple event object.""" @@ -34,7 +39,7 @@ def __init__(self, topic: str, payload: Optional[Any] = None): self._payload = payload @property - def topic(self): + def topic(self) -> str: """Return this event's topic.""" return self._topic @@ -86,6 +91,9 @@ def __init__(self): """Initialize Event Bus.""" self.topic_patterns_to_subscribers: Dict[Pattern, List[Callable]] = {} + # TaskQueue for non-blocking event processing + self.task_queue = TaskQueue(max_active=MAX_ACTIVE_EVENT_BUS_TASKS) + async def notify(self, profile: "Profile", event: Event): """Notify subscribers of event. @@ -94,33 +102,54 @@ async def notify(self, profile: "Profile", event: Event): event (Event): event to emit """ - # TODO don't block notifier until subscribers have all been called? - # TODO trigger each processor but don't await? - # TODO log errors but otherwise ignore? + # TODO: This method can now be made synchronous (would be breaking change) - LOGGER.debug("Notifying subscribers: %s", event) + LOGGER.debug("Notifying subscribers for event: %s", event) + # Define partial functions for each subscriber that matches the event topic + partials = [ + partial( + subscriber, + profile, + event.with_metadata(EventMetadata(pattern, match)), + ) + for pattern, subscribers in self.topic_patterns_to_subscribers.items() + if (match := pattern.match(event.topic)) + for subscriber in subscribers + ] - partials = [] - for pattern, subscribers in self.topic_patterns_to_subscribers.items(): - match = pattern.match(event.topic) + if not partials: + LOGGER.debug("No subscribers for %s event", event.topic) + return - if not match: - continue + LOGGER.debug("Notifying %d subscribers for %s event", len(partials), event.topic) + for processor in partials: + LOGGER.debug("Putting %s event for processor %s", event.topic, processor) + # Run each processor as a background task (fire and forget) with error handler + self.task_queue.put( + processor(), + task_complete=self._make_error_handler(processor, event), + ident=f"event_processor_{event.topic}", + ) - for subscriber in subscribers: - partials.append( - partial( - subscriber, - profile, - event.with_metadata(EventMetadata(pattern, match)), + def _make_error_handler( + self, processor: partial[Any], event: Event + ) -> Callable[[CompletedTask], None]: + """Create an error handler that captures the processor and event context.""" + + def error_handler(completed_task: CompletedTask): + """Handle errors from event processor tasks.""" + if completed_task.exc_info: + _, exc_val, _ = completed_task.exc_info + # Don't log CancelledError as an error - it's normal task cancellation + if not isinstance(exc_val, asyncio.CancelledError): + LOGGER.exception( + "Error occurred while processing %s for event: %s", + str(processor), + event, + exc_info=completed_task.exc_info, ) - ) - for processor in partials: - try: - await processor() - except Exception: - LOGGER.exception("Error occurred while processing event") + return error_handler def subscribe(self, pattern: Pattern, processor: Callable): """Subscribe to an event. @@ -130,10 +159,10 @@ def subscribe(self, pattern: Pattern, processor: Callable): processor (Callable): async callable accepting profile and event """ - LOGGER.debug("Subscribed: topic %s, processor %s", pattern, processor) if pattern not in self.topic_patterns_to_subscribers: self.topic_patterns_to_subscribers[pattern] = [] self.topic_patterns_to_subscribers[pattern].append(processor) + LOGGER.debug("Subscribed: topic %s, processor %s", pattern, processor) def unsubscribe(self, pattern: Pattern, processor: Callable): """Unsubscribe from an event. @@ -187,6 +216,42 @@ async def _handle_single_event(profile, event): if not future.done(): future.cancel() + async def shutdown(self): + """Shutdown the event bus and clean up background tasks.""" + active_before = self.task_queue.current_active + pending_before = self.task_queue.current_pending + LOGGER.debug( + "Shutting down EventBus, cancelling %d active tasks and %d pending tasks", + active_before, + pending_before, + ) + # Get references to active tasks before cancelling them + tasks_to_cancel = [ + task for task in self.task_queue.active_tasks if not task.done() + ] + try: + # Use TaskQueue's complete() to cancel tasks + await self.task_queue.complete(timeout=2.0, cleanup=True) + + # Explicitly wait for the cancelled tasks to actually finish cancelling + if tasks_to_cancel: + # Wait for all the tasks we just cancelled to actually complete + await asyncio.wait(tasks_to_cancel, timeout=2.0) + except Exception as e: + LOGGER.debug("Exception while waiting for task cancellation: %s", e) + + active_after = self.task_queue.current_active + pending_after = self.task_queue.current_pending + LOGGER.debug( + "EventBus shutdown complete. Tasks: %d active (%d->%d), %d pending (%d->%d)", + active_after, + active_before, + active_after, + pending_after, + pending_before, + pending_after, + ) + class MockEventBus(EventBus): """A mock EventBus for testing.""" @@ -199,3 +264,9 @@ def __init__(self): async def notify(self, profile: "Profile", event: Event): """Append the event to MockEventBus.events.""" self.events.append((profile, event)) + await super().notify(profile, event) + + async def shutdown(self): + """Mock shutdown method for testing.""" + # For MockEventBus, we still want to clean up the TaskQueue + await super().shutdown() diff --git a/acapy_agent/core/profile.py b/acapy_agent/core/profile.py index e84a9c1855..1b13d3baff 100644 --- a/acapy_agent/core/profile.py +++ b/acapy_agent/core/profile.py @@ -114,6 +114,10 @@ def inject_or( async def close(self): """Close the profile instance.""" + # Shutdown the EventBus to clean up background tasks + event_bus = self.inject_or(EventBus) + if event_bus: + await event_bus.shutdown() async def remove(self): """Remove the profile.""" @@ -123,6 +127,8 @@ async def notify(self, topic: str, payload: Any): event_bus = self.inject_or(EventBus) if event_bus: await event_bus.notify(self, Event(topic, payload)) + else: + LOGGER.warning("No event bus found for profile %s", self.name) def __repr__(self) -> str: """Get a human readable string.""" diff --git a/acapy_agent/core/tests/conftest.py b/acapy_agent/core/tests/conftest.py new file mode 100644 index 0000000000..ca593f09b5 --- /dev/null +++ b/acapy_agent/core/tests/conftest.py @@ -0,0 +1,15 @@ +import asyncio + +import pytest + + +@pytest.fixture(scope="function") +def event_loop(): + """ + Custom function-scoped event loop. + """ + policy = asyncio.get_event_loop_policy() + loop = policy.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() diff --git a/acapy_agent/core/tests/test_event_bus.py b/acapy_agent/core/tests/test_event_bus.py index c1cf2b43ef..510bcd24ca 100644 --- a/acapy_agent/core/tests/test_event_bus.py +++ b/acapy_agent/core/tests/test_event_bus.py @@ -1,7 +1,8 @@ """Test Event Bus.""" +import asyncio import re -from unittest import mock +from unittest.mock import MagicMock, patch import pytest @@ -12,34 +13,34 @@ @pytest.fixture -def event_bus(): - yield EventBus() +def event_bus() -> EventBus: + return EventBus() @pytest.fixture -def profile(): - yield mock.MagicMock() +def profile() -> MagicMock: + return MagicMock() @pytest.fixture -def event(): +def event() -> Event: event = Event(topic="anything", payload="payload") - yield event + return event class MockProcessor: - def __init__(self): + def __init__(self) -> None: self.profile = None self.event = None - async def __call__(self, profile, event): + async def __call__(self, profile, event) -> None: self.profile = profile self.event = event @pytest.fixture -def processor(): - yield MockProcessor() +def processor() -> MockProcessor: + return MockProcessor() def test_event(event): @@ -55,7 +56,7 @@ def test_event(event): assert repr(event) -def test_sub_unsub(event_bus: EventBus, processor): +def test_sub_unsub(event_bus: EventBus, processor: MockProcessor): """Test subscribe and unsubscribe.""" event_bus.subscribe(re.compile(".*"), processor) assert event_bus.topic_patterns_to_subscribers @@ -64,7 +65,7 @@ def test_sub_unsub(event_bus: EventBus, processor): assert not event_bus.topic_patterns_to_subscribers -def test_unsub_idempotency(event_bus: EventBus, processor): +def test_unsub_idempotency(event_bus: EventBus, processor: MockProcessor): """Test unsubscribe idempotency.""" event_bus.subscribe(re.compile(".*"), processor) event_bus.unsubscribe(re.compile(".*"), processor) @@ -73,7 +74,7 @@ def test_unsub_idempotency(event_bus: EventBus, processor): assert not event_bus.topic_patterns_to_subscribers -def test_unsub_unsubbed_processor(event_bus: EventBus, processor): +def test_unsub_unsubbed_processor(event_bus: EventBus, processor: MockProcessor): """Test unsubscribing an unsubscribed processor does not error.""" event_bus.unsubscribe(re.compile(".*"), processor) event_bus.subscribe(re.compile(".*"), processor) @@ -82,10 +83,13 @@ def test_unsub_unsubbed_processor(event_bus: EventBus, processor): @pytest.mark.asyncio -async def test_sub_notify(event_bus: EventBus, profile, event, processor): +async def test_sub_notify( + event_bus: EventBus, profile: MagicMock, event: Event, processor: MockProcessor +): """Test subscriber receives event.""" event_bus.subscribe(re.compile(".*"), processor) await event_bus.notify(profile, event) + await event_bus.task_queue.wait_for_completion() assert processor.profile == profile assert processor.event == event @@ -93,24 +97,24 @@ async def test_sub_notify(event_bus: EventBus, profile, event, processor): @pytest.mark.asyncio async def test_sub_notify_error_logged_and_exec_continues( event_bus: EventBus, - profile, - event, + profile: MagicMock, + event: Event, ): """Test subscriber errors are logged but do not halt execution.""" - def _raise_exception(profile, event): - raise Exception() + async def _raise_exception(profile, event): + raise Exception("Test exception") processor = MockProcessor() bad_processor = _raise_exception event_bus.subscribe(re.compile(".*"), bad_processor) event_bus.subscribe(re.compile(".*"), processor) - with mock.patch.object( - test_module.LOGGER, "exception", mock.MagicMock() - ) as mock_log_exc: + with patch.object(test_module.LOGGER, "exception", MagicMock()) as mock_log_exc: await event_bus.notify(profile, event) + await event_bus.task_queue.wait_for_completion() - mock_log_exc.assert_called_once_with("Error occurred while processing event") + # The error handler should log the exception + mock_log_exc.assert_called() assert processor.profile == profile assert processor.event == event @@ -125,18 +129,25 @@ def _raise_exception(profile, event): ) @pytest.mark.asyncio async def test_sub_notify_regex_filtering( - event_bus: EventBus, profile, processor, pattern, topic + event_bus: EventBus, + profile: MagicMock, + processor: MockProcessor, + pattern: str, + topic: str, ): """Test events are filtered correctly.""" event = Event(topic) event_bus.subscribe(re.compile(pattern), processor) await event_bus.notify(profile, event) + await event_bus.task_queue.wait_for_completion() assert processor.profile == profile assert processor.event == event @pytest.mark.asyncio -async def test_sub_notify_no_match(event_bus: EventBus, profile, event, processor): +async def test_sub_notify_no_match( + event_bus: EventBus, profile: MagicMock, event: Event, processor: MockProcessor +): """Test event not given to processor when pattern doesn't match.""" event_bus.subscribe(re.compile("^$"), processor) await event_bus.notify(profile, event) @@ -145,12 +156,15 @@ async def test_sub_notify_no_match(event_bus: EventBus, profile, event, processo @pytest.mark.asyncio -async def test_sub_notify_only_one(event_bus: EventBus, profile, event, processor): +async def test_sub_notify_only_one( + event_bus: EventBus, profile: MagicMock, event: Event, processor: MockProcessor +): """Test only one subscriber is called when pattern matches only one.""" processor1 = MockProcessor() event_bus.subscribe(re.compile(".*"), processor) event_bus.subscribe(re.compile("^$"), processor1) await event_bus.notify(profile, event) + await event_bus.task_queue.wait_for_completion() assert processor.profile == profile assert processor.event == event assert processor1.profile is None @@ -158,12 +172,15 @@ async def test_sub_notify_only_one(event_bus: EventBus, profile, event, processo @pytest.mark.asyncio -async def test_sub_notify_both(event_bus: EventBus, profile, event, processor): +async def test_sub_notify_both( + event_bus: EventBus, profile: MagicMock, event: Event, processor: MockProcessor +): """Test both subscribers are called when pattern matches both.""" processor1 = MockProcessor() event_bus.subscribe(re.compile(".*"), processor) event_bus.subscribe(re.compile("anything"), processor1) await event_bus.notify(profile, event) + await event_bus.task_queue.wait_for_completion() assert processor.profile == profile assert processor.event == event assert processor1.profile == profile @@ -171,7 +188,9 @@ async def test_sub_notify_both(event_bus: EventBus, profile, event, processor): @pytest.mark.asyncio -async def test_wait_for_event_multiple_do_not_collide(event_bus: EventBus, profile): +async def test_wait_for_event_multiple_do_not_collide( + event_bus: EventBus, profile: MagicMock +): """Test multiple wait_for_event calls don't collide.""" pattern = re.compile(".*") with event_bus.wait_for_event(profile, pattern) as event1: @@ -185,23 +204,156 @@ async def test_wait_for_event_multiple_do_not_collide(event_bus: EventBus, profi @pytest.mark.asyncio -async def test_wait_for_event(event_bus: EventBus, profile, event): +async def test_wait_for_event(event_bus: EventBus, profile: MagicMock, event: Event): with event_bus.wait_for_event(profile, re.compile(".*")) as returned_event: await event_bus.notify(profile, event) + await event_bus.task_queue.wait_for_completion() assert await returned_event == event @pytest.mark.asyncio -async def test_wait_for_event_condition(event_bus: EventBus, profile, event): +async def test_wait_for_event_condition( + event_bus: EventBus, profile: MagicMock, event: Event +): with event_bus.wait_for_event( profile, re.compile(".*"), lambda e: e.payload == "asdf" ) as returned_event: # This shouldn't trigger our condition because payload == "payload" await event_bus.notify(profile, event) + await event_bus.task_queue.wait_for_completion() assert not returned_event.done() # This should trigger event = Event("asdF", "asdf") await event_bus.notify(profile, event) + await event_bus.task_queue.wait_for_completion() assert returned_event.done() assert await returned_event == event + + +@pytest.mark.asyncio +async def test_shutdown_no_active_tasks(event_bus: EventBus): + """Test shutdown with no active tasks completes cleanly.""" + with patch.object(test_module.LOGGER, "debug") as mock_debug: + await event_bus.shutdown() + + # Should log start and completion messages + assert mock_debug.call_count >= 2 + # Verify the shutdown completion message + completion_call = mock_debug.call_args_list[-1] + assert "EventBus shutdown complete" in completion_call[0][0] + + +@pytest.mark.asyncio +async def test_shutdown_exception_handling( + event_bus: EventBus, profile: MagicMock, event: Event +): + """Test shutdown handles exceptions during task cancellation.""" + + async def normal_processor(profile, event): + await asyncio.sleep(0.1) + + event_bus.subscribe(re.compile(".*"), normal_processor) + + # Mock asyncio.wait to raise an exception + test_exception = Exception("Test exception during shutdown") + with ( + patch("asyncio.wait", side_effect=test_exception), + patch.object(test_module.LOGGER, "debug") as mock_debug, + ): + await event_bus.notify(profile, event) + await asyncio.sleep(0.01) # Let task start + + # Should handle the exception gracefully + await event_bus.shutdown() + + # Should log the exception + exception_logged = any( + "Exception while waiting for task cancellation" in str(call) + for call in mock_debug.call_args_list + ) + assert exception_logged + + +@pytest.mark.asyncio +async def test_shutdown_idempotency(event_bus: EventBus): + """Test shutdown can be called multiple times safely.""" + with patch.object(test_module.LOGGER, "debug") as mock_debug: + # First shutdown + await event_bus.shutdown() + first_call_count = mock_debug.call_count + + # Second shutdown should also work + await event_bus.shutdown() + + # Should have logged both shutdowns + assert mock_debug.call_count >= first_call_count + + +@pytest.mark.asyncio +async def test_shutdown_logging_details( + event_bus: EventBus, profile: MagicMock, event: Event +): + """Test shutdown logs detailed task count information.""" + + async def quick_processor(profile, event): + await asyncio.sleep(0.01) + + event_bus.subscribe(re.compile(".*"), quick_processor) + + with patch.object(test_module.LOGGER, "debug") as mock_debug: + # Create some tasks + await event_bus.notify(profile, event) + await event_bus.notify(profile, event) + + await event_bus.shutdown() + + # Find the shutdown start message + start_message = None + completion_message = None + for call in mock_debug.call_args_list: + message = call[0][0] + if "Shutting down EventBus" in message: + start_message = message + elif "EventBus shutdown complete" in message: + completion_message = message + + assert start_message is not None + assert completion_message is not None + assert "active tasks" in start_message + assert "pending tasks" in start_message + + +@pytest.mark.asyncio +async def test_shutdown_with_mixed_task_states( + event_bus: EventBus, profile: MagicMock, event: Event +): + """Test shutdown handles tasks in various states (running, done, cancelled).""" + + task_states = [] + + async def state_tracking_processor(profile, event): + """Track when this processor runs.""" + task_states.append("started") + try: + await asyncio.sleep(0.1) + task_states.append("completed") + except asyncio.CancelledError: + task_states.append("cancelled") + raise + + event_bus.subscribe(re.compile(".*"), state_tracking_processor) + + # Create multiple tasks + await event_bus.notify(profile, event) + await event_bus.notify(profile, event) + + # Let some tasks start + await asyncio.sleep(0.01) + + with patch.object(test_module.LOGGER, "debug"): + await event_bus.shutdown() + + # Tasks should have been cancelled + assert "started" in task_states + assert event_bus.task_queue.current_active == 0 diff --git a/acapy_agent/indy/constants.py b/acapy_agent/indy/constants.py new file mode 100644 index 0000000000..c2ebf8aa2e --- /dev/null +++ b/acapy_agent/indy/constants.py @@ -0,0 +1,13 @@ +"""Constants for Indy.""" + +CATEGORY_CRED_DEF = "credential_def" +CATEGORY_CRED_DEF_PRIVATE = "credential_def_private" +CATEGORY_CRED_DEF_KEY_PROOF = "credential_def_key_proof" + +CATEGORY_SCHEMA = "schema" + +CATEGORY_REV_REG = "revocation_reg" +CATEGORY_REV_REG_DEF = "revocation_reg_def" +CATEGORY_REV_REG_DEF_PRIVATE = "revocation_reg_def_private" +CATEGORY_REV_REG_INFO = "revocation_reg_info" +CATEGORY_REV_REG_ISSUER = "revocation_reg_def_issuer" diff --git a/acapy_agent/indy/credx/issuer.py b/acapy_agent/indy/credx/issuer.py index c18bf85f64..ca1e7d8136 100644 --- a/acapy_agent/indy/credx/issuer.py +++ b/acapy_agent/indy/credx/issuer.py @@ -2,7 +2,7 @@ import asyncio import logging -from typing import Optional, Sequence, Tuple +from typing import TYPE_CHECKING, Optional, Sequence, Tuple from aries_askar import AskarError from indy_credx import ( @@ -18,8 +18,17 @@ Schema, ) -from ...askar.profile import AskarProfile from ...utils.general import strip_did_prefix +from ..constants import ( + CATEGORY_CRED_DEF, + CATEGORY_CRED_DEF_KEY_PROOF, + CATEGORY_CRED_DEF_PRIVATE, + CATEGORY_REV_REG, + CATEGORY_REV_REG_DEF, + CATEGORY_REV_REG_DEF_PRIVATE, + CATEGORY_REV_REG_INFO, + CATEGORY_SCHEMA, +) from ..issuer import ( DEFAULT_CRED_DEF_TAG, DEFAULT_SIGNATURE_TYPE, @@ -28,23 +37,16 @@ IndyIssuerRevocationRegistryFullError, ) -LOGGER = logging.getLogger(__name__) +if TYPE_CHECKING: + from ...askar.profile import AskarProfile -CATEGORY_CRED_DEF = "credential_def" -CATEGORY_CRED_DEF_PRIVATE = "credential_def_private" -CATEGORY_CRED_DEF_KEY_PROOF = "credential_def_key_proof" -CATEGORY_SCHEMA = "schema" -CATEGORY_REV_REG = "revocation_reg" -CATEGORY_REV_REG_INFO = "revocation_reg_info" -CATEGORY_REV_REG_DEF = "revocation_reg_def" -CATEGORY_REV_REG_DEF_PRIVATE = "revocation_reg_def_private" -CATEGORY_REV_REG_ISSUER = "revocation_reg_def_issuer" +LOGGER = logging.getLogger(__name__) class IndyCredxIssuer(IndyIssuer): """Indy-Credx issuer class.""" - def __init__(self, profile: AskarProfile): + def __init__(self, profile: "AskarProfile"): """Initialize an IndyCredxIssuer instance. Args: @@ -54,7 +56,7 @@ def __init__(self, profile: AskarProfile): self._profile = profile @property - def profile(self) -> AskarProfile: + def profile(self) -> "AskarProfile": """Accessor for the profile instance.""" return self._profile diff --git a/acapy_agent/indy/util.py b/acapy_agent/indy/util.py index 63c4432946..d95d7943dd 100644 --- a/acapy_agent/indy/util.py +++ b/acapy_agent/indy/util.py @@ -1,11 +1,19 @@ """Utilities for dealing with Indy conventions.""" +import logging +import os from os import getenv, makedirs, urandom from os.path import isdir, join from pathlib import Path from platform import system from typing import Optional +LOGGER = logging.getLogger(__name__) + +REVOCATION_REGISTRY_CREATION_TIMEOUT = float( + os.getenv("REVOCATION_REGISTRY_CREATION_TIMEOUT", "60.0") +) + async def generate_pr_nonce() -> str: """Generate a nonce for a proof request.""" diff --git a/acapy_agent/messaging/credential_definitions/routes.py b/acapy_agent/messaging/credential_definitions/routes.py index c54da063fd..0314feb4e7 100644 --- a/acapy_agent/messaging/credential_definitions/routes.py +++ b/acapy_agent/messaging/credential_definitions/routes.py @@ -45,6 +45,7 @@ from ...storage.base import BaseStorage, StorageRecord from ...storage.error import StorageError, StorageNotFoundError from ...utils.profiles import is_anoncreds_profile_raise_web_exception +from ...utils.wait_for_active_registry import wait_for_active_revocation_registry from ..models.base import BaseModelError from ..models.openapi import OpenAPISchema from ..valid import ( @@ -95,6 +96,13 @@ class CredentialDefinitionSendRequestSchema(OpenAPISchema): "example": "default", }, ) + wait_for_revocation_setup = fields.Boolean( + required=False, + load_default=True, + metadata={ + "description": "Wait for revocation registry setup to complete before returning" # noqa: E501 + }, + ) class CredentialDefinitionSendResultSchema(OpenAPISchema): @@ -215,6 +223,7 @@ async def credential_definitions_send_credential_definition(request: web.BaseReq support_revocation = bool(body.get("support_revocation")) tag = body.get("tag") rev_reg_size = body.get("revocation_registry_size") + wait_for_revocation_setup = body.get("wait_for_revocation_setup", True) # Don't allow revocable cred def to be created without tails server base url if not profile.settings.get("tails_server_base_url") and support_revocation: @@ -325,6 +334,12 @@ async def credential_definitions_send_credential_definition(request: web.BaseReq meta_data["processing"]["auto_create_rev_reg"] = True await notify_cred_def_event(profile, cred_def_id, meta_data) + if support_revocation and wait_for_revocation_setup: + try: + await wait_for_active_revocation_registry(profile, cred_def_id) + except TimeoutError as err: + raise web.HTTPGatewayTimeout(reason=str(err)) from err + return web.json_response( { "sent": {"credential_definition_id": cred_def_id}, diff --git a/acapy_agent/messaging/credential_definitions/tests/test_routes.py b/acapy_agent/messaging/credential_definitions/tests/test_routes.py index 1d81ee188f..da6df84702 100644 --- a/acapy_agent/messaging/credential_definitions/tests/test_routes.py +++ b/acapy_agent/messaging/credential_definitions/tests/test_routes.py @@ -87,6 +87,119 @@ async def test_send_credential_definition(self): } ) + async def test_send_credential_definition_with_revocation_wait_success(self): + """Test credential definition creation with revocation and waiting enabled.""" + self.request.json = mock.CoroutineMock( + return_value={ + "schema_id": "WgWxqztrNooG92RXvxSTWv:2:schema_name:1.0", + "support_revocation": True, + "tag": "tag", + "wait_for_revocation_setup": True, + } + ) + self.request.query = {"create_transaction_for_endorser": "false"} + + # Mock tails server setting to allow revocable cred def + self.profile.settings["tails_server_base_url"] = ( + "https://tails-server.example.com" + ) + + with mock.patch.object(test_module.web, "json_response") as mock_response: + with mock.patch.object( + test_module, "wait_for_active_revocation_registry" + ) as mock_wait: + mock_wait.return_value = None # Successful completion + + result = ( + await test_module.credential_definitions_send_credential_definition( + self.request + ) + ) + + # Should have called the wait utility + mock_wait.assert_called_once_with(self.profile, CRED_DEF_ID) + + # Should return success response + assert result == mock_response.return_value + mock_response.assert_called_once_with( + { + "sent": {"credential_definition_id": CRED_DEF_ID}, + "credential_definition_id": CRED_DEF_ID, + } + ) + + async def test_send_credential_definition_with_revocation_wait_timeout(self): + """Test credential definition creation with revocation wait timeout.""" + self.request.json = mock.CoroutineMock( + return_value={ + "schema_id": "WgWxqztrNooG92RXvxSTWv:2:schema_name:1.0", + "support_revocation": True, + "tag": "tag", + "wait_for_revocation_setup": True, + } + ) + self.request.query = {"create_transaction_for_endorser": "false"} + + # Mock tails server setting to allow revocable cred def + self.profile.settings["tails_server_base_url"] = ( + "https://tails-server.example.com" + ) + + with mock.patch.object( + test_module, "wait_for_active_revocation_registry" + ) as mock_wait: + mock_wait.side_effect = TimeoutError("Timeout waiting for revocation setup") + + with self.assertRaises(test_module.web.HTTPGatewayTimeout) as exc_context: + await test_module.credential_definitions_send_credential_definition( + self.request + ) + + # Should have called the wait utility + mock_wait.assert_called_once_with(self.profile, CRED_DEF_ID) + + # Should raise HTTPGatewayTimeout with timeout message + assert "Timeout waiting for revocation setup" in str(exc_context.exception) + + async def test_send_credential_definition_with_revocation_wait_false(self): + """Test credential definition creation with revocation but waiting disabled.""" + self.request.json = mock.CoroutineMock( + return_value={ + "schema_id": "WgWxqztrNooG92RXvxSTWv:2:schema_name:1.0", + "support_revocation": True, + "tag": "tag", + "wait_for_revocation_setup": False, + } + ) + self.request.query = {"create_transaction_for_endorser": "false"} + + # Mock tails server setting to allow revocable cred def + self.profile.settings["tails_server_base_url"] = ( + "https://tails-server.example.com" + ) + + with mock.patch.object(test_module.web, "json_response") as mock_response: + with mock.patch.object( + test_module, "wait_for_active_revocation_registry" + ) as mock_wait: + result = ( + await test_module.credential_definitions_send_credential_definition( + self.request + ) + ) + + # Should NOT have called the wait utility + mock_wait.assert_not_called() + + # Should return success response immediately + assert result == mock_response.return_value + mock_response.assert_called_once_with( + { + "sent": {"credential_definition_id": CRED_DEF_ID}, + "credential_definition_id": CRED_DEF_ID, + } + ) + async def test_send_credential_definition_create_transaction_for_endorser(self): self.request.json = mock.CoroutineMock( return_value={ diff --git a/acapy_agent/multitenant/admin/tests/test_routes.py b/acapy_agent/multitenant/admin/tests/test_routes.py index cd68a890fe..381c168a31 100644 --- a/acapy_agent/multitenant/admin/tests/test_routes.py +++ b/acapy_agent/multitenant/admin/tests/test_routes.py @@ -309,6 +309,7 @@ async def test_wallet_create_optional_default_fields(self): "image_url": "https://image.com", } self.request.json = mock.CoroutineMock(return_value=body) + test_module.attempt_auto_author_with_endorser_setup = mock.CoroutineMock() with mock.patch.object(test_module.web, "json_response"): mock_multitenant_mgr = mock.AsyncMock(BaseMultitenantManager, autospec=True) @@ -340,6 +341,7 @@ async def test_wallet_create_optional_default_fields(self): WalletRecord.MODE_MANAGED, ) assert mock_multitenant_mgr.get_wallet_profile.called + assert test_module.attempt_auto_author_with_endorser_setup.called async def test_wallet_create_raw_key_derivation(self): body = { @@ -348,6 +350,7 @@ async def test_wallet_create_raw_key_derivation(self): "wallet_key_derivation": "RAW", } self.request.json = mock.CoroutineMock(return_value=body) + test_module.attempt_auto_author_with_endorser_setup = mock.CoroutineMock() with mock.patch.object(test_module.web, "json_response"): mock_multitenant_mgr = mock.AsyncMock(BaseMultitenantManager, autospec=True) @@ -377,6 +380,7 @@ async def test_wallet_create_raw_key_derivation(self): WalletRecord.MODE_MANAGED, ) assert mock_multitenant_mgr.get_wallet_profile.called + assert test_module.attempt_auto_author_with_endorser_setup.called async def test_wallet_update_tenant_settings(self): self.request.match_info = {"wallet_id": "test-wallet-id"} diff --git a/acapy_agent/protocols/issue_credential/v2_0/routes.py b/acapy_agent/protocols/issue_credential/v2_0/routes.py index cc9cd26e51..c43da88662 100644 --- a/acapy_agent/protocols/issue_credential/v2_0/routes.py +++ b/acapy_agent/protocols/issue_credential/v2_0/routes.py @@ -18,6 +18,7 @@ from ....admin.request_context import AdminRequestContext from ....anoncreds.holder import AnonCredsHolderError from ....anoncreds.issuer import AnonCredsIssuerError +from ....anoncreds.revocation.revocation import AnonCredsRevocationError from ....connections.models.conn_record import ConnRecord from ....core.profile import Profile from ....indy.holder import IndyHolderError @@ -1585,6 +1586,7 @@ async def credential_exchange_issue(request: web.BaseRequest): except ( BaseModelError, AnonCredsIssuerError, + AnonCredsRevocationError, IndyIssuerError, LedgerError, StorageError, diff --git a/acapy_agent/resolver/default/tests/test_peer3.py b/acapy_agent/resolver/default/tests/test_peer3.py index a6e030e9fc..0be21a39b3 100644 --- a/acapy_agent/resolver/default/tests/test_peer3.py +++ b/acapy_agent/resolver/default/tests/test_peer3.py @@ -5,7 +5,7 @@ from did_peer_2 import peer2to3 from ....connections.models.conn_record import ConnRecord -from ....core.event_bus import EventBus +from ....core.event_bus import EventBus, MockEventBus from ....core.profile import Profile from ....utils.testing import create_test_profile from .. import peer3 as test_module @@ -18,12 +18,12 @@ @pytest.fixture -def event_bus(): - yield EventBus() +def event_bus() -> MockEventBus: + return MockEventBus() @pytest_asyncio.fixture -async def profile(event_bus: EventBus): +async def profile(event_bus: MockEventBus): """Profile fixture.""" profile = await create_test_profile() profile.context.injector.bind_instance(EventBus, event_bus) @@ -31,7 +31,7 @@ async def profile(event_bus: EventBus): @pytest_asyncio.fixture -async def resolver(profile): +async def resolver(profile: Profile): """Resolver fixture.""" instance = PeerDID3Resolver() await instance.setup(profile.context) @@ -71,11 +71,12 @@ async def test_resolve_x_no_2(profile: Profile, resolver: PeerDID3Resolver): @pytest.mark.asyncio async def test_record_removal( + event_bus: MockEventBus, profile: Profile, resolver: PeerDID3Resolver, peer2_resolver: PeerDID2Resolver, ): - """Test resolver setup.""" + """Test that record removal works correctly.""" await peer2_resolver.resolve(profile, TEST_DP2) assert await resolver.resolve(profile, TEST_DP3) record = ConnRecord( @@ -87,5 +88,7 @@ async def test_record_removal( async with profile.session() as session: await record.emit_event(session, record.serialize()) + await event_bus.task_queue.wait_for_completion() + with pytest.raises(test_module.DIDNotFound): await resolver.resolve(profile, TEST_DP3) diff --git a/acapy_agent/resolver/default/tests/test_peer4.py b/acapy_agent/resolver/default/tests/test_peer4.py index a6c1a3d80e..2fb386f175 100644 --- a/acapy_agent/resolver/default/tests/test_peer4.py +++ b/acapy_agent/resolver/default/tests/test_peer4.py @@ -3,7 +3,7 @@ import pytest import pytest_asyncio -from ....core.event_bus import EventBus +from ....core.event_bus import EventBus, MockEventBus from ....core.profile import Profile from ....utils.testing import create_test_profile from .. import peer4 as test_module @@ -15,12 +15,12 @@ @pytest.fixture -def event_bus(): - yield EventBus() +def event_bus() -> MockEventBus: + return MockEventBus() @pytest_asyncio.fixture -async def profile(event_bus: EventBus): +async def profile(event_bus: MockEventBus): """Profile fixture.""" profile = await create_test_profile() profile.context.injector.bind_instance(EventBus, event_bus) diff --git a/acapy_agent/revocation/models/issuer_rev_reg_record.py b/acapy_agent/revocation/models/issuer_rev_reg_record.py index 49c756a652..5eb796d8dd 100644 --- a/acapy_agent/revocation/models/issuer_rev_reg_record.py +++ b/acapy_agent/revocation/models/issuer_rev_reg_record.py @@ -14,7 +14,7 @@ from uuid_utils import uuid4 from ...core.profile import Profile, ProfileSession -from ...indy.credx.issuer import ( +from ...indy.constants import ( CATEGORY_CRED_DEF, CATEGORY_REV_REG, CATEGORY_REV_REG_DEF_PRIVATE, diff --git a/acapy_agent/utils/task_queue.py b/acapy_agent/utils/task_queue.py index 9e9016ecf1..4a42676052 100644 --- a/acapy_agent/utils/task_queue.py +++ b/acapy_agent/utils/task_queue.py @@ -138,19 +138,36 @@ def __init__( trace_fn: A callback for all completed tasks """ - self.loop = asyncio.get_event_loop() - self.active_tasks = [] - self.pending_tasks = [] + self.loop = None # Lazy initialization + self.active_tasks: list[asyncio.Task] = [] + self.pending_tasks: list[PendingTask] = [] self.timed = timed self.total_done = 0 self.total_failed = 0 self.total_started = 0 self._trace_fn = trace_fn self._cancelled = False - self._drain_evt = asyncio.Event() + self._drain_evt = None # Lazy initialization self._drain_task: asyncio.Task = None self._max_active = max_active + def _ensure_loop(self): + """Ensure the event loop is initialized.""" + if self.loop is None: + try: + self.loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop, try to get the event loop policy loop + try: + self.loop = asyncio.get_event_loop() + except RuntimeError: + # Create a new event loop if none exists + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + if self._drain_evt is None: + self._drain_evt = asyncio.Event() + @property def cancelled(self) -> bool: """Accessor for the cancelled property of the queue.""" @@ -200,6 +217,7 @@ def __len__(self) -> int: def drain(self) -> asyncio.Task: """Start the process to run queued tasks.""" + self._ensure_loop() # Ensure loop is initialized if self._drain_task and not self._drain_task.done(): self._drain_evt.set() elif self.pending_tasks: @@ -308,6 +326,7 @@ def run( if not timing: timing = {} coro = coro_timed(coro, timing) + self._ensure_loop() # Ensure loop is initialized task = self.loop.create_task(coro) return self.add_active(task, task_complete, ident, timing) @@ -418,3 +437,21 @@ def __await__(self): async def wait_for(self, timeout: float): """Wait for all queued tasks to complete with a timeout.""" return await asyncio.wait_for(self.flush(), timeout) + + async def wait_for_completion(self): + """Wait for all active tasks to complete with timeout. + + This is safer than flush() for testing as it doesn't try to + manage the drain loop, just waits for existing tasks. + """ + if not self.active_tasks: + return + + try: + await asyncio.wait_for( + asyncio.gather(*self.active_tasks, return_exceptions=True), + timeout=5.0, + ) + except asyncio.TimeoutError: + # Tasks didn't complete in time, but that's okay for testing + pass diff --git a/acapy_agent/utils/tests/test_task_queue.py b/acapy_agent/utils/tests/test_task_queue.py index b079bf53bb..da024db555 100644 --- a/acapy_agent/utils/tests/test_task_queue.py +++ b/acapy_agent/utils/tests/test_task_queue.py @@ -73,7 +73,9 @@ async def test_pending(self): coro = retval(1, delay=1) pend = PendingTask(coro, None) assert str(pend).startswith("= 2 + mock_query.assert_called_once() + mock_sleep.assert_not_called() + + async def test_custom_timeout_value(self): + """Test behavior with custom timeout configuration.""" + with mock.patch("asyncio.sleep") as mock_sleep: + mock_sleep.return_value = None # Make sleep instant + + with mock.patch.object( + IssuerRevRegRecord, "query_by_cred_def_id" + ) as mock_query: + mock_query.return_value = [] # No active registries + + # Set a custom timeout + custom_timeout = 5.0 + with mock.patch.object( + test_module, "REVOCATION_REGISTRY_CREATION_TIMEOUT", custom_timeout + ): + with self.assertRaises(TimeoutError): + await test_module.wait_for_active_revocation_registry( + self.profile, self.cred_def_id + ) + + # Should have polled based on custom timeout (5.0s / 0.5s = 10 iterations) + expected_iterations = int(custom_timeout / 0.5) + assert mock_query.call_count == expected_iterations diff --git a/acapy_agent/utils/wait_for_active_registry.py b/acapy_agent/utils/wait_for_active_registry.py new file mode 100644 index 0000000000..aafcb42c86 --- /dev/null +++ b/acapy_agent/utils/wait_for_active_registry.py @@ -0,0 +1,77 @@ +"""Utility method for waiting for active revocation registry.""" + +import asyncio +import logging + +from ..core.profile import Profile +from ..indy.util import REVOCATION_REGISTRY_CREATION_TIMEOUT +from ..revocation.models.issuer_rev_reg_record import IssuerRevRegRecord + +LOGGER = logging.getLogger(__name__) + + +async def wait_for_active_revocation_registry(profile: Profile, cred_def_id: str) -> None: + """Wait for revocation registry setup to complete. + + Polls for the creation of revocation registry definitions until we have + the 2 active registries or timeout occurs. + + Args: + profile: The profile + cred_def_id: The credential definition ID + + Raises: + TimeoutError: If timeout occurs before completion + + """ + LOGGER.debug( + "Waiting for revocation setup completion for cred_def_id: %s", cred_def_id + ) + + expected_count = 2 # Active registry + poll_interval = 0.5 # Poll every 500ms + max_iterations = int(REVOCATION_REGISTRY_CREATION_TIMEOUT / poll_interval) + registries = [] + + for _iteration in range(max_iterations): + try: + # Check for finished revocation registry definitions + async with profile.session() as session: + registries = await IssuerRevRegRecord.query_by_cred_def_id( + session, cred_def_id, IssuerRevRegRecord.STATE_ACTIVE + ) + + current_count = len(registries) + LOGGER.debug( + "Revocation setup progress for %s: %d registries active", + cred_def_id, + current_count, + ) + + if current_count >= expected_count: + LOGGER.info( + "Revocation setup completed for cred_def_id: %s " + "(%d registries created)", + cred_def_id, + current_count, + ) + return + + except Exception as e: + LOGGER.warning( + "Error checking revocation setup progress for %s: %s", cred_def_id, e + ) + # Continue polling despite errors - they might be transient + + await asyncio.sleep(poll_interval) # Wait before next poll + + # Timeout occurred + current_count = len(registries) + + raise TimeoutError( + "Timeout waiting for revocation setup completion for credential definition " + f"{cred_def_id}. Expected {expected_count} active revocation registries, but " + f"{current_count} were active within {REVOCATION_REGISTRY_CREATION_TIMEOUT} " + "seconds. Note: Revocation registry creation may still be in progress in the " + "background. You can check status using the revocation registry endpoints." + ) diff --git a/acapy_agent/wallet/anoncreds_upgrade.py b/acapy_agent/wallet/anoncreds_upgrade.py index 71be59a5d9..79fda097f8 100644 --- a/acapy_agent/wallet/anoncreds_upgrade.py +++ b/acapy_agent/wallet/anoncreds_upgrade.py @@ -15,10 +15,13 @@ from aries_askar import AskarError from indy_credx import LinkSecret -from ..anoncreds.issuer import ( +from ..anoncreds.constants import ( CATEGORY_CRED_DEF, CATEGORY_CRED_DEF_KEY_PROOF, CATEGORY_CRED_DEF_PRIVATE, + CATEGORY_REV_LIST, + CATEGORY_REV_REG_DEF, + CATEGORY_REV_REG_DEF_PRIVATE, CATEGORY_SCHEMA, ) from ..anoncreds.models.credential_definition import CredDef, CredDefState @@ -30,11 +33,6 @@ RevRegDefValue, ) from ..anoncreds.models.schema import SchemaState -from ..anoncreds.revocation import ( - CATEGORY_REV_LIST, - CATEGORY_REV_REG_DEF, - CATEGORY_REV_REG_DEF_PRIVATE, -) from ..cache.base import BaseCache from ..core.profile import Profile from ..indy.credx.holder import CATEGORY_LINK_SECRET, IndyCredxHolder