Write f64 values

This commit is contained in:
Dominik Werder
2022-04-05 15:36:18 +02:00
parent 1e76cb1391
commit 83bfc66ec4
8 changed files with 759 additions and 138 deletions

View File

@@ -10,7 +10,7 @@ pub fn main() -> Result<(), Error> {
}
let opts = DaqIngestOpts::parse();
match opts.subcmd {
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await?,
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.scylla, &k.source, k.rcvbuf, k.do_pulse_id).await?,
SubCmd::ListPkey => daqingest::query::list_pkey().await?,
SubCmd::ListPulses => daqingest::query::list_pulses().await?,
}

View File

@@ -21,8 +21,12 @@ pub enum SubCmd {
#[derive(Debug, Parser)]
pub struct Bsread {
#[clap(long)]
pub scylla: String,
#[clap(long)]
pub source: String,
#[clap(long)]
pub rcvbuf: Option<u32>,
#[clap(long)]
pub do_pulse_id: bool,
}

View File

@@ -22,6 +22,7 @@ futures-util = "0.3"
#pin-project-lite = "0.2"
scylla = "0.4"
md-5 = "0.9"
hex = "0.4"
libc = "0.2"
log = { path = "../log" }
err = { path = "../../daqbuffer/err" }

View File

@@ -5,7 +5,6 @@ use log::*;
use netpod::{ByteOrder, ScalarType, Shape};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsVal;
use std::fmt;
// TODO
pub struct ParseError {
@@ -13,50 +12,86 @@ pub struct ParseError {
pub msg: ZmtpMessage,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GlobalTimestamp {
pub sec: u64,
pub ns: u64,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelDesc {
pub name: String,
#[serde(rename = "type")]
pub ty: String,
pub shape: JsVal,
pub encoding: String,
pub compression: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HeadA {
pub htype: String,
pub hash: String,
pub pulse_id: serde_json::Number,
pub global_timestamp: GlobalTimestamp,
pub dh_compression: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HeadB {
pub htype: String,
pub channels: Vec<ChannelDesc>,
}
#[derive(Debug)]
impl HeadB {
pub fn empty() -> Self {
Self {
htype: String::new(),
channels: vec![],
}
}
}
#[derive(Clone, Debug)]
pub struct BsreadMessage {
pub head_a: HeadA,
pub head_b: HeadB,
pub values: Vec<Box<dyn fmt::Debug>>,
pub head_b_md5: String,
}
pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
if msg.frames().len() < 3 {
if msg.frames().len() < 2 {
return Err(Error::with_msg_no_trace("not enough frames for bsread"));
}
let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?;
let head_b: HeadB = serde_json::from_slice(&msg.frames()[1].data())?;
let mut values = vec![];
if msg.frames().len() == head_b.channels.len() + 3 {
let head_b_md5 = {
use md5::Digest;
let mut hasher = md5::Md5::new();
hasher.update(msg.frames()[1].data());
let h = hasher.finalize();
hex::encode(&h)
};
let dhdecompr = match &head_a.dh_compression {
Some(m) => match m.as_str() {
"none" => msg.frames()[1].data(),
"lz4" => {
error!("data header lz4 compression not yet implemented");
return Err(Error::with_msg_no_trace(
"data header lz4 compression not yet implemented",
));
}
"bitshuffle_lz4" => {
error!("data header bitshuffle_lz4 compression not yet implemented");
return Err(Error::with_msg_no_trace(
"data header bitshuffle_lz4 compression not yet implemented",
));
}
_ => msg.frames()[1].data(),
},
None => msg.frames()[1].data(),
};
let head_b: HeadB = serde_json::from_slice(dhdecompr)?;
if false && msg.frames().len() == head_b.channels.len() + 3 {
for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) {
let sty = ScalarType::from_bsread_str(ch.ty.as_str())?;
let bo = ByteOrder::from_bsread_str(&ch.encoding)?;
@@ -66,8 +101,7 @@ pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
ByteOrder::LE => match &shape {
Shape::Scalar => {
assert_eq!(fr.data().len(), 8);
let v = i64::from_le_bytes(fr.data().try_into()?);
values.push(Box::new(v) as _);
let _v = i64::from_le_bytes(fr.data().try_into()?);
}
Shape::Wave(_) => {}
Shape::Image(_, _) => {}
@@ -78,14 +112,11 @@ pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
}
}
}
{
let fr = &msg.frames()[msg.frames().len() - 1];
if fr.data().len() == 8 {
let pulse = u64::from_le_bytes(fr.data().try_into()?);
info!("pulse {}", pulse);
}
}
let ret = BsreadMessage { head_a, head_b, values };
let ret = BsreadMessage {
head_a,
head_b,
head_b_md5,
};
Ok(ret)
}

View File

@@ -0,0 +1,296 @@
use crate::zmtp::ErrConv;
use crate::zmtp::{CommonQueries, ZmtpFrame};
use err::Error;
use futures_core::Future;
use futures_util::FutureExt;
use log::*;
use scylla::batch::{Batch, BatchType};
use scylla::frame::value::{BatchValues, ValueList};
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::{BatchResult, QueryResult, Session as ScySession};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
pub struct ScyQueryFut<V> {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
query: Box<PreparedStatement>,
#[allow(unused)]
values: Box<V>,
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>>>>,
}
impl<V> ScyQueryFut<V> {
pub fn new(scy: Arc<ScySession>, query: PreparedStatement, values: V) -> Self
where
V: ValueList + 'static,
{
let query = Box::new(query);
let values = Box::new(values);
let scy2 = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let query2 = unsafe { &*(&query as &_ as *const _) } as &PreparedStatement;
let v2 = unsafe { &*(&values as &_ as *const _) } as &V;
let fut = scy2.execute(query2, v2);
Self {
scy,
query,
values,
fut: Box::pin(fut),
}
}
}
impl<V> Future for ScyQueryFut<V> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
info!("ScyQueryFut done Ok");
Ready(Ok(()))
}
Err(e) => {
info!("ScyQueryFut done Err");
Ready(Err(e).err_conv())
}
},
Pending => Pending,
}
}
}
pub struct ScyBatchFut<V> {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
batch: Box<Batch>,
#[allow(unused)]
values: Box<V>,
fut: Pin<Box<dyn Future<Output = Result<BatchResult, QueryError>>>>,
}
impl<V> ScyBatchFut<V> {
pub fn new(scy: Arc<ScySession>, batch: Batch, values: V) -> Self
where
V: BatchValues + 'static,
{
let batch = Box::new(batch);
let values = Box::new(values);
let scy2 = unsafe { &*(&scy as &_ as *const _) } as &ScySession;
let batch2 = unsafe { &*(&batch as &_ as *const _) } as &Batch;
let v2 = unsafe { &*(&values as &_ as *const _) } as &V;
let fut = scy2.batch(batch2, v2);
Self {
scy,
batch,
values,
fut: Box::pin(fut),
}
}
}
impl<V> Future for ScyBatchFut<V> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
info!("ScyBatchFut done Ok");
Ready(Ok(()))
}
Err(e) => {
info!("ScyBatchFut done Err");
Ready(Err(e).err_conv())
}
},
Pending => Pending,
}
}
}
pub struct ChannelWriteFut {
nn: usize,
fut1: Option<Pin<Box<dyn Future<Output = Result<(), Error>>>>>,
fut2: Option<Pin<Box<dyn Future<Output = Result<(), Error>>>>>,
ts1: Option<Instant>,
mask: u8,
}
impl Future for ChannelWriteFut {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
loop {
break if self.ts1.is_none() {
self.ts1 = Some(Instant::now());
continue;
} else if let Some(f) = self.fut1.as_mut() {
match f.poll_unpin(cx) {
Ready(k) => {
info!("ChannelWriteFut fut1 Ready");
self.fut1 = None;
self.mask |= 1;
match k {
Ok(_) => continue,
Err(e) => Ready(Err(e)),
}
}
Pending => Pending,
}
} else if let Some(f) = self.fut2.as_mut() {
match f.poll_unpin(cx) {
Ready(k) => {
info!("ChannelWriteFut fut2 Ready");
self.fut2 = None;
self.mask |= 2;
match k {
Ok(_) => continue,
Err(e) => Ready(Err(e)),
}
}
Pending => Pending,
}
} else {
if self.mask != 0 {
let ts2 = Instant::now();
let dt = ts2.duration_since(self.ts1.unwrap()).as_secs_f32() * 1e3;
info!("insert f64 nn {} dt {:6.2} ms", self.nn, dt);
}
Ready(Ok(()))
};
}
}
}
pub trait ChannelWriter {
fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error>;
}
pub struct WriteFutF64 {
nn: usize,
fut1: Pin<Box<dyn Future<Output = Result<(), Error>>>>,
fut2: Pin<Box<dyn Future<Output = Result<(), Error>>>>,
}
impl Future for WriteFutF64 {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
todo!()
}
}
pub async fn run_write_fut(fut: ChannelWriteFut) -> Result<(), Error> {
err::todo();
let ts1 = Instant::now();
if let Some(f) = fut.fut1 {
f.await?;
}
if let Some(f) = fut.fut2 {
f.await?;
}
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("insert f64 nn {} dt {:6.2} ms", fut.nn, dt);
Ok(())
}
pub async fn run_write_fut_f64(fut: WriteFutF64) -> Result<(), Error> {
err::todo();
let ts1 = Instant::now();
fut.fut1.await?;
fut.fut2.await?;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("insert f64 nn {} dt {:6.2} ms", fut.nn, dt);
Ok(())
}
pub struct ChannelWriterF64 {
series: u32,
scy: Arc<ScySession>,
common_queries: Arc<CommonQueries>,
ts_msp_last: u64,
tmp_vals: Vec<(i32, i64, i64, i64, f64)>,
}
impl ChannelWriterF64 {
pub fn new(series: u32, common_queries: Arc<CommonQueries>, scy: Arc<ScySession>) -> Self {
Self {
series,
scy,
ts_msp_last: 0,
common_queries,
tmp_vals: vec![],
}
}
pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
let (ts_msp, ts_lsp) = ts_msp_lsp(ts);
let fut1 = if ts_msp != self.ts_msp_last {
info!("write_msg_impl TS MSP CHANGED ts {} pulse {}", ts, pulse);
self.ts_msp_last = ts_msp;
let fut = ScyQueryFut::new(
self.scy.clone(),
self.common_queries.qu_insert_ts_msp.clone(),
(self.series as i32, ts_msp as i64),
);
Some(Box::pin(fut) as _)
} else {
None
};
let value = f64::from_be_bytes(fr.data().try_into()?);
self.tmp_vals
.push((self.series as i32, ts_msp as i64, ts_lsp as i64, pulse as i64, value));
if self.tmp_vals.len() >= 180 + ((self.series as usize) & 0x3f) {
info!("write_msg_impl BATCH INSERT ts {} pulse {}", ts, pulse);
let vt = std::mem::replace(&mut self.tmp_vals, vec![]);
let nn = vt.len();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..nn {
batch.append_statement(self.common_queries.qu_insert_scalar_f64.clone());
}
let fut = ScyBatchFut::new(self.scy.clone(), batch, vt);
let fut2 = Some(Box::pin(fut) as _);
let ret = ChannelWriteFut {
ts1: None,
mask: 0,
nn,
fut1,
fut2,
};
Ok(ret)
} else {
let ret = ChannelWriteFut {
ts1: None,
mask: 0,
nn: 0,
fut1: fut1,
fut2: None,
};
Ok(ret)
}
}
}
impl ChannelWriter for ChannelWriterF64 {
fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result<ChannelWriteFut, Error> {
self.write_msg_impl(ts, pulse, fr)
}
}
fn ts_msp_lsp(ts: u64) -> (u64, u64) {
const MASK: u64 = u64::MAX >> 23;
let ts_msp = ts & (!MASK);
let ts_lsp = ts & MASK;
(ts_msp, ts_lsp)
}

View File

@@ -16,23 +16,32 @@ impl NetBuf {
}
}
pub fn state(&self) -> (usize, usize) {
(self.rp, self.wp)
}
pub fn len(&self) -> usize {
self.check_invariant();
self.wp - self.rp
}
pub fn cap(&self) -> usize {
self.check_invariant();
self.buf.len()
}
pub fn wcap(&self) -> usize {
self.check_invariant();
self.buf.len() - self.wp
}
pub fn data(&self) -> &[u8] {
self.check_invariant();
&self.buf[self.rp..self.wp]
}
pub fn adv(&mut self, x: usize) -> Result<(), Error> {
self.check_invariant();
if self.len() < x {
return Err(Error::with_msg_no_trace("not enough bytes"));
} else {
@@ -42,6 +51,7 @@ impl NetBuf {
}
pub fn wadv(&mut self, x: usize) -> Result<(), Error> {
self.check_invariant();
if self.wcap() < x {
return Err(Error::with_msg_no_trace("not enough space"));
} else {
@@ -51,6 +61,7 @@ impl NetBuf {
}
pub fn read_u8(&mut self) -> Result<u8, Error> {
self.check_invariant();
type T = u8;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
@@ -63,6 +74,7 @@ impl NetBuf {
}
pub fn read_u64(&mut self) -> Result<u64, Error> {
self.check_invariant();
type T = u64;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
@@ -75,6 +87,7 @@ impl NetBuf {
}
pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> {
self.check_invariant();
if self.len() < n {
return Err(Error::with_msg_no_trace("not enough bytes"));
} else {
@@ -85,12 +98,14 @@ impl NetBuf {
}
pub fn read_buf_for_fill(&mut self) -> ReadBuf {
self.check_invariant();
self.rewind_if_needed();
let read_buf = ReadBuf::new(&mut self.buf[self.wp..]);
read_buf
}
pub fn rewind_if_needed(&mut self) {
self.check_invariant();
if self.rp != 0 && self.rp == self.wp {
self.rp = 0;
self.wp = 0;
@@ -102,6 +117,7 @@ impl NetBuf {
}
pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> {
self.check_invariant();
self.rewind_if_needed();
if self.wcap() < buf.len() {
return Err(Error::with_msg_no_trace("not enough space"));
@@ -113,6 +129,7 @@ impl NetBuf {
}
pub fn put_u8(&mut self, v: u8) -> Result<(), Error> {
self.check_invariant();
type T = u8;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed();
@@ -126,6 +143,7 @@ impl NetBuf {
}
pub fn put_u64(&mut self, v: u64) -> Result<(), Error> {
self.check_invariant();
type T = u64;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed();
@@ -137,4 +155,15 @@ impl NetBuf {
Ok(())
}
}
fn check_invariant(&self) {
if self.wp > self.buf.len() {
eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp);
std::process::exit(87);
}
if self.rp > self.wp {
eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp);
std::process::exit(87);
}
}
}

View File

@@ -1,5 +1,6 @@
pub mod bsread;
pub mod ca;
pub mod channelwriter;
pub mod netbuf;
#[cfg(test)]
pub mod test;

View File

@@ -1,5 +1,6 @@
use crate::bsread::parse_zmtp_message;
use crate::bsread::{parse_zmtp_message, BsreadMessage};
use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB};
use crate::channelwriter::{ChannelWriter, ChannelWriterF64};
use crate::netbuf::NetBuf;
use async_channel::{Receiver, Sender};
#[allow(unused)]
@@ -10,13 +11,16 @@ use futures_util::{pin_mut, StreamExt};
use log::*;
use netpod::timeunits::*;
use scylla::batch::{Batch, BatchType, Consistency};
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::SessionBuilder;
use scylla::{Session as ScySession, SessionBuilder};
use serde_json::Value as JsVal;
use std::collections::BTreeMap;
use std::ffi::CStr;
use std::fmt;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
@@ -66,118 +70,281 @@ fn test_service() -> Result<(), Error> {
taskrun::run(fut)
}
pub async fn zmtp_client(addr: &str, rcvbuf: Option<u32>) -> Result<(), Error> {
let mut conn = tokio::net::TcpStream::connect(addr).await?;
if let Some(v) = rcvbuf {
set_rcv_sock_opts(&mut conn, v)?;
pub fn get_series_id(chn: &ChannelDesc) -> u32 {
use md5::Digest;
let mut h = md5::Md5::new();
h.update(chn.name.as_bytes());
let f = h.finalize();
u32::from_le_bytes(f.as_slice()[0..4].try_into().unwrap())
}
pub struct CommonQueries {
pub qu1: PreparedStatement,
pub qu2: PreparedStatement,
pub qu_insert_ts_msp: PreparedStatement,
pub qu_insert_scalar_f64: PreparedStatement,
}
struct BsreadClient {
#[allow(unused)]
scylla: String,
addr: String,
do_pulse_id: bool,
rcvbuf: Option<u32>,
tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>,
scy: Arc<ScySession>,
channel_writers: BTreeMap<u32, Box<dyn ChannelWriter>>,
common_queries: Arc<CommonQueries>,
}
impl BsreadClient {
pub async fn new(scylla: String, addr: String, do_pulse_id: bool, rcvbuf: Option<u32>) -> Result<Self, Error> {
let scy = SessionBuilder::new()
.default_consistency(Consistency::Quorum)
.known_node(&scylla)
.use_keyspace("ks1", false)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu1 = scy
.prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu2 = scy
.prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f64 = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let common_queries = CommonQueries {
qu1,
qu2,
qu_insert_ts_msp,
qu_insert_scalar_f64,
};
let ret = Self {
scylla,
addr,
do_pulse_id,
rcvbuf,
tmp_vals_pulse_map: vec![],
scy: Arc::new(scy),
channel_writers: Default::default(),
common_queries: Arc::new(common_queries),
};
Ok(ret)
}
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0u64;
let mut msgc = 0u64;
let mut vals1 = vec![];
let mut vals2 = vec![];
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.use_keyspace("ks1", false)
.default_consistency(Consistency::One)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu2 = scy
.prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpCommand(cmd) => {
info!("{:?}", cmd);
}
ZmtpEvent::ZmtpMessage(msg) => {
msgc += 1;
trace!("Message frames: {}", msg.frames.len());
match parse_zmtp_message(&msg) {
Ok(bm) => {
trace!("{:?}", bm);
trace!("len A {} len B {}", bm.head_b.channels.len(), bm.values.len());
let mut i3 = u32::MAX;
for (i, ch) in bm.head_b.channels.iter().enumerate() {
if ch.name == "SINEG01-RLLE-STA:MASTER-EVRPULSEID" {
i3 = i as u32;
pub async fn run(&mut self) -> Result<(), Error> {
let mut conn = tokio::net::TcpStream::connect(&self.addr).await?;
if let Some(v) = self.rcvbuf {
set_rcv_sock_opts(&mut conn, v)?;
}
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0u64;
let mut msgc = 0u64;
let mut dh_md5_last = String::new();
let mut frame_diff_count = 0u64;
let mut hash_mismatch_count = 0u64;
let mut head_b = HeadB::empty();
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpCommand(_) => (),
ZmtpEvent::ZmtpMessage(msg) => {
msgc += 1;
match parse_zmtp_message(&msg) {
Ok(bm) => {
if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 {
frame_diff_count += 1;
if frame_diff_count < 1000 {
warn!(
"chn len {} frame diff {}",
bm.head_b.channels.len(),
msg.frames().len() - 2 * bm.head_b.channels.len()
);
}
}
}
if i3 < u32::MAX {
trace!("insert value frame {}", i3);
let i4 = 2 * i3 + 2;
if i4 >= msg.frames.len() as u32 {
} else {
let fr = &msg.frames[i4 as usize];
trace!("data len {}", fr.data.len());
let pulse_f64 = f64::from_be_bytes(fr.data[..].try_into().unwrap());
trace!("pulse_f64 {pulse_f64}");
let pulse = pulse_f64 as u64;
if false {
// TODO this next frame should be described somehow in the json header or?
info!("next val len {}", msg.frames[i4 as usize + 1].data.len());
let ts_a = u64::from_be_bytes(
msg.frames[i4 as usize + 1].data[0..8].try_into().unwrap(),
if bm.head_b_md5 != bm.head_a.hash {
hash_mismatch_count += 1;
// TODO keep logging data header changes, just suppress too frequent messages.
if hash_mismatch_count < 200 {
error!(
"Invalid bsread message: hash mismatch. dhcompr {:?}",
bm.head_a.dh_compression
);
let ts_b = u64::from_be_bytes(
msg.frames[i4 as usize + 1].data[8..16].try_into().unwrap(),
);
info!("ts_a {ts_a} ts_b {ts_b}");
}
let ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns;
if false {
let tsa = ts / (SEC * 10);
let tsb = ts % (SEC * 10);
vals1.push((tsa as i32, tsb as i32, pulse as i64));
}
if true {
let pulse_a = (pulse >> 14) as i64;
let pulse_b = (pulse & 0x3fff) as i32;
let ts_a = bm.head_a.global_timestamp.sec as i64;
let ts_b = bm.head_a.global_timestamp.ns as i32;
vals2.push((pulse_a, pulse_b, ts_a, ts_b));
}
if vals2.len() >= 200 {
let ts1 = Instant::now();
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..vals2.len() {
batch.append_statement(qu2.clone());
}
{
if bm.head_b_md5 != dh_md5_last {
head_b = bm.head_b.clone();
if dh_md5_last.is_empty() {
info!("data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash);
dh_md5_last = bm.head_b_md5.clone();
for chn in &head_b.channels {
info!("Setup writer for {}", chn.name);
self.setup_channel_writers(chn)?;
}
} else {
error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash);
dh_md5_last = bm.head_b_md5.clone();
// TODO
// Update only the changed channel writers.
// Flush buffers before creating new channel writer.
}
let _ = scy.batch(&batch, &vals2).await.err_conv()?;
vals2.clear();
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("Batch insert took {:6.2} ms", dt);
}
}
if self.do_pulse_id {
let mut i3 = u32::MAX;
for (i, ch) in head_b.channels.iter().enumerate() {
if ch.name == "SINEG01-RLLE-STA:MASTER-EVRPULSEID" {
i3 = i as u32;
}
}
// TODO need to know the facility!
if i3 < u32::MAX {
let i4 = 2 * i3 + 2;
if i4 >= msg.frames.len() as u32 {
} else {
let fr = &msg.frames[i4 as usize];
self.insert_pulse_map(fr, &msg, &bm).await?;
}
}
}
if msg.frames.len() < 2 + 2 * head_b.channels.len() {
// TODO count always, throttle log.
error!("not enough frames for data header");
}
let gts = bm.head_a.global_timestamp;
let ts = (gts.sec as u64) * SEC + gts.ns as u64;
let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0);
for i1 in 0..head_b.channels.len() {
let chn = &head_b.channels[i1];
let fr = &msg.frames[2 + 2 * i1];
let series = get_series_id(chn);
if !self.channel_writers.contains_key(&series) {}
if let Some(cw) = self.channel_writers.get_mut(&series) {
cw.write_msg(ts, pulse, fr)?.await?;
} else {
// TODO check for missing writers.
//warn!("no writer for {}", chn.name);
}
}
}
}
Err(e) => {
error!("{}", e);
for frame in &msg.frames {
info!("Frame: {:?}", frame);
Err(e) => {
error!("{}", e);
for frame in &msg.frames {
info!("Frame: {:?}", frame);
}
zmtp.dump_input_state();
zmtp.dump_conn_state();
}
}
}
},
Err(e) => {
error!("{}", e);
return Err(e);
}
},
Err(e) => {
error!("{}", e);
return Err(e);
}
i1 += 1;
if false && i1 > 10000 {
break;
}
if false && msgc > 10000 {
break;
}
}
i1 += 1;
if false && i1 > 10000 {
break;
}
if false && msgc > 10000 {
break;
}
Ok(())
}
Ok(())
fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> {
let series = get_series_id(chn);
match chn.ty.as_str() {
"float64" => match &chn.shape {
JsVal::Array(a) => {
if a.len() == 1 {
if let Some(n) = a[0].as_u64() {
if n == 1 {
if chn.encoding == "big" {
let cw =
ChannelWriterF64::new(series, self.common_queries.clone(), self.scy.clone());
self.channel_writers.insert(series, Box::new(cw));
} else {
warn!("No LE avail");
}
} else {
warn!("array f64 writer not yet available.")
}
}
} else {
warn!("f64 writer not yet available for shape {:?}", a)
}
}
s => {
warn!("setup_channel_writers shape not supported {:?}", s);
}
},
k => {
warn!("setup_channel_writers data type not supported {:?}", k);
}
}
Ok(())
}
async fn insert_pulse_map(&mut self, fr: &ZmtpFrame, msg: &ZmtpMessage, bm: &BsreadMessage) -> Result<(), Error> {
trace!("data len {}", fr.data.len());
// TODO take pulse-id also from main header and compare.
let pulse_f64 = f64::from_be_bytes(fr.data[..].try_into().unwrap());
trace!("pulse_f64 {pulse_f64}");
let pulse = pulse_f64 as u64;
if false {
let i4 = 3;
// TODO this next frame should be described somehow in the json header or?
info!("next val len {}", msg.frames[i4 as usize + 1].data.len());
let ts_a = u64::from_be_bytes(msg.frames[i4 as usize + 1].data[0..8].try_into().unwrap());
let ts_b = u64::from_be_bytes(msg.frames[i4 as usize + 1].data[8..16].try_into().unwrap());
info!("ts_a {ts_a} ts_b {ts_b}");
}
let _ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns;
if true {
let pulse_a = (pulse >> 14) as i64;
let pulse_b = (pulse & 0x3fff) as i32;
let ts_a = bm.head_a.global_timestamp.sec as i64;
let ts_b = bm.head_a.global_timestamp.ns as i32;
self.tmp_vals_pulse_map.push((pulse_a, pulse_b, ts_a, ts_b));
}
if self.tmp_vals_pulse_map.len() >= 200 {
let ts1 = Instant::now();
// TODO use facility, channel_name, ... as partition key.
self.scy
.execute(&self.common_queries.qu1, (1i32, self.tmp_vals_pulse_map[0].0))
.await
.err_conv()?;
let mut batch = Batch::new(BatchType::Unlogged);
for _ in 0..self.tmp_vals_pulse_map.len() {
batch.append_statement(self.common_queries.qu2.clone());
}
let _ = self.scy.batch(&batch, &self.tmp_vals_pulse_map).await.err_conv()?;
let nn = self.tmp_vals_pulse_map.len();
self.tmp_vals_pulse_map.clear();
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("insert {} items in {:6.2} ms", nn, dt);
}
Ok(())
}
}
pub async fn zmtp_client(scylla: &str, addr: &str, rcvbuf: Option<u32>, do_pulse_id: bool) -> Result<(), Error> {
let mut client = BsreadClient::new(scylla.into(), addr.into(), do_pulse_id, rcvbuf).await?;
client.run().await
}
fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {
@@ -224,6 +391,7 @@ fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {
Ok(())
}
#[derive(Clone, Debug)]
enum ConnState {
InitSend,
InitRecv1,
@@ -260,6 +428,18 @@ pub enum SocketType {
PULL,
}
#[derive(Debug)]
enum InpState {
Empty,
Netbuf(usize, usize, usize),
}
impl Default for InpState {
fn default() -> Self {
InpState::Empty
}
}
pub struct Zmtp {
done: bool,
complete: bool,
@@ -277,6 +457,10 @@ pub struct Zmtp {
inp_eof: bool,
data_tx: Sender<u32>,
data_rx: Receiver<u32>,
input_state: Vec<InpState>,
input_state_ix: usize,
conn_state_log: Vec<ConnState>,
conn_state_log_ix: usize,
}
impl Zmtp {
@@ -299,6 +483,10 @@ impl Zmtp {
inp_eof: false,
data_tx: tx,
data_rx: rx,
input_state: vec![0; 64].iter().map(|_| InpState::default()).collect(),
input_state_ix: 0,
conn_state_log: vec![0; 64].iter().map(|_| ConnState::InitSend).collect(),
conn_state_log_ix: 0,
}
}
@@ -310,8 +498,41 @@ impl Zmtp {
(&mut self.conn, self.buf.read_buf_for_fill())
}
fn outbuf_conn(&mut self) -> (&[u8], &mut TcpStream) {
(self.outbuf.data(), &mut self.conn)
fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) {
(&mut self.conn, self.outbuf.data())
}
fn record_input_state(&mut self) {
let st = self.buf.state();
self.input_state[self.input_state_ix] = InpState::Netbuf(st.0, st.1, self.buf.cap() - st.1);
self.input_state_ix = (1 + self.input_state_ix) % self.input_state.len();
}
fn record_conn_state(&mut self) {
self.conn_state_log[self.conn_state_log_ix] = self.conn_state.clone();
self.conn_state_log_ix = (1 + self.conn_state_log_ix) % self.conn_state_log.len();
}
fn dump_input_state(&self) {
info!("---------------------------------------------------------");
info!("INPUT STATE DUMP");
let mut i = self.input_state_ix;
for _ in 0..self.input_state.len() {
info!("{i:4} {:?}", self.input_state[i]);
i = (1 + i) % self.input_state.len();
}
info!("---------------------------------------------------------");
}
fn dump_conn_state(&self) {
info!("---------------------------------------------------------");
info!("CONN STATE DUMP");
let mut i = self.conn_state_log_ix;
for _ in 0..self.conn_state_log.len() {
info!("{i:4} {:?}", self.conn_state_log[i]);
i = (1 + i) % self.conn_state_log.len();
}
info!("---------------------------------------------------------");
}
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Option<Poll<Result<ZmtpEvent, Error>>> {
@@ -334,13 +555,13 @@ impl Zmtp {
let write: Int<Result<(), _>> = if item_count > 0 {
Int::NoWork
} else if self.outbuf.len() > 0 {
let (b, w) = self.outbuf_conn();
let (w, b) = self.outbuf_conn();
pin_mut!(w);
match w.poll_write(cx, b) {
Ready(k) => match k {
Ok(k) => match self.outbuf.adv(k) {
Ok(()) => {
info!("sent {} bytes", k);
trace!("sent {} bytes", k);
self.outbuf.rewind_if_needed();
Int::Empty
}
@@ -362,7 +583,7 @@ impl Zmtp {
match write {
Int::NoWork => {}
_ => {
info!("write result: {:?} {}", write, self.outbuf.len());
trace!("write result: {:?} {}", write, self.outbuf.len());
}
}
item_count += write.item_count();
@@ -378,6 +599,7 @@ impl Zmtp {
));
Int::Item(Err(e))
} else if self.buf.len() < self.conn_state.need_min() {
self.record_input_state();
let (w, mut rbuf) = self.inpbuf_conn();
pin_mut!(w);
match w.poll_read(cx, &mut rbuf) {
@@ -387,6 +609,7 @@ impl Zmtp {
if nf == 0 {
info!("EOF");
self.inp_eof = true;
self.record_input_state();
Int::Done
} else {
trace!("received {} bytes", rbuf.filled().len());
@@ -396,8 +619,14 @@ impl Zmtp {
trace!("got data {:?}", &rbuf.filled()[0..t]);
}
match self.buf.wadv(nf) {
Ok(()) => Int::Empty,
Err(e) => Int::Item(Err(e)),
Ok(()) => {
self.record_input_state();
Int::Empty
}
Err(e) => {
error!("netbuf wadv fail nf {nf}");
Int::Item(Err(e))
}
}
}
}
@@ -475,6 +704,7 @@ impl Zmtp {
}
fn parse_item(&mut self) -> Result<Option<ZmtpEvent>, Error> {
self.record_conn_state();
match self.conn_state {
ConnState::InitSend => {
info!("parse_item InitSend");
@@ -561,7 +791,19 @@ impl Zmtp {
let has_more = flags & 0x01 != 0;
let long_size = flags & 0x02 != 0;
let is_command = flags & 0x04 != 0;
self.has_more = has_more;
if is_command {
if has_more {
error!("received command with has_more flag (error in peer)");
}
if self.has_more {
debug!(
"received command frame while in multipart, having {}",
self.frames.len()
);
}
} else {
self.has_more = has_more;
}
self.is_command = is_command;
trace!(
"parse_item ReadFrameFlags has_more {} long_size {} is_command {}",
@@ -580,7 +822,8 @@ impl Zmtp {
self.msglen = self.buf.read_u8()? as usize;
trace!("parse_item ReadFrameShort msglen {}", self.msglen);
self.conn_state = ConnState::ReadFrameBody(self.msglen);
if self.msglen > 1024 * 64 {
if self.msglen > self.buf.cap() / 2 {
error!("msglen {} too large for this client", self.msglen);
return Err(Error::with_msg_no_trace(format!(
"larger msglen not yet supported {}",
self.msglen,
@@ -592,7 +835,8 @@ impl Zmtp {
self.msglen = self.buf.read_u64()? as usize;
trace!("parse_item ReadFrameShort msglen {}", self.msglen);
self.conn_state = ConnState::ReadFrameBody(self.msglen);
if self.msglen > 1024 * 64 {
if self.msglen > self.buf.cap() / 2 {
error!("msglen {} too large for this client", self.msglen);
return Err(Error::with_msg_no_trace(format!(
"larger msglen not yet supported {}",
self.msglen,
@@ -603,15 +847,14 @@ impl Zmtp {
ConnState::ReadFrameBody(msglen) => {
// TODO do not copy here...
let data = self.buf.read_bytes(msglen)?.to_vec();
self.conn_state = ConnState::ReadFrameFlags;
self.msglen = 0;
if false {
let n1 = data.len().min(256);
let s = String::from_utf8_lossy(&data[..n1]);
trace!("parse_item ReadFrameBody msglen {} string {}", msglen, s);
}
self.conn_state = ConnState::ReadFrameFlags;
if self.is_command {
info!("command data {:?}", data);
if data.len() >= 7 {
if &data[0..5] == b"\x04PING" {
if data.len() > 32 {
@@ -620,9 +863,9 @@ impl Zmtp {
} else {
let ttl = u16::from_be_bytes(data[5..7].try_into().unwrap());
let ctx = &data[7..];
info!("GOT PING ttl {ttl} ctx.len {}", ctx.len());
debug!("received PING ttl {ttl} ctx {:?}", &ctx);
if self.outbuf.wcap() < data.len() {
error!("can not respond with PONG because output buffer full");
warn!("can not respond with PONG because output buffer full");
} else {
let size = 5 + ctx.len() as u8;
self.outbuf.put_u8(0x04).unwrap();
@@ -631,7 +874,7 @@ impl Zmtp {
self.outbuf.put_slice(ctx).unwrap();
}
if self.outbuf.wcap() < 32 {
error!("can not send my PING because output buffer full");
warn!("can not send my PING because output buffer full");
} else {
let ctx = b"daqingest";
let size = 5 + ctx.len() as u8;
@@ -649,7 +892,6 @@ impl Zmtp {
is_command: self.is_command,
data,
};
self.frames.clear();
Ok(Some(ZmtpEvent::ZmtpCommand(g)))
} else {
let g = ZmtpFrame {
@@ -665,6 +907,21 @@ impl Zmtp {
let g = ZmtpMessage {
frames: mem::replace(&mut self.frames, vec![]),
};
if false && g.frames.len() != 118 {
info!("EMIT {} frames", g.frames.len());
if let Some(fr) = g.frames.get(0) {
let d = fr.data();
let nn = d.len().min(16);
let s = String::from_utf8_lossy(&d[..nn]);
info!("DATA 0 {} {:?} {:?}", nn, &d[..nn], s);
}
if let Some(fr) = g.frames.get(1) {
let d = fr.data();
let nn = d.len().min(16);
let s = String::from_utf8_lossy(&d[..nn]);
info!("DATA 1 {} {:?} {:?}", nn, &d[..nn], s);
}
}
Ok(Some(ZmtpEvent::ZmtpMessage(g)))
}
}
@@ -807,8 +1064,9 @@ impl DummyData {
channels: vec![ChannelDesc {
name: "TESTCHAN".into(),
ty: "int64".into(),
shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]),
shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1i32))]),
encoding: "little".into(),
compression: todo!(),
}],
};
let hb = serde_json::to_vec(&head_b).unwrap();
@@ -828,6 +1086,7 @@ impl DummyData {
sec: self.ts / SEC,
ns: self.ts % SEC,
},
dh_compression: None,
};
// TODO write directly to output buffer.
let ha = serde_json::to_vec(&head_a).unwrap();