diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index f018178..0a33766 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -10,7 +10,7 @@ pub fn main() -> Result<(), Error> { } let opts = DaqIngestOpts::parse(); match opts.subcmd { - SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await?, + SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.scylla, &k.source, k.rcvbuf, k.do_pulse_id).await?, SubCmd::ListPkey => daqingest::query::list_pkey().await?, SubCmd::ListPulses => daqingest::query::list_pulses().await?, } diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 79fe212..b0f85d9 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -21,8 +21,12 @@ pub enum SubCmd { #[derive(Debug, Parser)] pub struct Bsread { + #[clap(long)] + pub scylla: String, #[clap(long)] pub source: String, #[clap(long)] pub rcvbuf: Option, + #[clap(long)] + pub do_pulse_id: bool, } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index be66be8..b71151e 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -22,6 +22,7 @@ futures-util = "0.3" #pin-project-lite = "0.2" scylla = "0.4" md-5 = "0.9" +hex = "0.4" libc = "0.2" log = { path = "../log" } err = { path = "../../daqbuffer/err" } diff --git a/netfetch/src/bsread.rs b/netfetch/src/bsread.rs index 2965d0a..545b981 100644 --- a/netfetch/src/bsread.rs +++ b/netfetch/src/bsread.rs @@ -5,7 +5,6 @@ use log::*; use netpod::{ByteOrder, ScalarType, Shape}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsVal; -use std::fmt; // TODO pub struct ParseError { @@ -13,50 +12,86 @@ pub struct ParseError { pub msg: ZmtpMessage, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct GlobalTimestamp { pub sec: u64, pub ns: u64, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelDesc { pub name: String, #[serde(rename = "type")] pub ty: String, pub shape: JsVal, pub encoding: String, + pub compression: Option, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct HeadA { pub htype: String, pub hash: String, pub pulse_id: serde_json::Number, pub global_timestamp: GlobalTimestamp, + pub dh_compression: Option, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct HeadB { pub htype: String, pub channels: Vec, } -#[derive(Debug)] +impl HeadB { + pub fn empty() -> Self { + Self { + htype: String::new(), + channels: vec![], + } + } +} + +#[derive(Clone, Debug)] pub struct BsreadMessage { pub head_a: HeadA, pub head_b: HeadB, - pub values: Vec>, + pub head_b_md5: String, } pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result { - if msg.frames().len() < 3 { + if msg.frames().len() < 2 { return Err(Error::with_msg_no_trace("not enough frames for bsread")); } let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?; - let head_b: HeadB = serde_json::from_slice(&msg.frames()[1].data())?; - let mut values = vec![]; - if msg.frames().len() == head_b.channels.len() + 3 { + let head_b_md5 = { + use md5::Digest; + let mut hasher = md5::Md5::new(); + hasher.update(msg.frames()[1].data()); + let h = hasher.finalize(); + hex::encode(&h) + }; + let dhdecompr = match &head_a.dh_compression { + Some(m) => match m.as_str() { + "none" => msg.frames()[1].data(), + "lz4" => { + error!("data header lz4 compression not yet implemented"); + return Err(Error::with_msg_no_trace( + "data header lz4 compression not yet implemented", + )); + } + "bitshuffle_lz4" => { + error!("data header bitshuffle_lz4 compression not yet implemented"); + return Err(Error::with_msg_no_trace( + "data header bitshuffle_lz4 compression not yet implemented", + )); + } + _ => msg.frames()[1].data(), + }, + None => msg.frames()[1].data(), + }; + let head_b: HeadB = serde_json::from_slice(dhdecompr)?; + if false && msg.frames().len() == head_b.channels.len() + 3 { for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) { let sty = ScalarType::from_bsread_str(ch.ty.as_str())?; let bo = ByteOrder::from_bsread_str(&ch.encoding)?; @@ -66,8 +101,7 @@ pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result { ByteOrder::LE => match &shape { Shape::Scalar => { assert_eq!(fr.data().len(), 8); - let v = i64::from_le_bytes(fr.data().try_into()?); - values.push(Box::new(v) as _); + let _v = i64::from_le_bytes(fr.data().try_into()?); } Shape::Wave(_) => {} Shape::Image(_, _) => {} @@ -78,14 +112,11 @@ pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result { } } } - { - let fr = &msg.frames()[msg.frames().len() - 1]; - if fr.data().len() == 8 { - let pulse = u64::from_le_bytes(fr.data().try_into()?); - info!("pulse {}", pulse); - } - } - let ret = BsreadMessage { head_a, head_b, values }; + let ret = BsreadMessage { + head_a, + head_b, + head_b_md5, + }; Ok(ret) } diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs new file mode 100644 index 0000000..425cd46 --- /dev/null +++ b/netfetch/src/channelwriter.rs @@ -0,0 +1,296 @@ +use crate::zmtp::ErrConv; +use crate::zmtp::{CommonQueries, ZmtpFrame}; +use err::Error; +use futures_core::Future; +use futures_util::FutureExt; +use log::*; +use scylla::batch::{Batch, BatchType}; +use scylla::frame::value::{BatchValues, ValueList}; +use scylla::prepared_statement::PreparedStatement; +use scylla::transport::errors::QueryError; +use scylla::{BatchResult, QueryResult, Session as ScySession}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Instant; + +pub struct ScyQueryFut { + #[allow(unused)] + scy: Arc, + #[allow(unused)] + query: Box, + #[allow(unused)] + values: Box, + fut: Pin>>>, +} + +impl ScyQueryFut { + pub fn new(scy: Arc, query: PreparedStatement, values: V) -> Self + where + V: ValueList + 'static, + { + let query = Box::new(query); + let values = Box::new(values); + let scy2 = unsafe { &*(&scy as &_ as *const _) } as &ScySession; + let query2 = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement; + let v2 = unsafe { &*(&values as &_ as *const _) } as &V; + let fut = scy2.execute(query2, v2); + Self { + scy, + query, + values, + fut: Box::pin(fut), + } + } +} + +impl Future for ScyQueryFut { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + match self.fut.poll_unpin(cx) { + Ready(k) => match k { + Ok(_) => { + info!("ScyQueryFut done Ok"); + Ready(Ok(())) + } + Err(e) => { + info!("ScyQueryFut done Err"); + Ready(Err(e).err_conv()) + } + }, + Pending => Pending, + } + } +} + +pub struct ScyBatchFut { + #[allow(unused)] + scy: Arc, + #[allow(unused)] + batch: Box, + #[allow(unused)] + values: Box, + fut: Pin>>>, +} + +impl ScyBatchFut { + pub fn new(scy: Arc, batch: Batch, values: V) -> Self + where + V: BatchValues + 'static, + { + let batch = Box::new(batch); + let values = Box::new(values); + let scy2 = unsafe { &*(&scy as &_ as *const _) } as &ScySession; + let batch2 = unsafe { &*(&batch as &_ as *const _) } as &Batch; + let v2 = unsafe { &*(&values as &_ as *const _) } as &V; + let fut = scy2.batch(batch2, v2); + Self { + scy, + batch, + values, + fut: Box::pin(fut), + } + } +} + +impl Future for ScyBatchFut { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + match self.fut.poll_unpin(cx) { + Ready(k) => match k { + Ok(_) => { + info!("ScyBatchFut done Ok"); + Ready(Ok(())) + } + Err(e) => { + info!("ScyBatchFut done Err"); + Ready(Err(e).err_conv()) + } + }, + Pending => Pending, + } + } +} + +pub struct ChannelWriteFut { + nn: usize, + fut1: Option>>>>, + fut2: Option>>>>, + ts1: Option, + mask: u8, +} + +impl Future for ChannelWriteFut { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + loop { + break if self.ts1.is_none() { + self.ts1 = Some(Instant::now()); + continue; + } else if let Some(f) = self.fut1.as_mut() { + match f.poll_unpin(cx) { + Ready(k) => { + info!("ChannelWriteFut fut1 Ready"); + self.fut1 = None; + self.mask |= 1; + match k { + Ok(_) => continue, + Err(e) => Ready(Err(e)), + } + } + Pending => Pending, + } + } else if let Some(f) = self.fut2.as_mut() { + match f.poll_unpin(cx) { + Ready(k) => { + info!("ChannelWriteFut fut2 Ready"); + self.fut2 = None; + self.mask |= 2; + match k { + Ok(_) => continue, + Err(e) => Ready(Err(e)), + } + } + Pending => Pending, + } + } else { + if self.mask != 0 { + let ts2 = Instant::now(); + let dt = ts2.duration_since(self.ts1.unwrap()).as_secs_f32() * 1e3; + info!("insert f64 nn {} dt {:6.2} ms", self.nn, dt); + } + Ready(Ok(())) + }; + } + } +} + +pub trait ChannelWriter { + fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result; +} + +pub struct WriteFutF64 { + nn: usize, + fut1: Pin>>>, + fut2: Pin>>>, +} + +impl Future for WriteFutF64 { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { + todo!() + } +} + +pub async fn run_write_fut(fut: ChannelWriteFut) -> Result<(), Error> { + err::todo(); + let ts1 = Instant::now(); + if let Some(f) = fut.fut1 { + f.await?; + } + if let Some(f) = fut.fut2 { + f.await?; + } + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; + info!("insert f64 nn {} dt {:6.2} ms", fut.nn, dt); + Ok(()) +} + +pub async fn run_write_fut_f64(fut: WriteFutF64) -> Result<(), Error> { + err::todo(); + let ts1 = Instant::now(); + fut.fut1.await?; + fut.fut2.await?; + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; + info!("insert f64 nn {} dt {:6.2} ms", fut.nn, dt); + Ok(()) +} + +pub struct ChannelWriterF64 { + series: u32, + scy: Arc, + common_queries: Arc, + ts_msp_last: u64, + tmp_vals: Vec<(i32, i64, i64, i64, f64)>, +} + +impl ChannelWriterF64 { + pub fn new(series: u32, common_queries: Arc, scy: Arc) -> Self { + Self { + series, + scy, + ts_msp_last: 0, + common_queries, + tmp_vals: vec![], + } + } + + pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { + let (ts_msp, ts_lsp) = ts_msp_lsp(ts); + let fut1 = if ts_msp != self.ts_msp_last { + info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse); + self.ts_msp_last = ts_msp; + let fut = ScyQueryFut::new( + self.scy.clone(), + self.common_queries.qu_insert_ts_msp.clone(), + (self.series as i32, ts_msp as i64), + ); + Some(Box::pin(fut) as _) + } else { + None + }; + let value = f64::from_be_bytes(fr.data().try_into()?); + self.tmp_vals + .push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, value)); + if self.tmp_vals.len() >= 180 + ((self.series as usize) & 0x3f) { + info!("write_msg_impl BATCH INSERT ts {} pulse {}", ts, pulse); + let vt = std::mem::replace(&mut self.tmp_vals, vec![]); + let nn = vt.len(); + let mut batch = Batch::new(BatchType::Unlogged); + for _ in 0..nn { + batch.append_statement(self.common_queries.qu_insert_scalar_f64.clone()); + } + let fut = ScyBatchFut::new(self.scy.clone(), batch, vt); + let fut2 = Some(Box::pin(fut) as _); + let ret = ChannelWriteFut { + ts1: None, + mask: 0, + nn, + fut1, + fut2, + }; + Ok(ret) + } else { + let ret = ChannelWriteFut { + ts1: None, + mask: 0, + nn: 0, + fut1: fut1, + fut2: None, + }; + Ok(ret) + } + } +} + +impl ChannelWriter for ChannelWriterF64 { + fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { + self.write_msg_impl(ts, pulse, fr) + } +} + +fn ts_msp_lsp(ts: u64) -> (u64, u64) { + const MASK: u64 = u64::MAX >> 23; + let ts_msp = ts & (!MASK); + let ts_lsp = ts & MASK; + (ts_msp, ts_lsp) +} diff --git a/netfetch/src/netbuf.rs b/netfetch/src/netbuf.rs index 315b8be..370c6dc 100644 --- a/netfetch/src/netbuf.rs +++ b/netfetch/src/netbuf.rs @@ -16,23 +16,32 @@ impl NetBuf { } } + pub fn state(&self) -> (usize, usize) { + (self.rp, self.wp) + } + pub fn len(&self) -> usize { + self.check_invariant(); self.wp - self.rp } pub fn cap(&self) -> usize { + self.check_invariant(); self.buf.len() } pub fn wcap(&self) -> usize { + self.check_invariant(); self.buf.len() - self.wp } pub fn data(&self) -> &[u8] { + self.check_invariant(); &self.buf[self.rp..self.wp] } pub fn adv(&mut self, x: usize) -> Result<(), Error> { + self.check_invariant(); if self.len() < x { return Err(Error::with_msg_no_trace("not enough bytes")); } else { @@ -42,6 +51,7 @@ impl NetBuf { } pub fn wadv(&mut self, x: usize) -> Result<(), Error> { + self.check_invariant(); if self.wcap() < x { return Err(Error::with_msg_no_trace("not enough space")); } else { @@ -51,6 +61,7 @@ impl NetBuf { } pub fn read_u8(&mut self) -> Result { + self.check_invariant(); type T = u8; const TS: usize = std::mem::size_of::(); if self.len() < TS { @@ -63,6 +74,7 @@ impl NetBuf { } pub fn read_u64(&mut self) -> Result { + self.check_invariant(); type T = u64; const TS: usize = std::mem::size_of::(); if self.len() < TS { @@ -75,6 +87,7 @@ impl NetBuf { } pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> { + self.check_invariant(); if self.len() < n { return Err(Error::with_msg_no_trace("not enough bytes")); } else { @@ -85,12 +98,14 @@ impl NetBuf { } pub fn read_buf_for_fill(&mut self) -> ReadBuf { + self.check_invariant(); self.rewind_if_needed(); let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); read_buf } pub fn rewind_if_needed(&mut self) { + self.check_invariant(); if self.rp != 0 && self.rp == self.wp { self.rp = 0; self.wp = 0; @@ -102,6 +117,7 @@ impl NetBuf { } pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> { + self.check_invariant(); self.rewind_if_needed(); if self.wcap() < buf.len() { return Err(Error::with_msg_no_trace("not enough space")); @@ -113,6 +129,7 @@ impl NetBuf { } pub fn put_u8(&mut self, v: u8) -> Result<(), Error> { + self.check_invariant(); type T = u8; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(); @@ -126,6 +143,7 @@ impl NetBuf { } pub fn put_u64(&mut self, v: u64) -> Result<(), Error> { + self.check_invariant(); type T = u64; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(); @@ -137,4 +155,15 @@ impl NetBuf { Ok(()) } } + + fn check_invariant(&self) { + if self.wp > self.buf.len() { + eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); + std::process::exit(87); + } + if self.rp > self.wp { + eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); + std::process::exit(87); + } + } } diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index 983e103..976c3a5 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -1,5 +1,6 @@ pub mod bsread; pub mod ca; +pub mod channelwriter; pub mod netbuf; #[cfg(test)] pub mod test; diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index af8dc56..aacacba 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,5 +1,6 @@ -use crate::bsread::parse_zmtp_message; +use crate::bsread::{parse_zmtp_message, BsreadMessage}; use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; +use crate::channelwriter::{ChannelWriter, ChannelWriterF64}; use crate::netbuf::NetBuf; use async_channel::{Receiver, Sender}; #[allow(unused)] @@ -10,13 +11,16 @@ use futures_util::{pin_mut, StreamExt}; use log::*; use netpod::timeunits::*; use scylla::batch::{Batch, BatchType, Consistency}; +use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::QueryError; -use scylla::SessionBuilder; +use scylla::{Session as ScySession, SessionBuilder}; use serde_json::Value as JsVal; +use std::collections::BTreeMap; use std::ffi::CStr; use std::fmt; use std::mem; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -66,118 +70,281 @@ fn test_service() -> Result<(), Error> { taskrun::run(fut) } -pub async fn zmtp_client(addr: &str, rcvbuf: Option) -> Result<(), Error> { - let mut conn = tokio::net::TcpStream::connect(addr).await?; - if let Some(v) = rcvbuf { - set_rcv_sock_opts(&mut conn, v)?; +pub fn get_series_id(chn: &ChannelDesc) -> u32 { + use md5::Digest; + let mut h = md5::Md5::new(); + h.update(chn.name.as_bytes()); + let f = h.finalize(); + u32::from_le_bytes(f.as_slice()[0..4].try_into().unwrap()) +} + +pub struct CommonQueries { + pub qu1: PreparedStatement, + pub qu2: PreparedStatement, + pub qu_insert_ts_msp: PreparedStatement, + pub qu_insert_scalar_f64: PreparedStatement, +} + +struct BsreadClient { + #[allow(unused)] + scylla: String, + addr: String, + do_pulse_id: bool, + rcvbuf: Option, + tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>, + scy: Arc, + channel_writers: BTreeMap>, + common_queries: Arc, +} + +impl BsreadClient { + pub async fn new(scylla: String, addr: String, do_pulse_id: bool, rcvbuf: Option) -> Result { + let scy = SessionBuilder::new() + .default_consistency(Consistency::Quorum) + .known_node(&scylla) + .use_keyspace("ks1", false) + .build() + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu1 = scy + .prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu2 = scy + .prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_ts_msp = scy + .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_f64 = scy + .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let common_queries = CommonQueries { + qu1, + qu2, + qu_insert_ts_msp, + qu_insert_scalar_f64, + }; + let ret = Self { + scylla, + addr, + do_pulse_id, + rcvbuf, + tmp_vals_pulse_map: vec![], + scy: Arc::new(scy), + channel_writers: Default::default(), + common_queries: Arc::new(common_queries), + }; + Ok(ret) } - let mut zmtp = Zmtp::new(conn, SocketType::PULL); - let mut i1 = 0u64; - let mut msgc = 0u64; - let mut vals1 = vec![]; - let mut vals2 = vec![]; - let scy = SessionBuilder::new() - .known_node("127.0.0.1:19042") - .use_keyspace("ks1", false) - .default_consistency(Consistency::One) - .build() - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu2 = scy - .prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - while let Some(item) = zmtp.next().await { - match item { - Ok(ev) => match ev { - ZmtpEvent::ZmtpCommand(cmd) => { - info!("{:?}", cmd); - } - ZmtpEvent::ZmtpMessage(msg) => { - msgc += 1; - trace!("Message frames: {}", msg.frames.len()); - match parse_zmtp_message(&msg) { - Ok(bm) => { - trace!("{:?}", bm); - trace!("len A {} len B {}", bm.head_b.channels.len(), bm.values.len()); - let mut i3 = u32::MAX; - for (i, ch) in bm.head_b.channels.iter().enumerate() { - if ch.name == "SINEG01-RLLE-STA:MASTER-EVRPULSEID" { - i3 = i as u32; + + pub async fn run(&mut self) -> Result<(), Error> { + let mut conn = tokio::net::TcpStream::connect(&self.addr).await?; + if let Some(v) = self.rcvbuf { + set_rcv_sock_opts(&mut conn, v)?; + } + let mut zmtp = Zmtp::new(conn, SocketType::PULL); + let mut i1 = 0u64; + let mut msgc = 0u64; + let mut dh_md5_last = String::new(); + let mut frame_diff_count = 0u64; + let mut hash_mismatch_count = 0u64; + let mut head_b = HeadB::empty(); + while let Some(item) = zmtp.next().await { + match item { + Ok(ev) => match ev { + ZmtpEvent::ZmtpCommand(_) => (), + ZmtpEvent::ZmtpMessage(msg) => { + msgc += 1; + match parse_zmtp_message(&msg) { + Ok(bm) => { + if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 { + frame_diff_count += 1; + if frame_diff_count < 1000 { + warn!( + "chn len {} frame diff {}", + bm.head_b.channels.len(), + msg.frames().len() - 2 * bm.head_b.channels.len() + ); + } } - } - if i3 < u32::MAX { - trace!("insert value frame {}", i3); - let i4 = 2 * i3 + 2; - if i4 >= msg.frames.len() as u32 { - } else { - let fr = &msg.frames[i4 as usize]; - trace!("data len {}", fr.data.len()); - let pulse_f64 = f64::from_be_bytes(fr.data[..].try_into().unwrap()); - trace!("pulse_f64 {pulse_f64}"); - let pulse = pulse_f64 as u64; - if false { - // TODO this next frame should be described somehow in the json header or? - info!("next val len {}", msg.frames[i4 as usize + 1].data.len()); - let ts_a = u64::from_be_bytes( - msg.frames[i4 as usize + 1].data[0..8].try_into().unwrap(), + if bm.head_b_md5 != bm.head_a.hash { + hash_mismatch_count += 1; + // TODO keep logging data header changes, just suppress too frequent messages. + if hash_mismatch_count < 200 { + error!( + "Invalid bsread message: hash mismatch. dhcompr {:?}", + bm.head_a.dh_compression ); - let ts_b = u64::from_be_bytes( - msg.frames[i4 as usize + 1].data[8..16].try_into().unwrap(), - ); - info!("ts_a {ts_a} ts_b {ts_b}"); } - let ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns; - if false { - let tsa = ts / (SEC * 10); - let tsb = ts % (SEC * 10); - vals1.push((tsa as i32, tsb as i32, pulse as i64)); - } - if true { - let pulse_a = (pulse >> 14) as i64; - let pulse_b = (pulse & 0x3fff) as i32; - let ts_a = bm.head_a.global_timestamp.sec as i64; - let ts_b = bm.head_a.global_timestamp.ns as i32; - vals2.push((pulse_a, pulse_b, ts_a, ts_b)); - } - if vals2.len() >= 200 { - let ts1 = Instant::now(); - let mut batch = Batch::new(BatchType::Unlogged); - for _ in 0..vals2.len() { - batch.append_statement(qu2.clone()); + } + { + if bm.head_b_md5 != dh_md5_last { + head_b = bm.head_b.clone(); + if dh_md5_last.is_empty() { + info!("data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash); + dh_md5_last = bm.head_b_md5.clone(); + for chn in &head_b.channels { + info!("Setup writer for {}", chn.name); + self.setup_channel_writers(chn)?; + } + } else { + error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash); + dh_md5_last = bm.head_b_md5.clone(); + // TODO + // Update only the changed channel writers. + // Flush buffers before creating new channel writer. } - let _ = scy.batch(&batch, &vals2).await.err_conv()?; - vals2.clear(); - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; - info!("Batch insert took {:6.2} ms", dt); + } + } + if self.do_pulse_id { + let mut i3 = u32::MAX; + for (i, ch) in head_b.channels.iter().enumerate() { + if ch.name == "SINEG01-RLLE-STA:MASTER-EVRPULSEID" { + i3 = i as u32; + } + } + // TODO need to know the facility! + if i3 < u32::MAX { + let i4 = 2 * i3 + 2; + if i4 >= msg.frames.len() as u32 { + } else { + let fr = &msg.frames[i4 as usize]; + self.insert_pulse_map(fr, &msg, &bm).await?; + } + } + } + if msg.frames.len() < 2 + 2 * head_b.channels.len() { + // TODO count always, throttle log. + error!("not enough frames for data header"); + } + let gts = bm.head_a.global_timestamp; + let ts = (gts.sec as u64) * SEC + gts.ns as u64; + let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0); + for i1 in 0..head_b.channels.len() { + let chn = &head_b.channels[i1]; + let fr = &msg.frames[2 + 2 * i1]; + let series = get_series_id(chn); + if !self.channel_writers.contains_key(&series) {} + if let Some(cw) = self.channel_writers.get_mut(&series) { + cw.write_msg(ts, pulse, fr)?.await?; + } else { + // TODO check for missing writers. + //warn!("no writer for {}", chn.name); } } } - } - Err(e) => { - error!("{}", e); - for frame in &msg.frames { - info!("Frame: {:?}", frame); + Err(e) => { + error!("{}", e); + for frame in &msg.frames { + info!("Frame: {:?}", frame); + } + zmtp.dump_input_state(); + zmtp.dump_conn_state(); } } } + }, + Err(e) => { + error!("{}", e); + return Err(e); } - }, - Err(e) => { - error!("{}", e); - return Err(e); + } + i1 += 1; + if false && i1 > 10000 { + break; + } + if false && msgc > 10000 { + break; } } - i1 += 1; - if false && i1 > 10000 { - break; - } - if false && msgc > 10000 { - break; - } + Ok(()) } - Ok(()) + + fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> { + let series = get_series_id(chn); + match chn.ty.as_str() { + "float64" => match &chn.shape { + JsVal::Array(a) => { + if a.len() == 1 { + if let Some(n) = a[0].as_u64() { + if n == 1 { + if chn.encoding == "big" { + let cw = + ChannelWriterF64::new(series, self.common_queries.clone(), self.scy.clone()); + self.channel_writers.insert(series, Box::new(cw)); + } else { + warn!("No LE avail"); + } + } else { + warn!("array f64 writer not yet available.") + } + } + } else { + warn!("f64 writer not yet available for shape {:?}", a) + } + } + s => { + warn!("setup_channel_writers shape not supported {:?}", s); + } + }, + k => { + warn!("setup_channel_writers data type not supported {:?}", k); + } + } + Ok(()) + } + + async fn insert_pulse_map(&mut self, fr: &ZmtpFrame, msg: &ZmtpMessage, bm: &BsreadMessage) -> Result<(), Error> { + trace!("data len {}", fr.data.len()); + // TODO take pulse-id also from main header and compare. + let pulse_f64 = f64::from_be_bytes(fr.data[..].try_into().unwrap()); + trace!("pulse_f64 {pulse_f64}"); + let pulse = pulse_f64 as u64; + if false { + let i4 = 3; + // TODO this next frame should be described somehow in the json header or? + info!("next val len {}", msg.frames[i4 as usize + 1].data.len()); + let ts_a = u64::from_be_bytes(msg.frames[i4 as usize + 1].data[0..8].try_into().unwrap()); + let ts_b = u64::from_be_bytes(msg.frames[i4 as usize + 1].data[8..16].try_into().unwrap()); + info!("ts_a {ts_a} ts_b {ts_b}"); + } + let _ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns; + if true { + let pulse_a = (pulse >> 14) as i64; + let pulse_b = (pulse & 0x3fff) as i32; + let ts_a = bm.head_a.global_timestamp.sec as i64; + let ts_b = bm.head_a.global_timestamp.ns as i32; + self.tmp_vals_pulse_map.push((pulse_a, pulse_b, ts_a, ts_b)); + } + if self.tmp_vals_pulse_map.len() >= 200 { + let ts1 = Instant::now(); + // TODO use facility, channel_name, ... as partition key. + self.scy + .execute(&self.common_queries.qu1, (1i32, self.tmp_vals_pulse_map[0].0)) + .await + .err_conv()?; + let mut batch = Batch::new(BatchType::Unlogged); + for _ in 0..self.tmp_vals_pulse_map.len() { + batch.append_statement(self.common_queries.qu2.clone()); + } + let _ = self.scy.batch(&batch, &self.tmp_vals_pulse_map).await.err_conv()?; + let nn = self.tmp_vals_pulse_map.len(); + self.tmp_vals_pulse_map.clear(); + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; + info!("insert {} items in {:6.2} ms", nn, dt); + } + Ok(()) + } +} + +pub async fn zmtp_client(scylla: &str, addr: &str, rcvbuf: Option, do_pulse_id: bool) -> Result<(), Error> { + let mut client = BsreadClient::new(scylla.into(), addr.into(), do_pulse_id, rcvbuf).await?; + client.run().await } fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { @@ -224,6 +391,7 @@ fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { Ok(()) } +#[derive(Clone, Debug)] enum ConnState { InitSend, InitRecv1, @@ -260,6 +428,18 @@ pub enum SocketType { PULL, } +#[derive(Debug)] +enum InpState { + Empty, + Netbuf(usize, usize, usize), +} + +impl Default for InpState { + fn default() -> Self { + InpState::Empty + } +} + pub struct Zmtp { done: bool, complete: bool, @@ -277,6 +457,10 @@ pub struct Zmtp { inp_eof: bool, data_tx: Sender, data_rx: Receiver, + input_state: Vec, + input_state_ix: usize, + conn_state_log: Vec, + conn_state_log_ix: usize, } impl Zmtp { @@ -299,6 +483,10 @@ impl Zmtp { inp_eof: false, data_tx: tx, data_rx: rx, + input_state: vec![0; 64].iter().map(|_| InpState::default()).collect(), + input_state_ix: 0, + conn_state_log: vec![0; 64].iter().map(|_| ConnState::InitSend).collect(), + conn_state_log_ix: 0, } } @@ -310,8 +498,41 @@ impl Zmtp { (&mut self.conn, self.buf.read_buf_for_fill()) } - fn outbuf_conn(&mut self) -> (&[u8], &mut TcpStream) { - (self.outbuf.data(), &mut self.conn) + fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) { + (&mut self.conn, self.outbuf.data()) + } + + fn record_input_state(&mut self) { + let st = self.buf.state(); + self.input_state[self.input_state_ix] = InpState::Netbuf(st.0, st.1, self.buf.cap() - st.1); + self.input_state_ix = (1 + self.input_state_ix) % self.input_state.len(); + } + + fn record_conn_state(&mut self) { + self.conn_state_log[self.conn_state_log_ix] = self.conn_state.clone(); + self.conn_state_log_ix = (1 + self.conn_state_log_ix) % self.conn_state_log.len(); + } + + fn dump_input_state(&self) { + info!("---------------------------------------------------------"); + info!("INPUT STATE DUMP"); + let mut i = self.input_state_ix; + for _ in 0..self.input_state.len() { + info!("{i:4} {:?}", self.input_state[i]); + i = (1 + i) % self.input_state.len(); + } + info!("---------------------------------------------------------"); + } + + fn dump_conn_state(&self) { + info!("---------------------------------------------------------"); + info!("CONN STATE DUMP"); + let mut i = self.conn_state_log_ix; + for _ in 0..self.conn_state_log.len() { + info!("{i:4} {:?}", self.conn_state_log[i]); + i = (1 + i) % self.conn_state_log.len(); + } + info!("---------------------------------------------------------"); } fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Option>> { @@ -334,13 +555,13 @@ impl Zmtp { let write: Int> = if item_count > 0 { Int::NoWork } else if self.outbuf.len() > 0 { - let (b, w) = self.outbuf_conn(); + let (w, b) = self.outbuf_conn(); pin_mut!(w); match w.poll_write(cx, b) { Ready(k) => match k { Ok(k) => match self.outbuf.adv(k) { Ok(()) => { - info!("sent {} bytes", k); + trace!("sent {} bytes", k); self.outbuf.rewind_if_needed(); Int::Empty } @@ -362,7 +583,7 @@ impl Zmtp { match write { Int::NoWork => {} _ => { - info!("write result: {:?} {}", write, self.outbuf.len()); + trace!("write result: {:?} {}", write, self.outbuf.len()); } } item_count += write.item_count(); @@ -378,6 +599,7 @@ impl Zmtp { )); Int::Item(Err(e)) } else if self.buf.len() < self.conn_state.need_min() { + self.record_input_state(); let (w, mut rbuf) = self.inpbuf_conn(); pin_mut!(w); match w.poll_read(cx, &mut rbuf) { @@ -387,6 +609,7 @@ impl Zmtp { if nf == 0 { info!("EOF"); self.inp_eof = true; + self.record_input_state(); Int::Done } else { trace!("received {} bytes", rbuf.filled().len()); @@ -396,8 +619,14 @@ impl Zmtp { trace!("got data {:?}", &rbuf.filled()[0..t]); } match self.buf.wadv(nf) { - Ok(()) => Int::Empty, - Err(e) => Int::Item(Err(e)), + Ok(()) => { + self.record_input_state(); + Int::Empty + } + Err(e) => { + error!("netbuf wadv fail nf {nf}"); + Int::Item(Err(e)) + } } } } @@ -475,6 +704,7 @@ impl Zmtp { } fn parse_item(&mut self) -> Result, Error> { + self.record_conn_state(); match self.conn_state { ConnState::InitSend => { info!("parse_item InitSend"); @@ -561,7 +791,19 @@ impl Zmtp { let has_more = flags & 0x01 != 0; let long_size = flags & 0x02 != 0; let is_command = flags & 0x04 != 0; - self.has_more = has_more; + if is_command { + if has_more { + error!("received command with has_more flag (error in peer)"); + } + if self.has_more { + debug!( + "received command frame while in multipart, having {}", + self.frames.len() + ); + } + } else { + self.has_more = has_more; + } self.is_command = is_command; trace!( "parse_item ReadFrameFlags has_more {} long_size {} is_command {}", @@ -580,7 +822,8 @@ impl Zmtp { self.msglen = self.buf.read_u8()? as usize; trace!("parse_item ReadFrameShort msglen {}", self.msglen); self.conn_state = ConnState::ReadFrameBody(self.msglen); - if self.msglen > 1024 * 64 { + if self.msglen > self.buf.cap() / 2 { + error!("msglen {} too large for this client", self.msglen); return Err(Error::with_msg_no_trace(format!( "larger msglen not yet supported {}", self.msglen, @@ -592,7 +835,8 @@ impl Zmtp { self.msglen = self.buf.read_u64()? as usize; trace!("parse_item ReadFrameShort msglen {}", self.msglen); self.conn_state = ConnState::ReadFrameBody(self.msglen); - if self.msglen > 1024 * 64 { + if self.msglen > self.buf.cap() / 2 { + error!("msglen {} too large for this client", self.msglen); return Err(Error::with_msg_no_trace(format!( "larger msglen not yet supported {}", self.msglen, @@ -603,15 +847,14 @@ impl Zmtp { ConnState::ReadFrameBody(msglen) => { // TODO do not copy here... let data = self.buf.read_bytes(msglen)?.to_vec(); + self.conn_state = ConnState::ReadFrameFlags; self.msglen = 0; if false { let n1 = data.len().min(256); let s = String::from_utf8_lossy(&data[..n1]); trace!("parse_item ReadFrameBody msglen {} string {}", msglen, s); } - self.conn_state = ConnState::ReadFrameFlags; if self.is_command { - info!("command data {:?}", data); if data.len() >= 7 { if &data[0..5] == b"\x04PING" { if data.len() > 32 { @@ -620,9 +863,9 @@ impl Zmtp { } else { let ttl = u16::from_be_bytes(data[5..7].try_into().unwrap()); let ctx = &data[7..]; - info!("GOT PING ttl {ttl} ctx.len {}", ctx.len()); + debug!("received PING ttl {ttl} ctx {:?}", &ctx); if self.outbuf.wcap() < data.len() { - error!("can not respond with PONG because output buffer full"); + warn!("can not respond with PONG because output buffer full"); } else { let size = 5 + ctx.len() as u8; self.outbuf.put_u8(0x04).unwrap(); @@ -631,7 +874,7 @@ impl Zmtp { self.outbuf.put_slice(ctx).unwrap(); } if self.outbuf.wcap() < 32 { - error!("can not send my PING because output buffer full"); + warn!("can not send my PING because output buffer full"); } else { let ctx = b"daqingest"; let size = 5 + ctx.len() as u8; @@ -649,7 +892,6 @@ impl Zmtp { is_command: self.is_command, data, }; - self.frames.clear(); Ok(Some(ZmtpEvent::ZmtpCommand(g))) } else { let g = ZmtpFrame { @@ -665,6 +907,21 @@ impl Zmtp { let g = ZmtpMessage { frames: mem::replace(&mut self.frames, vec![]), }; + if false && g.frames.len() != 118 { + info!("EMIT {} frames", g.frames.len()); + if let Some(fr) = g.frames.get(0) { + let d = fr.data(); + let nn = d.len().min(16); + let s = String::from_utf8_lossy(&d[..nn]); + info!("DATA 0 {} {:?} {:?}", nn, &d[..nn], s); + } + if let Some(fr) = g.frames.get(1) { + let d = fr.data(); + let nn = d.len().min(16); + let s = String::from_utf8_lossy(&d[..nn]); + info!("DATA 1 {} {:?} {:?}", nn, &d[..nn], s); + } + } Ok(Some(ZmtpEvent::ZmtpMessage(g))) } } @@ -807,8 +1064,9 @@ impl DummyData { channels: vec![ChannelDesc { name: "TESTCHAN".into(), ty: "int64".into(), - shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]), + shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1i32))]), encoding: "little".into(), + compression: todo!(), }], }; let hb = serde_json::to_vec(&head_b).unwrap(); @@ -828,6 +1086,7 @@ impl DummyData { sec: self.ts / SEC, ns: self.ts % SEC, }, + dh_compression: None, }; // TODO write directly to output buffer. let ha = serde_json::to_vec(&head_a).unwrap();