Skip to content

Commit 36a8922

Browse files
authored
Add support for valkey in the Redis client managers (#1488)
1 parent 5dc2aea commit 36a8922

File tree

3 files changed

+96
-30
lines changed

3 files changed

+96
-30
lines changed

src/socketio/async_redis_manager.py

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import pickle
3+
from urllib.parse import urlparse
34

45
try: # pragma: no cover
56
from redis import asyncio as aioredis
@@ -12,6 +13,13 @@
1213
aioredis = None
1314
RedisError = None
1415

16+
try: # pragma: no cover
17+
from valkey import asyncio as valkey
18+
from valkey.exceptions import ValkeyError
19+
except ImportError: # pragma: no cover
20+
valkey = None
21+
ValkeyError = None
22+
1523
from .async_pubsub_manager import AsyncPubSubManager
1624
from .redis_manager import parse_redis_sentinel_url
1725

@@ -47,38 +55,61 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
4755

4856
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
4957
write_only=False, logger=None, redis_options=None):
50-
if aioredis is None:
58+
if aioredis is None and valkey is None:
5159
raise RuntimeError('Redis package is not installed '
52-
'(Run "pip install redis" in your virtualenv).')
53-
if not hasattr(aioredis.Redis, 'from_url'):
60+
'(Run "pip install redis" or '
61+
'"pip install valkey" '
62+
'in your virtualenv).')
63+
if aioredis and not hasattr(aioredis.Redis, 'from_url'):
5464
raise RuntimeError('Version 2 of aioredis package is required.')
5565
super().__init__(channel=channel, write_only=write_only, logger=logger)
5666
self.redis_url = url
5767
self.redis_options = redis_options or {}
5868
self._redis_connect()
5969

70+
def _get_redis_module_and_error(self):
71+
parsed_url = urlparse(self.redis_url)
72+
schema = parsed_url.scheme.split('+', 1)[0].lower()
73+
if schema == 'redis':
74+
if aioredis is None or RedisError is None:
75+
raise RuntimeError('Redis package is not installed '
76+
'(Run "pip install redis" '
77+
'in your virtualenv).')
78+
return aioredis, RedisError
79+
if schema == 'valkey':
80+
if valkey is None or ValkeyError is None:
81+
raise RuntimeError('Valkey package is not installed '
82+
'(Run "pip install valkey" '
83+
'in your virtualenv).')
84+
return valkey, ValkeyError
85+
error_msg = f'Unsupported Redis URL schema: {schema}'
86+
raise ValueError(error_msg)
87+
6088
def _redis_connect(self):
61-
if not self.redis_url.startswith('redis+sentinel://'):
62-
self.redis = aioredis.Redis.from_url(self.redis_url,
63-
**self.redis_options)
64-
else:
89+
module, _ = self._get_redis_module_and_error()
90+
parsed_url = urlparse(self.redis_url)
91+
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
6592
sentinels, service_name, connection_kwargs = \
6693
parse_redis_sentinel_url(self.redis_url)
6794
kwargs = self.redis_options
6895
kwargs.update(connection_kwargs)
69-
sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs)
96+
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
7097
self.redis = sentinel.master_for(service_name or self.channel)
98+
else:
99+
self.redis = module.Redis.from_url(self.redis_url,
100+
**self.redis_options)
71101
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
72102

73103
async def _publish(self, data):
74104
retry = True
105+
_, error = self._get_redis_module_and_error()
75106
while True:
76107
try:
77108
if not retry:
78109
self._redis_connect()
79110
return await self.redis.publish(
80111
self.channel, pickle.dumps(data))
81-
except RedisError as exc:
112+
except error as exc:
82113
if retry:
83114
self._get_logger().error(
84115
'Cannot publish to redis... '
@@ -96,6 +127,7 @@ async def _publish(self, data):
96127
async def _redis_listen_with_retries(self):
97128
retry_sleep = 1
98129
connect = False
130+
_, error = self._get_redis_module_and_error()
99131
while True:
100132
try:
101133
if connect:
@@ -104,10 +136,10 @@ async def _redis_listen_with_retries(self):
104136
retry_sleep = 1
105137
async for message in self.pubsub.listen():
106138
yield message
107-
except RedisError as exc:
139+
except error as exc:
108140
self._get_logger().error('Cannot receive from redis... '
109141
'retrying in '
110-
'{} secs'.format(retry_sleep),
142+
f'{retry_sleep} secs',
111143
extra={"redis_exception": str(exc)})
112144
connect = True
113145
await asyncio.sleep(retry_sleep)

src/socketio/redis_manager.py

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,19 @@
33
import time
44
from urllib.parse import urlparse
55

6-
try:
6+
try: # pragma: no cover
77
import redis
8+
from redis.exceptions import RedisError
89
except ImportError:
910
redis = None
11+
RedisError = None
12+
13+
try: # pragma: no cover
14+
import valkey
15+
from valkey.exceptions import ValkeyError
16+
except ImportError:
17+
valkey = None
18+
ValkeyError = None
1019

1120
from .pubsub_manager import PubSubManager
1221

@@ -18,7 +27,7 @@ def parse_redis_sentinel_url(url):
1827
redis+sentinel://[:password]@host1:port1,host2:port2,.../db/service_name
1928
"""
2029
parsed_url = urlparse(url)
21-
if parsed_url.scheme != 'redis+sentinel':
30+
if parsed_url.scheme not in {'redis+sentinel', 'valkey+sentinel'}:
2231
raise ValueError('Invalid Redis Sentinel URL')
2332
sentinels = []
2433
for host_port in parsed_url.netloc.split('@')[-1].split(','):
@@ -71,10 +80,11 @@ class RedisManager(PubSubManager): # pragma: no cover
7180

7281
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
7382
write_only=False, logger=None, redis_options=None):
74-
if redis is None:
83+
if redis is None and valkey is None:
7584
raise RuntimeError('Redis package is not installed '
76-
'(Run "pip install redis" in your '
77-
'virtualenv).')
85+
'(Run "pip install redis" '
86+
'or "pip install valkey" '
87+
'in your virtualenv).')
7888
super().__init__(channel=channel, write_only=write_only, logger=logger)
7989
self.redis_url = url
8090
self.redis_options = redis_options or {}
@@ -95,27 +105,48 @@ def initialize(self):
95105
'Redis requires a monkey patched socket library to work '
96106
'with ' + self.server.async_mode)
97107

108+
def _get_redis_module_and_error(self):
109+
parsed_url = urlparse(self.redis_url)
110+
schema = parsed_url.scheme.split('+', 1)[0].lower()
111+
if schema == 'redis':
112+
if redis is None or RedisError is None:
113+
raise RuntimeError('Redis package is not installed '
114+
'(Run "pip install redis" '
115+
'in your virtualenv).')
116+
return redis, RedisError
117+
if schema == 'valkey':
118+
if valkey is None or ValkeyError is None:
119+
raise RuntimeError('Valkey package is not installed '
120+
'(Run "pip install valkey" '
121+
'in your virtualenv).')
122+
return valkey, ValkeyError
123+
error_msg = f'Unsupported Redis URL schema: {schema}'
124+
raise ValueError(error_msg)
125+
98126
def _redis_connect(self):
99-
if not self.redis_url.startswith('redis+sentinel://'):
100-
self.redis = redis.Redis.from_url(self.redis_url,
101-
**self.redis_options)
102-
else:
127+
module, _ = self._get_redis_module_and_error()
128+
parsed_url = urlparse(self.redis_url)
129+
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
103130
sentinels, service_name, connection_kwargs = \
104131
parse_redis_sentinel_url(self.redis_url)
105132
kwargs = self.redis_options
106133
kwargs.update(connection_kwargs)
107-
sentinel = redis.sentinel.Sentinel(sentinels, **kwargs)
134+
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
108135
self.redis = sentinel.master_for(service_name or self.channel)
136+
else:
137+
self.redis = module.Redis.from_url(self.redis_url,
138+
**self.redis_options)
109139
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
110140

111141
def _publish(self, data):
112142
retry = True
143+
_, error = self._get_redis_module_and_error()
113144
while True:
114145
try:
115146
if not retry:
116147
self._redis_connect()
117148
return self.redis.publish(self.channel, pickle.dumps(data))
118-
except redis.exceptions.RedisError as exc:
149+
except error as exc:
119150
if retry:
120151
logger.error(
121152
'Cannot publish to redis... retrying',
@@ -132,16 +163,17 @@ def _publish(self, data):
132163
def _redis_listen_with_retries(self):
133164
retry_sleep = 1
134165
connect = False
166+
_, error = self._get_redis_module_and_error()
135167
while True:
136168
try:
137169
if connect:
138170
self._redis_connect()
139171
self.pubsub.subscribe(self.channel)
140172
retry_sleep = 1
141173
yield from self.pubsub.listen()
142-
except redis.exceptions.RedisError as exc:
174+
except error as exc:
143175
logger.error('Cannot receive from redis... '
144-
'retrying in {} secs'.format(retry_sleep),
176+
f'retrying in {retry_sleep} secs',
145177
extra={"redis_exception": str(exc)})
146178
connect = True
147179
time.sleep(retry_sleep)

tests/common/test_redis_manager.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,35 @@
44

55

66
class TestPubSubManager:
7-
def test_sentinel_url_parser(self):
7+
@pytest.mark.parametrize('rtype', ['redis', 'valkey'])
8+
def test_sentinel_url_parser(self, rtype):
89
with pytest.raises(ValueError):
9-
parse_redis_sentinel_url('redis://localhost:6379/0')
10+
parse_redis_sentinel_url(f'{rtype}://localhost:6379/0')
1011

1112
assert parse_redis_sentinel_url(
12-
'redis+sentinel://localhost:6379'
13+
f'{rtype}+sentinel://localhost:6379'
1314
) == (
1415
[('localhost', 6379)],
1516
None,
1617
{}
1718
)
1819
assert parse_redis_sentinel_url(
19-
'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
20+
f'{rtype}+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
2021
) == (
2122
[('192.168.0.1', 6379), ('192.168.0.2', 6379)],
2223
None,
2324
{}
2425
)
2526
assert parse_redis_sentinel_url(
26-
'redis+sentinel://h1:6379,h2:6379/0'
27+
f'{rtype}+sentinel://h1:6379,h2:6379/0'
2728
) == (
2829
[('h1', 6379), ('h2', 6379)],
2930
None,
3031
{'db': 0}
3132
)
3233
assert parse_redis_sentinel_url(
33-
'redis+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis'
34+
f'{rtype}+sentinel://'
35+
'user:password@h1:6379,h2:6379,h1:6380/0/myredis'
3436
) == (
3537
[('h1', 6379), ('h2', 6379), ('h1', 6380)],
3638
'myredis',

0 commit comments

Comments
 (0)