From 0d73fe972a0591328717a7f944b03bc7b48f6597 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 7 Apr 2022 21:39:03 +0200 Subject: [PATCH] Write f64 array data --- daqingest/src/bin/daqingest.rs | 3 +- daqingest/src/daqingest.rs | 19 ++++- netfetch/src/channelwriter.rs | 133 +++++++++++++++++++++++++++++---- netfetch/src/zmtp.rs | 86 +++++++++++++++------ 4 files changed, 202 insertions(+), 39 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 0a33766..1b14d9b 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -9,8 +9,9 @@ pub fn main() -> Result<(), Error> { } else { } let opts = DaqIngestOpts::parse(); + log::info!("opts: {opts:?}"); match opts.subcmd { - SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.scylla, &k.source, k.rcvbuf, k.do_pulse_id).await?, + SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()).await?, SubCmd::ListPkey => daqingest::query::list_pkey().await?, SubCmd::ListPulses => daqingest::query::list_pulses().await?, } diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index b0f85d9..f9f8c1d 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -1,6 +1,7 @@ pub mod query; use clap::Parser; +use netfetch::zmtp::ZmtpClientOpts; #[derive(Debug, Parser)] //#[clap(name = "daqingest", version)] @@ -22,11 +23,25 @@ pub enum SubCmd { #[derive(Debug, Parser)] pub struct Bsread { #[clap(long)] - pub scylla: String, + pub scylla: Vec, #[clap(long)] pub source: String, #[clap(long)] - pub rcvbuf: Option, + pub rcvbuf: Option, + #[clap(long)] + pub array_truncate: Option, #[clap(long)] pub do_pulse_id: bool, } + +impl From for ZmtpClientOpts { + fn from(k: Bsread) -> Self { + Self { + scylla: k.scylla, + addr: k.source, + rcvbuf: k.rcvbuf, + array_truncate: k.array_truncate, + do_pulse_id: k.do_pulse_id, + } + } +} diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs index 425cd46..7096378 100644 --- a/netfetch/src/channelwriter.rs +++ b/netfetch/src/channelwriter.rs @@ -56,7 +56,7 @@ impl Future for ScyQueryFut { Ready(Ok(())) } Err(e) => { - info!("ScyQueryFut done Err"); + warn!("ScyQueryFut done Err"); Ready(Err(e).err_conv()) } }, @@ -73,6 +73,9 @@ pub struct ScyBatchFut { #[allow(unused)] values: Box, fut: Pin>>>, + polled: usize, + ts_create: Instant, + ts_poll_start: Instant, } impl ScyBatchFut { @@ -86,11 +89,15 @@ impl ScyBatchFut { let batch2 = unsafe { &*(&batch as &_ as *const _) } as &Batch; let v2 = unsafe { &*(&values as &_ as *const _) } as &V; let fut = scy2.batch(batch2, v2); + let tsnow = Instant::now(); Self { scy, batch, values, fut: Box::pin(fut), + polled: 0, + ts_create: tsnow, + ts_poll_start: tsnow, } } } @@ -100,14 +107,25 @@ impl Future for ScyBatchFut { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; + if self.polled == 0 { + self.ts_poll_start = Instant::now(); + } + self.polled += 1; match self.fut.poll_unpin(cx) { Ready(k) => match k { Ok(_) => { - info!("ScyBatchFut done Ok"); + trace!("ScyBatchFut done Ok"); Ready(Ok(())) } Err(e) => { - info!("ScyBatchFut done Err"); + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; + warn!( + "ScyBatchFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", + self.polled, dt_created, dt_polled + ); + warn!("ScyBatchFut done Err {e:?}"); Ready(Err(e).err_conv()) } }, @@ -136,7 +154,7 @@ impl Future for ChannelWriteFut { } else if let Some(f) = self.fut1.as_mut() { match f.poll_unpin(cx) { Ready(k) => { - info!("ChannelWriteFut fut1 Ready"); + trace!("ChannelWriteFut fut1 Ready"); self.fut1 = None; self.mask |= 1; match k { @@ -149,7 +167,7 @@ impl Future for ChannelWriteFut { } else if let Some(f) = self.fut2.as_mut() { match f.poll_unpin(cx) { Ready(k) => { - info!("ChannelWriteFut fut2 Ready"); + trace!("ChannelWriteFut fut2 Ready"); self.fut2 = None; self.mask |= 2; match k { @@ -163,7 +181,7 @@ impl Future for ChannelWriteFut { if self.mask != 0 { let ts2 = Instant::now(); let dt = ts2.duration_since(self.ts1.unwrap()).as_secs_f32() * 1e3; - info!("insert f64 nn {} dt {:6.2} ms", self.nn, dt); + info!("inserted nn {} dt {:6.2} ms", self.nn, dt); } Ready(Ok(())) }; @@ -215,7 +233,7 @@ pub async fn run_write_fut_f64(fut: WriteFutF64) -> Result<(), Error> { Ok(()) } -pub struct ChannelWriterF64 { +pub struct ChannelWriterScalarF64 { series: u32, scy: Arc, common_queries: Arc, @@ -223,7 +241,7 @@ pub struct ChannelWriterF64 { tmp_vals: Vec<(i32, i64, i64, i64, f64)>, } -impl ChannelWriterF64 { +impl ChannelWriterScalarF64 { pub fn new(series: u32, common_queries: Arc, scy: Arc) -> Self { Self { series, @@ -235,7 +253,7 @@ impl ChannelWriterF64 { } pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { - let (ts_msp, ts_lsp) = ts_msp_lsp(ts); + let (ts_msp, ts_lsp) = ts_msp_lsp_1(ts); let fut1 = if ts_msp != self.ts_msp_last { info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse); self.ts_msp_last = ts_msp; @@ -251,8 +269,7 @@ impl ChannelWriterF64 { let value = f64::from_be_bytes(fr.data().try_into()?); self.tmp_vals .push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, value)); - if self.tmp_vals.len() >= 180 + ((self.series as usize) & 0x3f) { - info!("write_msg_impl BATCH INSERT ts {} pulse {}", ts, pulse); + if self.tmp_vals.len() >= 100 + ((self.series as usize) & 0xf) { let vt = std::mem::replace(&mut self.tmp_vals, vec![]); let nn = vt.len(); let mut batch = Batch::new(BatchType::Unlogged); @@ -282,15 +299,105 @@ impl ChannelWriterF64 { } } -impl ChannelWriter for ChannelWriterF64 { +impl ChannelWriter for ChannelWriterScalarF64 { fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { self.write_msg_impl(ts, pulse, fr) } } -fn ts_msp_lsp(ts: u64) -> (u64, u64) { +pub struct ChannelWriterArrayF64 { + series: u32, + scy: Arc, + common_queries: Arc, + ts_msp_last: u64, + tmp_vals: Vec<(i32, i64, i64, i64, Vec)>, + truncate: usize, +} + +impl ChannelWriterArrayF64 { + pub fn new(series: u32, common_queries: Arc, scy: Arc, truncate: usize) -> Self { + Self { + series, + scy, + ts_msp_last: 0, + common_queries, + tmp_vals: vec![], + truncate, + } + } + + pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { + let (ts_msp, ts_lsp) = ts_msp_lsp_2(ts); + let fut1 = if ts_msp != self.ts_msp_last { + info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse); + self.ts_msp_last = ts_msp; + let fut = ScyQueryFut::new( + self.scy.clone(), + self.common_queries.qu_insert_ts_msp.clone(), + (self.series as i32, ts_msp as i64), + ); + Some(Box::pin(fut) as _) + } else { + None + }; + type ST = f64; + const STL: usize = std::mem::size_of::(); + let vc = fr.data().len() / STL; + let mut values = Vec::with_capacity(vc); + for i in 0..vc { + let h = i * STL; + let value = f64::from_be_bytes(fr.data()[h..h + STL].try_into()?); + values.push(value); + } + values.truncate(self.truncate); + self.tmp_vals + .push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, values)); + if self.tmp_vals.len() >= 40 + ((self.series as usize) & 0x7) { + let vt = std::mem::replace(&mut self.tmp_vals, vec![]); + let nn = vt.len(); + let mut batch = Batch::new(BatchType::Unlogged); + for _ in 0..nn { + batch.append_statement(self.common_queries.qu_insert_array_f64.clone()); + } + let fut = ScyBatchFut::new(self.scy.clone(), batch, vt); + let fut2 = Some(Box::pin(fut) as _); + let ret = ChannelWriteFut { + ts1: None, + mask: 0, + nn, + fut1, + fut2, + }; + Ok(ret) + } else { + let ret = ChannelWriteFut { + ts1: None, + mask: 0, + nn: 0, + fut1: fut1, + fut2: None, + }; + Ok(ret) + } + } +} + +impl ChannelWriter for ChannelWriterArrayF64 { + fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { + self.write_msg_impl(ts, pulse, fr) + } +} + +fn ts_msp_lsp_1(ts: u64) -> (u64, u64) { const MASK: u64 = u64::MAX >> 23; let ts_msp = ts & (!MASK); let ts_lsp = ts & MASK; (ts_msp, ts_lsp) } + +fn ts_msp_lsp_2(ts: u64) -> (u64, u64) { + const MASK: u64 = u64::MAX >> 16; + let ts_msp = ts & (!MASK); + let ts_lsp = ts & MASK; + (ts_msp, ts_lsp) +} diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index aacacba..6ebc3b8 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,6 +1,6 @@ use crate::bsread::{parse_zmtp_message, BsreadMessage}; use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; -use crate::channelwriter::{ChannelWriter, ChannelWriterF64}; +use crate::channelwriter::{ChannelWriter, ChannelWriterArrayF64, ChannelWriterScalarF64}; use crate::netbuf::NetBuf; use async_channel::{Receiver, Sender}; #[allow(unused)] @@ -22,7 +22,7 @@ use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; @@ -83,14 +83,22 @@ pub struct CommonQueries { pub qu2: PreparedStatement, pub qu_insert_ts_msp: PreparedStatement, pub qu_insert_scalar_f64: PreparedStatement, + pub qu_insert_array_f64: PreparedStatement, +} + +pub struct ZmtpClientOpts { + pub scylla: Vec, + pub addr: String, + pub do_pulse_id: bool, + pub rcvbuf: Option, + pub array_truncate: Option, } struct BsreadClient { - #[allow(unused)] - scylla: String, + opts: ZmtpClientOpts, addr: String, do_pulse_id: bool, - rcvbuf: Option, + rcvbuf: Option, tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>, scy: Arc, channel_writers: BTreeMap>, @@ -98,10 +106,13 @@ struct BsreadClient { } impl BsreadClient { - pub async fn new(scylla: String, addr: String, do_pulse_id: bool, rcvbuf: Option) -> Result { - let scy = SessionBuilder::new() - .default_consistency(Consistency::Quorum) - .known_node(&scylla) + pub async fn new(opts: ZmtpClientOpts) -> Result { + let scy = SessionBuilder::new().default_consistency(Consistency::Quorum); + let mut scy = scy; + for a in &opts.scylla { + scy = scy.known_node(a); + } + let scy = scy .use_keyspace("ks1", false) .build() .await @@ -122,17 +133,22 @@ impl BsreadClient { .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_f64 = scy + .prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let common_queries = CommonQueries { qu1, qu2, qu_insert_ts_msp, qu_insert_scalar_f64, + qu_insert_array_f64, }; let ret = Self { - scylla, - addr, - do_pulse_id, - rcvbuf, + addr: opts.addr.clone(), + do_pulse_id: opts.do_pulse_id, + rcvbuf: opts.rcvbuf, + opts, tmp_vals_pulse_map: vec![], scy: Arc::new(scy), channel_writers: Default::default(), @@ -144,7 +160,7 @@ impl BsreadClient { pub async fn run(&mut self) -> Result<(), Error> { let mut conn = tokio::net::TcpStream::connect(&self.addr).await?; if let Some(v) = self.rcvbuf { - set_rcv_sock_opts(&mut conn, v)?; + set_rcv_sock_opts(&mut conn, v as u32)?; } let mut zmtp = Zmtp::new(conn, SocketType::PULL); let mut i1 = 0u64; @@ -153,6 +169,8 @@ impl BsreadClient { let mut frame_diff_count = 0u64; let mut hash_mismatch_count = 0u64; let mut head_b = HeadB::empty(); + let mut status_last = Instant::now(); + let mut bytes_payload = 0u64; while let Some(item) = zmtp.next().await { match item { Ok(ev) => match ev { @@ -231,6 +249,7 @@ impl BsreadClient { if !self.channel_writers.contains_key(&series) {} if let Some(cw) = self.channel_writers.get_mut(&series) { cw.write_msg(ts, pulse, fr)?.await?; + bytes_payload += fr.data().len() as u64; } else { // TODO check for missing writers. //warn!("no writer for {}", chn.name); @@ -260,6 +279,14 @@ impl BsreadClient { if false && msgc > 10000 { break; } + let tsnow = Instant::now(); + let dt = tsnow.duration_since(status_last); + if dt >= Duration::from_millis(2000) { + let r = bytes_payload as f32 / dt.as_secs_f32() * 1e-3; + info!("rate: {r:8.3} kB/s"); + status_last = tsnow; + bytes_payload = 0; + } } Ok(()) } @@ -273,26 +300,39 @@ impl BsreadClient { if let Some(n) = a[0].as_u64() { if n == 1 { if chn.encoding == "big" { - let cw = - ChannelWriterF64::new(series, self.common_queries.clone(), self.scy.clone()); + let cw = ChannelWriterScalarF64::new( + series, + self.common_queries.clone(), + self.scy.clone(), + ); self.channel_writers.insert(series, Box::new(cw)); } else { - warn!("No LE avail"); + warn!("TODO scalar f64 LE"); } } else { - warn!("array f64 writer not yet available.") + if chn.encoding == "big" { + let cw = ChannelWriterArrayF64::new( + series, + self.common_queries.clone(), + self.scy.clone(), + self.opts.array_truncate.unwrap_or(64), + ); + self.channel_writers.insert(series, Box::new(cw)); + } else { + warn!("TODO array f64 LE"); + } } } } else { - warn!("f64 writer not yet available for shape {:?}", a) + warn!("TODO writer f64 shape {:?}", a); } } s => { - warn!("setup_channel_writers shape not supported {:?}", s); + warn!("TODO writer f64 shape {:?}", s); } }, k => { - warn!("setup_channel_writers data type not supported {:?}", k); + warn!("TODO writer dtype {:?}", k); } } Ok(()) @@ -342,8 +382,8 @@ impl BsreadClient { } } -pub async fn zmtp_client(scylla: &str, addr: &str, rcvbuf: Option, do_pulse_id: bool) -> Result<(), Error> { - let mut client = BsreadClient::new(scylla.into(), addr.into(), do_pulse_id, rcvbuf).await?; +pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { + let mut client = BsreadClient::new(opts).await?; client.run().await }