Skip to content

Commit 548d5e0

Browse files
committed
feat: also support python-socketio v4
Configurable line protocol by cargo feature flag Added tests to cover the new server implementation No breaking changes to public API
1 parent daae14e commit 548d5e0

File tree

7 files changed

+602
-201
lines changed

7 files changed

+602
-201
lines changed

.github/workflows/ci.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ jobs:
1212
- stable
1313
- beta
1414
- nightly
15+
features:
16+
- js-v7
17+
- python-v4
1518

1619
steps:
1720
- uses: actions/checkout@v2
@@ -26,16 +29,20 @@ jobs:
2629
- uses: actions-rs/cargo@v1
2730
with:
2831
command: build
32+
args: --no-default-features --features ${{ matrix.features }}
2933

3034
- uses: actions-rs/cargo@v1
3135
with:
3236
command: test
37+
args: --no-default-features --features ${{ matrix.features }}
3338

3439
- uses: actions-rs/cargo@v1
3540
with:
3641
command: fmt
3742
args: --all -- --check
43+
args: --no-default-features --features ${{ matrix.features }}
3844

3945
- uses: actions-rs/cargo@v1
4046
with:
4147
command: clippy
48+
args: --no-default-features --features ${{ matrix.features }}

Cargo.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,20 @@ license = "MIT"
1515
redis = "0.27"
1616
rmp = "0.8"
1717
serde = "1.0"
18+
serde_json = "1.0"
1819
serde_derive = "1.0"
19-
rmp-serde = "1.3"
20+
rmp-serde = { version = "1.3", optional = true }
2021

2122
[dev-dependencies]
2223
testcontainers = { version = "0.22.0", features = ["blocking"] }
24+
tempfile = "3.12"
25+
26+
[features]
27+
default = ["js-v7"]
28+
# Feature structure: {language}-{version of adapter}
29+
# Unfortunatelly, the version of the adapter is not directly related to the version of the protocol, and differs per implementation.
30+
31+
# Compatible with socket.io-redis-adapter v7 analogous with socket.io-redis-emitter v4 (socket.io protocol version 4)
32+
js-v7 = ["rmp-serde"]
33+
# Compatible with python-socketio v4 (socket.io protocol version 3 or 4)
34+
python-v4 = []

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# socketio-rust-emitter
22

3+
Emits messages to a Redis instance for consumption by either:
4+
- (default) socket.io servers (protocol version 5, socket.io-redis-adapter v7) (under the feature flag `js_v7`)
5+
- python-socketio (version 4) (under the feature flag `py_v4`)
6+
37
[![build status](https://github.com/epli2/socketio-rust-emitter/actions/workflows/ci.yaml/badge.svg?branch=master&event=push)](https://github.com/epli2/socketio-rust-emitter/actions)
48
[![socketio-rust-emitter at crates.io](https://img.shields.io/crates/v/socketio-rust-emitter.svg)](https://crates.io/crates/socketio-rust-emitter)
59
[![socketio-rust-emitter at docs.rs](https://docs.rs/socketio-rust-emitter/badge.svg)](https://docs.rs/socketio-rust-emitter)

src/implementations/javascript.rs

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
use std::collections::HashMap;
2+
3+
use crate::Emitter;
4+
use redis::Commands;
5+
use rmp_serde::Serializer;
6+
use serde::Serialize;
7+
8+
#[derive(Debug, PartialEq, Serialize, Deserialize)]
9+
struct Packet {
10+
#[serde(rename = "type")]
11+
_type: i32,
12+
data: Vec<String>,
13+
nsp: String,
14+
}
15+
16+
#[derive(Debug, PartialEq, Serialize, Deserialize)]
17+
struct Opts {
18+
rooms: Vec<String>,
19+
flags: HashMap<String, bool>,
20+
}
21+
22+
impl Emitter {
23+
pub fn json(mut self) -> Emitter {
24+
let mut flags = HashMap::new();
25+
flags.insert("json".to_string(), true);
26+
self.flags = flags;
27+
self
28+
}
29+
30+
pub fn volatile(mut self) -> Emitter {
31+
let mut flags = HashMap::new();
32+
flags.insert("volatile".to_string(), true);
33+
self.flags = flags;
34+
self
35+
}
36+
37+
pub fn broadcast(mut self) -> Emitter {
38+
let mut flags = HashMap::new();
39+
flags.insert("broadcast".to_string(), true);
40+
self.flags = flags;
41+
self
42+
}
43+
44+
pub fn emit(mut self, message: Vec<&str>) -> Emitter {
45+
let packet = Packet {
46+
_type: 2,
47+
data: message.iter().map(|s| s.to_string()).collect(),
48+
nsp: self.nsp.clone(),
49+
};
50+
let opts = Opts {
51+
rooms: self.rooms.clone(),
52+
flags: self.flags.clone(),
53+
};
54+
let mut msg = Vec::new();
55+
let val = (self.uid.clone(), packet, opts);
56+
val.serialize(&mut Serializer::new(&mut msg).with_struct_map())
57+
.unwrap();
58+
59+
let channel = if self.rooms.len() == 1 {
60+
format!("{}{}#", self.channel, self.rooms.join("#"))
61+
} else {
62+
self.channel.clone()
63+
};
64+
65+
let _: () = self.redis.publish(channel, msg).unwrap();
66+
self.rooms = vec![];
67+
self.flags = HashMap::new();
68+
self
69+
}
70+
}
71+
72+
#[cfg(test)]
73+
mod tests {
74+
use super::{Emitter, Opts, Packet};
75+
use crate::tests::create_redis;
76+
use redis::Msg;
77+
use rmp_serde::Deserializer;
78+
use serde::Deserialize;
79+
80+
fn decode_msg(msg: Msg) -> (String, Packet, Opts) {
81+
let payload: Vec<u8> = msg.get_payload().unwrap();
82+
let mut de = Deserializer::new(&payload[..]);
83+
Deserialize::deserialize(&mut de).unwrap()
84+
}
85+
86+
#[test]
87+
fn emit() {
88+
create_redis!(redis);
89+
let mut con = redis.get_connection().unwrap();
90+
let mut pubsub = con.as_pubsub();
91+
pubsub.subscribe("socket.io#/#").unwrap();
92+
93+
// act
94+
let io = Emitter::new(redis);
95+
io.emit(vec!["test1", "test2"]);
96+
97+
// assert
98+
let actual = decode_msg(pubsub.get_message().unwrap());
99+
assert_eq!("emitter", actual.0);
100+
assert_eq!(
101+
Packet {
102+
_type: 2,
103+
data: vec!["test1".to_string(), "test2".to_string()],
104+
nsp: "/".to_string(),
105+
},
106+
actual.1
107+
);
108+
assert_eq!(
109+
Opts {
110+
rooms: vec![],
111+
flags: Default::default()
112+
},
113+
actual.2
114+
);
115+
}
116+
117+
#[test]
118+
fn emit_in_namespaces() {
119+
create_redis!(redis);
120+
let mut con = redis.get_connection().unwrap();
121+
let mut pubsub = con.as_pubsub();
122+
pubsub.subscribe("socket.io#/custom#").unwrap();
123+
124+
// act
125+
let io = Emitter::new(redis);
126+
io.of("/custom").emit(vec!["test"]);
127+
128+
// assert
129+
let actual = decode_msg(pubsub.get_message().unwrap());
130+
assert_eq!("emitter", actual.0);
131+
assert_eq!(
132+
Packet {
133+
_type: 2,
134+
data: vec!["test".to_string()],
135+
nsp: "/custom".to_string(),
136+
},
137+
actual.1
138+
);
139+
assert_eq!(
140+
Opts {
141+
rooms: vec![],
142+
flags: Default::default()
143+
},
144+
actual.2
145+
);
146+
}
147+
148+
#[test]
149+
fn emit_to_namespaces() {
150+
create_redis!(redis);
151+
let mut con = redis.get_connection().unwrap();
152+
let mut pubsub = con.as_pubsub();
153+
pubsub.subscribe("socket.io#/custom#").unwrap();
154+
155+
// act
156+
let io = Emitter::new(redis);
157+
io.of("/custom").emit(vec!["test"]);
158+
159+
// assert
160+
let actual = decode_msg(pubsub.get_message().unwrap());
161+
assert_eq!("emitter", actual.0);
162+
assert_eq!(
163+
Packet {
164+
_type: 2,
165+
data: vec!["test".to_string()],
166+
nsp: "/custom".to_string(),
167+
},
168+
actual.1
169+
);
170+
assert_eq!(
171+
Opts {
172+
rooms: vec![],
173+
flags: Default::default()
174+
},
175+
actual.2
176+
);
177+
}
178+
179+
#[test]
180+
fn emit_to_room() {
181+
create_redis!(redis);
182+
let mut con = redis.get_connection().unwrap();
183+
let mut pubsub = con.as_pubsub();
184+
pubsub.subscribe("socket.io#/#room1#").unwrap();
185+
186+
// act
187+
let io = Emitter::new(redis);
188+
io.to("room1").emit(vec!["test"]);
189+
190+
// assert
191+
let actual = decode_msg(pubsub.get_message().unwrap());
192+
assert_eq!("emitter", actual.0);
193+
assert_eq!(
194+
Packet {
195+
_type: 2,
196+
data: vec!["test".to_string()],
197+
nsp: "/".to_string(),
198+
},
199+
actual.1
200+
);
201+
assert_eq!(
202+
Opts {
203+
rooms: vec!["room1".to_string()],
204+
flags: Default::default()
205+
},
206+
actual.2
207+
);
208+
}
209+
}

src/implementations/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
//! A specific protocol used by the redis adapter.
2+
//!
3+
//! Unfortunately, the line protocol differs between the Python and JS implementations.
4+
//! This module provides a way to abstract over the differences.
5+
#[cfg(feature = "python-v4")]
6+
mod python_socketio;
7+
#[cfg(feature = "js-v7")]
8+
mod javascript;

0 commit comments

Comments
 (0)