Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ jobs:
- stable
- beta
- nightly
features:
- js-v7
- python-v4

steps:
- uses: actions/checkout@v2
Expand All @@ -26,16 +29,20 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: build
args: --no-default-features --features ${{ matrix.features }}

- uses: actions-rs/cargo@v1
with:
command: test
args: --no-default-features --features ${{ matrix.features }}

- uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
args: --no-default-features --features ${{ matrix.features }}

- uses: actions-rs/cargo@v1
with:
command: clippy
args: --no-default-features --features ${{ matrix.features }}
29 changes: 23 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,28 @@ keywords = ["socketio"]
license = "MIT"

[dependencies]
redis = "0.21.2"
rmp = "0.8.10"
serde = "1.0.130"
serde_derive = "1.0.130"
rmp-serde = "0.15.5"
redis = "0.27"
rmp = "0.8"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
rmp-serde = { version = "1.3", optional = true }
thiserror = "1"

[dev-dependencies]
testcontainers = "0.12.0"
testcontainers = { version = "0.22.0", features = ["blocking"] }
tempfile = "3.12"

[features]
default = ["js-v7"]
# Feature structure: {language}-{version of adapter}
# Unfortunatelly, the version of the adapter is not directly related to the version of the protocol, and differs per implementation.

# Compatible with socket.io-redis-adapter v7 analogous with socket.io-redis-emitter v4 (socket.io protocol version 4)
js-v7 = ["rmp-serde"]
# Compatible with python-socketio v4 (socket.io protocol version 3 or 4)
python-v4 = []

[lints.clippy]
unwrap_in_result = "deny"
unwrap_used = "deny"
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# socketio-rust-emitter

Emits messages to a Redis instance for consumption by either:
- (default) socket.io servers (protocol version 5, socket.io-redis-adapter v7) (under the feature flag `js_v7`)
- python-socketio (version 4) (under the feature flag `py_v4`)

[![build status](https://github.com/epli2/socketio-rust-emitter/actions/workflows/ci.yaml/badge.svg?branch=master&event=push)](https://github.com/epli2/socketio-rust-emitter/actions)
[![socketio-rust-emitter at crates.io](https://img.shields.io/crates/v/socketio-rust-emitter.svg)](https://crates.io/crates/socketio-rust-emitter)
[![socketio-rust-emitter at docs.rs](https://docs.rs/socketio-rust-emitter/badge.svg)](https://docs.rs/socketio-rust-emitter)
Expand Down
208 changes: 208 additions & 0 deletions src/implementations/javascript.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use std::collections::HashMap;

use crate::{Emitter, Result};
use redis::Commands;
use rmp_serde::Serializer;
use serde::Serialize;

#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct Packet {
#[serde(rename = "type")]
_type: i32,
data: Vec<String>,
nsp: String,
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct Opts {
rooms: Vec<String>,
flags: HashMap<String, bool>,
}

impl Emitter {
pub fn json(mut self) -> Emitter {
let mut flags = HashMap::new();
flags.insert("json".to_string(), true);
self.flags = flags;
self
}

pub fn volatile(mut self) -> Emitter {
let mut flags = HashMap::new();
flags.insert("volatile".to_string(), true);
self.flags = flags;
self
}

pub fn broadcast(mut self) -> Emitter {
let mut flags = HashMap::new();
flags.insert("broadcast".to_string(), true);
self.flags = flags;
self
}

pub fn emit(&mut self, message: Vec<&str>) -> Result<()> {
let packet = Packet {
_type: 2,
data: message.iter().map(|s| s.to_string()).collect(),
nsp: self.nsp.clone(),
};
let opts = Opts {
rooms: self.rooms.clone(),
flags: self.flags.clone(),
};
let mut msg = Vec::new();
let val = (self.uid.clone(), packet, opts);
val.serialize(&mut Serializer::new(&mut msg).with_struct_map())?;

let channel = if self.rooms.len() == 1 {
format!("{}{}#", self.channel, self.rooms.join("#"))
} else {
self.channel.clone()
};

let _: () = self.redis.publish(channel, msg)?;
self.rooms = vec![];
self.flags = HashMap::new();
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::{Emitter, Opts, Packet};
use crate::tests::create_redis;
use redis::Msg;
use rmp_serde::Deserializer;
use serde::Deserialize;

fn decode_msg(msg: Msg) -> (String, Packet, Opts) {
let payload: Vec<u8> = msg.get_payload().unwrap();
let mut de = Deserializer::new(&payload[..]);
Deserialize::deserialize(&mut de).unwrap()
}

#[test]
fn emit() {
create_redis!(redis);
let mut con = redis.get_connection().unwrap();
let mut pubsub = con.as_pubsub();
pubsub.subscribe("socket.io#/#").unwrap();

// act
let mut io = Emitter::new(redis).unwrap();
io.emit(vec!["test1", "test2"]).unwrap();

// assert
let actual = decode_msg(pubsub.get_message().unwrap());
assert_eq!("emitter", actual.0);
assert_eq!(
Packet {
_type: 2,
data: vec!["test1".to_string(), "test2".to_string()],
nsp: "/".to_string(),
},
actual.1
);
assert_eq!(
Opts {
rooms: vec![],
flags: Default::default()
},
actual.2
);
}

#[test]
fn emit_in_namespaces() {
create_redis!(redis);
let mut con = redis.get_connection().unwrap();
let mut pubsub = con.as_pubsub();
pubsub.subscribe("socket.io#/custom#").unwrap();

// act
let io = Emitter::new(redis).unwrap();
io.of("/custom").emit(vec!["test"]).unwrap();

// assert
let actual = decode_msg(pubsub.get_message().unwrap());
assert_eq!("emitter", actual.0);
assert_eq!(
Packet {
_type: 2,
data: vec!["test".to_string()],
nsp: "/custom".to_string(),
},
actual.1
);
assert_eq!(
Opts {
rooms: vec![],
flags: Default::default()
},
actual.2
);
}

#[test]
fn emit_to_namespaces() {
create_redis!(redis);
let mut con = redis.get_connection().unwrap();
let mut pubsub = con.as_pubsub();
pubsub.subscribe("socket.io#/custom#").unwrap();

// act
let io = Emitter::new(redis).unwrap();
io.of("/custom").emit(vec!["test"]).unwrap();

// assert
let actual = decode_msg(pubsub.get_message().unwrap());
assert_eq!("emitter", actual.0);
assert_eq!(
Packet {
_type: 2,
data: vec!["test".to_string()],
nsp: "/custom".to_string(),
},
actual.1
);
assert_eq!(
Opts {
rooms: vec![],
flags: Default::default()
},
actual.2
);
}

#[test]
fn emit_to_room() {
create_redis!(redis);
let mut con = redis.get_connection().unwrap();
let mut pubsub = con.as_pubsub();
pubsub.subscribe("socket.io#/#room1#").unwrap();

// act
let io = Emitter::new(redis).unwrap();
io.to("room1").emit(vec!["test"]).unwrap();

// assert
let actual = decode_msg(pubsub.get_message().unwrap());
assert_eq!("emitter", actual.0);
assert_eq!(
Packet {
_type: 2,
data: vec!["test".to_string()],
nsp: "/".to_string(),
},
actual.1
);
assert_eq!(
Opts {
rooms: vec!["room1".to_string()],
flags: Default::default()
},
actual.2
);
}
}
8 changes: 8 additions & 0 deletions src/implementations/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! A specific protocol used by the redis adapter.
//!
//! Unfortunately, the line protocol differs between the Python and JS implementations.
//! This module provides a way to abstract over the differences.
#[cfg(feature = "python-v4")]
mod python_socketio;
#[cfg(feature = "js-v7")]
mod javascript;
Loading