Query pulse pkey

This commit is contained in:
Dominik Werder
2022-03-28 10:41:38 +02:00
parent dde0e248ff
commit 1e76cb1391
6 changed files with 20 additions and 25 deletions

View File

@@ -11,7 +11,7 @@ pub fn main() -> Result<(), Error> {
let opts = DaqIngestOpts::parse();
match opts.subcmd {
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await?,
SubCmd::ListTsa => daqingest::query::list_tsa().await?,
SubCmd::ListPkey => daqingest::query::list_pkey().await?,
SubCmd::ListPulses => daqingest::query::list_pulses().await?,
}
Ok(())

View File

@@ -15,7 +15,7 @@ pub struct DaqIngestOpts {
#[derive(Debug, Parser)]
pub enum SubCmd {
Bsread(Bsread),
ListTsa,
ListPkey,
ListPulses,
}

View File

@@ -23,7 +23,7 @@ impl From<QueryError> for Error {
}
}
pub async fn list_tsa() -> Result<(), Error> {
pub async fn list_pkey() -> Result<(), Error> {
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.default_consistency(Consistency::One)
@@ -31,11 +31,11 @@ pub async fn list_tsa() -> Result<(), Error> {
.build()
.await?;
let query = scy
.prepare("select distinct token(tsa), tsa from pulse where token(tsa) >= ? and token(tsa) <= ?")
.prepare("select distinct token(pulse_a), pulse_a from pulse where token(pulse_a) >= ? and token(pulse_a) <= ?")
.await?;
let td = i64::MAX / 27;
let mut t1 = i64::MIN;
let mut tsa_max = 0;
let mut pulse_a_max = 0;
loop {
let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX };
let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000);
@@ -46,10 +46,10 @@ pub async fn list_tsa() -> Result<(), Error> {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
info!("tsa_token {tsa_token:?} tsa {tsa:?}");
tsa_max = tsa_max.max(tsa);
let pulse_a_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let pulse_a = r.columns[1].as_ref().unwrap().as_bigint().unwrap();
info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}");
pulse_a_max = pulse_a_max.max(pulse_a);
}
}
}
@@ -60,7 +60,7 @@ pub async fn list_tsa() -> Result<(), Error> {
t1 = t2 + 1;
}
}
info!("tsa_max {tsa_max}");
info!("pulse_a_max {pulse_a_max}");
Ok(())
}

View File

@@ -19,6 +19,7 @@ arrayref = "0.3"
byteorder = "1.4"
futures-core = "0.3"
futures-util = "0.3"
#pin-project-lite = "0.2"
scylla = "0.4"
md-5 = "0.9"
libc = "0.2"

View File

@@ -1,9 +1,6 @@
use err::Error;
use tokio::io::ReadBuf;
pub const BUFCAP: usize = 1024 * 128;
pub const RP_REW_PT: usize = 1024 * 64;
pub struct NetBuf {
buf: Vec<u8>,
wp: usize,
@@ -11,9 +8,9 @@ pub struct NetBuf {
}
impl NetBuf {
pub fn new() -> Self {
pub fn new(cap: usize) -> Self {
Self {
buf: vec![0; BUFCAP],
buf: vec![0; cap],
wp: 0,
rp: 0,
}
@@ -97,7 +94,7 @@ impl NetBuf {
if self.rp != 0 && self.rp == self.wp {
self.rp = 0;
self.wp = 0;
} else if self.rp > RP_REW_PT {
} else if self.rp > self.cap() / 2 {
self.buf.copy_within(self.rp..self.wp, 0);
self.wp -= self.rp;
self.rp = 0;

View File

@@ -1,9 +1,7 @@
use crate::bsread::parse_zmtp_message;
use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB};
use crate::netbuf::NetBuf;
use crate::netbuf::RP_REW_PT;
use async_channel::Receiver;
use async_channel::Sender;
use async_channel::{Receiver, Sender};
#[allow(unused)]
use bytes::BufMut;
use err::Error;
@@ -11,9 +9,7 @@ use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use log::*;
use netpod::timeunits::*;
use scylla::batch::Batch;
use scylla::batch::BatchType;
use scylla::batch::Consistency;
use scylla::batch::{Batch, BatchType, Consistency};
use scylla::transport::errors::QueryError;
use scylla::SessionBuilder;
use serde_json::Value as JsVal;
@@ -292,8 +288,8 @@ impl Zmtp {
socket_type,
conn,
conn_state: ConnState::InitSend,
buf: NetBuf::new(),
outbuf: NetBuf::new(),
buf: NetBuf::new(1024 * 128),
outbuf: NetBuf::new(1024 * 128),
out_enable: false,
msglen: 0,
has_more: false,
@@ -321,7 +317,8 @@ impl Zmtp {
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Option<Poll<Result<ZmtpEvent, Error>>> {
use Poll::*;
let mut item_count = 0;
let serialized: Int<Result<(), Error>> = if self.out_enable && self.outbuf.wcap() >= RP_REW_PT {
// TODO should I better keep one serialized item in Self so that I know how much space it needs?
let serialized: Int<Result<(), Error>> = if self.out_enable && self.outbuf.wcap() >= self.outbuf.cap() / 2 {
match self.data_rx.poll_next_unpin(cx) {
Ready(Some(_item)) => {
// TODO item should be something that we can convert into a zmtp message.