Support more data types

This commit is contained in:
Dominik Werder
2022-04-22 22:45:14 +02:00
parent de26b5b7c4
commit e965845ba6
9 changed files with 672 additions and 145 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.1.0"
version = "0.1.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -14,6 +14,11 @@ pub fn main() -> Result<(), Error> {
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()).await?,
SubCmd::ListPkey => daqingest::query::list_pkey().await?,
SubCmd::ListPulses => daqingest::query::list_pulses().await?,
SubCmd::FetchEvents(k) => daqingest::query::fetch_events(k).await?,
SubCmd::BsreadDump(k) => {
let mut f = netfetch::zmtp::BsreadDumper::new(k.source);
f.run().await?
}
}
Ok(())
})

View File

@@ -5,10 +5,12 @@ use netfetch::zmtp::ZmtpClientOpts;
#[derive(Debug, Parser)]
//#[clap(name = "daqingest", version)]
//#[clap(version)]
#[clap(version)]
pub struct DaqIngestOpts {
#[clap(long, parse(from_occurrences))]
pub verbose: u32,
#[clap(long)]
pub tag: String,
#[clap(subcommand)]
pub subcmd: SubCmd,
}
@@ -18,6 +20,8 @@ pub enum SubCmd {
Bsread(Bsread),
ListPkey,
ListPulses,
FetchEvents(FetchEvents),
BsreadDump(BsreadDump),
}
#[derive(Debug, Parser)]
@@ -45,3 +49,16 @@ impl From<Bsread> for ZmtpClientOpts {
}
}
}
#[derive(Debug, Parser)]
pub struct FetchEvents {
#[clap(long, min_values(1))]
pub scylla: Vec<String>,
#[clap(long)]
pub channel: String,
}
#[derive(Debug, Parser)]
pub struct BsreadDump {
pub source: String,
}

View File

@@ -1,3 +1,4 @@
use crate::FetchEvents;
use log::*;
use scylla::batch::Consistency;
use scylla::transport::errors::{NewSessionError, QueryError};
@@ -27,7 +28,7 @@ pub async fn list_pkey() -> Result<(), Error> {
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.default_consistency(Consistency::One)
.use_keyspace("ks1", false)
.use_keyspace("ks1", true)
.build()
.await?;
let query = scy
@@ -68,7 +69,7 @@ pub async fn list_pulses() -> Result<(), Error> {
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.default_consistency(Consistency::One)
.use_keyspace("ks1", false)
.use_keyspace("ks1", true)
.build()
.await?;
let query = scy
@@ -103,3 +104,39 @@ pub async fn list_pulses() -> Result<(), Error> {
}
Ok(())
}
pub async fn fetch_events(opts: FetchEvents) -> Result<(), Error> {
let scy = SessionBuilder::new()
.known_nodes(&opts.scylla)
.default_consistency(Consistency::One)
.use_keyspace("ks1", true)
.build()
.await?;
let qu_series = scy
.prepare(
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?",
)
.await?;
let qres = scy.execute(&qu_series, ("scylla", &opts.channel)).await?;
if let Some(rows) = qres.rows {
info!("Found {} matching series", rows.len());
for r in &rows {
info!("Got row: {r:?}");
if false {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32;
let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64;
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
}
}
let _row = rows.into_iter().next().unwrap();
} else {
warn!("No result from series lookup");
}
Ok(())
}

View File

@@ -29,3 +29,4 @@ stats = { path = "../stats" }
err = { path = "../../daqbuffer/err" }
netpod = { path = "../../daqbuffer/netpod" }
taskrun = { path = "../../daqbuffer/taskrun" }
bitshuffle = { path = "../../daqbuffer/bitshuffle" }

View File

@@ -21,13 +21,27 @@ pub struct GlobalTimestamp {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelDesc {
pub name: String,
#[serde(rename = "type")]
#[serde(rename = "type", default = "bsread_type_default")]
pub ty: String,
#[serde(default = "bsread_shape_default")]
pub shape: JsVal,
#[serde(default = "bsread_encoding_default")]
pub encoding: String,
pub compression: Option<String>,
}
fn bsread_type_default() -> String {
"float64".into()
}
fn bsread_shape_default() -> JsVal {
JsVal::Array(vec![])
}
fn bsread_encoding_default() -> String {
"little".into()
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HeadA {
pub htype: String,
@@ -59,65 +73,115 @@ pub struct BsreadMessage {
pub head_b_md5: String,
}
pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
if msg.frames().len() < 2 {
return Err(Error::with_msg_no_trace("not enough frames for bsread"));
pub struct Parser {
tmp1: Vec<u8>,
}
impl Parser {
pub fn new() -> Self {
Self { tmp1: vec![0; 1024] }
}
let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?;
let head_b_md5 = {
use md5::Digest;
let mut hasher = md5::Md5::new();
hasher.update(msg.frames()[1].data());
let h = hasher.finalize();
hex::encode(&h)
};
let dhdecompr = match &head_a.dh_compression {
Some(m) => match m.as_str() {
"none" => msg.frames()[1].data(),
"lz4" => {
error!("data header lz4 compression not yet implemented");
return Err(Error::with_msg_no_trace(
"data header lz4 compression not yet implemented",
));
}
"bitshuffle_lz4" => {
error!("data header bitshuffle_lz4 compression not yet implemented");
return Err(Error::with_msg_no_trace(
"data header bitshuffle_lz4 compression not yet implemented",
));
}
_ => msg.frames()[1].data(),
},
None => msg.frames()[1].data(),
};
let head_b: HeadB = serde_json::from_slice(dhdecompr)?;
if false && msg.frames().len() == head_b.channels.len() + 3 {
for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) {
let sty = ScalarType::from_bsread_str(ch.ty.as_str())?;
let bo = ByteOrder::from_bsread_str(&ch.encoding)?;
let shape = Shape::from_bsread_jsval(&ch.shape)?;
match sty {
ScalarType::I64 => match &bo {
ByteOrder::LE => match &shape {
Shape::Scalar => {
assert_eq!(fr.data().len(), 8);
let _v = i64::from_le_bytes(fr.data().try_into()?);
pub fn parse_zmtp_message(&mut self, msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
if msg.frames().len() < 2 {
return Err(Error::with_msg_no_trace("not enough frames for bsread"));
}
let head_a: HeadA =
serde_json::from_slice(&msg.frames()[0].data()).map_err(|e| format!("main header parse error {e:?}"))?;
let head_b_md5 = {
use md5::Digest;
let mut hasher = md5::Md5::new();
hasher.update(msg.frames()[1].data());
let h = hasher.finalize();
hex::encode(&h)
};
let dhdecompr = match &head_a.dh_compression {
Some(m) => match m.as_str() {
"none" => msg.frames()[1].data(),
"lz4" => {
let inp = msg.frames()[1].data();
let nd = u32::from_be_bytes(inp[0..4].try_into()?) as usize;
loop {
let g = self.tmp1.len();
if g >= nd {
break;
}
Shape::Wave(_) => {}
Shape::Image(_, _) => {}
if g > 1024 * 512 {
return Err(Error::with_public_msg("decomp buffer too large"));
}
let u = self.tmp1.len() * 2;
info!("resize buffer to {u}");
self.tmp1.resize(u, 0);
}
match bitshuffle::lz4_decompress(&inp[4..], &mut self.tmp1) {
Ok(_) => {}
Err(e) => {
// TODO throttle log output
error!("lz4 error {e:?}");
return Err(Error::with_public_msg(format!("lz4 error {e:?}")));
}
}
&self.tmp1[..nd]
}
"bitshuffle_lz4" => {
let inp = msg.frames()[1].data();
let nd = u64::from_be_bytes(inp[0..8].try_into()?) as usize;
let bs = u32::from_be_bytes(inp[8..12].try_into()?) as usize;
loop {
let g = self.tmp1.len();
if g >= nd {
break;
}
if g > 1024 * 512 {
return Err(Error::with_public_msg("decomp buffer too large"));
}
let u = self.tmp1.len() * 2;
info!("resize buffer to {u}");
self.tmp1.resize(u, 0);
}
match bitshuffle::bitshuffle_decompress(&inp[12..], &mut self.tmp1, nd, 1, bs) {
Ok(_) => {}
Err(e) => {
// TODO throttle log output
error!("bitshuffle_lz4 error {e:?}");
return Err(Error::with_public_msg(format!("bitshuffle_lz4 error {e:?}")));
}
}
&self.tmp1[..nd]
}
_ => msg.frames()[1].data(),
},
None => msg.frames()[1].data(),
};
let head_b: HeadB = serde_json::from_slice(dhdecompr).map_err(|e| format!("data header parse error: {e:?}"))?;
if false && msg.frames().len() == head_b.channels.len() + 3 {
for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) {
let sty = ScalarType::from_bsread_str(ch.ty.as_str())?;
let bo = ByteOrder::from_bsread_str(&ch.encoding)?;
let shape = Shape::from_bsread_jsval(&ch.shape)?;
match sty {
ScalarType::I64 => match &bo {
ByteOrder::LE => match &shape {
Shape::Scalar => {
assert_eq!(fr.data().len(), 8);
let _v = i64::from_le_bytes(fr.data().try_into()?);
}
Shape::Wave(_) => {}
Shape::Image(_, _) => {}
},
_ => {}
},
_ => {}
},
_ => {}
}
}
}
let ret = BsreadMessage {
head_a,
head_b,
head_b_md5,
};
Ok(ret)
}
let ret = BsreadMessage {
head_a,
head_b,
head_b_md5,
};
Ok(ret)
}
pub struct BsreadCollector {}

View File

@@ -54,10 +54,7 @@ impl<V> Future for ScyQueryFut<V> {
use Poll::*;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
info!("ScyQueryFut done Ok");
Ready(Ok(()))
}
Ok(_) => Ready(Ok(())),
Err(e) => {
warn!("ScyQueryFut done Err");
Ready(Err(e).err_conv())
@@ -181,7 +178,7 @@ impl Future for ScyBatchFutGen {
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
trace!("ScyBatchFutGen done Ok");
trace!("ScyBatchFutGen done Ok");
Ready(Ok(()))
}
Err(e) => {
@@ -192,7 +189,7 @@ impl Future for ScyBatchFutGen {
"ScyBatchFutGen polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("ScyBatchFutGen done Err {e:?}");
warn!("ScyBatchFutGen done Err {e:?}");
Ready(Err(e).err_conv())
}
},
@@ -201,6 +198,102 @@ impl Future for ScyBatchFutGen {
}
}
pub struct InsertLoopFut {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
query: Arc<PreparedStatement>,
futs: Vec<Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>>>>>,
fut_ix: usize,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl InsertLoopFut {
pub fn new<V>(scy: Arc<ScySession>, query: PreparedStatement, values: Vec<V>) -> Self
where
V: ValueList + 'static,
{
//values.clear();
let query = Arc::new(query);
let scy_ref = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let query_ref = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement;
// TODO
// Can I store the values in some better generic form?
// Or is it acceptable to generate all insert futures right here and poll them later?
let futs: Vec<_> = values
.into_iter()
.map(|vs| {
//
let fut = scy_ref.execute(query_ref, vs);
Box::pin(fut) as _
})
.collect();
let tsnow = Instant::now();
Self {
scy,
query,
futs,
fut_ix: 0,
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl Future for InsertLoopFut {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
if self.futs.is_empty() {
return Ready(Ok(()));
}
loop {
let fut_ix = self.fut_ix;
break match self.futs[fut_ix].poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
self.fut_ix += 1;
if self.fut_ix >= self.futs.len() {
if false {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
info!(
"InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
}
continue;
} else {
Ready(Ok(()))
}
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("InsertLoopFut done Err {e:?}");
Ready(Err(e).err_conv())
}
},
Pending => Pending,
};
}
}
}
pub struct ChannelWriteRes {
pub nrows: u32,
pub dt: Duration,
@@ -285,6 +378,7 @@ trait MsgAcceptor {
fn len(&self) -> usize;
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>;
fn should_flush(&self) -> bool;
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error>;
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error>;
}
@@ -292,13 +386,12 @@ macro_rules! impl_msg_acceptor_scalar {
($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => {
struct $sname {
query: PreparedStatement,
values: Vec<(i32, i64, i64, i64, $st)>,
series: i32,
values: Vec<(i64, i64, i64, i64, $st)>,
series: i64,
}
impl $sname {
#[allow(unused)]
pub fn new(series: i32, cq: &CommonQueries) -> Self {
pub fn new(series: i64, cq: &CommonQueries) -> Self {
Self {
query: cq.$qu_id.clone(),
values: vec![],
@@ -315,7 +408,16 @@ macro_rules! impl_msg_acceptor_scalar {
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> {
type ST = $st;
const STL: usize = std::mem::size_of::<ST>();
let value = ST::$from_bytes(fr.data()[0..STL].try_into()?);
let data = fr.data();
if data.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"data frame too small for type: {} vs {}",
data.len(),
STL
)));
}
let a = data[..STL].try_into()?;
let value = ST::$from_bytes(a);
self.values.push((self.series, ts_msp, ts_lsp, pulse, value));
Ok(())
}
@@ -334,6 +436,12 @@ macro_rules! impl_msg_acceptor_scalar {
let ret = ScyBatchFutGen::new(scy, batch, vt);
Ok(ret)
}
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt);
Ok(ret)
}
}
};
}
@@ -342,19 +450,20 @@ macro_rules! impl_msg_acceptor_array {
($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => {
struct $sname {
query: PreparedStatement,
values: Vec<(i32, i64, i64, i64, Vec<$st>)>,
series: i32,
values: Vec<(i64, i64, i64, i64, Vec<$st>)>,
series: i64,
array_truncate: usize,
truncated: usize,
}
impl $sname {
#[allow(unused)]
pub fn new(series: i32, array_truncate: usize, cq: &CommonQueries) -> Self {
pub fn new(series: i64, array_truncate: usize, cq: &CommonQueries) -> Self {
Self {
query: cq.$qu_id.clone(),
values: vec![],
series,
array_truncate,
truncated: 0,
}
}
}
@@ -374,7 +483,18 @@ macro_rules! impl_msg_acceptor_array {
let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
}
values.truncate(self.array_truncate);
if values.len() > self.array_truncate {
if self.truncated < 10 {
warn!(
"truncate {} to {} for series {}",
values.len(),
self.array_truncate,
self.series
);
}
values.truncate(self.array_truncate);
self.truncated = self.truncated.saturating_add(1);
}
self.values.push((self.series, ts_msp, ts_lsp, pulse, values));
Ok(())
}
@@ -393,6 +513,12 @@ macro_rules! impl_msg_acceptor_array {
let ret = ScyBatchFutGen::new(scy, batch, vt);
Ok(ret)
}
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt);
Ok(ret)
}
}
};
}
@@ -403,6 +529,8 @@ impl_msg_acceptor_scalar!(MsgAcceptorScalarU32LE, i32, qu_insert_scalar_i32, fro
impl_msg_acceptor_scalar!(MsgAcceptorScalarU32BE, i32, qu_insert_scalar_i32, from_be_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarI16LE, i16, qu_insert_scalar_i16, from_le_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarI16BE, i16, qu_insert_scalar_i16, from_be_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarI32LE, i32, qu_insert_scalar_i32, from_le_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarI32BE, i32, qu_insert_scalar_i32, from_be_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarF32LE, f32, qu_insert_scalar_f32, from_le_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarF32BE, f32, qu_insert_scalar_f32, from_be_bytes);
impl_msg_acceptor_scalar!(MsgAcceptorScalarF64LE, f64, qu_insert_scalar_f64, from_le_bytes);
@@ -412,24 +540,104 @@ impl_msg_acceptor_array!(MsgAcceptorArrayU16LE, i16, qu_insert_array_u16, from_l
impl_msg_acceptor_array!(MsgAcceptorArrayU16BE, i16, qu_insert_array_u16, from_be_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayI16LE, i16, qu_insert_array_i16, from_le_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayI16BE, i16, qu_insert_array_i16, from_be_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayI32LE, i32, qu_insert_array_i32, from_le_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayI32BE, i32, qu_insert_array_i32, from_be_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayF32LE, f32, qu_insert_array_f32, from_le_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayF32BE, f32, qu_insert_array_f32, from_be_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayF64LE, f64, qu_insert_array_f64, from_le_bytes);
impl_msg_acceptor_array!(MsgAcceptorArrayF64BE, f64, qu_insert_array_f64, from_be_bytes);
struct MsgAcceptorArrayBool {
query: PreparedStatement,
values: Vec<(i64, i64, i64, i64, Vec<bool>)>,
series: i64,
array_truncate: usize,
truncated: usize,
}
impl MsgAcceptorArrayBool {
pub fn new(series: i64, array_truncate: usize, cq: &CommonQueries) -> Self {
Self {
query: cq.qu_insert_array_bool.clone(),
values: vec![],
series,
array_truncate,
truncated: 0,
}
}
}
impl MsgAcceptor for MsgAcceptorArrayBool {
fn len(&self) -> usize {
self.values.len()
}
fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> {
type ST = bool;
const STL: usize = std::mem::size_of::<ST>();
let vc = fr.data().len() / STL;
let mut values = Vec::with_capacity(vc);
for i in 0..vc {
let h = i * STL;
let value = u8::from_le_bytes(fr.data()[h..h + STL].try_into()?);
values.push(value);
}
if values.len() > self.array_truncate {
if self.truncated < 10 {
warn!(
"truncate {} to {} for series {}",
values.len(),
self.array_truncate,
self.series
);
}
values.truncate(self.array_truncate);
self.truncated = self.truncated.saturating_add(1);
}
let values = values.into_iter().map(|x| x != 0).collect();
self.values.push((self.series, ts_msp, ts_lsp, pulse, values));
Ok(())
}
fn should_flush(&self) -> bool {
self.len() >= 40 + ((self.series as usize) & 0x7)
}
fn flush_batch(&mut self, scy: Arc<ScySession>) -> Result<ScyBatchFutGen, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..nn {
batch.append_statement(self.query.clone());
}
let ret = ScyBatchFutGen::new(scy, batch, vt);
Ok(ret)
}
fn flush_loop(&mut self, scy: Arc<ScySession>) -> Result<InsertLoopFut, Error> {
let vt = mem::replace(&mut self.values, vec![]);
let ret = InsertLoopFut::new(scy, self.query.clone(), vt);
Ok(ret)
}
}
pub struct ChannelWriterAll {
series: u32,
series: u64,
scy: Arc<ScySession>,
common_queries: Arc<CommonQueries>,
ts_msp_lsp: fn(u64, u32) -> (u64, u64),
ts_msp_lsp: fn(u64, u64) -> (u64, u64),
ts_msp_last: u64,
acceptor: Box<dyn MsgAcceptor>,
dtype_mark: u32,
#[allow(unused)]
scalar_type: ScalarType,
#[allow(unused)]
shape: Shape,
pulse_last: u64,
}
impl ChannelWriterAll {
pub fn new(
series: u32,
series: u64,
common_queries: Arc<CommonQueries>,
scy: Arc<ScySession>,
scalar_type: ScalarType,
@@ -437,60 +645,66 @@ impl ChannelWriterAll {
byte_order: ByteOrder,
array_truncate: usize,
) -> Result<Self, Error> {
let dtype_mark = scalar_type.index() as u32;
let dtype_mark = match &shape {
Shape::Scalar => dtype_mark,
Shape::Wave(_) => 1000 + dtype_mark,
Shape::Image(_, _) => 2000 + dtype_mark,
};
let (ts_msp_lsp, acc): (fn(u64, u32) -> (u64, u64), Box<dyn MsgAcceptor>) = match &shape {
let (ts_msp_lsp, acc): (fn(u64, u64) -> (u64, u64), Box<dyn MsgAcceptor>) = match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U16 => match &byte_order {
ByteOrder::BE => {
let acc = MsgAcceptorScalarU16BE::new(series as i32, &common_queries);
ByteOrder::LE => {
let acc = MsgAcceptorScalarU16LE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::LE => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
ByteOrder::BE => {
let acc = MsgAcceptorScalarU16BE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::U32 => match &byte_order {
ByteOrder::BE => {
let acc = MsgAcceptorScalarU32BE::new(series as i32, &common_queries);
ByteOrder::LE => {
let acc = MsgAcceptorScalarU32LE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarU32BE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::I16 => match &byte_order {
ByteOrder::LE => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
let acc = MsgAcceptorScalarI16LE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarI16BE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::I32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorScalarI32LE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorScalarI32BE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::F32 => match &byte_order {
ByteOrder::BE => {
let acc = MsgAcceptorScalarF32BE::new(series as i32, &common_queries);
ByteOrder::LE => {
let acc = MsgAcceptorScalarF32LE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::LE => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
ByteOrder::BE => {
let acc = MsgAcceptorScalarF32BE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
ScalarType::F64 => match &byte_order {
ByteOrder::BE => {
let acc = MsgAcceptorScalarF64BE::new(series as i32, &common_queries);
ByteOrder::LE => {
let acc = MsgAcceptorScalarF64LE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
ByteOrder::LE => {
return Err(Error::with_msg_no_trace(format!(
"TODO {:?} {:?} {:?}",
scalar_type, shape, byte_order
)));
ByteOrder::BE => {
let acc = MsgAcceptorScalarF64BE::new(series as i64, &common_queries);
(ts_msp_lsp_1, Box::new(acc) as _)
}
},
_ => {
@@ -503,43 +717,59 @@ impl ChannelWriterAll {
Shape::Wave(nele) => {
info!("set up wave acceptor nele {nele}");
match &scalar_type {
ScalarType::BOOL => match &byte_order {
_ => {
let acc = MsgAcceptorArrayBool::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::U16 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayU16LE::new(series as i32, array_truncate, &common_queries);
let acc = MsgAcceptorArrayU16LE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayU16BE::new(series as i32, array_truncate, &common_queries);
let acc = MsgAcceptorArrayU16BE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::I16 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayI16LE::new(series as i32, array_truncate, &common_queries);
let acc = MsgAcceptorArrayI16LE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayI16BE::new(series as i32, array_truncate, &common_queries);
let acc = MsgAcceptorArrayI16BE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::I32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayI32LE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayI32BE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::F32 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayF32LE::new(series as i32, array_truncate, &common_queries);
let acc = MsgAcceptorArrayF32LE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayF32BE::new(series as i32, array_truncate, &common_queries);
let acc = MsgAcceptorArrayF32BE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
ScalarType::F64 => match &byte_order {
ByteOrder::LE => {
let acc = MsgAcceptorArrayF64LE::new(series as i32, array_truncate, &common_queries);
let acc = MsgAcceptorArrayF64LE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
ByteOrder::BE => {
let acc = MsgAcceptorArrayF64BE::new(series as i32, array_truncate, &common_queries);
let acc = MsgAcceptorArrayF64BE::new(series as i64, array_truncate, &common_queries);
(ts_msp_lsp_2, Box::new(acc) as _)
}
},
@@ -565,25 +795,31 @@ impl ChannelWriterAll {
ts_msp_lsp,
ts_msp_last: 0,
acceptor: acc,
dtype_mark,
scalar_type,
shape,
pulse_last: 0,
};
Ok(ret)
}
pub fn dtype_mark(&self) -> u32 {
self.dtype_mark
}
pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
// TODO limit log rate
// TODO for many channels, it's normal to have gaps.
if false && pulse != 0 && pulse != self.pulse_last + 1 {
let gap = pulse as i64 - self.pulse_last as i64;
warn!("GAP series {} pulse {} gap {}", self.series, pulse, gap);
}
self.pulse_last = pulse;
let (ts_msp, ts_lsp) = (self.ts_msp_lsp)(ts, self.series);
let fut1 = if ts_msp != self.ts_msp_last {
info!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}");
debug!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}");
self.ts_msp_last = ts_msp;
// TODO make the passing of the query parameters type safe.
// TODO the "dtype" table field is not needed here. Drop from database.
let fut = ScyQueryFut::new(
self.scy.clone(),
self.common_queries.qu_insert_ts_msp.clone(),
(self.series as i32, ts_msp as i64, self.dtype_mark as i32),
(self.series as i64, ts_msp as i64),
);
Some(Box::pin(fut) as _)
} else {
@@ -592,7 +828,7 @@ impl ChannelWriterAll {
self.acceptor.accept(ts_msp as i64, ts_lsp as i64, pulse as i64, fr)?;
if self.acceptor.should_flush() {
let nn = self.acceptor.len();
let fut = self.acceptor.flush_batch(self.scy.clone())?;
let fut = self.acceptor.flush_loop(self.scy.clone())?;
let fut2 = Some(Box::pin(fut) as _);
let ret = ChannelWriteFut {
ts1: None,
@@ -621,19 +857,19 @@ impl ChannelWriter for ChannelWriterAll {
}
}
fn ts_msp_lsp_1(ts: u64, series: u32) -> (u64, u64) {
fn ts_msp_lsp_1(ts: u64, series: u64) -> (u64, u64) {
ts_msp_lsp_gen(ts, series, 100 * SEC)
}
fn ts_msp_lsp_2(ts: u64, series: u32) -> (u64, u64) {
fn ts_msp_lsp_2(ts: u64, series: u64) -> (u64, u64) {
ts_msp_lsp_gen(ts, series, 10 * SEC)
}
fn ts_msp_lsp_gen(ts: u64, series: u32, fak: u64) -> (u64, u64) {
fn ts_msp_lsp_gen(ts: u64, series: u64, fak: u64) -> (u64, u64) {
if ts < u32::MAX as u64 {
return (0, 0);
}
let off = series as u64;
let off = series & 0xffffffff;
let ts_a = ts - off;
let ts_b = ts_a / fak;
let ts_lsp = ts_a % fak;

View File

@@ -156,7 +156,13 @@ impl NetBuf {
}
}
fn check_invariant(&self) {
#[allow(unused)]
#[inline(always)]
fn check_invariant(&self) {}
#[allow(unused)]
#[inline(always)]
fn check_invariant2(&self) {
if self.wp > self.buf.len() {
eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp);
std::process::exit(87);

View File

@@ -1,4 +1,4 @@
use crate::bsread::{parse_zmtp_message, BsreadMessage};
use crate::bsread::{BsreadMessage, Parser};
use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB};
use crate::channelwriter::{ChannelWriter, ChannelWriterAll};
use crate::netbuf::NetBuf;
@@ -72,12 +72,16 @@ fn test_service() -> Result<(), Error> {
taskrun::run(fut)
}
pub fn get_series_id(chn: &ChannelDesc) -> u32 {
pub fn get_series_id(chn: &ChannelDesc) -> u64 {
// TODO use a more stable format (with ScalarType, Shape) as hash input.
// TODO do not depend at all on the mapping, instead look it up on demand and cache.
use md5::Digest;
let mut h = md5::Md5::new();
h.update(chn.name.as_bytes());
h.update(chn.ty.as_bytes());
h.update(format!("{:?}", chn.shape).as_bytes());
let f = h.finalize();
u32::from_le_bytes(f.as_slice()[0..4].try_into().unwrap())
u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap())
}
pub struct CommonQueries {
@@ -92,8 +96,10 @@ pub struct CommonQueries {
pub qu_insert_scalar_f64: PreparedStatement,
pub qu_insert_array_u16: PreparedStatement,
pub qu_insert_array_i16: PreparedStatement,
pub qu_insert_array_i32: PreparedStatement,
pub qu_insert_array_f32: PreparedStatement,
pub qu_insert_array_f64: PreparedStatement,
pub qu_insert_array_bool: PreparedStatement,
}
#[derive(Clone)]
@@ -136,9 +142,10 @@ struct BsreadClient {
rcvbuf: Option<usize>,
tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>,
scy: Arc<ScySession>,
channel_writers: BTreeMap<u32, Box<dyn ChannelWriter>>,
channel_writers: BTreeMap<u64, Box<dyn ChannelWriter>>,
common_queries: Arc<CommonQueries>,
print_stats: CheckEvery,
parser: Parser,
}
impl BsreadClient {
@@ -158,6 +165,7 @@ impl BsreadClient {
channel_writers: Default::default(),
common_queries,
print_stats: CheckEvery::new(Duration::from_millis(2000)),
parser: Parser::new(),
};
Ok(ret)
}
@@ -177,13 +185,14 @@ impl BsreadClient {
let mut bytes_payload = 0u64;
let mut rows_inserted = 0u32;
let mut time_spent_inserting = Duration::from_millis(0);
let mut series_ids = Vec::new();
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpCommand(_) => (),
ZmtpEvent::ZmtpMessage(msg) => {
msgc += 1;
match parse_zmtp_message(&msg) {
match self.parser.parse_zmtp_message(&msg) {
Ok(bm) => {
if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 {
frame_diff_count += 1;
@@ -207,9 +216,10 @@ impl BsreadClient {
}
{
if bm.head_b_md5 != dh_md5_last {
series_ids.clear();
head_b = bm.head_b.clone();
if dh_md5_last.is_empty() {
info!("data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash);
info!("data header hash {}", bm.head_b_md5);
dh_md5_last = bm.head_b_md5.clone();
for chn in &head_b.channels {
info!("Setup writer for {}", chn.name);
@@ -221,7 +231,7 @@ impl BsreadClient {
}
}
} else {
error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash);
error!("TODO changed data header hash {}", bm.head_b_md5);
dh_md5_last = bm.head_b_md5.clone();
// TODO
// Update only the changed channel writers.
@@ -253,11 +263,23 @@ impl BsreadClient {
let gts = bm.head_a.global_timestamp;
let ts = (gts.sec as u64) * SEC + gts.ns as u64;
let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0);
// TODO limit warn rate
if pulse != 0 && (pulse < 14781000000 || pulse > 16000000000) {
// TODO limit log rate
warn!("Bad pulse {} for {}", pulse, self.source_addr);
}
for i1 in 0..head_b.channels.len() {
let chn = &head_b.channels[i1];
let fr = &msg.frames[2 + 2 * i1];
let series = get_series_id(chn);
if !self.channel_writers.contains_key(&series) {}
if i1 >= series_ids.len() {
series_ids.resize(head_b.channels.len(), (0u8, 0u64));
}
if series_ids[i1].0 == 0 {
let series = get_series_id(chn);
series_ids[i1].0 = 1;
series_ids[i1].1 = series;
}
let series = series_ids[i1].1;
if let Some(cw) = self.channel_writers.get_mut(&series) {
let res = cw.write_msg(ts, pulse, fr)?.await?;
rows_inserted += res.nrows;
@@ -329,13 +351,13 @@ impl BsreadClient {
byte_order.clone(),
trunc,
)?;
let dtype_mark = cw.dtype_mark();
let shape_dims = shape.to_scylla_vec();
self.channel_writers.insert(series, Box::new(cw));
// TODO insert correct facility name
self.scy
.query(
"insert into series_by_channel (facility, channel_name, dtype, series) values (?, ?, ?, ?)",
("scylla", &chn.name, dtype_mark as i32, series as i32),
"insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?)",
("scylla", &chn.name, series as i64, scalar_type.to_scylla_i32(), &shape_dims),
)
.await
.err_conv()?;
@@ -407,7 +429,7 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = scy
.prepare("insert into ts_msp (series, ts_msp, dtype) values (?, ?, ?)")
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_u16 = scy
@@ -442,6 +464,10 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
.prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_i32 = scy
.prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f32 = scy
.prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
@@ -450,6 +476,10 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_bool = scy
.prepare("insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let common_queries = CommonQueries {
qu1,
qu2,
@@ -462,8 +492,10 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
qu_insert_scalar_f64,
qu_insert_array_u16,
qu_insert_array_i16,
qu_insert_array_i32,
qu_insert_array_f32,
qu_insert_array_f64,
qu_insert_array_bool,
};
let common_queries = Arc::new(common_queries);
let mut clients = vec![];
@@ -520,6 +552,135 @@ fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {
Ok(())
}
pub struct BsreadDumper {
source_addr: String,
parser: Parser,
}
impl BsreadDumper {
pub fn new(source_addr: String) -> Self {
Self {
source_addr,
parser: Parser::new(),
}
}
pub async fn run(&mut self) -> Result<(), Error> {
let src = if self.source_addr.starts_with("tcp://") {
self.source_addr[6..].into()
} else {
self.source_addr.clone()
};
let conn = tokio::net::TcpStream::connect(&src).await?;
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0u64;
let mut msgc = 0u64;
let mut dh_md5_last = String::new();
let mut frame_diff_count = 0u64;
let mut hash_mismatch_count = 0u64;
let mut head_b = HeadB::empty();
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpCommand(_) => (),
ZmtpEvent::ZmtpMessage(msg) => {
msgc += 1;
match self.parser.parse_zmtp_message(&msg) {
Ok(bm) => {
if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 {
frame_diff_count += 1;
if frame_diff_count < 1000 {
warn!(
"chn len {} frame diff {}",
bm.head_b.channels.len(),
msg.frames().len() - 2 * bm.head_b.channels.len()
);
}
}
if bm.head_b_md5 != bm.head_a.hash {
hash_mismatch_count += 1;
// TODO keep logging data header changes, just suppress too frequent messages.
if hash_mismatch_count < 200 {
error!(
"Invalid bsread message: hash mismatch. dhcompr {:?}",
bm.head_a.dh_compression
);
}
}
if bm.head_b_md5 != dh_md5_last {
head_b = bm.head_b.clone();
if dh_md5_last.is_empty() {
info!("data header hash {}", bm.head_b_md5);
} else {
error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash);
}
dh_md5_last = bm.head_b_md5.clone();
}
if msg.frames.len() < 2 + 2 * head_b.channels.len() {
// TODO count always, throttle log.
error!("not enough frames for data header");
}
let gts = bm.head_a.global_timestamp;
let ts = (gts.sec as u64) * SEC + gts.ns as u64;
let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0);
let mut bytes_payload = 0u64;
for i1 in 0..head_b.channels.len() {
let chn = &head_b.channels[i1];
let fr = &msg.frames[2 + 2 * i1];
let _series = get_series_id(chn);
if chn.ty == "string" {
info!("string channel: {} {:?}", chn.name, chn.shape);
if let Ok(shape) = Shape::from_bsread_jsval(&chn.shape) {
if let Ok(_bo) = ByteOrder::from_bsread_str(&chn.encoding) {
match &shape {
Shape::Scalar => {
info!("scalar string...");
let s = String::from_utf8_lossy(fr.data());
info!("STRING: {s:?}");
}
_ => {
warn!(
"non-scalar string channels not yet implemented {}",
chn.name
);
}
}
}
}
}
bytes_payload += fr.data().len() as u64;
}
info!("zmtp message ts {ts} pulse {pulse} bytes_payload {bytes_payload}");
}
Err(e) => {
for frame in &msg.frames {
info!("Frame: {:?}", frame);
}
zmtp.dump_input_state();
zmtp.dump_conn_state();
error!("bsread parse error: {e:?}");
break;
}
}
}
},
Err(e) => {
error!("zmtp item error: {e:?}");
return Err(e);
}
}
i1 += 1;
if true && i1 > 20 {
break;
}
if true && msgc > 20 {
break;
}
}
Ok(())
}
}
#[derive(Clone, Debug)]
enum ConnState {
InitSend,