Feature-gate bsread logic

This commit is contained in:
Dominik Werder
2023-09-01 09:47:03 +02:00
parent ca7d949fa6
commit 0d2fc7862f
32 changed files with 831 additions and 261 deletions

View File

@@ -1,237 +0,0 @@
use crate::zmtp::zmtpproto::ZmtpMessage;
use err::Error;
#[allow(unused)]
use log::*;
use netpod::AggKind;
use netpod::ByteOrder;
use netpod::ScalarType;
use netpod::Shape;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsVal;
// TODO
pub struct ParseError {
pub err: Error,
pub msg: ZmtpMessage,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GlobalTimestamp {
pub sec: u64,
pub ns: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelDesc {
pub name: String,
#[serde(rename = "type", default = "bsread_type_default")]
pub ty: String,
#[serde(default = "bsread_shape_default")]
pub shape: JsVal,
#[serde(default = "bsread_encoding_default")]
pub encoding: String,
pub compression: Option<String>,
}
fn bsread_type_default() -> String {
"float64".into()
}
fn bsread_shape_default() -> JsVal {
JsVal::Array(Vec::new())
}
fn bsread_encoding_default() -> String {
"little".into()
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum CompressionKind {
Lz4,
BitshuffleLz4,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelDescDecoded {
pub name: String,
pub scalar_type: ScalarType,
pub shape: Shape,
pub byte_order: ByteOrder,
pub compression: Option<CompressionKind>,
pub agg_kind: AggKind,
}
impl TryFrom<&ChannelDesc> for ChannelDescDecoded {
type Error = Error;
fn try_from(cd: &ChannelDesc) -> Result<Self, Self::Error> {
let ret = ChannelDescDecoded {
name: cd.name.clone(),
scalar_type: ScalarType::from_bsread_str(&cd.ty)?,
shape: Shape::from_bsread_jsval(&cd.shape)?,
compression: match &cd.compression {
None => None,
Some(k) => match k.as_str() {
"none" => None,
"lz4" => Some(CompressionKind::Lz4),
"bitshuffle_lz4" => Some(CompressionKind::BitshuffleLz4),
_ => {
return Err(Error::with_msg_no_trace(format!(
"can not understand bsread compression kind: {k:?}"
)))
}
},
},
byte_order: match cd.encoding.as_str() {
"little" => ByteOrder::Little,
"big" => ByteOrder::Big,
_ => ByteOrder::Little,
},
agg_kind: AggKind::Plain,
};
Ok(ret)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HeadA {
pub htype: String,
pub hash: String,
pub pulse_id: serde_json::Number,
pub global_timestamp: GlobalTimestamp,
pub dh_compression: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HeadB {
pub htype: String,
pub channels: Vec<ChannelDesc>,
}
impl HeadB {
pub fn empty() -> Self {
Self {
htype: String::new(),
channels: vec![],
}
}
}
#[derive(Clone, Debug)]
pub struct BsreadMessage {
pub head_a: HeadA,
pub head_b: HeadB,
pub head_b_md5: String,
}
pub struct Parser {
tmp1: Vec<u8>,
}
impl Parser {
pub fn new() -> Self {
Self { tmp1: vec![0; 1024] }
}
pub fn parse_zmtp_message(&mut self, msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
if msg.frames().len() < 2 {
return Err(Error::with_msg_no_trace("not enough frames for bsread"));
}
let head_a: HeadA =
serde_json::from_slice(&msg.frames()[0].data()).map_err(|e| format!("main header parse error {e:?}"))?;
let head_b_md5 = {
use md5::Digest;
let mut hasher = md5::Md5::new();
hasher.update(msg.frames()[1].data());
let h = hasher.finalize();
hex::encode(&h)
};
let dhdecompr = match &head_a.dh_compression {
Some(m) => match m.as_str() {
"none" => msg.frames()[1].data(),
"lz4" => {
let inp = msg.frames()[1].data();
let nd = u32::from_be_bytes(inp[0..4].try_into()?) as usize;
loop {
let g = self.tmp1.len();
if g >= nd {
break;
}
if g > 1024 * 512 {
return Err(Error::with_public_msg("decomp buffer too large"));
}
let u = self.tmp1.len() * 2;
info!("resize buffer to {u}");
self.tmp1.resize(u, 0);
}
match bitshuffle::lz4_decompress(&inp[4..], &mut self.tmp1) {
Ok(_) => {}
Err(e) => {
// TODO throttle log output
error!("lz4 error {e:?}");
return Err(Error::with_public_msg(format!("lz4 error {e:?}")));
}
}
&self.tmp1[..nd]
}
"bitshuffle_lz4" => {
let inp = msg.frames()[1].data();
let nd = u64::from_be_bytes(inp[0..8].try_into()?) as usize;
let bs = u32::from_be_bytes(inp[8..12].try_into()?) as usize;
loop {
let g = self.tmp1.len();
if g >= nd {
break;
}
if g > 1024 * 512 {
return Err(Error::with_public_msg("decomp buffer too large"));
}
let u = self.tmp1.len() * 2;
info!("resize buffer to {u}");
self.tmp1.resize(u, 0);
}
match bitshuffle::bitshuffle_decompress(&inp[12..], &mut self.tmp1, nd, 1, bs) {
Ok(_) => {}
Err(e) => {
// TODO throttle log output
error!("bitshuffle_lz4 error {e:?}");
return Err(Error::with_public_msg(format!("bitshuffle_lz4 error {e:?}")));
}
}
&self.tmp1[..nd]
}
_ => msg.frames()[1].data(),
},
None => msg.frames()[1].data(),
};
let head_b: HeadB = serde_json::from_slice(dhdecompr).map_err(|e| format!("data header parse error: {e:?}"))?;
if false && msg.frames().len() == head_b.channels.len() + 3 {
for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) {
let sty = ScalarType::from_bsread_str(ch.ty.as_str())?;
let bo = ByteOrder::from_bsread_str(&ch.encoding)?;
let shape = Shape::from_bsread_jsval(&ch.shape)?;
match sty {
ScalarType::I64 => match &bo {
ByteOrder::Little => match &shape {
Shape::Scalar => {
assert_eq!(fr.data().len(), 8);
let _v = i64::from_le_bytes(fr.data().try_into()?);
}
Shape::Wave(_) => {}
Shape::Image(_, _) => {}
},
_ => {}
},
_ => {}
}
}
}
let ret = BsreadMessage {
head_a,
head_b,
head_b_md5,
};
Ok(ret)
}
}

View File

@@ -1,434 +0,0 @@
use crate::bsread::BsreadMessage;
use crate::bsread::ChannelDescDecoded;
use crate::bsread::HeadB;
use crate::bsread::Parser;
use crate::ca::IngestCommons;
use crate::zmtp::zmtpproto;
use crate::zmtp::zmtpproto::SocketType;
use crate::zmtp::zmtpproto::Zmtp;
use crate::zmtp::zmtpproto::ZmtpFrame;
use crate::zmtp::zmtpproto::ZmtpMessage;
use crate::zmtp::ZmtpClientOpts;
use crate::zmtp::ZmtpEvent;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TS_MSP_GRID_SPACING;
use netpod::TS_MSP_GRID_UNIT;
use scywr::iteminsertqueue::ArrayValue;
use scywr::iteminsertqueue::CommonInsertItemQueueSender;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::InsertItem;
use scywr::iteminsertqueue::QueryItem;
use scywr::session::ScySession;
use series::SeriesId;
use stats::CheckEvery;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
#[derive(Debug, ThisError)]
pub enum Error {
#[error("InsertQueueSenderMissing")]
InsertQueueSenderMissing,
#[error("AsyncChannelSend")]
AsyncChannelSend,
#[error("IO({0})")]
IO(#[from] io::Error),
#[error("Msg({0})")]
Msg(String),
#[error("ZmtpProto({0})")]
ZmtpProto(#[from] zmtpproto::Error),
#[error("BadSlice")]
BadSlice,
}
impl<T> From<async_channel::SendError<T>> for Error {
fn from(value: async_channel::SendError<T>) -> Self {
Self::AsyncChannelSend
}
}
impl From<err::Error> for Error {
fn from(value: err::Error) -> Self {
Self::Msg(value.to_string())
}
}
pub struct BsreadClient {
opts: ZmtpClientOpts,
source_addr: SocketAddr,
do_pulse_id: bool,
rcvbuf: Option<usize>,
print_stats: CheckEvery,
parser: Parser,
ingest_commons: Arc<IngestCommons>,
insqtx: CommonInsertItemQueueSender,
tmp_evtset_series: Option<SeriesId>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
inserted_in_ts_msp_count: u32,
ts_msp_last: u64,
ts_msp_grid_last: u32,
}
impl BsreadClient {
pub async fn new(
opts: ZmtpClientOpts,
ingest_commons: Arc<IngestCommons>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
) -> Result<Self, Error> {
let insqtx = ingest_commons
.insert_item_queue
.sender()
.ok_or_else(|| Error::InsertQueueSenderMissing)?;
let ret = Self {
source_addr: opts.addr,
do_pulse_id: opts.do_pulse_id,
rcvbuf: opts.rcvbuf,
opts,
print_stats: CheckEvery::new(Duration::from_millis(2000)),
parser: Parser::new(),
ingest_commons,
insqtx,
tmp_evtset_series: None,
channel_info_query_tx,
inserted_in_ts_msp_count: 0,
ts_msp_last: 0,
ts_msp_grid_last: 0,
};
Ok(ret)
}
async fn test_evtset_extract(
&mut self,
msg: &ZmtpMessage,
bm: &BsreadMessage,
ts: u64,
pulse: u64,
) -> Result<(), Error> {
let chname = "SAR-CVME-TIFALL5:EvtSet";
// Test the bool set write
let mut i3 = usize::MAX;
for (i, ch) in bm.head_b.channels.iter().enumerate() {
if ch.name == chname {
i3 = i;
break;
}
}
if i3 != usize::MAX {
if let Some(fr) = msg.frames.get(2 + 2 * i3) {
debug!("try to extract bools {} {}", fr.msglen, fr.data.len());
let setlen = fr.data.len();
debug!("flags {:?}", &fr.data[..setlen.min(16)]);
let evtset: Vec<_> = fr.data.iter().map(|&x| x != 0).collect();
let scalar_type = ScalarType::BOOL;
let shape = Shape::Wave(256);
if self.tmp_evtset_series.is_none() {
debug!("try to fetch series id");
let (tx, rx) = async_channel::bounded(8);
let item = ChannelInfoQuery {
backend: self.opts.backend.clone(),
channel: chname.into(),
scalar_type: ScalarType::BOOL.to_scylla_i32(),
shape_dims: Shape::Wave(setlen as _).to_scylla_vec(),
tx,
};
self.channel_info_query_tx.send(item).await?;
match rx.recv().await {
Ok(res) => match res {
Ok(res) => {
debug!("got series id: {res:?}");
self.tmp_evtset_series = Some(res.into_inner());
}
Err(e) => {
error!("{e}");
}
},
Err(e) => {
error!("{e}");
}
}
}
if let Some(series) = self.tmp_evtset_series.clone() {
let (ts_msp, ts_msp_changed) =
if self.inserted_in_ts_msp_count >= 6400 || self.ts_msp_last + HOUR <= ts {
let div = SEC * 10;
let ts_msp = ts / div * div;
if ts_msp == self.ts_msp_last {
(ts_msp, false)
} else {
self.ts_msp_last = ts_msp;
self.inserted_in_ts_msp_count = 1;
(ts_msp, true)
}
} else {
self.inserted_in_ts_msp_count += 1;
(self.ts_msp_last, false)
};
let ts_lsp = ts - ts_msp;
let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32;
let ts_msp_grid = if self.ts_msp_grid_last != ts_msp_grid {
self.ts_msp_grid_last = ts_msp_grid;
Some(ts_msp_grid)
} else {
None
};
let item = InsertItem {
series: series.into(),
ts_msp,
ts_lsp,
msp_bump: ts_msp_changed,
ts_msp_grid,
pulse,
scalar_type,
shape,
val: DataValue::Array(ArrayValue::Bool(evtset)),
};
let item = QueryItem::Insert(item);
match self.insqtx.send(item).await {
Ok(_) => {
debug!("item send ok pulse {}", pulse);
}
Err(e) => {
error!("can not send item {:?}", e.0);
}
}
} else {
error!("still no series id");
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
Ok(())
}
pub async fn run(&mut self) -> Result<(), Error> {
let mut conn = tokio::net::TcpStream::connect(&self.source_addr).await?;
if let Some(v) = self.rcvbuf {
crate::linuxhelper::set_rcv_sock_opts(&mut conn, v as u32)?;
}
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0u64;
let mut msgc = 0u64;
let mut dh_md5_last = String::new();
let mut frame_diff_count = 0u64;
let mut hash_mismatch_count = 0u64;
let mut head_b = HeadB::empty();
let mut bytes_payload = 0u64;
let mut rows_inserted = 0u32;
let mut time_spent_inserting = Duration::from_millis(0);
let mut msg_dt_ema = stats::EMA::with_k(0.01);
let mut msg_ts_last = Instant::now();
while let Some(item) = zmtp.next().await {
let tsnow = Instant::now();
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpCommand(_) => (),
ZmtpEvent::ZmtpMessage(msg) => {
msgc += 1;
{
let dt = tsnow.duration_since(msg_ts_last);
msg_dt_ema.update(dt.as_secs_f32());
msg_ts_last = tsnow;
}
match self.parser.parse_zmtp_message(&msg) {
Ok(bm) => {
if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 {
frame_diff_count += 1;
if frame_diff_count < 1000 {
warn!(
"chn len {} frame diff {}",
bm.head_b.channels.len(),
msg.frames().len() - 2 * bm.head_b.channels.len()
);
}
}
if bm.head_b_md5 != bm.head_a.hash {
hash_mismatch_count += 1;
// TODO keep logging data header changes, just suppress too frequent messages.
if hash_mismatch_count < 200 {
error!(
"Invalid bsread message: hash mismatch. dhcompr {:?}",
bm.head_a.dh_compression
);
}
}
{
if bm.head_b_md5 != dh_md5_last {
// TODO header changed, don't support this at the moment.
head_b = bm.head_b.clone();
if dh_md5_last.is_empty() {
debug!("data header hash {}", bm.head_b_md5);
dh_md5_last = bm.head_b_md5.clone();
// TODO must fetch series ids on-demand.
// For the time being, assume that channel list never changes, but WARN!
/*let scy = self.scy.clone();
for chn in &head_b.channels {
info!("Setup writer for {}", chn.name);
let cd: ChannelDescDecoded = chn.try_into()?;
match self.setup_channel_writers(&scy, &cd).await {
Ok(_) => {}
Err(e) => {
warn!("can not set up writer for {} {e:?}", chn.name);
}
}
}*/
} else {
error!("TODO changed data header hash {}", bm.head_b_md5);
dh_md5_last = bm.head_b_md5.clone();
// TODO
// Update only the changed channel writers.
// Flush buffers before creating new channel writer.
}
}
}
if self.do_pulse_id {
let nframes = msg.frames().len();
debug!("nframes {nframes}");
let mut i3 = u32::MAX;
for (i, ch) in head_b.channels.iter().enumerate() {
if ch.name == "SINEG01-RLLE-STA:MASTER-EVRPULSEID"
|| ch.name == "SAR-CVME-TIFALL4:EvtSet"
{
i3 = i as u32;
}
}
// TODO need to know the facility!
if i3 < u32::MAX {
let i4 = 2 * i3 + 2;
if let Some(fr) = msg.frames.get(i4 as usize) {
self.insert_pulse_map(fr, &msg, &bm).await?;
}
}
}
if msg.frames.len() < 2 + 2 * head_b.channels.len() {
// TODO count always, throttle log.
error!("not enough frames for data header");
}
let gts = &bm.head_a.global_timestamp;
let ts = (gts.sec as u64) * SEC + gts.ns as u64;
let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0);
debug!("ts {ts:20} pulse{pulse:20}");
// TODO limit warn rate
if pulse != 0 && (pulse < 14781000000 || pulse > 49000000000) {
// TODO limit log rate
warn!("pulse out of range {} addr {}", pulse, self.source_addr);
}
if pulse % 1000000 != ts % 1000000 {
warn!(
"pulse-ts mismatch ts {} pulse {} addr {}",
ts, pulse, self.source_addr
);
}
self.test_evtset_extract(&msg, &bm, ts, pulse).await?;
let nch = head_b.channels.len();
let nmax = self.opts.process_channel_count_limit.unwrap_or(4000);
let nlim = if nch > nmax {
// TODO count this event
4000
} else {
nch
};
for i1 in 0..nlim {
// TODO skip decoding if header unchanged.
let chn = &head_b.channels[i1];
let chd: ChannelDescDecoded = chn.try_into()?;
let fr = &msg.frames[2 + 2 * i1];
// TODO store the channel information together with series in struct.
}
}
Err(e) => {
error!("{}", e);
for frame in &msg.frames {
info!("Frame: {:?}", frame);
}
zmtp.dump_input_state();
zmtp.dump_conn_state();
}
}
}
},
Err(e) => {
error!("{}", e);
return Err(e)?;
}
}
i1 += 1;
if false && i1 > 10000 {
break;
}
if false && msgc > 10000 {
break;
}
let dt = self.print_stats.is_elapsed_now();
if dt > 0. {
let nrs = rows_inserted as f32 / dt;
let dt_ins = time_spent_inserting.as_secs_f32() * 1e3;
let r = bytes_payload as f32 / dt * 1e-3;
info!("insert {nrs:.0} 1/s dt-ins {dt_ins:4.0} ms payload {r:8.3} kB/s");
rows_inserted = 0;
time_spent_inserting = Duration::from_millis(0);
bytes_payload = 0;
if msg_dt_ema.update_count() > 100 {
let ema = msg_dt_ema.ema();
if ema < 0.005 {
let emv = msg_dt_ema.emv().sqrt();
warn!("MSG FREQ {} {:9.5} {:9.5}", self.source_addr, ema, emv);
}
}
}
}
Ok(())
}
async fn setup_channel_writers(&mut self, scy: &ScySession, cd: &ChannelDescDecoded) -> Result<(), Error> {
let has_comp = cd.compression.is_some();
if has_comp {
warn!("Compression not yet supported [{}]", cd.name);
return Ok(());
}
let shape_dims = cd.shape.to_scylla_vec();
Ok(())
}
async fn insert_pulse_map(&mut self, fr: &ZmtpFrame, msg: &ZmtpMessage, bm: &BsreadMessage) -> Result<(), Error> {
debug!("data len {}", fr.data.len());
// TODO take pulse-id also from main header and compare.
let pulse_f64 = f64::from_be_bytes(fr.data[..8].try_into().map_err(|_| Error::BadSlice)?);
debug!("pulse_f64 {pulse_f64}");
let pulse = pulse_f64 as u64;
if false {
let i4 = 3;
// TODO this next frame should be described somehow in the json header or?
debug!("next val len {}", msg.frames[i4 as usize + 1].data.len());
let ts_a = u64::from_be_bytes(
msg.frames[i4 as usize + 1].data[0..8]
.try_into()
.map_err(|_| Error::BadSlice)?,
);
let ts_b = u64::from_be_bytes(
msg.frames[i4 as usize + 1].data[8..16]
.try_into()
.map_err(|_| Error::BadSlice)?,
);
debug!("ts_a {ts_a} ts_b {ts_b}");
}
let ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns;
/*let pulse_a = (pulse >> 14) as i64;
let pulse_b = (pulse & 0x3fff) as i32;
let ts_a = bm.head_a.global_timestamp.sec as i64;
let ts_b = bm.head_a.global_timestamp.ns as i32;*/
debug!("ts {ts:20} pulse {pulse:20}");
Ok(())
}
}

View File

@@ -7,7 +7,6 @@ pub mod search;
use crate::ca::connset::CaConnSet;
use crate::metrics::ExtraInsertsConf;
use crate::rt::TokMx;
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use log::*;
@@ -16,7 +15,6 @@ use scywr::insertworker::InsertWorkerOpts;
use scywr::iteminsertqueue::CommonInsertItemQueue;
use scywr::store::DataStore;
use stats::CaConnStatsAgg;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::AtomicU64;
@@ -146,5 +144,5 @@ where
fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) {
crate::ca::SIGINT.store(1, Ordering::Release);
let _ = crate::linuxhelper::unset_signal_handler(libc::SIGINT);
let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT);
}

View File

@@ -4,7 +4,6 @@ use super::proto::CaMsg;
use super::proto::CaMsgTy;
use super::proto::CaProto;
use super::ExtraInsertsConf;
use crate::bsread::ChannelDescDecoded;
use crate::ca::proto::CreateChan;
use crate::ca::proto::EventAdd;
use crate::timebin::ConnTimeBin;
@@ -59,6 +58,24 @@ use std::time::SystemTime;
use taskrun::tokio;
use tokio::net::TcpStream;
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[derive(Clone, Debug, Serialize)]
pub enum ChannelConnectedInfo {
Disconnected,
@@ -440,6 +457,7 @@ pub struct CaConn {
Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
>,
time_binners: BTreeMap<Cid, ConnTimeBin>,
ts_earliest_warn_poll_slow: Instant,
}
impl CaConn {
@@ -493,6 +511,7 @@ impl CaConn {
series_lookup_schedule: BTreeMap::new(),
series_lookup_futs: FuturesUnordered::new(),
time_binners: BTreeMap::new(),
ts_earliest_warn_poll_slow: Instant::now(),
}
}
@@ -988,15 +1007,6 @@ impl CaConn {
*ch_s = ChannelState::Created(series, created_state);
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
let _cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::Little,
compression: None,
};
cx.waker().wake_by_ref();
Ok(())
}
@@ -1012,9 +1022,17 @@ impl CaConn {
entry.remove();
continue;
}
Err(e) => {
*entry.get_mut() = e.into_inner();
}
Err(e) => match e {
async_channel::TrySendError::Full(_) => {
warn!("series lookup channel full");
*entry.get_mut() = e.into_inner();
}
async_channel::TrySendError::Closed(_) => {
warn!("series lookup channel closed");
// *entry.get_mut() = e.into_inner();
entry.remove();
}
},
}
} else {
()
@@ -1473,24 +1491,15 @@ impl CaConn {
};
*ch_s = ChannelState::FetchingSeriesId(created_state);
// TODO handle error in different way. Should most likely not abort.
let _cd = ChannelDescDecoded {
name: name.clone(),
scalar_type: scalar_type.clone(),
shape: shape.clone(),
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::Little,
compression: None,
};
let (tx, rx) = async_channel::bounded(1);
let query = ChannelInfoQuery {
backend: self.backend.clone(),
channel: name.clone(),
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx,
};
if !self.series_lookup_schedule.contains_key(&cid) {
let (tx, rx) = async_channel::bounded(1);
let query = ChannelInfoQuery {
backend: self.backend.clone(),
channel: name.clone(),
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx,
};
self.series_lookup_schedule.insert(cid, query);
let fut = async move {
match rx.recv().await {
@@ -1749,7 +1758,9 @@ impl CaConn {
Self::apply_channel_ops_with_res(res)
}
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
self.emit_series_lookup(cx);
self.poll_channel_info_results(cx);
let this = self.get_mut();
for (_, tb) in this.time_binners.iter_mut() {
let iiq = &mut this.insert_item_queue;
@@ -1769,12 +1780,12 @@ impl Stream for CaConn {
if self.channel_set_ops.flag.load(atomic::Ordering::Acquire) > 0 {
self.apply_channel_ops();
}
self.emit_series_lookup(cx);
self.poll_channel_info_results(cx);
match self.ticker.poll_unpin(cx) {
Ready(()) => {
match self.as_mut().handle_own_ticker_tick(cx) {
Ok(_) => {}
Ok(_) => {
let _ = self.ticker.poll_unpin(cx);
}
Err(e) => {
error!("{e}");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
@@ -1782,7 +1793,8 @@ impl Stream for CaConn {
}
}
self.ticker = Self::new_self_ticker();
cx.waker().wake_by_ref();
let _ = self.ticker.poll_unpin(cx);
// cx.waker().wake_by_ref();
}
Pending => {}
}
@@ -1823,8 +1835,8 @@ impl Stream for CaConn {
}
if self.is_shutdown() {
if self.insert_item_queue.len() == 0 {
trace!("no more items to flush");
if i1 >= 10 {
trace!("no more items to flush");
break Ready(Ok(()));
}
} else {
@@ -1863,9 +1875,14 @@ impl Stream for CaConn {
}
};
{
let dt = poll_ts1.elapsed();
let tsnow = Instant::now();
let dt = tsnow.saturating_duration_since(poll_ts1);
if dt > Duration::from_millis(40) {
warn!("slow poll: {}ms", dt.as_secs_f32() * 1e3);
if poll_ts1 > self.ts_earliest_warn_poll_slow {
// TODO factor out the rate limit logic in reusable type
self.ts_earliest_warn_poll_slow = tsnow + Duration::from_millis(2000);
warn!("slow poll: {}ms", dt.as_secs_f32() * 1e3);
}
}
}
ret

View File

@@ -1,5 +1,5 @@
use crate::linuxhelper::local_hostname;
use err::Error;
use ingest_linux::net::local_hostname;
use netpod::log::*;
use netpod::Database;
use netpod::ScyllaConfig;

View File

@@ -1,5 +1,3 @@
pub mod bsread;
pub mod bsreadclient;
pub mod ca;
pub mod conf;
pub mod daemon_common;
@@ -13,4 +11,3 @@ pub mod rt;
#[cfg(test)]
pub mod test;
pub mod timebin;
pub mod zmtp;

View File

@@ -1,120 +1 @@
use err::Error;
use log::*;
use std::ffi::CStr;
use std::mem::MaybeUninit;
use taskrun::tokio;
use tokio::net::TcpStream;
pub fn local_hostname() -> String {
let mut buf = vec![0u8; 128];
let hostname = unsafe {
let ec = libc::gethostname(buf.as_mut_ptr() as _, buf.len() - 2);
if ec != 0 {
panic!();
}
let hostname = CStr::from_ptr(&buf[0] as *const _ as _);
hostname.to_str().unwrap()
};
hostname.into()
}
#[test]
fn test_get_local_hostname() {
assert_ne!(local_hostname().len(), 0);
}
pub fn set_signal_handler(
signum: libc::c_int,
cb: fn(libc::c_int, *const libc::siginfo_t, *const libc::c_void) -> (),
) -> Result<(), Error> {
//let cb: fn(libc::c_int, *const libc::siginfo_t, *const libc::c_void) -> () = handler_sigaction;
// Safe because it creates a valid value:
let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() };
let sa_sigaction: libc::sighandler_t = cb as *const libc::c_void as _;
let act = libc::sigaction {
sa_sigaction,
sa_mask: mask,
sa_flags: 0,
sa_restorer: None,
};
let (ec, msg) = unsafe {
let ec = libc::sigaction(signum, &act, std::ptr::null_mut());
let errno = *libc::__errno_location();
(ec, CStr::from_ptr(libc::strerror(errno)))
};
if ec != 0 {
// Not valid to print here, but we will panic anyways after that.
eprintln!("error: {:?}", msg);
panic!();
}
Ok(())
}
pub fn unset_signal_handler(signum: libc::c_int) -> Result<(), Error> {
// Safe because it creates a valid value:
let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() };
let act = libc::sigaction {
sa_sigaction: libc::SIG_DFL,
sa_mask: mask,
sa_flags: 0,
sa_restorer: None,
};
let (ec, msg) = unsafe {
let ec = libc::sigaction(signum, &act, std::ptr::null_mut());
let errno = *libc::__errno_location();
(ec, CStr::from_ptr(libc::strerror(errno)))
};
if ec != 0 {
// Not valid to print here, but we will panic anyways after that.
eprintln!("error: {:?}", msg);
panic!();
}
Ok(())
}
pub fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {
use std::mem::size_of;
use std::os::unix::prelude::AsRawFd;
let fd = conn.as_raw_fd();
unsafe {
type N = libc::c_int;
let n: N = rcvbuf as _;
let ec = libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVBUF,
&n as *const N as _,
size_of::<N>() as _,
);
if ec != 0 {
error!("ec {ec}");
if ec != 0 {
return Err(Error::with_msg_no_trace(format!("can not set socket option")));
}
}
}
unsafe {
type N = libc::c_int;
let mut n: N = -1;
let mut l = size_of::<N>() as libc::socklen_t;
let ec = libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVBUF,
&mut n as *mut N as _,
&mut l as _,
);
if ec != 0 {
let errno = *libc::__errno_location();
let es = CStr::from_ptr(libc::strerror(errno));
error!("can not query socket option ec {ec} errno {errno} es {es:?}");
} else {
if (n as u32) < rcvbuf * 5 / 6 {
warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}");
} else {
info!("SO_RCVBUF {n}");
}
}
}
Ok(())
}

View File

@@ -1,131 +0,0 @@
pub mod dumper;
pub mod zmtpproto;
use self::zmtpproto::ZmtpFrame;
use self::zmtpproto::ZmtpMessage;
use crate::bsread::ChannelDescDecoded;
use crate::bsreadclient;
use crate::bsreadclient::BsreadClient;
use crate::zmtp::zmtpproto::SocketType;
use crate::zmtp::zmtpproto::Zmtp;
#[allow(unused)]
use bytes::BufMut;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
use log::*;
use scywr::session::ScySession;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use taskrun::tokio;
#[derive(Debug, ThisError)]
pub enum Error {
#[error("Msg({0})")]
Msg(String),
#[error("TaskJoin")]
TaskJoin,
#[error("BsreadClient({0})")]
BsreadClient(#[from] bsreadclient::Error),
#[error("IO({0})")]
IO(#[from] io::Error),
}
#[allow(unused)]
fn test_listen() -> Result<(), Error> {
use std::time::Duration;
let fut = async move {
let _ = tokio::time::timeout(Duration::from_millis(16000), futures_util::future::ready(0u32)).await;
Ok::<_, Error>(())
};
taskrun::run(fut).map_err(|e| Error::Msg(e.to_string()))
}
#[allow(unused)]
fn test_service() -> Result<(), Error> {
let fut = async move {
let sock = tokio::net::TcpListener::bind("0.0.0.0:9999").await?;
loop {
info!("accepting...");
let (conn, remote) = sock.accept().await?;
info!("new connection from {:?}", remote);
let mut zmtp = Zmtp::new(conn, SocketType::PUSH);
let fut = async move {
while let Some(item) = zmtp.next().await {
info!("item from {:?} {:?}", remote, item);
}
Ok::<_, Error>(())
};
taskrun::spawn(fut);
}
};
taskrun::run(fut)
}
pub async fn get_series_id(_scy: &ScySession, _chn: &ChannelDescDecoded) -> Result<u64, Error> {
error!("TODO get_series_id");
err::todoval()
}
#[derive(Clone)]
pub struct ZmtpClientOpts {
pub backend: String,
pub addr: SocketAddr,
pub do_pulse_id: bool,
pub rcvbuf: Option<usize>,
pub array_truncate: Option<usize>,
pub process_channel_count_limit: Option<usize>,
}
struct ClientRun {
#[allow(unused)]
client: Pin<Box<BsreadClient>>,
fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
}
impl ClientRun {
fn new(client: BsreadClient) -> Self {
let mut client = Box::pin(client);
let client2 = unsafe { &mut *(&mut client as &mut _ as *mut _) } as &mut BsreadClient;
let fut = client2.run().map_err(|e| e.into());
let fut = Box::pin(fut) as _;
Self { client, fut }
}
}
impl Future for ClientRun {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.fut.poll_unpin(cx)
}
}
#[derive(Debug)]
pub enum ZmtpEvent {
ZmtpCommand(ZmtpFrame),
ZmtpMessage(ZmtpMessage),
}
pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
let client = BsreadClient::new(opts.clone(), todo!(), todo!()).await?;
let fut = {
async move {
let mut client = client;
client.run().await?;
Ok::<_, Error>(())
}
};
let jh = tokio::spawn(fut);
//let mut jhs = Vec::new();
//jhs.push(jh);
//futures_util::future::join_all(jhs).await;
jh.await.map_err(|_| Error::TaskJoin)??;
Ok(())
}

View File

@@ -1,138 +0,0 @@
use crate::bsread::ChannelDescDecoded;
use crate::bsread::HeadB;
use crate::bsread::Parser;
use crate::zmtp::zmtpproto;
use crate::zmtp::zmtpproto::SocketType;
use crate::zmtp::zmtpproto::Zmtp;
use crate::zmtp::ZmtpEvent;
use err::thiserror;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::SEC;
use std::io;
use taskrun::tokio;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO({0})")]
IO(#[from] io::Error),
#[error("Msg({0})")]
Msg(String),
#[error("ZmtpProto({0})")]
ZmtpProto(#[from] zmtpproto::Error),
}
impl From<err::Error> for Error {
fn from(value: err::Error) -> Self {
Self::Msg(value.to_string())
}
}
pub struct BsreadDumper {
source_addr: String,
parser: Parser,
}
impl BsreadDumper {
pub fn new(source_addr: String) -> Self {
Self {
source_addr,
parser: Parser::new(),
}
}
pub async fn run(&mut self) -> Result<(), Error> {
let src = if self.source_addr.starts_with("tcp://") {
self.source_addr[6..].into()
} else {
self.source_addr.clone()
};
let conn = tokio::net::TcpStream::connect(&src).await?;
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0u64;
let mut msgc = 0u64;
let mut dh_md5_last = String::new();
let mut frame_diff_count = 0u64;
let mut hash_mismatch_count = 0u64;
let mut head_b = HeadB::empty();
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpCommand(_) => (),
ZmtpEvent::ZmtpMessage(msg) => {
msgc += 1;
match self.parser.parse_zmtp_message(&msg) {
Ok(bm) => {
if msg.frames().len() - 2 * bm.head_b.channels.len() != 2 {
frame_diff_count += 1;
if frame_diff_count < 1000 {
warn!(
"chn len {} frame diff {}",
bm.head_b.channels.len(),
msg.frames().len() - 2 * bm.head_b.channels.len()
);
}
}
if bm.head_b_md5 != bm.head_a.hash {
hash_mismatch_count += 1;
// TODO keep logging data header changes, just suppress too frequent messages.
if hash_mismatch_count < 200 {
error!(
"Invalid bsread message: hash mismatch. dhcompr {:?}",
bm.head_a.dh_compression
);
}
}
if bm.head_b_md5 != dh_md5_last {
head_b = bm.head_b.clone();
if dh_md5_last.is_empty() {
info!("data header hash {}", bm.head_b_md5);
} else {
error!("changed data header hash {} mh {}", bm.head_b_md5, bm.head_a.hash);
}
dh_md5_last = bm.head_b_md5.clone();
}
if msg.frames.len() < 2 + 2 * head_b.channels.len() {
// TODO count always, throttle log.
error!("not enough frames for data header");
}
let gts = bm.head_a.global_timestamp;
let ts = (gts.sec as u64) * SEC + gts.ns as u64;
let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0);
let mut bytes_payload = 0u64;
for i1 in 0..head_b.channels.len() {
let chn = &head_b.channels[i1];
let _cd: ChannelDescDecoded = chn.try_into()?;
let fr = &msg.frames[2 + 2 * i1];
bytes_payload += fr.data().len() as u64;
}
info!("zmtp message ts {ts} pulse {pulse} bytes_payload {bytes_payload}");
}
Err(e) => {
for frame in &msg.frames {
info!("Frame: {:?}", frame);
}
zmtp.dump_input_state();
zmtp.dump_conn_state();
error!("bsread parse error: {e:?}");
break;
}
}
}
},
Err(e) => {
error!("zmtp item error: {e:?}");
return Err(e.into());
}
}
i1 += 1;
if true && i1 > 20 {
break;
}
if true && msgc > 20 {
break;
}
}
Ok(())
}
}

View File

@@ -1,829 +0,0 @@
use crate::bsread::ChannelDesc;
use crate::bsread::GlobalTimestamp;
use crate::bsread::HeadA;
use crate::bsread::HeadB;
use crate::netbuf::NetBuf;
use crate::zmtp::ZmtpEvent;
use async_channel::Receiver;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use futures_util::pin_mut;
use futures_util::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::SEC;
use serde_json::Value as JsVal;
use std::fmt;
use std::io;
use std::mem;
use std::pin::Pin;
use std::string::FromUtf8Error;
use std::task::Context;
use std::task::Poll;
use taskrun::tokio;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;
use tokio::net::TcpStream;
#[derive(Debug, ThisError)]
pub enum Error {
#[error("bad")]
Bad,
#[error("NetBuf({0})")]
NetBuf(#[from] crate::netbuf::Error),
#[error("zmtp peer is not v3.x")]
ZmtpInitPeerNot3x,
#[error("zmtp peer is not v3.0 or v3.1")]
ZmtpInitPeerUnsupportedVersion,
#[error("zmtp bad mechanism")]
BadPeerMechanism,
#[error("zmtp message too large {0}")]
MsgTooLarge(usize),
#[error("buffer too small, need-min {0} cap {1}")]
BufferTooSmallForNeedMin(usize, usize),
#[error("FromUtf8Error")]
FromUtf8Error(#[from] FromUtf8Error),
#[error("IO")]
IO(#[from] io::Error),
}
#[derive(Clone, Debug)]
enum ConnState {
InitSend,
InitRecv1,
InitRecv2,
InitRecv3,
InitRecv4,
InitRecv5,
ReadFrameFlags,
ReadFrameShort,
ReadFrameLong,
ReadFrameBody(usize),
#[allow(unused)]
LockScan(usize),
}
impl ConnState {
fn need_min(&self) -> usize {
use ConnState::*;
match self {
InitSend => 0,
InitRecv1 => 1,
InitRecv2 => 9,
InitRecv3 => 1,
InitRecv4 => 1,
InitRecv5 => 52,
ReadFrameFlags => 1,
ReadFrameShort => 1,
ReadFrameLong => 8,
ReadFrameBody(msglen) => *msglen,
LockScan(n) => *n,
}
}
}
pub enum SocketType {
PUSH,
PULL,
}
#[derive(Debug)]
enum InpState {
Empty,
Netbuf(usize, usize, usize),
}
impl Default for InpState {
fn default() -> Self {
InpState::Empty
}
}
pub struct Zmtp {
done: bool,
complete: bool,
socket_type: SocketType,
conn: TcpStream,
conn_state: ConnState,
buf: NetBuf,
outbuf: NetBuf,
out_enable: bool,
msglen: usize,
has_more: bool,
is_command: bool,
peer_ver: (u8, u8),
frames: Vec<ZmtpFrame>,
inp_eof: bool,
data_tx: Sender<u32>,
data_rx: Receiver<u32>,
input_state: Vec<InpState>,
input_state_ix: usize,
conn_state_log: Vec<ConnState>,
conn_state_log_ix: usize,
}
impl Zmtp {
pub fn new(conn: TcpStream, socket_type: SocketType) -> Self {
let (tx, rx) = async_channel::bounded(1);
Self {
done: false,
complete: false,
socket_type,
conn,
//conn_state: ConnState::LockScan(1),
conn_state: ConnState::InitSend,
buf: NetBuf::new(1024 * 128),
outbuf: NetBuf::new(1024 * 128),
out_enable: false,
msglen: 0,
has_more: false,
is_command: false,
peer_ver: (0, 0),
frames: Vec::new(),
inp_eof: false,
data_tx: tx,
data_rx: rx,
input_state: vec![0; 64].iter().map(|_| InpState::default()).collect(),
input_state_ix: 0,
conn_state_log: vec![0; 64].iter().map(|_| ConnState::InitSend).collect(),
conn_state_log_ix: 0,
}
}
pub fn out_channel(&self) -> Sender<u32> {
self.data_tx.clone()
}
fn inpbuf_conn(&mut self, need_min: usize) -> (&mut TcpStream, ReadBuf) {
(&mut self.conn, self.buf.read_buf_for_fill(need_min))
}
fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) {
(&mut self.conn, self.outbuf.data())
}
#[allow(unused)]
#[inline(always)]
fn record_input_state(&mut self) {}
#[allow(unused)]
fn record_input_state_2(&mut self) {
let st = self.buf.state();
self.input_state[self.input_state_ix] = InpState::Netbuf(st.0, st.1, self.buf.cap() - st.1);
self.input_state_ix = (1 + self.input_state_ix) % self.input_state.len();
}
#[allow(unused)]
#[inline(always)]
fn record_conn_state(&mut self) {}
#[allow(unused)]
fn record_conn_state_2(&mut self) {
self.conn_state_log[self.conn_state_log_ix] = self.conn_state.clone();
self.conn_state_log_ix = (1 + self.conn_state_log_ix) % self.conn_state_log.len();
}
pub fn dump_input_state(&self) {
info!("---------------------------------------------------------");
info!("INPUT STATE DUMP");
let mut i = self.input_state_ix;
for _ in 0..self.input_state.len() {
info!("{i:4} {:?}", self.input_state[i]);
i = (1 + i) % self.input_state.len();
}
info!("---------------------------------------------------------");
}
pub fn dump_conn_state(&self) {
info!("---------------------------------------------------------");
info!("CONN STATE DUMP");
let mut i = self.conn_state_log_ix;
for _ in 0..self.conn_state_log.len() {
info!("{i:4} {:?}", self.conn_state_log[i]);
i = (1 + i) % self.conn_state_log.len();
}
info!("---------------------------------------------------------");
}
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Option<Poll<Result<ZmtpEvent, Error>>> {
use Poll::*;
let mut item_count = 0;
// TODO should I better keep one serialized item in Self so that I know how much space it needs?
let serialized: Int<Result<(), Error>> = if self.out_enable && self.outbuf.wcap() >= self.outbuf.cap() / 2 {
match self.data_rx.poll_next_unpin(cx) {
Ready(Some(_item)) => {
// TODO item should be something that we can convert into a zmtp message.
Int::Empty
}
Ready(None) => Int::Done,
Pending => Int::Pend,
}
} else {
Int::NoWork
};
item_count += serialized.item_count();
let write: Int<Result<(), Error>> = if item_count > 0 {
Int::NoWork
} else if self.outbuf.len() > 0 {
let (w, b) = self.outbuf_conn();
pin_mut!(w);
match w.poll_write(cx, b) {
Ready(k) => match k {
Ok(k) => match self.outbuf.adv(k) {
Ok(()) => {
trace!("sent {} bytes", k);
Int::Empty
}
Err(e) => {
error!("advance error {:?}", e);
Int::Item(Err(e.into()))
}
},
Err(e) => {
error!("output write error {:?}", e);
Int::Item(Err(e.into()))
}
},
Pending => Int::Pend,
}
} else {
Int::NoWork
};
match write {
Int::NoWork => {}
_ => {
trace!("write result: {:?} {}", write, self.outbuf.len());
}
}
item_count += write.item_count();
let read: Int<Result<(), _>> = if item_count > 0 || self.inp_eof {
Int::NoWork
} else {
let need_min = self.conn_state.need_min();
if self.buf.cap() < need_min {
self.done = true;
let e = Error::BufferTooSmallForNeedMin(self.conn_state.need_min(), self.buf.cap());
Int::Item(Err(e))
} else if self.buf.len() < need_min {
self.record_input_state();
let (w, mut rbuf) = self.inpbuf_conn(need_min);
pin_mut!(w);
match w.poll_read(cx, &mut rbuf) {
Ready(k) => match k {
Ok(()) => {
let nf = rbuf.filled().len();
if nf == 0 {
info!("EOF");
self.inp_eof = true;
self.record_input_state();
Int::Done
} else {
trace!("received {} bytes", rbuf.filled().len());
if false {
let t = rbuf.filled().len();
let t = if t < 32 { t } else { 32 };
trace!("got data {:?}", &rbuf.filled()[0..t]);
}
match self.buf.wadv(nf) {
Ok(()) => {
self.record_input_state();
Int::Empty
}
Err(e) => {
error!("netbuf wadv fail nf {nf}");
Int::Item(Err(e.into()))
}
}
}
}
Err(e) => Int::Item(Err(e.into())),
},
Pending => Int::Pend,
}
} else {
Int::NoWork
}
};
item_count += read.item_count();
let parsed = if item_count > 0 || self.buf.len() < self.conn_state.need_min() {
Int::NoWork
} else {
match self.parse_item() {
Ok(k) => match k {
Some(k) => Int::Item(Ok(k)),
None => Int::Empty,
},
Err(e) => Int::Item(Err(e)),
}
};
item_count += parsed.item_count();
let _ = item_count;
{
use Int::*;
match (serialized, write, read, parsed) {
(NoWork | Done, NoWork | Done, NoWork | Done, NoWork | Done) => {
warn!("all NoWork or Done");
return Some(Pending);
}
(Item(Err(e)), _, _, _) => {
self.done = true;
return Some(Ready(Err(e.into())));
}
(_, Item(Err(e)), _, _) => {
self.done = true;
return Some(Ready(Err(e.into())));
}
(_, _, Item(Err(e)), _) => {
self.done = true;
return Some(Ready(Err(e.into())));
}
(_, _, _, Item(Err(e))) => {
self.done = true;
return Some(Ready(Err(e.into())));
}
(Item(_), _, _, _) => {
return None;
}
(_, Item(_), _, _) => {
return None;
}
(_, _, Item(_), _) => {
return None;
}
(_, _, _, Item(Ok(item))) => {
return Some(Ready(Ok(item)));
}
(Empty, _, _, _) => return None,
(_, Empty, _, _) => return None,
(_, _, Empty, _) => return None,
(_, _, _, Empty) => return None,
#[allow(unreachable_patterns)]
(Pend, Pend | NoWork | Done, Pend | NoWork | Done, Pend | NoWork | Done) => return Some(Pending),
#[allow(unreachable_patterns)]
(Pend | NoWork | Done, Pend, Pend | NoWork | Done, Pend | NoWork | Done) => return Some(Pending),
#[allow(unreachable_patterns)]
(Pend | NoWork | Done, Pend | NoWork | Done, Pend, Pend | NoWork | Done) => return Some(Pending),
#[allow(unreachable_patterns)]
(Pend | NoWork | Done, Pend | NoWork | Done, Pend | NoWork | Done, Pend) => return Some(Pending),
}
};
}
fn parse_item(&mut self) -> Result<Option<ZmtpEvent>, Error> {
self.record_conn_state();
match self.conn_state {
ConnState::InitSend => {
info!("parse_item InitSend");
self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 1])?;
self.conn_state = ConnState::InitRecv1;
Ok(None)
}
ConnState::InitRecv1 => {
let b = self.buf.read_u8()?;
if b != 0xff {
Err(Error::ZmtpInitPeerNot3x)
} else {
self.conn_state = ConnState::InitRecv2;
Ok(None)
}
}
ConnState::InitRecv2 => {
self.buf.adv(8)?;
let b = self.buf.read_u8()?;
if b & 0x01 != 1 {
Err(Error::ZmtpInitPeerNot3x)
} else {
self.conn_state = ConnState::InitRecv3;
Ok(None)
}
}
ConnState::InitRecv3 => {
let maj = self.buf.read_u8()?;
if maj != 3 {
Err(Error::ZmtpInitPeerNot3x)
} else {
self.peer_ver.0 = maj;
self.outbuf.put_slice(&[0x4e, 0x55, 0x4c, 0x4c])?;
let a = vec![0; 48];
self.outbuf.put_slice(&a)?;
self.conn_state = ConnState::InitRecv4;
Ok(None)
}
}
ConnState::InitRecv4 => {
let minver = self.buf.read_u8()?;
if minver > 1 {
Err(Error::ZmtpInitPeerUnsupportedVersion)
} else {
self.peer_ver.1 = minver;
info!("InitRecv4 peer version {:?}", self.peer_ver);
self.conn_state = ConnState::InitRecv5;
Ok(None)
}
}
ConnState::InitRecv5 => {
{
let b2 = self.buf.read_bytes(20)?;
let mut i = 0;
while i < b2.len() && b2[i] != 0 {
i += 1;
}
if i >= b2.len() {
return Err(Error::BadPeerMechanism);
} else {
let sec = String::from_utf8(b2[..i].to_vec())?;
info!("Peer security mechanism {} [{}]", sec.len(), sec);
}
}
self.buf.adv(32)?;
match self.socket_type {
SocketType::PUSH => {
self.outbuf
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PUSH"[..])?;
}
SocketType::PULL => {
self.outbuf
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?;
}
}
self.out_enable = true;
self.conn_state = ConnState::ReadFrameFlags;
Ok(None)
}
ConnState::ReadFrameFlags => {
let flags = self.buf.read_u8()?;
let has_more = flags & 0x01 != 0;
let long_size = flags & 0x02 != 0;
let is_command = flags & 0x04 != 0;
if is_command {
if has_more {
error!("received command with has_more flag (error in peer)");
}
if self.has_more {
debug!(
"received command frame while in multipart, having {}",
self.frames.len()
);
}
} else {
self.has_more = has_more;
}
self.is_command = is_command;
trace!(
"parse_item ReadFrameFlags has_more {} long_size {} is_command {}",
has_more,
long_size,
is_command
);
if long_size {
self.conn_state = ConnState::ReadFrameLong;
} else {
self.conn_state = ConnState::ReadFrameShort;
}
Ok(None)
}
ConnState::ReadFrameShort => {
self.msglen = self.buf.read_u8()? as usize;
trace!("parse_item ReadFrameShort msglen {}", self.msglen);
self.conn_state = ConnState::ReadFrameBody(self.msglen);
if self.msglen > self.buf.cap() / 2 {
error!("msglen {} too large for this client", self.msglen);
return Err(Error::MsgTooLarge(self.msglen as usize));
}
Ok(None)
}
ConnState::ReadFrameLong => {
self.msglen = self.buf.read_u64()? as usize;
trace!("parse_item ReadFrameLong msglen {}", self.msglen);
self.conn_state = ConnState::ReadFrameBody(self.msglen);
if self.msglen > self.buf.cap() / 2 {
error!("msglen {} too large for this client", self.msglen);
return Err(Error::MsgTooLarge(self.msglen));
}
Ok(None)
}
ConnState::ReadFrameBody(msglen) => {
// TODO do not copy here...
let data = self.buf.read_bytes(msglen)?.to_vec();
self.conn_state = ConnState::ReadFrameFlags;
self.msglen = 0;
if false {
let n1 = data.len().min(256);
let s = String::from_utf8_lossy(&data[..n1]);
trace!("parse_item ReadFrameBody msglen {} string {}", msglen, s);
}
if self.is_command {
if data.len() >= 7 {
if &data[0..5] == b"\x04PING" {
if data.len() > 32 {
// TODO close connection?
error!("Oversized PING");
} else {
let ttl = u16::from_be_bytes(data[5..7].try_into().unwrap());
let ctx = &data[7..];
debug!("received PING ttl {ttl} ctx {:?}", &ctx);
if self.outbuf.wcap() < data.len() {
warn!("can not respond with PONG because output buffer full");
} else {
let size = 5 + ctx.len() as u8;
self.outbuf.put_u8(0x04).unwrap();
self.outbuf.put_u8(size).unwrap();
self.outbuf.put_slice(b"\x04PONG").unwrap();
self.outbuf.put_slice(ctx).unwrap();
}
if self.outbuf.wcap() < 32 {
warn!("can not send my PING because output buffer full");
} else {
let ctx = b"daqingest";
let size = 5 + ctx.len() as u8;
self.outbuf.put_u8(0x04).unwrap();
self.outbuf.put_u8(size).unwrap();
self.outbuf.put_slice(b"\x04PING").unwrap();
self.outbuf.put_slice(ctx).unwrap();
}
}
}
}
let g = ZmtpFrame {
msglen: msglen,
has_more: self.has_more,
is_command: self.is_command,
data,
};
Ok(Some(ZmtpEvent::ZmtpCommand(g)))
} else {
let g = ZmtpFrame {
msglen: msglen,
has_more: self.has_more,
is_command: self.is_command,
data,
};
self.frames.push(g);
if self.has_more {
Ok(None)
} else {
let g = ZmtpMessage {
frames: mem::replace(&mut self.frames, Vec::new()),
};
if false && g.frames.len() != 118 {
info!("EMIT {} frames", g.frames.len());
if let Some(fr) = g.frames.get(0) {
let d = fr.data();
let nn = d.len().min(16);
let s = String::from_utf8_lossy(&d[..nn]);
info!("DATA 0 {} {:?} {:?}", nn, &d[..nn], s);
}
if let Some(fr) = g.frames.get(1) {
let d = fr.data();
let nn = d.len().min(16);
let s = String::from_utf8_lossy(&d[..nn]);
info!("DATA 1 {} {:?} {:?}", nn, &d[..nn], s);
}
}
Ok(Some(ZmtpEvent::ZmtpMessage(g)))
}
}
}
ConnState::LockScan(n) => {
if n > 1024 * 20 {
warn!("could not lock within {n} bytes");
}
const NBACK: usize = 2;
let data = self.buf.data();
let mut found_at = None;
debug!("{}", String::from_utf8_lossy(data));
debug!("try to lock within {} bytes", data.len());
let needle = br##"{"dh_compression":"##;
for (i1, b) in data.iter().enumerate() {
if i1 >= NBACK && *b == needle[0] {
let dd = &data[i1..];
{
let nn = dd.len().min(32);
debug!("pre {}", String::from_utf8_lossy(&dd[..nn]));
}
if dd.len() >= needle.len() {
if &dd[..needle.len()] == needle {
debug!("found at {i1}");
found_at = Some(i1);
break;
}
}
}
}
let mut locked = false;
if let Some(nf) = found_at {
if nf >= NBACK {
if false {
let s1 = data[nf - NBACK..].iter().take(32).fold(String::new(), |mut a, x| {
use std::fmt::Write;
let _ = write!(a, "{:02x} ", *x);
a
});
debug!("BUF {s1}");
}
if data[nf - 2] == 0x01 && data[nf - 1] > 0x70 && data[nf - 1] < 0xd0 {
locked = true;
}
}
}
if locked {
self.conn_state = ConnState::ReadFrameFlags;
} else {
self.conn_state = ConnState::LockScan(data.len() + 1);
}
Ok(None)
}
}
}
}
#[derive(Debug)]
pub struct ZmtpMessage {
pub frames: Vec<ZmtpFrame>,
}
impl ZmtpMessage {
pub fn frames(&self) -> &Vec<ZmtpFrame> {
&self.frames
}
pub fn emit_to_buffer(&self, out: &mut NetBuf) -> Result<(), Error> {
let n = self.frames.len();
for (i, fr) in self.frames.iter().enumerate() {
let mut flags: u8 = 2;
if i < n - 1 {
flags |= 1;
}
out.put_u8(flags)?;
out.put_u64(fr.data().len() as u64)?;
out.put_slice(fr.data())?;
}
Ok(())
}
}
pub struct ZmtpFrame {
pub msglen: usize,
pub has_more: bool,
pub is_command: bool,
pub data: Vec<u8>,
}
impl ZmtpFrame {
pub fn data(&self) -> &[u8] {
&self.data
}
}
impl fmt::Debug for ZmtpFrame {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let data = match String::from_utf8(self.data.clone()) {
Ok(s) => s
.chars()
.take(32)
.filter(|x| {
//
x.is_ascii_alphanumeric() || x.is_ascii_punctuation() || x.is_ascii_whitespace()
})
.collect::<String>(),
Err(_) => format!("Binary {{ len: {} }}", self.data.len()),
};
f.debug_struct("ZmtpFrame")
.field("msglen", &self.msglen)
.field("has_more", &self.has_more)
.field("is_command", &self.is_command)
.field("data.len", &self.data.len())
.field("data", &data)
.finish()
}
}
enum Int<T> {
NoWork,
Pend,
Empty,
Item(T),
Done,
}
impl<T> Int<T> {
fn item_count(&self) -> u32 {
if let Int::Item(_) = self {
1
} else {
0
}
}
}
impl<T> fmt::Debug for Int<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::NoWork => write!(f, "NoWork"),
Self::Pend => write!(f, "Pend"),
Self::Empty => write!(f, "Empty"),
Self::Item(_) => write!(f, "Item"),
Self::Done => write!(f, "Done"),
}
}
}
impl Stream for Zmtp {
type Item = Result<ZmtpEvent, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.complete {
panic!("poll_next on complete")
} else if self.done {
self.complete = true;
return Ready(None);
} else {
loop {
match Self::loop_body(self.as_mut(), cx) {
Some(Ready(k)) => break Ready(Some(k)),
Some(Pending) => break Pending,
None => continue,
}
}
}
}
}
#[allow(unused)]
struct DummyData {
ts: u64,
pulse: u64,
value: i64,
}
impl DummyData {
#[allow(unused)]
fn make_zmtp_msg(&self) -> Result<ZmtpMessage, Error> {
let head_b = HeadB {
htype: "bsr_d-1.1".into(),
channels: vec![ChannelDesc {
name: "TESTCHAN".into(),
ty: "int64".into(),
shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1i32))]),
encoding: "little".into(),
compression: todo!(),
}],
};
let hb = serde_json::to_vec(&head_b).unwrap();
use md5::Digest;
let mut h = md5::Md5::new();
h.update(&hb);
let mut md5hex = String::with_capacity(32);
for c in h.finalize() {
use fmt::Write;
write!(&mut md5hex, "{:02x}", c).unwrap();
}
let head_a = HeadA {
htype: "bsr_m-1.1".into(),
hash: md5hex,
pulse_id: serde_json::Number::from(self.pulse),
global_timestamp: GlobalTimestamp {
sec: self.ts / SEC,
ns: self.ts % SEC,
},
dh_compression: None,
};
// TODO write directly to output buffer.
let ha = serde_json::to_vec(&head_a).unwrap();
let hf = self.value.to_le_bytes().to_vec();
let hp = [(self.ts / SEC).to_be_bytes(), (self.ts % SEC).to_be_bytes()].concat();
let mut msg = ZmtpMessage { frames: Vec::new() };
let fr = ZmtpFrame {
msglen: 0,
has_more: false,
is_command: false,
data: ha,
};
msg.frames.push(fr);
let fr = ZmtpFrame {
msglen: 0,
has_more: false,
is_command: false,
data: hb,
};
msg.frames.push(fr);
let fr = ZmtpFrame {
msglen: 0,
has_more: false,
is_command: false,
data: hf,
};
msg.frames.push(fr);
let fr = ZmtpFrame {
msglen: 0,
has_more: false,
is_command: false,
data: hp,
};
msg.frames.push(fr);
Ok(msg)
}
}