From e965845ba611db491e66539d43883311d4ed8057 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 22 Apr 2022 22:45:14 +0200 Subject: [PATCH] Support more data types --- daqingest/Cargo.toml | 2 +- daqingest/src/bin/daqingest.rs | 5 + daqingest/src/daqingest.rs | 19 +- daqingest/src/query.rs | 41 +++- netfetch/Cargo.toml | 1 + netfetch/src/bsread.rs | 170 ++++++++++----- netfetch/src/channelwriter.rs | 384 ++++++++++++++++++++++++++------- netfetch/src/netbuf.rs | 8 +- netfetch/src/zmtp.rs | 187 ++++++++++++++-- 9 files changed, 672 insertions(+), 145 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 2dce341..eacad57 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.1.0" +version = "0.1.2" authors = ["Dominik Werder "] edition = "2021" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 1b14d9b..5b91be9 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -14,6 +14,11 @@ pub fn main() -> Result<(), Error> { SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()).await?, SubCmd::ListPkey => daqingest::query::list_pkey().await?, SubCmd::ListPulses => daqingest::query::list_pulses().await?, + SubCmd::FetchEvents(k) => daqingest::query::fetch_events(k).await?, + SubCmd::BsreadDump(k) => { + let mut f = netfetch::zmtp::BsreadDumper::new(k.source); + f.run().await? + } } Ok(()) }) diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index a020557..2d24195 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -5,10 +5,12 @@ use netfetch::zmtp::ZmtpClientOpts; #[derive(Debug, Parser)] //#[clap(name = "daqingest", version)] -//#[clap(version)] +#[clap(version)] pub struct DaqIngestOpts { #[clap(long, parse(from_occurrences))] pub verbose: u32, + #[clap(long)] + pub tag: String, #[clap(subcommand)] pub subcmd: SubCmd, } @@ -18,6 +20,8 @@ pub enum SubCmd { Bsread(Bsread), ListPkey, ListPulses, + FetchEvents(FetchEvents), + BsreadDump(BsreadDump), } #[derive(Debug, Parser)] @@ -45,3 +49,16 @@ impl From for ZmtpClientOpts { } } } + +#[derive(Debug, Parser)] +pub struct FetchEvents { + #[clap(long, min_values(1))] + pub scylla: Vec, + #[clap(long)] + pub channel: String, +} + +#[derive(Debug, Parser)] +pub struct BsreadDump { + pub source: String, +} diff --git a/daqingest/src/query.rs b/daqingest/src/query.rs index c58a5a8..a0a4f3f 100644 --- a/daqingest/src/query.rs +++ b/daqingest/src/query.rs @@ -1,3 +1,4 @@ +use crate::FetchEvents; use log::*; use scylla::batch::Consistency; use scylla::transport::errors::{NewSessionError, QueryError}; @@ -27,7 +28,7 @@ pub async fn list_pkey() -> Result<(), Error> { let scy = SessionBuilder::new() .known_node("127.0.0.1:19042") .default_consistency(Consistency::One) - .use_keyspace("ks1", false) + .use_keyspace("ks1", true) .build() .await?; let query = scy @@ -68,7 +69,7 @@ pub async fn list_pulses() -> Result<(), Error> { let scy = SessionBuilder::new() .known_node("127.0.0.1:19042") .default_consistency(Consistency::One) - .use_keyspace("ks1", false) + .use_keyspace("ks1", true) .build() .await?; let query = scy @@ -103,3 +104,39 @@ pub async fn list_pulses() -> Result<(), Error> { } Ok(()) } + +pub async fn fetch_events(opts: FetchEvents) -> Result<(), Error> { + let scy = SessionBuilder::new() + .known_nodes(&opts.scylla) + .default_consistency(Consistency::One) + .use_keyspace("ks1", true) + .build() + .await?; + let qu_series = scy + .prepare( + "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?", + ) + .await?; + let qres = scy.execute(&qu_series, ("scylla", &opts.channel)).await?; + if let Some(rows) = qres.rows { + info!("Found {} matching series", rows.len()); + for r in &rows { + info!("Got row: {r:?}"); + if false { + if r.columns.len() < 2 { + warn!("see {} columns", r.columns.len()); + } else { + let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); + let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32; + let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32; + let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64; + info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}"); + } + } + } + let _row = rows.into_iter().next().unwrap(); + } else { + warn!("No result from series lookup"); + } + Ok(()) +} diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 7c7e2e5..3de3928 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -29,3 +29,4 @@ stats = { path = "../stats" } err = { path = "../../daqbuffer/err" } netpod = { path = "../../daqbuffer/netpod" } taskrun = { path = "../../daqbuffer/taskrun" } +bitshuffle = { path = "../../daqbuffer/bitshuffle" } diff --git a/netfetch/src/bsread.rs b/netfetch/src/bsread.rs index 545b981..7e964f2 100644 --- a/netfetch/src/bsread.rs +++ b/netfetch/src/bsread.rs @@ -21,13 +21,27 @@ pub struct GlobalTimestamp { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelDesc { pub name: String, - #[serde(rename = "type")] + #[serde(rename = "type", default = "bsread_type_default")] pub ty: String, + #[serde(default = "bsread_shape_default")] pub shape: JsVal, + #[serde(default = "bsread_encoding_default")] pub encoding: String, pub compression: Option, } +fn bsread_type_default() -> String { + "float64".into() +} + +fn bsread_shape_default() -> JsVal { + JsVal::Array(vec![]) +} + +fn bsread_encoding_default() -> String { + "little".into() +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct HeadA { pub htype: String, @@ -59,65 +73,115 @@ pub struct BsreadMessage { pub head_b_md5: String, } -pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result { - if msg.frames().len() < 2 { - return Err(Error::with_msg_no_trace("not enough frames for bsread")); +pub struct Parser { + tmp1: Vec, +} + +impl Parser { + pub fn new() -> Self { + Self { tmp1: vec![0; 1024] } } - let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?; - let head_b_md5 = { - use md5::Digest; - let mut hasher = md5::Md5::new(); - hasher.update(msg.frames()[1].data()); - let h = hasher.finalize(); - hex::encode(&h) - }; - let dhdecompr = match &head_a.dh_compression { - Some(m) => match m.as_str() { - "none" => msg.frames()[1].data(), - "lz4" => { - error!("data header lz4 compression not yet implemented"); - return Err(Error::with_msg_no_trace( - "data header lz4 compression not yet implemented", - )); - } - "bitshuffle_lz4" => { - error!("data header bitshuffle_lz4 compression not yet implemented"); - return Err(Error::with_msg_no_trace( - "data header bitshuffle_lz4 compression not yet implemented", - )); - } - _ => msg.frames()[1].data(), - }, - None => msg.frames()[1].data(), - }; - let head_b: HeadB = serde_json::from_slice(dhdecompr)?; - if false && msg.frames().len() == head_b.channels.len() + 3 { - for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) { - let sty = ScalarType::from_bsread_str(ch.ty.as_str())?; - let bo = ByteOrder::from_bsread_str(&ch.encoding)?; - let shape = Shape::from_bsread_jsval(&ch.shape)?; - match sty { - ScalarType::I64 => match &bo { - ByteOrder::LE => match &shape { - Shape::Scalar => { - assert_eq!(fr.data().len(), 8); - let _v = i64::from_le_bytes(fr.data().try_into()?); + + pub fn parse_zmtp_message(&mut self, msg: &ZmtpMessage) -> Result { + if msg.frames().len() < 2 { + return Err(Error::with_msg_no_trace("not enough frames for bsread")); + } + let head_a: HeadA = + serde_json::from_slice(&msg.frames()[0].data()).map_err(|e| format!("main header parse error {e:?}"))?; + let head_b_md5 = { + use md5::Digest; + let mut hasher = md5::Md5::new(); + hasher.update(msg.frames()[1].data()); + let h = hasher.finalize(); + hex::encode(&h) + }; + let dhdecompr = match &head_a.dh_compression { + Some(m) => match m.as_str() { + "none" => msg.frames()[1].data(), + "lz4" => { + let inp = msg.frames()[1].data(); + let nd = u32::from_be_bytes(inp[0..4].try_into()?) as usize; + loop { + let g = self.tmp1.len(); + if g >= nd { + break; } - Shape::Wave(_) => {} - Shape::Image(_, _) => {} + if g > 1024 * 512 { + return Err(Error::with_public_msg("decomp buffer too large")); + } + let u = self.tmp1.len() * 2; + info!("resize buffer to {u}"); + self.tmp1.resize(u, 0); + } + match bitshuffle::lz4_decompress(&inp[4..], &mut self.tmp1) { + Ok(_) => {} + Err(e) => { + // TODO throttle log output + error!("lz4 error {e:?}"); + return Err(Error::with_public_msg(format!("lz4 error {e:?}"))); + } + } + &self.tmp1[..nd] + } + "bitshuffle_lz4" => { + let inp = msg.frames()[1].data(); + let nd = u64::from_be_bytes(inp[0..8].try_into()?) as usize; + let bs = u32::from_be_bytes(inp[8..12].try_into()?) as usize; + loop { + let g = self.tmp1.len(); + if g >= nd { + break; + } + if g > 1024 * 512 { + return Err(Error::with_public_msg("decomp buffer too large")); + } + let u = self.tmp1.len() * 2; + info!("resize buffer to {u}"); + self.tmp1.resize(u, 0); + } + match bitshuffle::bitshuffle_decompress(&inp[12..], &mut self.tmp1, nd, 1, bs) { + Ok(_) => {} + Err(e) => { + // TODO throttle log output + error!("bitshuffle_lz4 error {e:?}"); + return Err(Error::with_public_msg(format!("bitshuffle_lz4 error {e:?}"))); + } + } + &self.tmp1[..nd] + } + _ => msg.frames()[1].data(), + }, + None => msg.frames()[1].data(), + }; + let head_b: HeadB = serde_json::from_slice(dhdecompr).map_err(|e| format!("data header parse error: {e:?}"))?; + if false && msg.frames().len() == head_b.channels.len() + 3 { + for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) { + let sty = ScalarType::from_bsread_str(ch.ty.as_str())?; + let bo = ByteOrder::from_bsread_str(&ch.encoding)?; + let shape = Shape::from_bsread_jsval(&ch.shape)?; + match sty { + ScalarType::I64 => match &bo { + ByteOrder::LE => match &shape { + Shape::Scalar => { + assert_eq!(fr.data().len(), 8); + let _v = i64::from_le_bytes(fr.data().try_into()?); + } + Shape::Wave(_) => {} + Shape::Image(_, _) => {} + }, + _ => {} }, _ => {} - }, - _ => {} + } } } + let ret = BsreadMessage { + head_a, + head_b, + head_b_md5, + }; + Ok(ret) } - let ret = BsreadMessage { - head_a, - head_b, - head_b_md5, - }; - Ok(ret) } pub struct BsreadCollector {} diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs index 15b9d95..62c9351 100644 --- a/netfetch/src/channelwriter.rs +++ b/netfetch/src/channelwriter.rs @@ -54,10 +54,7 @@ impl Future for ScyQueryFut { use Poll::*; match self.fut.poll_unpin(cx) { Ready(k) => match k { - Ok(_) => { - info!("ScyQueryFut done Ok"); - Ready(Ok(())) - } + Ok(_) => Ready(Ok(())), Err(e) => { warn!("ScyQueryFut done Err"); Ready(Err(e).err_conv()) @@ -181,7 +178,7 @@ impl Future for ScyBatchFutGen { match self.fut.poll_unpin(cx) { Ready(k) => match k { Ok(_) => { - trace!("ScyBatchFutGen done Ok"); + trace!("ScyBatchFutGen done Ok"); Ready(Ok(())) } Err(e) => { @@ -192,7 +189,7 @@ impl Future for ScyBatchFutGen { "ScyBatchFutGen polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", self.polled, dt_created, dt_polled ); - warn!("ScyBatchFutGen done Err {e:?}"); + warn!("ScyBatchFutGen done Err {e:?}"); Ready(Err(e).err_conv()) } }, @@ -201,6 +198,102 @@ impl Future for ScyBatchFutGen { } } +pub struct InsertLoopFut { + #[allow(unused)] + scy: Arc, + #[allow(unused)] + query: Arc, + futs: Vec>>>>, + fut_ix: usize, + polled: usize, + ts_create: Instant, + ts_poll_start: Instant, +} + +impl InsertLoopFut { + pub fn new(scy: Arc, query: PreparedStatement, values: Vec) -> Self + where + V: ValueList + 'static, + { + //values.clear(); + let query = Arc::new(query); + let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession; + let query_ref = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement; + // TODO + // Can I store the values in some better generic form? + // Or is it acceptable to generate all insert futures right here and poll them later? + let futs: Vec<_> = values + .into_iter() + .map(|vs| { + // + let fut = scy_ref.execute(query_ref, vs); + Box::pin(fut) as _ + }) + .collect(); + let tsnow = Instant::now(); + Self { + scy, + query, + futs, + fut_ix: 0, + polled: 0, + ts_create: tsnow, + ts_poll_start: tsnow, + } + } +} + +impl Future for InsertLoopFut { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + if self.polled == 0 { + self.ts_poll_start = Instant::now(); + } + self.polled += 1; + if self.futs.is_empty() { + return Ready(Ok(())); + } + loop { + let fut_ix = self.fut_ix; + break match self.futs[fut_ix].poll_unpin(cx) { + Ready(k) => match k { + Ok(_) => { + self.fut_ix += 1; + if self.fut_ix >= self.futs.len() { + if false { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; + info!( + "InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", + self.polled, dt_created, dt_polled + ); + } + continue; + } else { + Ready(Ok(())) + } + } + Err(e) => { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; + warn!( + "InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", + self.polled, dt_created, dt_polled + ); + warn!("InsertLoopFut done Err {e:?}"); + Ready(Err(e).err_conv()) + } + }, + Pending => Pending, + }; + } + } +} + pub struct ChannelWriteRes { pub nrows: u32, pub dt: Duration, @@ -285,6 +378,7 @@ trait MsgAcceptor { fn len(&self) -> usize; fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>; fn should_flush(&self) -> bool; + fn flush_loop(&mut self, scy: Arc) -> Result; fn flush_batch(&mut self, scy: Arc) -> Result; } @@ -292,13 +386,12 @@ macro_rules! impl_msg_acceptor_scalar { ($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => { struct $sname { query: PreparedStatement, - values: Vec<(i32, i64, i64, i64, $st)>, - series: i32, + values: Vec<(i64, i64, i64, i64, $st)>, + series: i64, } impl $sname { - #[allow(unused)] - pub fn new(series: i32, cq: &CommonQueries) -> Self { + pub fn new(series: i64, cq: &CommonQueries) -> Self { Self { query: cq.$qu_id.clone(), values: vec![], @@ -315,7 +408,16 @@ macro_rules! impl_msg_acceptor_scalar { fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> { type ST = $st; const STL: usize = std::mem::size_of::(); - let value = ST::$from_bytes(fr.data()[0..STL].try_into()?); + let data = fr.data(); + if data.len() < STL { + return Err(Error::with_msg_no_trace(format!( + "data frame too small for type: {} vs {}", + data.len(), + STL + ))); + } + let a = data[..STL].try_into()?; + let value = ST::$from_bytes(a); self.values.push((self.series, ts_msp, ts_lsp, pulse, value)); Ok(()) } @@ -334,6 +436,12 @@ macro_rules! impl_msg_acceptor_scalar { let ret = ScyBatchFutGen::new(scy, batch, vt); Ok(ret) } + + fn flush_loop(&mut self, scy: Arc) -> Result { + let vt = mem::replace(&mut self.values, vec![]); + let ret = InsertLoopFut::new(scy, self.query.clone(), vt); + Ok(ret) + } } }; } @@ -342,19 +450,20 @@ macro_rules! impl_msg_acceptor_array { ($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => { struct $sname { query: PreparedStatement, - values: Vec<(i32, i64, i64, i64, Vec<$st>)>, - series: i32, + values: Vec<(i64, i64, i64, i64, Vec<$st>)>, + series: i64, array_truncate: usize, + truncated: usize, } impl $sname { - #[allow(unused)] - pub fn new(series: i32, array_truncate: usize, cq: &CommonQueries) -> Self { + pub fn new(series: i64, array_truncate: usize, cq: &CommonQueries) -> Self { Self { query: cq.$qu_id.clone(), values: vec![], series, array_truncate, + truncated: 0, } } } @@ -374,7 +483,18 @@ macro_rules! impl_msg_acceptor_array { let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?); values.push(value); } - values.truncate(self.array_truncate); + if values.len() > self.array_truncate { + if self.truncated < 10 { + warn!( + "truncate {} to {} for series {}", + values.len(), + self.array_truncate, + self.series + ); + } + values.truncate(self.array_truncate); + self.truncated = self.truncated.saturating_add(1); + } self.values.push((self.series, ts_msp, ts_lsp, pulse, values)); Ok(()) } @@ -393,6 +513,12 @@ macro_rules! impl_msg_acceptor_array { let ret = ScyBatchFutGen::new(scy, batch, vt); Ok(ret) } + + fn flush_loop(&mut self, scy: Arc) -> Result { + let vt = mem::replace(&mut self.values, vec![]); + let ret = InsertLoopFut::new(scy, self.query.clone(), vt); + Ok(ret) + } } }; } @@ -403,6 +529,8 @@ impl_msg_acceptor_scalar!(MsgAcceptorScalarU32LE, i32, qu_insert_scalar_i32, fro impl_msg_acceptor_scalar!(MsgAcceptorScalarU32BE, i32, qu_insert_scalar_i32, from_be_bytes); impl_msg_acceptor_scalar!(MsgAcceptorScalarI16LE, i16, qu_insert_scalar_i16, from_le_bytes); impl_msg_acceptor_scalar!(MsgAcceptorScalarI16BE, i16, qu_insert_scalar_i16, from_be_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarI32LE, i32, qu_insert_scalar_i32, from_le_bytes); +impl_msg_acceptor_scalar!(MsgAcceptorScalarI32BE, i32, qu_insert_scalar_i32, from_be_bytes); impl_msg_acceptor_scalar!(MsgAcceptorScalarF32LE, f32, qu_insert_scalar_f32, from_le_bytes); impl_msg_acceptor_scalar!(MsgAcceptorScalarF32BE, f32, qu_insert_scalar_f32, from_be_bytes); impl_msg_acceptor_scalar!(MsgAcceptorScalarF64LE, f64, qu_insert_scalar_f64, from_le_bytes); @@ -412,24 +540,104 @@ impl_msg_acceptor_array!(MsgAcceptorArrayU16LE, i16, qu_insert_array_u16, from_l impl_msg_acceptor_array!(MsgAcceptorArrayU16BE, i16, qu_insert_array_u16, from_be_bytes); impl_msg_acceptor_array!(MsgAcceptorArrayI16LE, i16, qu_insert_array_i16, from_le_bytes); impl_msg_acceptor_array!(MsgAcceptorArrayI16BE, i16, qu_insert_array_i16, from_be_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayI32LE, i32, qu_insert_array_i32, from_le_bytes); +impl_msg_acceptor_array!(MsgAcceptorArrayI32BE, i32, qu_insert_array_i32, from_be_bytes); impl_msg_acceptor_array!(MsgAcceptorArrayF32LE, f32, qu_insert_array_f32, from_le_bytes); impl_msg_acceptor_array!(MsgAcceptorArrayF32BE, f32, qu_insert_array_f32, from_be_bytes); impl_msg_acceptor_array!(MsgAcceptorArrayF64LE, f64, qu_insert_array_f64, from_le_bytes); impl_msg_acceptor_array!(MsgAcceptorArrayF64BE, f64, qu_insert_array_f64, from_be_bytes); +struct MsgAcceptorArrayBool { + query: PreparedStatement, + values: Vec<(i64, i64, i64, i64, Vec)>, + series: i64, + array_truncate: usize, + truncated: usize, +} + +impl MsgAcceptorArrayBool { + pub fn new(series: i64, array_truncate: usize, cq: &CommonQueries) -> Self { + Self { + query: cq.qu_insert_array_bool.clone(), + values: vec![], + series, + array_truncate, + truncated: 0, + } + } +} + +impl MsgAcceptor for MsgAcceptorArrayBool { + fn len(&self) -> usize { + self.values.len() + } + + fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> { + type ST = bool; + const STL: usize = std::mem::size_of::(); + let vc = fr.data().len() / STL; + let mut values = Vec::with_capacity(vc); + for i in 0..vc { + let h = i * STL; + let value = u8::from_le_bytes(fr.data()[h..h + STL].try_into()?); + values.push(value); + } + if values.len() > self.array_truncate { + if self.truncated < 10 { + warn!( + "truncate {} to {} for series {}", + values.len(), + self.array_truncate, + self.series + ); + } + values.truncate(self.array_truncate); + self.truncated = self.truncated.saturating_add(1); + } + let values = values.into_iter().map(|x| x != 0).collect(); + self.values.push((self.series, ts_msp, ts_lsp, pulse, values)); + Ok(()) + } + + fn should_flush(&self) -> bool { + self.len() >= 40 + ((self.series as usize) & 0x7) + } + + fn flush_batch(&mut self, scy: Arc) -> Result { + let vt = mem::replace(&mut self.values, vec![]); + let nn = vt.len(); + let mut batch = Batch::new(BatchType::Unlogged); + for _ in 0..nn { + batch.append_statement(self.query.clone()); + } + let ret = ScyBatchFutGen::new(scy, batch, vt); + Ok(ret) + } + + fn flush_loop(&mut self, scy: Arc) -> Result { + let vt = mem::replace(&mut self.values, vec![]); + let ret = InsertLoopFut::new(scy, self.query.clone(), vt); + Ok(ret) + } +} + pub struct ChannelWriterAll { - series: u32, + series: u64, scy: Arc, common_queries: Arc, - ts_msp_lsp: fn(u64, u32) -> (u64, u64), + ts_msp_lsp: fn(u64, u64) -> (u64, u64), ts_msp_last: u64, acceptor: Box, - dtype_mark: u32, + #[allow(unused)] + scalar_type: ScalarType, + #[allow(unused)] + shape: Shape, + pulse_last: u64, } impl ChannelWriterAll { pub fn new( - series: u32, + series: u64, common_queries: Arc, scy: Arc, scalar_type: ScalarType, @@ -437,60 +645,66 @@ impl ChannelWriterAll { byte_order: ByteOrder, array_truncate: usize, ) -> Result { - let dtype_mark = scalar_type.index() as u32; - let dtype_mark = match &shape { - Shape::Scalar => dtype_mark, - Shape::Wave(_) => 1000 + dtype_mark, - Shape::Image(_, _) => 2000 + dtype_mark, - }; - let (ts_msp_lsp, acc): (fn(u64, u32) -> (u64, u64), Box) = match &shape { + let (ts_msp_lsp, acc): (fn(u64, u64) -> (u64, u64), Box) = match &shape { Shape::Scalar => match &scalar_type { ScalarType::U16 => match &byte_order { - ByteOrder::BE => { - let acc = MsgAcceptorScalarU16BE::new(series as i32, &common_queries); + ByteOrder::LE => { + let acc = MsgAcceptorScalarU16LE::new(series as i64, &common_queries); (ts_msp_lsp_1, Box::new(acc) as _) } - ByteOrder::LE => { - return Err(Error::with_msg_no_trace(format!( - "TODO {:?} {:?} {:?}", - scalar_type, shape, byte_order - ))); + ByteOrder::BE => { + let acc = MsgAcceptorScalarU16BE::new(series as i64, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) } }, ScalarType::U32 => match &byte_order { - ByteOrder::BE => { - let acc = MsgAcceptorScalarU32BE::new(series as i32, &common_queries); + ByteOrder::LE => { + let acc = MsgAcceptorScalarU32LE::new(series as i64, &common_queries); (ts_msp_lsp_1, Box::new(acc) as _) } + ByteOrder::BE => { + let acc = MsgAcceptorScalarU32BE::new(series as i64, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) + } + }, + ScalarType::I16 => match &byte_order { ByteOrder::LE => { - return Err(Error::with_msg_no_trace(format!( - "TODO {:?} {:?} {:?}", - scalar_type, shape, byte_order - ))); + let acc = MsgAcceptorScalarI16LE::new(series as i64, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) + } + ByteOrder::BE => { + let acc = MsgAcceptorScalarI16BE::new(series as i64, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) + } + }, + ScalarType::I32 => match &byte_order { + ByteOrder::LE => { + let acc = MsgAcceptorScalarI32LE::new(series as i64, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) + } + ByteOrder::BE => { + let acc = MsgAcceptorScalarI32BE::new(series as i64, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) } }, ScalarType::F32 => match &byte_order { - ByteOrder::BE => { - let acc = MsgAcceptorScalarF32BE::new(series as i32, &common_queries); + ByteOrder::LE => { + let acc = MsgAcceptorScalarF32LE::new(series as i64, &common_queries); (ts_msp_lsp_1, Box::new(acc) as _) } - ByteOrder::LE => { - return Err(Error::with_msg_no_trace(format!( - "TODO {:?} {:?} {:?}", - scalar_type, shape, byte_order - ))); + ByteOrder::BE => { + let acc = MsgAcceptorScalarF32BE::new(series as i64, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) } }, ScalarType::F64 => match &byte_order { - ByteOrder::BE => { - let acc = MsgAcceptorScalarF64BE::new(series as i32, &common_queries); + ByteOrder::LE => { + let acc = MsgAcceptorScalarF64LE::new(series as i64, &common_queries); (ts_msp_lsp_1, Box::new(acc) as _) } - ByteOrder::LE => { - return Err(Error::with_msg_no_trace(format!( - "TODO {:?} {:?} {:?}", - scalar_type, shape, byte_order - ))); + ByteOrder::BE => { + let acc = MsgAcceptorScalarF64BE::new(series as i64, &common_queries); + (ts_msp_lsp_1, Box::new(acc) as _) } }, _ => { @@ -503,43 +717,59 @@ impl ChannelWriterAll { Shape::Wave(nele) => { info!("set up wave acceptor nele {nele}"); match &scalar_type { + ScalarType::BOOL => match &byte_order { + _ => { + let acc = MsgAcceptorArrayBool::new(series as i64, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + }, ScalarType::U16 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayU16LE::new(series as i32, array_truncate, &common_queries); + let acc = MsgAcceptorArrayU16LE::new(series as i64, array_truncate, &common_queries); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayU16BE::new(series as i32, array_truncate, &common_queries); + let acc = MsgAcceptorArrayU16BE::new(series as i64, array_truncate, &common_queries); (ts_msp_lsp_2, Box::new(acc) as _) } }, ScalarType::I16 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayI16LE::new(series as i32, array_truncate, &common_queries); + let acc = MsgAcceptorArrayI16LE::new(series as i64, array_truncate, &common_queries); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayI16BE::new(series as i32, array_truncate, &common_queries); + let acc = MsgAcceptorArrayI16BE::new(series as i64, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + }, + ScalarType::I32 => match &byte_order { + ByteOrder::LE => { + let acc = MsgAcceptorArrayI32LE::new(series as i64, array_truncate, &common_queries); + (ts_msp_lsp_2, Box::new(acc) as _) + } + ByteOrder::BE => { + let acc = MsgAcceptorArrayI32BE::new(series as i64, array_truncate, &common_queries); (ts_msp_lsp_2, Box::new(acc) as _) } }, ScalarType::F32 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayF32LE::new(series as i32, array_truncate, &common_queries); + let acc = MsgAcceptorArrayF32LE::new(series as i64, array_truncate, &common_queries); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayF32BE::new(series as i32, array_truncate, &common_queries); + let acc = MsgAcceptorArrayF32BE::new(series as i64, array_truncate, &common_queries); (ts_msp_lsp_2, Box::new(acc) as _) } }, ScalarType::F64 => match &byte_order { ByteOrder::LE => { - let acc = MsgAcceptorArrayF64LE::new(series as i32, array_truncate, &common_queries); + let acc = MsgAcceptorArrayF64LE::new(series as i64, array_truncate, &common_queries); (ts_msp_lsp_2, Box::new(acc) as _) } ByteOrder::BE => { - let acc = MsgAcceptorArrayF64BE::new(series as i32, array_truncate, &common_queries); + let acc = MsgAcceptorArrayF64BE::new(series as i64, array_truncate, &common_queries); (ts_msp_lsp_2, Box::new(acc) as _) } }, @@ -565,25 +795,31 @@ impl ChannelWriterAll { ts_msp_lsp, ts_msp_last: 0, acceptor: acc, - dtype_mark, + scalar_type, + shape, + pulse_last: 0, }; Ok(ret) } - pub fn dtype_mark(&self) -> u32 { - self.dtype_mark - } - pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { + // TODO limit log rate + // TODO for many channels, it's normal to have gaps. + if false && pulse != 0 && pulse != self.pulse_last + 1 { + let gap = pulse as i64 - self.pulse_last as i64; + warn!("GAP series {} pulse {} gap {}", self.series, pulse, gap); + } + self.pulse_last = pulse; let (ts_msp, ts_lsp) = (self.ts_msp_lsp)(ts, self.series); let fut1 = if ts_msp != self.ts_msp_last { - info!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}"); + debug!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}"); self.ts_msp_last = ts_msp; // TODO make the passing of the query parameters type safe. + // TODO the "dtype" table field is not needed here. Drop from database. let fut = ScyQueryFut::new( self.scy.clone(), self.common_queries.qu_insert_ts_msp.clone(), - (self.series as i32, ts_msp as i64, self.dtype_mark as i32), + (self.series as i64, ts_msp as i64), ); Some(Box::pin(fut) as _) } else { @@ -592,7 +828,7 @@ impl ChannelWriterAll { self.acceptor.accept(ts_msp as i64, ts_lsp as i64, pulse as i64, fr)?; if self.acceptor.should_flush() { let nn = self.acceptor.len(); - let fut = self.acceptor.flush_batch(self.scy.clone())?; + let fut = self.acceptor.flush_loop(self.scy.clone())?; let fut2 = Some(Box::pin(fut) as _); let ret = ChannelWriteFut { ts1: None, @@ -621,19 +857,19 @@ impl ChannelWriter for ChannelWriterAll { } } -fn ts_msp_lsp_1(ts: u64, series: u32) -> (u64, u64) { +fn ts_msp_lsp_1(ts: u64, series: u64) -> (u64, u64) { ts_msp_lsp_gen(ts, series, 100 * SEC) } -fn ts_msp_lsp_2(ts: u64, series: u32) -> (u64, u64) { +fn ts_msp_lsp_2(ts: u64, series: u64) -> (u64, u64) { ts_msp_lsp_gen(ts, series, 10 * SEC) } -fn ts_msp_lsp_gen(ts: u64, series: u32, fak: u64) -> (u64, u64) { +fn ts_msp_lsp_gen(ts: u64, series: u64, fak: u64) -> (u64, u64) { if ts < u32::MAX as u64 { return (0, 0); } - let off = series as u64; + let off = series & 0xffffffff; let ts_a = ts - off; let ts_b = ts_a / fak; let ts_lsp = ts_a % fak; diff --git a/netfetch/src/netbuf.rs b/netfetch/src/netbuf.rs index 370c6dc..b7bd80c 100644 --- a/netfetch/src/netbuf.rs +++ b/netfetch/src/netbuf.rs @@ -156,7 +156,13 @@ impl NetBuf { } } - fn check_invariant(&self) { + #[allow(unused)] + #[inline(always)] + fn check_invariant(&self) {} + + #[allow(unused)] + #[inline(always)] + fn check_invariant2(&self) { if self.wp > self.buf.len() { eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); std::process::exit(87); diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index b3d4701..4920699 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,4 +1,4 @@ -use crate::bsread::{parse_zmtp_message, BsreadMessage}; +use crate::bsread::{BsreadMessage, Parser}; use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; use crate::channelwriter::{ChannelWriter, ChannelWriterAll}; use crate::netbuf::NetBuf; @@ -72,12 +72,16 @@ fn test_service() -> Result<(), Error> { taskrun::run(fut) } -pub fn get_series_id(chn: &ChannelDesc) -> u32 { +pub fn get_series_id(chn: &ChannelDesc) -> u64 { + // TODO use a more stable format (with ScalarType, Shape) as hash input. + // TODO do not depend at all on the mapping, instead look it up on demand and cache. use md5::Digest; let mut h = md5::Md5::new(); h.update(chn.name.as_bytes()); + h.update(chn.ty.as_bytes()); + h.update(format!("{:?}", chn.shape).as_bytes()); let f = h.finalize(); - u32::from_le_bytes(f.as_slice()[0..4].try_into().unwrap()) + u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()) } pub struct CommonQueries { @@ -92,8 +96,10 @@ pub struct CommonQueries { pub qu_insert_scalar_f64: PreparedStatement, pub qu_insert_array_u16: PreparedStatement, pub qu_insert_array_i16: PreparedStatement, + pub qu_insert_array_i32: PreparedStatement, pub qu_insert_array_f32: PreparedStatement, pub qu_insert_array_f64: PreparedStatement, + pub qu_insert_array_bool: PreparedStatement, } #[derive(Clone)] @@ -136,9 +142,10 @@ struct BsreadClient { rcvbuf: Option, tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>, scy: Arc, - channel_writers: BTreeMap>, + channel_writers: BTreeMap>, common_queries: Arc, print_stats: CheckEvery, + parser: Parser, } impl BsreadClient { @@ -158,6 +165,7 @@ impl BsreadClient { channel_writers: Default::default(), common_queries, print_stats: CheckEvery::new(Duration::from_millis(2000)), + parser: Parser::new(), }; Ok(ret) } @@ -177,13 +185,14 @@ impl BsreadClient { let mut bytes_payload = 0u64; let mut rows_inserted = 0u32; let mut time_spent_inserting = Duration::from_millis(0); + let mut series_ids = Vec::new(); while let Some(item) = zmtp.next().await { match item { Ok(ev) => match ev { ZmtpEvent::ZmtpCommand(_) => (), ZmtpEvent::ZmtpMessage(msg) => { msgc += 1; - match parse_zmtp_message(&msg) { + match self.parser.parse_zmtp_message(&msg) { Ok(bm) => { if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 { frame_diff_count += 1; @@ -207,9 +216,10 @@ impl BsreadClient { } { if bm.head_b_md5 != dh_md5_last { + series_ids.clear(); head_b = bm.head_b.clone(); if dh_md5_last.is_empty() { - info!("data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash); + info!("data header hash {}", bm.head_b_md5); dh_md5_last = bm.head_b_md5.clone(); for chn in &head_b.channels { info!("Setup writer for {}", chn.name); @@ -221,7 +231,7 @@ impl BsreadClient { } } } else { - error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash); + error!("TODO changed data header hash {}", bm.head_b_md5); dh_md5_last = bm.head_b_md5.clone(); // TODO // Update only the changed channel writers. @@ -253,11 +263,23 @@ impl BsreadClient { let gts = bm.head_a.global_timestamp; let ts = (gts.sec as u64) * SEC + gts.ns as u64; let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0); + // TODO limit warn rate + if pulse != 0 && (pulse < 14781000000 || pulse > 16000000000) { + // TODO limit log rate + warn!("Bad pulse {} for {}", pulse, self.source_addr); + } for i1 in 0..head_b.channels.len() { let chn = &head_b.channels[i1]; let fr = &msg.frames[2 + 2 * i1]; - let series = get_series_id(chn); - if !self.channel_writers.contains_key(&series) {} + if i1 >= series_ids.len() { + series_ids.resize(head_b.channels.len(), (0u8, 0u64)); + } + if series_ids[i1].0 == 0 { + let series = get_series_id(chn); + series_ids[i1].0 = 1; + series_ids[i1].1 = series; + } + let series = series_ids[i1].1; if let Some(cw) = self.channel_writers.get_mut(&series) { let res = cw.write_msg(ts, pulse, fr)?.await?; rows_inserted += res.nrows; @@ -329,13 +351,13 @@ impl BsreadClient { byte_order.clone(), trunc, )?; - let dtype_mark = cw.dtype_mark(); + let shape_dims = shape.to_scylla_vec(); self.channel_writers.insert(series, Box::new(cw)); // TODO insert correct facility name self.scy .query( - "insert into series_by_channel (facility, channel_name, dtype, series) values (?, ?, ?, ?)", - ("scylla", &chn.name, dtype_mark as i32, series as i32), + "insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?)", + ("scylla", &chn.name, series as i64, scalar_type.to_scylla_i32(), &shape_dims), ) .await .err_conv()?; @@ -407,7 +429,7 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_ts_msp = scy - .prepare("insert into ts_msp (series, ts_msp, dtype) values (?, ?, ?)") + .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_u16 = scy @@ -442,6 +464,10 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { .prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_i32 = scy + .prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_array_f32 = scy .prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await @@ -450,6 +476,10 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { .prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_bool = scy + .prepare("insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let common_queries = CommonQueries { qu1, qu2, @@ -462,8 +492,10 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { qu_insert_scalar_f64, qu_insert_array_u16, qu_insert_array_i16, + qu_insert_array_i32, qu_insert_array_f32, qu_insert_array_f64, + qu_insert_array_bool, }; let common_queries = Arc::new(common_queries); let mut clients = vec![]; @@ -520,6 +552,135 @@ fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { Ok(()) } +pub struct BsreadDumper { + source_addr: String, + parser: Parser, +} + +impl BsreadDumper { + pub fn new(source_addr: String) -> Self { + Self { + source_addr, + parser: Parser::new(), + } + } + + pub async fn run(&mut self) -> Result<(), Error> { + let src = if self.source_addr.starts_with("tcp://") { + self.source_addr[6..].into() + } else { + self.source_addr.clone() + }; + let conn = tokio::net::TcpStream::connect(&src).await?; + let mut zmtp = Zmtp::new(conn, SocketType::PULL); + let mut i1 = 0u64; + let mut msgc = 0u64; + let mut dh_md5_last = String::new(); + let mut frame_diff_count = 0u64; + let mut hash_mismatch_count = 0u64; + let mut head_b = HeadB::empty(); + while let Some(item) = zmtp.next().await { + match item { + Ok(ev) => match ev { + ZmtpEvent::ZmtpCommand(_) => (), + ZmtpEvent::ZmtpMessage(msg) => { + msgc += 1; + match self.parser.parse_zmtp_message(&msg) { + Ok(bm) => { + if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 { + frame_diff_count += 1; + if frame_diff_count < 1000 { + warn!( + "chn len {} frame diff {}", + bm.head_b.channels.len(), + msg.frames().len() - 2 * bm.head_b.channels.len() + ); + } + } + if bm.head_b_md5 != bm.head_a.hash { + hash_mismatch_count += 1; + // TODO keep logging data header changes, just suppress too frequent messages. + if hash_mismatch_count < 200 { + error!( + "Invalid bsread message: hash mismatch. dhcompr {:?}", + bm.head_a.dh_compression + ); + } + } + if bm.head_b_md5 != dh_md5_last { + head_b = bm.head_b.clone(); + if dh_md5_last.is_empty() { + info!("data header hash {}", bm.head_b_md5); + } else { + error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash); + } + dh_md5_last = bm.head_b_md5.clone(); + } + if msg.frames.len() < 2 + 2 * head_b.channels.len() { + // TODO count always, throttle log. + error!("not enough frames for data header"); + } + let gts = bm.head_a.global_timestamp; + let ts = (gts.sec as u64) * SEC + gts.ns as u64; + let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0); + let mut bytes_payload = 0u64; + for i1 in 0..head_b.channels.len() { + let chn = &head_b.channels[i1]; + let fr = &msg.frames[2 + 2 * i1]; + let _series = get_series_id(chn); + if chn.ty == "string" { + info!("string channel: {} {:?}", chn.name, chn.shape); + if let Ok(shape) = Shape::from_bsread_jsval(&chn.shape) { + if let Ok(_bo) = ByteOrder::from_bsread_str(&chn.encoding) { + match &shape { + Shape::Scalar => { + info!("scalar string..."); + let s = String::from_utf8_lossy(fr.data()); + info!("STRING: {s:?}"); + } + _ => { + warn!( + "non-scalar string channels not yet implemented {}", + chn.name + ); + } + } + } + } + } + bytes_payload += fr.data().len() as u64; + } + info!("zmtp message ts {ts} pulse {pulse} bytes_payload {bytes_payload}"); + } + Err(e) => { + for frame in &msg.frames { + info!("Frame: {:?}", frame); + } + zmtp.dump_input_state(); + zmtp.dump_conn_state(); + error!("bsread parse error: {e:?}"); + break; + } + } + } + }, + Err(e) => { + error!("zmtp item error: {e:?}"); + return Err(e); + } + } + i1 += 1; + if true && i1 > 20 { + break; + } + if true && msgc > 20 { + break; + } + } + Ok(()) + } +} + #[derive(Clone, Debug)] enum ConnState { InitSend,