diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index a91f9b8..f018178 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -11,7 +11,7 @@ pub fn main() -> Result<(), Error> { let opts = DaqIngestOpts::parse(); match opts.subcmd { SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await?, - SubCmd::ListTsa => daqingest::query::list_tsa().await?, + SubCmd::ListPkey => daqingest::query::list_pkey().await?, SubCmd::ListPulses => daqingest::query::list_pulses().await?, } Ok(()) diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 7d47cf1..79fe212 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -15,7 +15,7 @@ pub struct DaqIngestOpts { #[derive(Debug, Parser)] pub enum SubCmd { Bsread(Bsread), - ListTsa, + ListPkey, ListPulses, } diff --git a/daqingest/src/query.rs b/daqingest/src/query.rs index 0162e90..c58a5a8 100644 --- a/daqingest/src/query.rs +++ b/daqingest/src/query.rs @@ -23,7 +23,7 @@ impl From for Error { } } -pub async fn list_tsa() -> Result<(), Error> { +pub async fn list_pkey() -> Result<(), Error> { let scy = SessionBuilder::new() .known_node("127.0.0.1:19042") .default_consistency(Consistency::One) @@ -31,11 +31,11 @@ pub async fn list_tsa() -> Result<(), Error> { .build() .await?; let query = scy - .prepare("select distinct token(tsa), tsa from pulse where token(tsa) >= ? and token(tsa) <= ?") + .prepare("select distinct token(pulse_a), pulse_a from pulse where token(pulse_a) >= ? and token(pulse_a) <= ?") .await?; let td = i64::MAX / 27; let mut t1 = i64::MIN; - let mut tsa_max = 0; + let mut pulse_a_max = 0; loop { let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX }; let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000); @@ -46,10 +46,10 @@ pub async fn list_tsa() -> Result<(), Error> { if r.columns.len() < 2 { warn!("see {} columns", r.columns.len()); } else { - let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); - let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32; - info!("tsa_token {tsa_token:?} tsa {tsa:?}"); - tsa_max = tsa_max.max(tsa); + let pulse_a_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); + let pulse_a = r.columns[1].as_ref().unwrap().as_bigint().unwrap(); + info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}"); + pulse_a_max = pulse_a_max.max(pulse_a); } } } @@ -60,7 +60,7 @@ pub async fn list_tsa() -> Result<(), Error> { t1 = t2 + 1; } } - info!("tsa_max {tsa_max}"); + info!("pulse_a_max {pulse_a_max}"); Ok(()) } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 5d63d72..be66be8 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -19,6 +19,7 @@ arrayref = "0.3" byteorder = "1.4" futures-core = "0.3" futures-util = "0.3" +#pin-project-lite = "0.2" scylla = "0.4" md-5 = "0.9" libc = "0.2" diff --git a/netfetch/src/netbuf.rs b/netfetch/src/netbuf.rs index 48dee57..315b8be 100644 --- a/netfetch/src/netbuf.rs +++ b/netfetch/src/netbuf.rs @@ -1,9 +1,6 @@ use err::Error; use tokio::io::ReadBuf; -pub const BUFCAP: usize = 1024 * 128; -pub const RP_REW_PT: usize = 1024 * 64; - pub struct NetBuf { buf: Vec, wp: usize, @@ -11,9 +8,9 @@ pub struct NetBuf { } impl NetBuf { - pub fn new() -> Self { + pub fn new(cap: usize) -> Self { Self { - buf: vec![0; BUFCAP], + buf: vec![0; cap], wp: 0, rp: 0, } @@ -97,7 +94,7 @@ impl NetBuf { if self.rp != 0 && self.rp == self.wp { self.rp = 0; self.wp = 0; - } else if self.rp > RP_REW_PT { + } else if self.rp > self.cap() / 2 { self.buf.copy_within(self.rp..self.wp, 0); self.wp -= self.rp; self.rp = 0; diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index b4b30d0..af8dc56 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,9 +1,7 @@ use crate::bsread::parse_zmtp_message; use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; use crate::netbuf::NetBuf; -use crate::netbuf::RP_REW_PT; -use async_channel::Receiver; -use async_channel::Sender; +use async_channel::{Receiver, Sender}; #[allow(unused)] use bytes::BufMut; use err::Error; @@ -11,9 +9,7 @@ use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use log::*; use netpod::timeunits::*; -use scylla::batch::Batch; -use scylla::batch::BatchType; -use scylla::batch::Consistency; +use scylla::batch::{Batch, BatchType, Consistency}; use scylla::transport::errors::QueryError; use scylla::SessionBuilder; use serde_json::Value as JsVal; @@ -292,8 +288,8 @@ impl Zmtp { socket_type, conn, conn_state: ConnState::InitSend, - buf: NetBuf::new(), - outbuf: NetBuf::new(), + buf: NetBuf::new(1024 * 128), + outbuf: NetBuf::new(1024 * 128), out_enable: false, msglen: 0, has_more: false, @@ -321,7 +317,8 @@ impl Zmtp { fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Option>> { use Poll::*; let mut item_count = 0; - let serialized: Int> = if self.out_enable && self.outbuf.wcap() >= RP_REW_PT { + // TODO should I better keep one serialized item in Self so that I know how much space it needs? + let serialized: Int> = if self.out_enable && self.outbuf.wcap() >= self.outbuf.cap() / 2 { match self.data_rx.poll_next_unpin(cx) { Ready(Some(_item)) => { // TODO item should be something that we can convert into a zmtp message.