From a14c05e925281eac36ae4893900e8023e97d8f4d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 8 Apr 2022 18:00:22 +0200 Subject: [PATCH] Support multiple sources --- daqingest/src/daqingest.rs | 4 +- netfetch/src/zmtp.rs | 143 ++++++++++++++++++++++++------------- 2 files changed, 97 insertions(+), 50 deletions(-) diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index f9f8c1d..a020557 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -25,7 +25,7 @@ pub struct Bsread { #[clap(long)] pub scylla: Vec, #[clap(long)] - pub source: String, + pub source: Vec, #[clap(long)] pub rcvbuf: Option, #[clap(long)] @@ -38,7 +38,7 @@ impl From for ZmtpClientOpts { fn from(k: Bsread) -> Self { Self { scylla: k.scylla, - addr: k.source, + sources: k.source, rcvbuf: k.rcvbuf, array_truncate: k.array_truncate, do_pulse_id: k.do_pulse_id, diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 6ebc3b8..4e1db4f 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -6,8 +6,8 @@ use async_channel::{Receiver, Sender}; #[allow(unused)] use bytes::BufMut; use err::Error; -use futures_core::Stream; -use futures_util::{pin_mut, StreamExt}; +use futures_core::{Future, Stream}; +use futures_util::{pin_mut, FutureExt, StreamExt}; use log::*; use netpod::timeunits::*; use scylla::batch::{Batch, BatchType, Consistency}; @@ -86,17 +86,42 @@ pub struct CommonQueries { pub qu_insert_array_f64: PreparedStatement, } +#[derive(Clone)] pub struct ZmtpClientOpts { pub scylla: Vec, - pub addr: String, + pub sources: Vec, pub do_pulse_id: bool, pub rcvbuf: Option, pub array_truncate: Option, } +struct ClientRun { + #[allow(unused)] + client: Pin>, + fut: Pin>>>, +} + +impl ClientRun { + fn new(client: BsreadClient) -> Self { + let mut client = Box::pin(client); + let client2 = unsafe { &mut *(&mut client as &mut _ as *mut _) } as &mut BsreadClient; + let fut = client2.run(); + let fut = Box::pin(fut) as _; + Self { client, fut } + } +} + +impl Future for ClientRun { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.fut.poll_unpin(cx) + } +} + struct BsreadClient { opts: ZmtpClientOpts, - addr: String, + source_addr: String, do_pulse_id: bool, rcvbuf: Option, tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>, @@ -106,59 +131,27 @@ struct BsreadClient { } impl BsreadClient { - pub async fn new(opts: ZmtpClientOpts) -> Result { - let scy = SessionBuilder::new().default_consistency(Consistency::Quorum); - let mut scy = scy; - for a in &opts.scylla { - scy = scy.known_node(a); - } - let scy = scy - .use_keyspace("ks1", false) - .build() - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu1 = scy - .prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu2 = scy - .prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_ts_msp = scy - .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_scalar_f64 = scy - .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_array_f64 = scy - .prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let common_queries = CommonQueries { - qu1, - qu2, - qu_insert_ts_msp, - qu_insert_scalar_f64, - qu_insert_array_f64, - }; + pub async fn new( + opts: ZmtpClientOpts, + source_addr: String, + scy: Arc, + common_queries: Arc, + ) -> Result { let ret = Self { - addr: opts.addr.clone(), + source_addr, do_pulse_id: opts.do_pulse_id, rcvbuf: opts.rcvbuf, opts, tmp_vals_pulse_map: vec![], - scy: Arc::new(scy), + scy, channel_writers: Default::default(), - common_queries: Arc::new(common_queries), + common_queries, }; Ok(ret) } pub async fn run(&mut self) -> Result<(), Error> { - let mut conn = tokio::net::TcpStream::connect(&self.addr).await?; + let mut conn = tokio::net::TcpStream::connect(&self.source_addr).await?; if let Some(v) = self.rcvbuf { set_rcv_sock_opts(&mut conn, v as u32)?; } @@ -293,6 +286,14 @@ impl BsreadClient { fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> { let series = get_series_id(chn); + let has_comp = match &chn.compression { + Some(s) => s != "none", + None => false, + }; + if has_comp { + warn!("Compression not yet supported [{}]", chn.name); + return Ok(()); + } match chn.ty.as_str() { "float64" => match &chn.shape { JsVal::Array(a) => { @@ -383,8 +384,54 @@ impl BsreadClient { } pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { - let mut client = BsreadClient::new(opts).await?; - client.run().await + let scy = SessionBuilder::new().default_consistency(Consistency::Quorum); + let mut scy = scy; + for a in &opts.scylla { + scy = scy.known_node(a); + } + let scy = scy + .use_keyspace("ks1", false) + .build() + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let scy = Arc::new(scy); + let qu1 = scy + .prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu2 = scy + .prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_ts_msp = scy + .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_f64 = scy + .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_f64 = scy + .prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let common_queries = CommonQueries { + qu1, + qu2, + qu_insert_ts_msp, + qu_insert_scalar_f64, + qu_insert_array_f64, + }; + let common_queries = Arc::new(common_queries); + let mut clients = vec![]; + for source_addr in &opts.sources { + let client = BsreadClient::new(opts.clone(), source_addr.into(), scy.clone(), common_queries.clone()).await?; + let fut = ClientRun::new(client); + //let client = Box::pin(client) as Pin>>>; + clients.push(fut); + } + futures_util::future::join_all(clients).await; + Ok(()) } fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {