Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/multi_database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ Key concepts
- Health checks:
A set of checks determines whether a database is healthy in proactive manner.
By default, an "PING" check runs against the database (all cluster nodes must
pass for a cluster). You can add custom checks. A Redis Enterprise specific
pass for a cluster). You can provide your own set of health checks or add an
additional health check on top of the default one. A Redis Enterprise specific
"lag-aware" health check is also available.

- Failure detector:
A detector observes command failures over a moving window (reactive monitoring).
You can specify an exact number of failures and failures rate to have more
fine-grain tuned configuration of triggering fail over based on organic traffic.
You can provide your own set of custom failure detectors or add an additional
detector on top of the default one.

- Failover strategy:
The default strategy is based on statically configured weights. It prefers the highest weighted healthy database.
Expand Down
18 changes: 10 additions & 8 deletions redis/asyncio/multidb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ class MultiDBClient(AsyncRedisModuleCommands, AsyncCoreCommands):

def __init__(self, config: MultiDbConfig):
self._databases = config.databases()
self._health_checks = config.default_health_checks()

if config.health_checks is not None:
self._health_checks.extend(config.health_checks)
self._health_checks = (
config.default_health_checks()
if not config.health_checks
else config.health_checks
)
Comment on lines +31 to +35
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change breaks backward compatibility. Previously, custom health_checks were added to the defaults via extend(), allowing users to augment default behavior. Now they completely replace defaults. Users who expected to add additional checks alongside the default PingHealthCheck will lose the default check. Consider providing a clear migration path or maintaining the extending behavior.

Suggested change
self._health_checks = (
config.default_health_checks()
if not config.health_checks
else config.health_checks
)
if config.health_checks:
self._health_checks = config.default_health_checks() + config.health_checks
else:
self._health_checks = config.default_health_checks()

Copilot uses AI. Check for mistakes.

self._health_check_interval = config.health_check_interval
self._health_check_policy: HealthCheckPolicy = config.health_check_policy.value(
config.health_check_probes, config.health_check_delay
)
self._failure_detectors = config.default_failure_detectors()

if config.failure_detectors is not None:
self._failure_detectors.extend(config.failure_detectors)
self._failure_detectors = (
config.default_failure_detectors()
if not config.failure_detectors
else config.failure_detectors
)
Comment on lines +41 to +45
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change breaks backward compatibility. Previously, custom failure_detectors were added to the defaults via extend(), allowing users to augment default behavior. Now they completely replace defaults. Users who expected to add additional detectors alongside the default CommandFailureDetector will lose the default detector. Consider providing a clear migration path or maintaining the extending behavior.

Suggested change
self._failure_detectors = (
config.default_failure_detectors()
if not config.failure_detectors
else config.failure_detectors
)
self._failure_detectors = config.default_failure_detectors()
if config.failure_detectors:
self._failure_detectors.extend(config.failure_detectors)

Copilot uses AI. Check for mistakes.

self._failover_strategy = (
config.default_failover_strategy()
Expand Down
2 changes: 1 addition & 1 deletion redis/asyncio/multidb/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ async def _check_active_database(self):
self._active_database is None
or self._active_database.circuit.state != CBState.CLOSED
or (
self._auto_fallback_interval != DEFAULT_AUTO_FALLBACK_INTERVAL
self._auto_fallback_interval > 0
and self._next_fallback_attempt <= datetime.now()
)
):
Expand Down
19 changes: 10 additions & 9 deletions redis/multidb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@ class MultiDBClient(RedisModuleCommands, CoreCommands):

def __init__(self, config: MultiDbConfig):
self._databases = config.databases()
self._health_checks = config.default_health_checks()

if config.health_checks is not None:
self._health_checks.extend(config.health_checks)

self._health_checks = (
config.default_health_checks()
if not config.health_checks
else config.health_checks
)
Comment on lines +32 to +36
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change breaks backward compatibility. Previously, custom health_checks were added to the defaults via extend(), allowing users to augment default behavior. Now they completely replace defaults. Users who expected to add additional checks alongside the default PingHealthCheck will lose the default check. Consider providing a clear migration path or maintaining the extending behavior.

Suggested change
self._health_checks = (
config.default_health_checks()
if not config.health_checks
else config.health_checks
)
if config.health_checks:
self._health_checks = config.default_health_checks() + config.health_checks
else:
self._health_checks = config.default_health_checks()

Copilot uses AI. Check for mistakes.
self._health_check_interval = config.health_check_interval
self._health_check_policy: HealthCheckPolicy = config.health_check_policy.value(
config.health_check_probes, config.health_check_probes_delay
)
self._failure_detectors = config.default_failure_detectors()

if config.failure_detectors is not None:
self._failure_detectors.extend(config.failure_detectors)
self._failure_detectors = (
config.default_failure_detectors()
if not config.failure_detectors
else config.failure_detectors
)
Comment on lines +41 to +45
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change breaks backward compatibility. Previously, custom failure_detectors were added to the defaults via extend(), allowing users to augment default behavior. Now they completely replace defaults. Users who expected to add additional detectors alongside the default CommandFailureDetector will lose the default detector. Consider providing a clear migration path or maintaining the extending behavior.

Suggested change
self._failure_detectors = (
config.default_failure_detectors()
if not config.failure_detectors
else config.failure_detectors
)
self._failure_detectors = config.default_failure_detectors()
if config.failure_detectors:
self._failure_detectors.extend(config.failure_detectors)

Copilot uses AI. Check for mistakes.

self._failover_strategy = (
config.default_failover_strategy()
Expand Down
4 changes: 2 additions & 2 deletions redis/multidb/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def auto_fallback_interval(self, auto_fallback_interval: int) -> None:
self._auto_fallback_interval = auto_fallback_interval

def _schedule_next_fallback(self) -> None:
if self._auto_fallback_interval == DEFAULT_AUTO_FALLBACK_INTERVAL:
if self._auto_fallback_interval < 0:
return

self._next_fallback_attempt = datetime.now() + timedelta(
Expand Down Expand Up @@ -321,7 +321,7 @@ def _check_active_database(self):
self._active_database is None
or self._active_database.circuit.state != CBState.CLOSED
or (
self._auto_fallback_interval != DEFAULT_AUTO_FALLBACK_INTERVAL
self._auto_fallback_interval > 0
and self._next_fallback_attempt <= datetime.now()
)
):
Expand Down
106 changes: 66 additions & 40 deletions tests/test_asyncio/test_multidb/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@ async def test_execute_command_against_correct_db_on_successful_initialization(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_db1.client.execute_command = AsyncMock(return_value="OK1")

Expand Down Expand Up @@ -71,12 +69,10 @@ async def test_execute_command_against_correct_db_and_closed_circuit(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_db1.client.execute_command = AsyncMock(return_value="OK1")

Expand Down Expand Up @@ -187,14 +183,10 @@ async def mock_check_health(database):
return True

mock_hc.check_health.side_effect = mock_check_health
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config,
"default_health_checks",
return_value=[mock_hc],
),
):
mock_db.client.execute_command.return_value = "OK"
mock_db1.client.execute_command.return_value = "OK1"
Expand Down Expand Up @@ -264,14 +256,10 @@ async def mock_check_health(database):
return True

mock_hc.check_health.side_effect = mock_check_health
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config,
"default_health_checks",
return_value=[mock_hc],
),
):
mock_db.client.execute_command.return_value = "OK"
mock_db1.client.execute_command.return_value = "OK1"
Expand All @@ -287,6 +275,60 @@ async def mock_check_health(database):
await asyncio.sleep(0.5)
assert await client.set("key", "value") == "OK1"

@pytest.mark.asyncio
@pytest.mark.parametrize(
"mock_multi_db_config,mock_db, mock_db1, mock_db2",
[
(
{"health_check_probes": 1},
{"weight": 0.2, "circuit": {"state": CBState.CLOSED}},
{"weight": 0.7, "circuit": {"state": CBState.CLOSED}},
{"weight": 0.5, "circuit": {"state": CBState.CLOSED}},
),
],
indirect=True,
)
async def test_execute_command_do_not_auto_fallback_to_highest_weight_db(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
db1_counter = 0
error_event = asyncio.Event()
check = False

async def mock_check_health(database):
nonlocal db1_counter, check

if database == mock_db1 and not check:
db1_counter += 1

if db1_counter > 1:
error_event.set()
check = True
return False

return True

mock_hc.check_health.side_effect = mock_check_health
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
mock_db.client.execute_command.return_value = "OK"
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"
mock_multi_db_config.health_check_interval = 0.1
mock_multi_db_config.auto_fallback_interval = -1
mock_multi_db_config.failover_strategy = WeightBasedFailoverStrategy()

async with MultiDBClient(mock_multi_db_config) as client:
assert await client.set("key", "value") == "OK1"
await error_event.wait()
assert await client.set("key", "value") == "OK2"
await asyncio.sleep(0.5)
assert await client.set("key", "value") == "OK2"

@pytest.mark.asyncio
@pytest.mark.parametrize(
"mock_multi_db_config,mock_db, mock_db1, mock_db2",
Expand All @@ -304,12 +346,10 @@ async def test_execute_command_throws_exception_on_failed_initialization(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_hc.check_health.return_value = False

Expand Down Expand Up @@ -340,12 +380,10 @@ async def test_add_database_throws_exception_on_same_database(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_hc.check_health.return_value = False

Expand Down Expand Up @@ -373,12 +411,10 @@ async def test_add_database_makes_new_database_active(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"
Expand Down Expand Up @@ -413,12 +449,10 @@ async def test_remove_highest_weighted_database(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"
Expand Down Expand Up @@ -451,12 +485,10 @@ async def test_update_database_weight_to_be_highest(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"
Expand Down Expand Up @@ -491,12 +523,10 @@ async def test_add_new_failure_detector(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_db1.client.execute_command.return_value = "OK1"
mock_multi_db_config.event_dispatcher = EventDispatcher()
Expand Down Expand Up @@ -552,12 +582,10 @@ async def test_add_new_health_check(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_db1.client.execute_command.return_value = "OK1"

Expand Down Expand Up @@ -594,12 +622,10 @@ async def test_set_active_database(
self, mock_multi_db_config, mock_db, mock_db1, mock_db2, mock_hc
):
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
patch.object(
mock_multi_db_config, "default_health_checks", return_value=[mock_hc]
),
):
mock_db1.client.execute_command.return_value = "OK1"
mock_db.client.execute_command.return_value = "OK"
Expand Down
Loading