Write f64 array data

This commit is contained in:
Dominik Werder
2022-04-07 21:39:03 +02:00
parent 83bfc66ec4
commit 0d73fe972a
4 changed files with 202 additions and 39 deletions

View File

@@ -9,8 +9,9 @@ pub fn main() -> Result<(), Error> {
} else {
}
let opts = DaqIngestOpts::parse();
log::info!("opts: {opts:?}");
match opts.subcmd {
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.scylla, &k.source, k.rcvbuf, k.do_pulse_id).await?,
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()).await?,
SubCmd::ListPkey => daqingest::query::list_pkey().await?,
SubCmd::ListPulses => daqingest::query::list_pulses().await?,
}

View File

@@ -1,6 +1,7 @@
pub mod query;
use clap::Parser;
use netfetch::zmtp::ZmtpClientOpts;
#[derive(Debug, Parser)]
//#[clap(name = "daqingest", version)]
@@ -22,11 +23,25 @@ pub enum SubCmd {
#[derive(Debug, Parser)]
pub struct Bsread {
#[clap(long)]
pub scylla: String,
pub scylla: Vec<String>,
#[clap(long)]
pub source: String,
#[clap(long)]
pub rcvbuf: Option<u32>,
pub rcvbuf: Option<usize>,
#[clap(long)]
pub array_truncate: Option<usize>,
#[clap(long)]
pub do_pulse_id: bool,
}
impl From<Bsread> for ZmtpClientOpts {
fn from(k: Bsread) -> Self {
Self {
scylla: k.scylla,
addr: k.source,
rcvbuf: k.rcvbuf,
array_truncate: k.array_truncate,
do_pulse_id: k.do_pulse_id,
}
}
}

View File

@@ -56,7 +56,7 @@ impl<V> Future for ScyQueryFut<V> {
Ready(Ok(()))
}
Err(e) => {
info!("ScyQueryFut done Err");
warn!("ScyQueryFut done Err");
Ready(Err(e).err_conv())
}
},
@@ -73,6 +73,9 @@ pub struct ScyBatchFut<V> {
#[allow(unused)]
values: Box<V>,
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>>>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<V> ScyBatchFut<V> {
@@ -86,11 +89,15 @@ impl<V> ScyBatchFut<V> {
let batch2 = unsafe { &*(&batch as &_ as *const _) } as &Batch;
let v2 = unsafe { &*(&values as &_ as *const _) } as &V;
let fut = scy2.batch(batch2, v2);
let tsnow = Instant::now();
Self {
scy,
batch,
values,
fut: Box::pin(fut),
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
@@ -100,14 +107,25 @@ impl<V> Future for ScyBatchFut<V> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
info!("ScyBatchFut done Ok");
trace!("ScyBatchFut done Ok");
Ready(Ok(()))
}
Err(e) => {
info!("ScyBatchFut done Err");
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"ScyBatchFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("ScyBatchFut done Err {e:?}");
Ready(Err(e).err_conv())
}
},
@@ -136,7 +154,7 @@ impl Future for ChannelWriteFut {
} else if let Some(f) = self.fut1.as_mut() {
match f.poll_unpin(cx) {
Ready(k) => {
info!("ChannelWriteFut fut1 Ready");
trace!("ChannelWriteFut fut1 Ready");
self.fut1 = None;
self.mask |= 1;
match k {
@@ -149,7 +167,7 @@ impl Future for ChannelWriteFut {
} else if let Some(f) = self.fut2.as_mut() {
match f.poll_unpin(cx) {
Ready(k) => {
info!("ChannelWriteFut fut2 Ready");
trace!("ChannelWriteFut fut2 Ready");
self.fut2 = None;
self.mask |= 2;
match k {
@@ -163,7 +181,7 @@ impl Future for ChannelWriteFut {
if self.mask != 0 {
let ts2 = Instant::now();
let dt = ts2.duration_since(self.ts1.unwrap()).as_secs_f32() * 1e3;
info!("insert f64 nn {} dt {:6.2} ms", self.nn, dt);
info!("inserted nn {} dt {:6.2} ms", self.nn, dt);
}
Ready(Ok(()))
};
@@ -215,7 +233,7 @@ pub async fn run_write_fut_f64(fut: WriteFutF64) -> Result<(), Error> {
Ok(())
}
pub struct ChannelWriterF64 {
pub struct ChannelWriterScalarF64 {
series: u32,
scy: Arc<ScySession>,
common_queries: Arc<CommonQueries>,
@@ -223,7 +241,7 @@ pub struct ChannelWriterF64 {
tmp_vals: Vec<(i32, i64, i64, i64, f64)>,
}
impl ChannelWriterF64 {
impl ChannelWriterScalarF64 {
pub fn new(series: u32, common_queries: Arc<CommonQueries>, scy: Arc<ScySession>) -> Self {
Self {
series,
@@ -235,7 +253,7 @@ impl ChannelWriterF64 {
}
pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
let (ts_msp, ts_lsp) = ts_msp_lsp(ts);
let (ts_msp, ts_lsp) = ts_msp_lsp_1(ts);
let fut1 = if ts_msp != self.ts_msp_last {
info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse);
self.ts_msp_last = ts_msp;
@@ -251,8 +269,7 @@ impl ChannelWriterF64 {
let value = f64::from_be_bytes(fr.data().try_into()?);
self.tmp_vals
.push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, value));
if self.tmp_vals.len() >= 180 + ((self.series as usize) & 0x3f) {
info!("write_msg_impl BATCH INSERT ts {} pulse {}", ts, pulse);
if self.tmp_vals.len() >= 100 + ((self.series as usize) & 0xf) {
let vt = std::mem::replace(&mut self.tmp_vals, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
@@ -282,15 +299,105 @@ impl ChannelWriterF64 {
}
}
impl ChannelWriter for ChannelWriterF64 {
impl ChannelWriter for ChannelWriterScalarF64 {
fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
self.write_msg_impl(ts, pulse, fr)
}
}
fn ts_msp_lsp(ts: u64) -> (u64, u64) {
pub struct ChannelWriterArrayF64 {
series: u32,
scy: Arc<ScySession>,
common_queries: Arc<CommonQueries>,
ts_msp_last: u64,
tmp_vals: Vec<(i32, i64, i64, i64, Vec<f64>)>,
truncate: usize,
}
impl ChannelWriterArrayF64 {
pub fn new(series: u32, common_queries: Arc<CommonQueries>, scy: Arc<ScySession>, truncate: usize) -> Self {
Self {
series,
scy,
ts_msp_last: 0,
common_queries,
tmp_vals: vec![],
truncate,
}
}
pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
let (ts_msp, ts_lsp) = ts_msp_lsp_2(ts);
let fut1 = if ts_msp != self.ts_msp_last {
info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse);
self.ts_msp_last = ts_msp;
let fut = ScyQueryFut::new(
self.scy.clone(),
self.common_queries.qu_insert_ts_msp.clone(),
(self.series as i32, ts_msp as i64),
);
Some(Box::pin(fut) as _)
} else {
None
};
type ST = f64;
const STL: usize = std::mem::size_of::<ST>();
let vc = fr.data().len() / STL;
let mut values = Vec::with_capacity(vc);
for i in 0..vc {
let h = i * STL;
let value = f64::from_be_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
}
values.truncate(self.truncate);
self.tmp_vals
.push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, values));
if self.tmp_vals.len() >= 40 + ((self.series as usize) & 0x7) {
let vt = std::mem::replace(&mut self.tmp_vals, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..nn {
batch.append_statement(self.common_queries.qu_insert_array_f64.clone());
}
let fut = ScyBatchFut::new(self.scy.clone(), batch, vt);
let fut2 = Some(Box::pin(fut) as _);
let ret = ChannelWriteFut {
ts1: None,
mask: 0,
nn,
fut1,
fut2,
};
Ok(ret)
} else {
let ret = ChannelWriteFut {
ts1: None,
mask: 0,
nn: 0,
fut1: fut1,
fut2: None,
};
Ok(ret)
}
}
}
impl ChannelWriter for ChannelWriterArrayF64 {
fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
self.write_msg_impl(ts, pulse, fr)
}
}
fn ts_msp_lsp_1(ts: u64) -> (u64, u64) {
const MASK: u64 = u64::MAX >> 23;
let ts_msp = ts & (!MASK);
let ts_lsp = ts & MASK;
(ts_msp, ts_lsp)
}
fn ts_msp_lsp_2(ts: u64) -> (u64, u64) {
const MASK: u64 = u64::MAX >> 16;
let ts_msp = ts & (!MASK);
let ts_lsp = ts & MASK;
(ts_msp, ts_lsp)
}

View File

@@ -1,6 +1,6 @@
use crate::bsread::{parse_zmtp_message, BsreadMessage};
use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB};
use crate::channelwriter::{ChannelWriter, ChannelWriterF64};
use crate::channelwriter::{ChannelWriter, ChannelWriterArrayF64, ChannelWriterScalarF64};
use crate::netbuf::NetBuf;
use async_channel::{Receiver, Sender};
#[allow(unused)]
@@ -22,7 +22,7 @@ use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
@@ -83,14 +83,22 @@ pub struct CommonQueries {
pub qu2: PreparedStatement,
pub qu_insert_ts_msp: PreparedStatement,
pub qu_insert_scalar_f64: PreparedStatement,
pub qu_insert_array_f64: PreparedStatement,
}
pub struct ZmtpClientOpts {
pub scylla: Vec<String>,
pub addr: String,
pub do_pulse_id: bool,
pub rcvbuf: Option<usize>,
pub array_truncate: Option<usize>,
}
struct BsreadClient {
#[allow(unused)]
scylla: String,
opts: ZmtpClientOpts,
addr: String,
do_pulse_id: bool,
rcvbuf: Option<u32>,
rcvbuf: Option<usize>,
tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>,
scy: Arc<ScySession>,
channel_writers: BTreeMap<u32, Box<dyn ChannelWriter>>,
@@ -98,10 +106,13 @@ struct BsreadClient {
}
impl BsreadClient {
pub async fn new(scylla: String, addr: String, do_pulse_id: bool, rcvbuf: Option<u32>) -> Result<Self, Error> {
let scy = SessionBuilder::new()
.default_consistency(Consistency::Quorum)
.known_node(&scylla)
pub async fn new(opts: ZmtpClientOpts) -> Result<Self, Error> {
let scy = SessionBuilder::new().default_consistency(Consistency::Quorum);
let mut scy = scy;
for a in &opts.scylla {
scy = scy.known_node(a);
}
let scy = scy
.use_keyspace("ks1", false)
.build()
.await
@@ -122,17 +133,22 @@ impl BsreadClient {
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f64 = scy
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let common_queries = CommonQueries {
qu1,
qu2,
qu_insert_ts_msp,
qu_insert_scalar_f64,
qu_insert_array_f64,
};
let ret = Self {
scylla,
addr,
do_pulse_id,
rcvbuf,
addr: opts.addr.clone(),
do_pulse_id: opts.do_pulse_id,
rcvbuf: opts.rcvbuf,
opts,
tmp_vals_pulse_map: vec![],
scy: Arc::new(scy),
channel_writers: Default::default(),
@@ -144,7 +160,7 @@ impl BsreadClient {
pub async fn run(&mut self) -> Result<(), Error> {
let mut conn = tokio::net::TcpStream::connect(&self.addr).await?;
if let Some(v) = self.rcvbuf {
set_rcv_sock_opts(&mut conn, v)?;
set_rcv_sock_opts(&mut conn, v as u32)?;
}
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0u64;
@@ -153,6 +169,8 @@ impl BsreadClient {
let mut frame_diff_count = 0u64;
let mut hash_mismatch_count = 0u64;
let mut head_b = HeadB::empty();
let mut status_last = Instant::now();
let mut bytes_payload = 0u64;
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
@@ -231,6 +249,7 @@ impl BsreadClient {
if !self.channel_writers.contains_key(&series) {}
if let Some(cw) = self.channel_writers.get_mut(&series) {
cw.write_msg(ts, pulse, fr)?.await?;
bytes_payload += fr.data().len() as u64;
} else {
// TODO check for missing writers.
//warn!("no writer for {}", chn.name);
@@ -260,6 +279,14 @@ impl BsreadClient {
if false && msgc > 10000 {
break;
}
let tsnow = Instant::now();
let dt = tsnow.duration_since(status_last);
if dt >= Duration::from_millis(2000) {
let r = bytes_payload as f32 / dt.as_secs_f32() * 1e-3;
info!("rate: {r:8.3} kB/s");
status_last = tsnow;
bytes_payload = 0;
}
}
Ok(())
}
@@ -273,26 +300,39 @@ impl BsreadClient {
if let Some(n) = a[0].as_u64() {
if n == 1 {
if chn.encoding == "big" {
let cw =
ChannelWriterF64::new(series, self.common_queries.clone(), self.scy.clone());
let cw = ChannelWriterScalarF64::new(
series,
self.common_queries.clone(),
self.scy.clone(),
);
self.channel_writers.insert(series, Box::new(cw));
} else {
warn!("No LE avail");
warn!("TODO scalar f64 LE");
}
} else {
warn!("array f64 writer not yet available.")
if chn.encoding == "big" {
let cw = ChannelWriterArrayF64::new(
series,
self.common_queries.clone(),
self.scy.clone(),
self.opts.array_truncate.unwrap_or(64),
);
self.channel_writers.insert(series, Box::new(cw));
} else {
warn!("TODO array f64 LE");
}
}
}
} else {
warn!("f64 writer not yet available for shape {:?}", a)
warn!("TODO writer f64 shape {:?}", a);
}
}
s => {
warn!("setup_channel_writers shape not supported {:?}", s);
warn!("TODO writer f64 shape {:?}", s);
}
},
k => {
warn!("setup_channel_writers data type not supported {:?}", k);
warn!("TODO writer dtype {:?}", k);
}
}
Ok(())
@@ -342,8 +382,8 @@ impl BsreadClient {
}
}
pub async fn zmtp_client(scylla: &str, addr: &str, rcvbuf: Option<u32>, do_pulse_id: bool) -> Result<(), Error> {
let mut client = BsreadClient::new(scylla.into(), addr.into(), do_pulse_id, rcvbuf).await?;
pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
let mut client = BsreadClient::new(opts).await?;
client.run().await
}