Ingest separate pulse map

This commit is contained in:
Dominik Werder
2022-03-25 14:21:31 +01:00
parent 5986d370c6
commit dde0e248ff
5 changed files with 173 additions and 17 deletions

View File

@@ -19,3 +19,4 @@ scylla = "0.4"
err = { path = "../../daqbuffer/err" }
taskrun = { path = "../../daqbuffer/taskrun" }
netfetch = { path = "../netfetch" }
log = { path = "../log" }

View File

@@ -10,7 +10,10 @@ pub fn main() -> Result<(), Error> {
}
let opts = DaqIngestOpts::parse();
match opts.subcmd {
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await,
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await?,
SubCmd::ListTsa => daqingest::query::list_tsa().await?,
SubCmd::ListPulses => daqingest::query::list_pulses().await?,
}
Ok(())
})
}

View File

@@ -1,3 +1,5 @@
pub mod query;
use clap::Parser;
#[derive(Debug, Parser)]
@@ -13,6 +15,8 @@ pub struct DaqIngestOpts {
#[derive(Debug, Parser)]
pub enum SubCmd {
Bsread(Bsread),
ListTsa,
ListPulses,
}
#[derive(Debug, Parser)]

105
daqingest/src/query.rs Normal file
View File

@@ -0,0 +1,105 @@
use log::*;
use scylla::batch::Consistency;
use scylla::transport::errors::{NewSessionError, QueryError};
use scylla::SessionBuilder;
pub struct Error(err::Error);
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
self.0
}
}
impl From<NewSessionError> for Error {
fn from(e: NewSessionError) -> Self {
Self(err::Error::with_msg_no_trace(format!("{e:?}")))
}
}
impl From<QueryError> for Error {
fn from(e: QueryError) -> Self {
Self(err::Error::with_msg_no_trace(format!("{e:?}")))
}
}
pub async fn list_tsa() -> Result<(), Error> {
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.default_consistency(Consistency::One)
.use_keyspace("ks1", false)
.build()
.await?;
let query = scy
.prepare("select distinct token(tsa), tsa from pulse where token(tsa) >= ? and token(tsa) <= ?")
.await?;
let td = i64::MAX / 27;
let mut t1 = i64::MIN;
let mut tsa_max = 0;
loop {
let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX };
let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000);
info!("Token range {:.2}%", pct as f32 * 1e-3);
let qr = scy.execute(&query, (t1, t2)).await?;
if let Some(rows) = qr.rows {
for r in rows {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
info!("tsa_token {tsa_token:?} tsa {tsa:?}");
tsa_max = tsa_max.max(tsa);
}
}
}
if t2 == i64::MAX {
info!("end of token range");
break;
} else {
t1 = t2 + 1;
}
}
info!("tsa_max {tsa_max}");
Ok(())
}
pub async fn list_pulses() -> Result<(), Error> {
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.default_consistency(Consistency::One)
.use_keyspace("ks1", false)
.build()
.await?;
let query = scy
.prepare("select token(tsa) as tsatok, tsa, tsb, pulse from pulse where token(tsa) >= ? and token(tsa) <= ?")
.await?;
let td = i64::MAX / 31;
let mut t1 = i64::MIN;
loop {
let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX };
let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000);
info!("Token range {:.2}%", pct as f32 * 1e-3);
let qr = scy.execute(&query, (t1, t2)).await?;
if let Some(rows) = qr.rows {
for r in rows {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32;
let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64;
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
}
}
if t2 == i64::MAX {
info!("end of token range");
break;
} else {
t1 = t2 + 1;
}
}
Ok(())
}

View File

@@ -1,8 +1,5 @@
use crate::bsread::parse_zmtp_message;
use crate::bsread::ChannelDesc;
use crate::bsread::GlobalTimestamp;
use crate::bsread::HeadA;
use crate::bsread::HeadB;
use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB};
use crate::netbuf::NetBuf;
use crate::netbuf::RP_REW_PT;
use async_channel::Receiver;
@@ -14,16 +11,34 @@ use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use log::*;
use netpod::timeunits::*;
use scylla::batch::Batch;
use scylla::batch::BatchType;
use scylla::batch::Consistency;
use scylla::transport::errors::QueryError;
use scylla::SessionBuilder;
use serde_json::Value as JsVal;
use std::ffi::CStr;
use std::fmt;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
pub trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, QueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
#[allow(unused)]
fn test_listen() -> Result<(), Error> {
use std::time::Duration;
@@ -63,15 +78,17 @@ pub async fn zmtp_client(addr: &str, rcvbuf: Option<u32>) -> Result<(), Error> {
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0u64;
let mut msgc = 0u64;
let mut vals = vec![];
let scy = scylla::SessionBuilder::new()
let mut vals1 = vec![];
let mut vals2 = vec![];
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.use_keyspace("ks1", false)
.default_consistency(Consistency::One)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu1 = scy
.prepare("insert into ks1.pulse (tsA, tsB, pulse) values (?, ?, ?) using ttl 120")
let qu2 = scy
.prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
while let Some(item) = zmtp.next().await {
@@ -114,14 +131,30 @@ pub async fn zmtp_client(addr: &str, rcvbuf: Option<u32>) -> Result<(), Error> {
);
info!("ts_a {ts_a} ts_b {ts_b}");
}
vals.push((bm.head_a.global_timestamp.sec, bm.head_a.global_timestamp.ns, pulse));
if vals.len() >= 20 {
for &(sec, ns, pulse) in &vals {
scy.execute(&qu1, (sec as i32, ns as i32, pulse as i64))
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns;
if false {
let tsa = ts / (SEC * 10);
let tsb = ts % (SEC * 10);
vals1.push((tsa as i32, tsb as i32, pulse as i64));
}
if true {
let pulse_a = (pulse >> 14) as i64;
let pulse_b = (pulse & 0x3fff) as i32;
let ts_a = bm.head_a.global_timestamp.sec as i64;
let ts_b = bm.head_a.global_timestamp.ns as i32;
vals2.push((pulse_a, pulse_b, ts_a, ts_b));
}
if vals2.len() >= 200 {
let ts1 = Instant::now();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..vals2.len() {
batch.append_statement(qu2.clone());
}
vals.clear();
let _ = scy.batch(&batch, &vals2).await.err_conv()?;
vals2.clear();
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("Batch insert took {:6.2} ms", dt);
}
}
}
@@ -448,7 +481,7 @@ impl Zmtp {
match self.conn_state {
ConnState::InitSend => {
info!("parse_item InitSend");
self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0])?;
self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 1])?;
self.conn_state = ConnState::InitRecv1;
Ok(None)
}
@@ -600,6 +633,16 @@ impl Zmtp {
self.outbuf.put_slice(b"\x04PONG").unwrap();
self.outbuf.put_slice(ctx).unwrap();
}
if self.outbuf.wcap() < 32 {
error!("can not send my PING because output buffer full");
} else {
let ctx = b"daqingest";
let size = 5 + ctx.len() as u8;
self.outbuf.put_u8(0x04).unwrap();
self.outbuf.put_u8(size).unwrap();
self.outbuf.put_slice(b"\x04PING").unwrap();
self.outbuf.put_slice(ctx).unwrap();
}
}
}
}