diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 29fa3fc..2dce341 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -19,3 +19,4 @@ scylla = "0.4" err = { path = "../../daqbuffer/err" } taskrun = { path = "../../daqbuffer/taskrun" } netfetch = { path = "../netfetch" } +log = { path = "../log" } diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 24535aa..a91f9b8 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -10,7 +10,10 @@ pub fn main() -> Result<(), Error> { } let opts = DaqIngestOpts::parse(); match opts.subcmd { - SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await, + SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await?, + SubCmd::ListTsa => daqingest::query::list_tsa().await?, + SubCmd::ListPulses => daqingest::query::list_pulses().await?, } + Ok(()) }) } diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 77ba64d..7d47cf1 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -1,3 +1,5 @@ +pub mod query; + use clap::Parser; #[derive(Debug, Parser)] @@ -13,6 +15,8 @@ pub struct DaqIngestOpts { #[derive(Debug, Parser)] pub enum SubCmd { Bsread(Bsread), + ListTsa, + ListPulses, } #[derive(Debug, Parser)] diff --git a/daqingest/src/query.rs b/daqingest/src/query.rs new file mode 100644 index 0000000..0162e90 --- /dev/null +++ b/daqingest/src/query.rs @@ -0,0 +1,105 @@ +use log::*; +use scylla::batch::Consistency; +use scylla::transport::errors::{NewSessionError, QueryError}; +use scylla::SessionBuilder; + +pub struct Error(err::Error); + +impl err::ToErr for Error { + fn to_err(self) -> err::Error { + self.0 + } +} + +impl From for Error { + fn from(e: NewSessionError) -> Self { + Self(err::Error::with_msg_no_trace(format!("{e:?}"))) + } +} + +impl From for Error { + fn from(e: QueryError) -> Self { + Self(err::Error::with_msg_no_trace(format!("{e:?}"))) + } +} + +pub async fn list_tsa() -> Result<(), Error> { + let scy = SessionBuilder::new() + .known_node("127.0.0.1:19042") + .default_consistency(Consistency::One) + .use_keyspace("ks1", false) + .build() + .await?; + let query = scy + .prepare("select distinct token(tsa), tsa from pulse where token(tsa) >= ? and token(tsa) <= ?") + .await?; + let td = i64::MAX / 27; + let mut t1 = i64::MIN; + let mut tsa_max = 0; + loop { + let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX }; + let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000); + info!("Token range {:.2}%", pct as f32 * 1e-3); + let qr = scy.execute(&query, (t1, t2)).await?; + if let Some(rows) = qr.rows { + for r in rows { + if r.columns.len() < 2 { + warn!("see {} columns", r.columns.len()); + } else { + let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); + let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32; + info!("tsa_token {tsa_token:?} tsa {tsa:?}"); + tsa_max = tsa_max.max(tsa); + } + } + } + if t2 == i64::MAX { + info!("end of token range"); + break; + } else { + t1 = t2 + 1; + } + } + info!("tsa_max {tsa_max}"); + Ok(()) +} + +pub async fn list_pulses() -> Result<(), Error> { + let scy = SessionBuilder::new() + .known_node("127.0.0.1:19042") + .default_consistency(Consistency::One) + .use_keyspace("ks1", false) + .build() + .await?; + let query = scy + .prepare("select token(tsa) as tsatok, tsa, tsb, pulse from pulse where token(tsa) >= ? and token(tsa) <= ?") + .await?; + let td = i64::MAX / 31; + let mut t1 = i64::MIN; + loop { + let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX }; + let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000); + info!("Token range {:.2}%", pct as f32 * 1e-3); + let qr = scy.execute(&query, (t1, t2)).await?; + if let Some(rows) = qr.rows { + for r in rows { + if r.columns.len() < 2 { + warn!("see {} columns", r.columns.len()); + } else { + let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); + let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32; + let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32; + let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64; + info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}"); + } + } + } + if t2 == i64::MAX { + info!("end of token range"); + break; + } else { + t1 = t2 + 1; + } + } + Ok(()) +} diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index bf9f692..b4b30d0 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,8 +1,5 @@ use crate::bsread::parse_zmtp_message; -use crate::bsread::ChannelDesc; -use crate::bsread::GlobalTimestamp; -use crate::bsread::HeadA; -use crate::bsread::HeadB; +use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; use crate::netbuf::NetBuf; use crate::netbuf::RP_REW_PT; use async_channel::Receiver; @@ -14,16 +11,34 @@ use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use log::*; use netpod::timeunits::*; +use scylla::batch::Batch; +use scylla::batch::BatchType; use scylla::batch::Consistency; +use scylla::transport::errors::QueryError; +use scylla::SessionBuilder; use serde_json::Value as JsVal; use std::ffi::CStr; use std::fmt; use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Instant; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; +pub trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + #[allow(unused)] fn test_listen() -> Result<(), Error> { use std::time::Duration; @@ -63,15 +78,17 @@ pub async fn zmtp_client(addr: &str, rcvbuf: Option) -> Result<(), Error> { let mut zmtp = Zmtp::new(conn, SocketType::PULL); let mut i1 = 0u64; let mut msgc = 0u64; - let mut vals = vec![]; - let scy = scylla::SessionBuilder::new() + let mut vals1 = vec![]; + let mut vals2 = vec![]; + let scy = SessionBuilder::new() .known_node("127.0.0.1:19042") + .use_keyspace("ks1", false) .default_consistency(Consistency::One) .build() .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu1 = scy - .prepare("insert into ks1.pulse (tsA, tsB, pulse) values (?, ?, ?) using ttl 120") + let qu2 = scy + .prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; while let Some(item) = zmtp.next().await { @@ -114,14 +131,30 @@ pub async fn zmtp_client(addr: &str, rcvbuf: Option) -> Result<(), Error> { ); info!("ts_a {ts_a} ts_b {ts_b}"); } - vals.push((bm.head_a.global_timestamp.sec, bm.head_a.global_timestamp.ns, pulse)); - if vals.len() >= 20 { - for &(sec, ns, pulse) in &vals { - scy.execute(&qu1, (sec as i32, ns as i32, pulse as i64)) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns; + if false { + let tsa = ts / (SEC * 10); + let tsb = ts % (SEC * 10); + vals1.push((tsa as i32, tsb as i32, pulse as i64)); + } + if true { + let pulse_a = (pulse >> 14) as i64; + let pulse_b = (pulse & 0x3fff) as i32; + let ts_a = bm.head_a.global_timestamp.sec as i64; + let ts_b = bm.head_a.global_timestamp.ns as i32; + vals2.push((pulse_a, pulse_b, ts_a, ts_b)); + } + if vals2.len() >= 200 { + let ts1 = Instant::now(); + let mut batch = Batch::new(BatchType::Unlogged); + for _ in 0..vals2.len() { + batch.append_statement(qu2.clone()); } - vals.clear(); + let _ = scy.batch(&batch, &vals2).await.err_conv()?; + vals2.clear(); + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; + info!("Batch insert took {:6.2} ms", dt); } } } @@ -448,7 +481,7 @@ impl Zmtp { match self.conn_state { ConnState::InitSend => { info!("parse_item InitSend"); - self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0])?; + self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 1])?; self.conn_state = ConnState::InitRecv1; Ok(None) } @@ -600,6 +633,16 @@ impl Zmtp { self.outbuf.put_slice(b"\x04PONG").unwrap(); self.outbuf.put_slice(ctx).unwrap(); } + if self.outbuf.wcap() < 32 { + error!("can not send my PING because output buffer full"); + } else { + let ctx = b"daqingest"; + let size = 5 + ctx.len() as u8; + self.outbuf.put_u8(0x04).unwrap(); + self.outbuf.put_u8(size).unwrap(); + self.outbuf.put_slice(b"\x04PING").unwrap(); + self.outbuf.put_slice(ctx).unwrap(); + } } } }