Skip to content

Commit afd14d9

Browse files
author
don
committed
commit pre-zmq
1 parent bb469ad commit afd14d9

File tree

5 files changed

+11
-209
lines changed

5 files changed

+11
-209
lines changed

Cargo.lock

Lines changed: 1 addition & 117 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ warp = "0.2"
3636
reqwest = "0.10.8"
3737
resiter = "0.4.0"
3838
im = "15.0.0"
39-
libzmq = "0.2.5"
4039

4140
[dev-dependencies]
42-
pretty_assertions = "0.6.1"
41+
pretty_assertions = "0.6.1"

src/aws/cloudwatch_logs/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,9 @@ async fn get_logs(
149149
timestamp: event.timestamp,
150150
token: None,
151151
});
152-
if let Err(err) = result {
153-
error!("Some error sending event over channel {}", err)
152+
match result {
153+
Ok(_) => {}
154+
Err(err) => error!("Some error sending event over channel {}", err),
154155
}
155156
});
156157
}

src/main.rs

Lines changed: 4 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,80 +5,29 @@ extern crate log;
55

66

77
use std::collections::HashMap;
8-
use std::str::Utf8Error;
98
use std::sync::Arc;
109
use std::sync::Mutex;
1110

12-
use anyhow::Context as AnyhowContext;
13-
use futures::SinkExt;
14-
use tokio::stream::StreamExt;
15-
use tokio::time::{delay_for, Duration};
16-
use warp::Filter;
11+
use warp::{Filter};
1712
use warp::hyper::Method;
18-
use libzmq::{prelude::*, *};
1913

2014
use aws::ecs::get_ecs_filter;
21-
use error::handle_rejection;
2215

2316
use crate::aws::cloudwatch_logs::{get_logs_events_filter, get_logs_filter};
2417
use crate::aws::cloudwatch_logs::dto::LogsOptions;
2518
use crate::aws::dto::AwsRequest;
2619
use crate::aws::manager::setup_default_manager;
27-
use crate::notifications::{build_fan_notifications, NotUtf8, subscriber_connected};
28-
use libzmq::InprocAddr;
29-
use std::thread;
20+
use error::handle_rejection;
21+
use crate::notifications::{subscriber_connected, build_fan_notifications, NotUtf8};
3022

31-
mod test;
3223
mod aws;
3324
mod error;
3425
mod notifications;
3526

3627
#[tokio::main]
37-
async fn main() -> Result<(), anyhow::Error> {
28+
async fn main() {
3829
pretty_env_logger::init();
3930

40-
let inproc_addr: InprocAddr = InprocAddr::new_unique();
41-
let inproc_socket = ServerBuilder::new().bind(&inproc_addr).build()?;
42-
let subscriber_addr: TcpAddr = "eth0;192.168.1.1:5555".try_into()?;
43-
let publish_socket = ClientBuilder::new().connect(subscriber_addr).build()?;
44-
45-
// Spawn the server thread.
46-
let handle = thread::spawn(move || -> Result<(), Error> {
47-
loop {
48-
let request = inproc_socket.recv_msg()?;
49-
let msg = request.to_str().unwrap_or_default();
50-
info!("Received a message on the inproc socket, msg: {}", msg);
51-
publish_socket.send(msg).unwrap_or_default();
52-
}
53-
});
54-
55-
let client = ClientBuilder::new().connect(inproc_addr).build()?;
56-
57-
58-
59-
// let publish_notification = context.socket(zmq::PUB)
60-
// .with_context(|| "Failed to instantiate notification socket!").unwrap();
61-
// publish_notification
62-
// .bind("tcp://127.0.0.1:3031").unwrap();
63-
//
64-
// let inproc_socket = context.socket(zmq::SUB)
65-
// .with_context(|| "Failed to instantiate inproc socket!").unwrap();
66-
// inproc_socket
67-
// .connect("inproc://internal_proxy").unwrap();
68-
// inproc_socket
69-
// .set_subscribe(b"")
70-
// .expect("failed setting subscription");
71-
//
72-
// std::thread::spawn(move || {
73-
// while let Ok(msg) = inproc_socket.recv_msg(0) {
74-
// info!("Received a message on the inproc socket");
75-
// let result = publish_notification.send(msg, 0);
76-
// if let Err(err) = result {
77-
// error!("Error sending message to notification sender!, err {}", err)
78-
// }
79-
// }
80-
// });
81-
8231

8332
let subscribers = Arc::new(Mutex::new(HashMap::new()));
8433
let subscribers = warp::any().map(move || subscribers.clone());
@@ -138,12 +87,6 @@ async fn main() -> Result<(), anyhow::Error> {
13887
warp::reply()
13988
});
14089

141-
let socket_test = warp::path("sockettest")
142-
.and(warp::post())
143-
.and(warp::body::content_length_limit(500))
144-
.and(warp::body::bytes())
145-
.and_then(move r|msg| notifications::handle_message(msg, client.clone()));
146-
14790
let notifications = warp::path("notifications")
14891
.and(warp::get())
14992
.and(subscribers)
@@ -158,19 +101,9 @@ async fn main() -> Result<(), anyhow::Error> {
158101
.or(log_stream)
159102
.or(bootstrap_config)
160103
.or(notify)
161-
.or(socket_test)
162104
.or(notifications)
163105
.with(cors)
164106
.recover(handle_rejection)
165107
)
166108
.run(([127, 0, 0, 1], 3030)).await;
167-
168-
// This will cause the server to fail with `InvalidCtx`.
169-
Ctx::global().shutdown();
170-
171-
// Join with the thread.
172-
let err = handle.join().unwrap().unwrap_err();
173-
assert_eq!(err.kind(), ErrorKind::InvalidCtx);
174-
175-
Ok(())
176109
}

src/notifications.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@ use std::collections::HashMap;
33
use std::sync::Arc;
44
use std::sync::atomic::{AtomicUsize, Ordering};
55
use std::sync::Mutex;
6-
use libzmq::{prelude::*, *};
76

8-
use futures::{Stream, StreamExt, SinkExt};
7+
use futures::{Stream, StreamExt};
98
use tokio::sync::mpsc;
10-
use warp::{sse::ServerSentEvent, Rejection};
11-
use anyhow::Context as AnyhowContext;
9+
use warp::{sse::ServerSentEvent};
1210

1311
// Our global unique user id counter.
1412
pub static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
@@ -94,16 +92,3 @@ pub fn build_fan_notifications(msg: String, users: &Subscribers) {
9492
}
9593
});
9694
}
97-
98-
pub async fn handle_message(bytes: bytes::Bytes, mut client: Client) -> Result<impl warp::Reply, Rejection> {
99-
let msg = std::str::from_utf8(&bytes);
100-
info!("sending");
101-
if let Ok(msg) = msg {
102-
info!("msg was ok");
103-
if let Err(err) = client.send(msg) {
104-
error!("failed to send to internal proxy!, {}", err)
105-
}
106-
}
107-
108-
Ok(warp::reply())
109-
}

0 commit comments

Comments
 (0)