From b206bd0eb0d3b317f1c6cdc9cb4e001c8801ceff Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 4 May 2022 15:50:45 +0200 Subject: [PATCH] WIP insert --- netfetch/src/bsread.rs | 50 ++++++- netfetch/src/ca.rs | 14 +- netfetch/src/ca/conn.rs | 308 ++++++++++++++++++++++++++++++--------- netfetch/src/ca/proto.rs | 32 +++- netfetch/src/ca/store.rs | 65 +++++++++ netfetch/src/netfetch.rs | 1 + netfetch/src/series.rs | 87 +++++++++++ netfetch/src/zmtp.rs | 86 +++++------ 8 files changed, 524 insertions(+), 119 deletions(-) create mode 100644 netfetch/src/ca/store.rs create mode 100644 netfetch/src/series.rs diff --git a/netfetch/src/bsread.rs b/netfetch/src/bsread.rs index 7e964f2..130ae2c 100644 --- a/netfetch/src/bsread.rs +++ b/netfetch/src/bsread.rs @@ -2,7 +2,7 @@ use crate::zmtp::ZmtpMessage; use err::Error; #[allow(unused)] use log::*; -use netpod::{ByteOrder, ScalarType, Shape}; +use netpod::{AggKind, ByteOrder, ScalarType, Shape}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsVal; @@ -42,6 +42,54 @@ fn bsread_encoding_default() -> String { "little".into() } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum CompressionKind { + Lz4, + BitshuffleLz4, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChannelDescDecoded { + pub name: String, + pub scalar_type: ScalarType, + pub shape: Shape, + pub byte_order: ByteOrder, + pub compression: Option, + pub agg_kind: AggKind, +} + +impl TryFrom<&ChannelDesc> for ChannelDescDecoded { + type Error = Error; + + fn try_from(cd: &ChannelDesc) -> Result { + let ret = ChannelDescDecoded { + name: cd.name.clone(), + scalar_type: ScalarType::from_bsread_str(&cd.ty)?, + shape: Shape::from_bsread_jsval(&cd.shape)?, + compression: match &cd.compression { + None => None, + Some(k) => match k.as_str() { + "none" => None, + "lz4" => Some(CompressionKind::Lz4), + "bitshuffle_lz4" => Some(CompressionKind::BitshuffleLz4), + _ => { + return Err(Error::with_msg_no_trace(format!( + "can not understand bsread compression kind: {k:?}" + ))) + } + }, + }, + byte_order: match cd.encoding.as_str() { + "little" => ByteOrder::LE, + "big" => ByteOrder::BE, + _ => ByteOrder::LE, + }, + agg_kind: AggKind::Plain, + }; + Ok(ret) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct HeadA { pub htype: String, diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index de92af2..4ed0cc3 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,5 +1,6 @@ pub mod conn; pub mod proto; +pub mod store; use conn::{CaConn, FindIoc}; use err::Error; @@ -13,6 +14,7 @@ use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; @@ -20,6 +22,8 @@ use tokio::net::TcpStream; use tokio::task::JoinError; use tokio::time::error::Elapsed; +use self::store::{ChannelRegistry, DataStore}; + #[derive(Debug, Serialize, Deserialize)] struct ChannelConfig { channels: Vec, @@ -229,9 +233,12 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { for (host, channels) in &channels_by_host { info!("Have: {:?} {:?}", host, channels.len()); } + let nil = None::; for ch in &opts.channels { if !channels_set.contains_key(ch) { - error!("Could not locate {ch:?}"); + scy.execute(&qu, (ch, "", nil)) + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; } } Ok(()) @@ -246,6 +253,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { .build() .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let scy = Arc::new(scy); let qu_find_addr = scy .prepare("select addr from ioc_by_channel where channel = ?") .await @@ -274,12 +282,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { if opts.abort_after_search == 1 { return Ok(()); } + let data_store = Arc::new(DataStore::new(scy.clone()).await?); let mut conn_jhs = vec![]; for (host, channels) in channels_by_host { + let data_store = data_store.clone(); let conn_block = async move { info!("Create TCP connection to {:?}", (host.ip(), host.port())); let tcp = TcpStream::connect((host.ip().clone(), host.port())).await?; - let mut conn = CaConn::new(tcp); + let mut conn = CaConn::new(tcp, data_store.clone()); for c in channels { conn.channel_add(c); } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 59fd68e..6e2e78d 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,15 +1,22 @@ use super::proto::{CaItem, CaMsg, CaMsgTy, CaProto}; +use super::store::DataStore; +use crate::bsread::ChannelDescDecoded; use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify}; +use crate::series::{Existence, SeriesId}; use err::Error; -use futures_util::{Future, FutureExt, Stream, StreamExt}; +use futures_util::stream::FuturesUnordered; +use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; use libc::c_int; use log::*; +use netpod::timeunits::SEC; +use netpod::{ScalarType, Shape}; use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use tokio::io::unix::AsyncFd; use tokio::net::TcpStream; @@ -25,8 +32,10 @@ struct EventedState { #[derive(Debug)] enum MonitoringState { - AddingEvent, - Evented(EventedState), + FetchSeriesId, + AddingEvent(SeriesId), + Evented(SeriesId, EventedState), + // TODO we also want to read while staying subscribed: Reading, Read, Muted, @@ -36,6 +45,8 @@ enum MonitoringState { struct CreatedState { cid: u32, sid: u32, + scalar_type: ScalarType, + shape: Shape, ts_created: Instant, state: MonitoringState, } @@ -82,10 +93,15 @@ pub struct CaConn { cid_by_subid: BTreeMap, name_by_cid: BTreeMap, poll_count: usize, + data_store: Arc, + fut_get_series: FuturesUnordered< + Pin), Error>> + Send>>, + >, + value_insert_futs: FuturesUnordered> + Send>>>, } impl CaConn { - pub fn new(tcp: TcpStream) -> Self { + pub fn new(tcp: TcpStream, data_store: Arc) -> Self { Self { state: CaConnState::Init, proto: CaProto::new(tcp), @@ -97,6 +113,9 @@ impl CaConn { cid_by_subid: BTreeMap::new(), name_by_cid: BTreeMap::new(), poll_count: 0, + data_store, + fut_get_series: FuturesUnordered::new(), + value_insert_futs: FuturesUnordered::new(), } } @@ -135,6 +154,65 @@ impl Stream for CaConn { return Ready(None); } loop { + while self.value_insert_futs.len() > 0 { + match self.fut_get_series.poll_next_unpin(cx) { + Pending => break, + _ => {} + } + } + while self.fut_get_series.len() > 0 { + match self.fut_get_series.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + info!("Have SeriesId {k:?}"); + let cid = k.0; + let sid = k.1; + let data_type = k.2; + let data_count = k.3; + let series = match k.4 { + Existence::Created(k) => k, + Existence::Existing(k) => k, + }; + let subid = self.subid_store.next(); + self.cid_by_subid.insert(subid, cid); + let name = self.name_by_cid(cid).unwrap().to_string(); + let msg = CaMsg { + ty: CaMsgTy::EventAdd(EventAdd { + sid, + data_type, + data_count, + subid, + }), + }; + self.proto.push_out(msg); + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&cid).unwrap(); + *ch_s = ChannelState::Created(CreatedState { + cid, + sid, + // TODO handle error better! Transition channel to Error state? + scalar_type: ScalarType::from_ca_id(data_type)?, + shape: Shape::from_ca_count(data_count)?, + ts_created: Instant::now(), + state: MonitoringState::AddingEvent(series), + }); + let scalar_type = ScalarType::from_ca_id(data_type)?; + let shape = Shape::from_ca_count(data_count)?; + let _cd = ChannelDescDecoded { + name: name.to_string(), + scalar_type, + shape, + agg_kind: netpod::AggKind::Plain, + // TODO these play no role in series id: + byte_order: netpod::ByteOrder::LE, + compression: None, + }; + cx.waker().wake_by_ref(); + } + Ready(Some(Err(e))) => error!("series error: {e:?}"), + Ready(None) => {} + Pending => break, + } + } break match &self.state { CaConnState::Init => { let msg = CaMsg { ty: CaMsgTy::Version }; @@ -229,75 +307,169 @@ impl Stream for CaConn { let res = match self.proto.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { match k { - CaItem::Msg(k) => match k.ty { - CaMsgTy::SearchRes(k) => { - let a = k.addr.to_be_bytes(); - let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); - info!("Search result indicates server address: {addr}"); - } - CaMsgTy::CreateChanRes(k) => { - // TODO handle cid-not-found which can also indicate peer error. - let cid = k.cid; - let name = self.name_by_cid(cid); - info!("Channel created for {name:?} now register for events"); - let subid = self.subid_store.next(); - self.cid_by_subid.insert(subid, cid); - let msg = CaMsg { - ty: CaMsgTy::EventAdd(EventAdd { - sid: k.sid, - data_type: k.data_type, - data_count: k.data_count, - subid, - }), - }; - self.proto.push_out(msg); - do_wake_again = true; - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&k.cid).unwrap(); - *ch_s = ChannelState::Created(CreatedState { - cid: k.cid, - sid: k.sid, - ts_created: Instant::now(), - state: MonitoringState::AddingEvent, - }); - info!( - "Channel is created cid {} sid {} name {}", - k.cid, k.sid, self.name_by_cid[&k.cid] - ); - } - CaMsgTy::EventAddRes(k) => { - // TODO handle subid-not-found which can also be peer error: - let cid = *self.cid_by_subid.get(&k.subid).unwrap(); - // TODO get rid of the string clone when I don't want the log output any longer: - let name: String = self.name_by_cid(cid).unwrap().into(); - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&cid).unwrap(); - match ch_s { - ChannelState::Created(st) => { - match st.state { - MonitoringState::AddingEvent => { - info!("Confirmation {name} is subscribed."); - // TODO get ts from faster common source: - st.state = MonitoringState::Evented(EventedState { - ts_last: Instant::now(), - }); - } - MonitoringState::Evented(ref mut st) => { - // TODO get ts from faster common source: - st.ts_last = Instant::now(); - } - _ => { - warn!("bad state? not always, could be late message."); + CaItem::Msg(k) => { + match k.ty { + CaMsgTy::SearchRes(k) => { + let a = k.addr.to_be_bytes(); + let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); + info!("Search result indicates server address: {addr}"); + } + CaMsgTy::CreateChanRes(k) => { + // TODO handle cid-not-found which can also indicate peer error. + let cid = k.cid; + let sid = k.sid; + // TODO handle error: + let name = self.name_by_cid(cid).unwrap().to_string(); + info!("CreateChanRes {name:?}"); + let scalar_type = ScalarType::from_ca_id(k.data_type)?; + let shape = Shape::from_ca_count(k.data_count)?; + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&cid).unwrap(); + *ch_s = ChannelState::Created(CreatedState { + cid, + sid, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + ts_created: Instant::now(), + state: MonitoringState::FetchSeriesId, + }); + // TODO handle error in different way. Should most likely not abort. + let cd = ChannelDescDecoded { + name: name.to_string(), + scalar_type, + shape, + agg_kind: netpod::AggKind::Plain, + // TODO these play no role in series id: + byte_order: netpod::ByteOrder::LE, + compression: None, + }; + let y = self.as_ref(); + let y = unsafe { Pin::into_inner_unchecked(y) }; + let y = unsafe { &*(y as *const CaConn) }; + let fut = + y.data_store.chan_reg.get_series_id(cd).map_ok(move |series| { + (cid, k.sid, k.data_type, k.data_count, series) + }); + // TODO throttle execution rate: + self.fut_get_series.push(Box::pin(fut) as _); + do_wake_again = true; + } + CaMsgTy::EventAddRes(k) => { + // TODO handle subid-not-found which can also be peer error: + let cid = *self.cid_by_subid.get(&k.subid).unwrap(); + // TODO get rid of the string clone when I don't want the log output any longer: + let name: String = self.name_by_cid(cid).unwrap().into(); + // TODO handle not-found error: + let mut series_2 = None; + let ch_s = self.channels.get_mut(&cid).unwrap(); + match ch_s { + ChannelState::Created(st) => { + match st.state { + MonitoringState::AddingEvent(ref series) => { + let series = series.clone(); + series_2 = Some(series.clone()); + info!("Confirmation {name} is subscribed."); + // TODO get ts from faster common source: + st.state = MonitoringState::Evented( + series, + EventedState { + ts_last: Instant::now(), + }, + ); + } + MonitoringState::Evented(ref series, ref mut st) => { + series_2 = Some(series.clone()); + // TODO get ts from faster common source: + st.ts_last = Instant::now(); + } + _ => { + error!( + "unexpected state: EventAddRes while having {ch_s:?}" + ); + } } } + _ => { + error!("unexpected state: EventAddRes while having {ch_s:?}"); + } } - _ => { - warn!("unexpected state: EventAddRes while having {ch_s:?}"); + { + let series = series_2.unwrap(); + // TODO where to actually get the time from? + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64; + let ts_msp = ts / (30 * SEC) * (30 * SEC); + let ts_lsp = ts - ts_msp; + // TODO make sure that I only accept types I expect. + use crate::ca::proto::CaDataScalarValue::*; + use crate::ca::proto::CaDataValue::*; + match k.value { + Scalar(v) => { + match v { + F64(val) => { + match ch_s { + ChannelState::Created(st) => { + match st.shape { + Shape::Scalar => { + match st.scalar_type { + ScalarType::F64 => { + // TODO get msp/lsp: cap at some count. only one writer!! + let y = self.as_ref(); + let y = unsafe { + Pin::into_inner_unchecked(y) + }; + let y = unsafe { + &*(y as *const CaConn) + }; + let fut1 = y.data_store.scy.execute( + &y.data_store.qu_insert_ts_msp, + (ts_msp as i64, ts_lsp as i64), + ).map(|_| Ok::<_, Error>(())) + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); + let fut2 = y.data_store.scy.execute( + &y.data_store.qu_insert_scalar_f64, + (series.id() as i64, ts_msp as i64, ts_lsp as i64, 0i64, val), + ).map(|_| Ok::<_, Error>(())) + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); + let fut = fut1 + .and_then(move |a| fut2); + if self.value_insert_futs.len() + > 10 + { + warn!("can not keep up"); + } else { + self.value_insert_futs + .push( + Box::pin(fut) as _ + ); + } + } + _ => { + error!("unexpected value type"); + } + } + } + _ => { + error!("unexpected value shape"); + } + } + } + _ => { + error!("got value but channel not created"); + } + } + } + _ => {} + } + } + Array => {} + } } } + _ => {} } - _ => {} - }, + } _ => {} } Ready(Some(Ok(()))) diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 463d19d..3db9ad7 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -63,6 +63,7 @@ pub struct EventAddRes { pub data_count: u16, pub status: u32, pub subid: u32, + pub value: CaDataValue, } #[derive(Debug)] @@ -92,6 +93,23 @@ enum CaScalarType { String, } +#[derive(Clone, Debug)] +pub enum CaDataScalarValue { + I8(i8), + I16(i16), + I32(i32), + F32(f32), + F64(f64), + Enum(i16), + String(String), +} + +#[derive(Clone, Debug)] +pub enum CaDataValue { + Scalar(CaDataScalarValue), + Array, +} + impl CaScalarType { fn from_ca_u16(k: u16) -> Result { use CaScalarType::*; @@ -416,7 +434,7 @@ impl CaMsg { } 1 => { let ca_st = CaScalarType::from_ca_u16(hi.data_type)?; - match ca_st { + let value = match ca_st { CaScalarType::F64 => { if payload.len() < 2 { return Err(Error::with_msg_no_trace(format!( @@ -425,7 +443,7 @@ impl CaMsg { ))); } let v = f64::from_be_bytes(payload.try_into()?); - info!("f64: {v}"); + CaDataValue::Scalar(CaDataScalarValue::F64(v)) } CaScalarType::Enum => { if payload.len() < 2 { @@ -434,8 +452,8 @@ impl CaMsg { payload.len() ))); } - let v = u16::from_be_bytes(payload[..2].try_into()?); - info!("enum payload: {v}"); + let v = i16::from_be_bytes(payload[..2].try_into()?); + CaDataValue::Scalar(CaDataScalarValue::I16(v)) } CaScalarType::String => { let mut ixn = payload.len(); @@ -447,17 +465,19 @@ impl CaMsg { } //info!("try to read string from payload len {} ixn {}", payload.len(), ixn); let v = String::from_utf8_lossy(&payload[..ixn]); - info!("String payload: {v}"); + CaDataValue::Scalar(CaDataScalarValue::String(v.into())) } _ => { warn!("TODO handle {ca_st:?}"); + return Err(Error::with_msg_no_trace(format!("can not yet handle type {ca_st:?}"))); } - } + }; let d = EventAddRes { data_type: hi.data_type, data_count: hi.data_count, status: hi.param1, subid: hi.param2, + value, }; CaMsg { ty: CaMsgTy::EventAddRes(d), diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs new file mode 100644 index 0000000..5bb8cf8 --- /dev/null +++ b/netfetch/src/ca/store.rs @@ -0,0 +1,65 @@ +use crate::bsread::ChannelDescDecoded; +use crate::series::{Existence, SeriesId}; +use async_channel::{Receiver, Sender}; +use err::Error; +use scylla::prepared_statement::PreparedStatement; +use scylla::Session as ScySession; +use std::sync::Arc; + +pub struct RegisterJob { + desc: ChannelDescDecoded, +} + +impl RegisterJob { + pub fn new(desc: ChannelDescDecoded) -> Self { + Self { desc } + } +} + +pub struct RegisterChannel { + tx: Sender, + rx: Receiver, +} + +pub struct ChannelRegistry { + scy: Arc, +} + +impl ChannelRegistry { + pub fn new(scy: Arc) -> Self { + Self { scy } + } + + pub async fn get_series_id(&self, cd: ChannelDescDecoded) -> Result, Error> { + crate::series::get_series_id(&self.scy, &cd).await + } +} + +pub struct DataStore { + pub scy: Arc, + pub qu_insert_ts_msp: Arc, + pub qu_insert_scalar_f64: Arc, + pub chan_reg: Arc, +} + +impl DataStore { + pub async fn new(scy: Arc) -> Result { + let q = scy + .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_ts_msp = Arc::new(q); + let q = scy + .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_f64 = Arc::new(q); + let ret = Self { + chan_reg: Arc::new(ChannelRegistry::new(scy.clone())), + scy, + qu_insert_ts_msp, + qu_insert_scalar_f64, + }; + Ok(ret) + } +} diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index 976c3a5..e05c28f 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -2,6 +2,7 @@ pub mod bsread; pub mod ca; pub mod channelwriter; pub mod netbuf; +pub mod series; #[cfg(test)] pub mod test; pub mod zmtp; diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs new file mode 100644 index 0000000..0e8463a --- /dev/null +++ b/netfetch/src/series.rs @@ -0,0 +1,87 @@ +use crate::bsread::ChannelDescDecoded; +use crate::zmtp::ErrConv; +use err::Error; +#[allow(unused)] +use log::*; +use scylla::Session as ScySession; +use std::time::Duration; + +#[derive(Clone, Debug)] +pub enum Existence { + Created(T), + Existing(T), +} + +#[derive(Clone, Debug)] +pub struct SeriesId(u64); + +impl SeriesId { + pub fn id(&self) -> u64 { + self.0 + } +} + +// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. +pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result, Error> { + let facility = "scylla"; + let channel_name = &cd.name; + let scalar_type = cd.scalar_type.to_scylla_i32(); + let shape = cd.shape.to_scylla_vec(); + info!("get_series_id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}"); + let res = scy + .query( + "select series, agg_kind from series_by_channel where facility = ? and channel_name = ? and scalar_type = ? and shape_dims = ?", + (facility, channel_name, &scalar_type, &shape), + ) + .await + .err_conv()?; + let mut all = vec![]; + for row in res.rows_typed_or_empty::<(i64, Option)>() { + match row { + Ok(k) => { + if k.1.is_none() { + all.push(k.0); + } + } + Err(e) => return Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } + info!("all: {all:?}"); + let rn = all.len(); + if rn == 0 { + use md5::Digest; + let mut h = md5::Md5::new(); + h.update(facility.as_bytes()); + h.update(channel_name.as_bytes()); + h.update(format!("{:?}", scalar_type).as_bytes()); + h.update(format!("{:?}", shape).as_bytes()); + let f = h.finalize(); + let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + for _ in 0..2000 { + let res = scy + .query( + concat!( + "insert into series_by_channel", + " (facility, channel_name, scalar_type, shape_dims, agg_kind, series)", + " values (?, ?, ?, ?, null, ?) if not exists" + ), + (facility, channel_name, &scalar_type, &shape, series as i64), + ) + .await + .err_conv()?; + let row = res.first_row().err_conv()?; + if row.columns[0].as_ref().unwrap().as_boolean().unwrap() { + return Ok(Existence::Created(SeriesId(series))); + } else { + error!("tried to insert but series exists..."); + } + tokio::time::sleep(Duration::from_millis(20)).await; + series += 1; + } + Err(Error::with_msg_no_trace(format!("can not create and insert series id"))) + } else { + let series = all[0] as u64; + info!("series: {:?}", series); + Ok(Existence::Existing(SeriesId(series))) + } +} diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 8787a0d..9305c1d 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,4 +1,4 @@ -use crate::bsread::{BsreadMessage, Parser}; +use crate::bsread::{BsreadMessage, ChannelDescDecoded, Parser}; use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; use crate::channelwriter::{ChannelWriter, ChannelWriterAll}; use crate::netbuf::NetBuf; @@ -10,10 +10,10 @@ use futures_core::{Future, Stream}; use futures_util::{pin_mut, FutureExt, StreamExt}; use log::*; use netpod::timeunits::*; -use netpod::{ByteOrder, ScalarType, Shape}; use scylla::batch::{Batch, BatchType, Consistency}; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::QueryError; +use scylla::transport::query_result::{FirstRowError, RowsExpectedError}; use scylla::{Session as ScySession, SessionBuilder}; use serde_json::Value as JsVal; use stats::CheckEvery; @@ -41,6 +41,24 @@ impl ErrConv for Result { } } +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + #[allow(unused)] fn test_listen() -> Result<(), Error> { use std::time::Duration; @@ -72,7 +90,7 @@ fn test_service() -> Result<(), Error> { taskrun::run(fut) } -pub fn get_series_id(chn: &ChannelDesc) -> u64 { +pub fn __get_series_id(chn: &ChannelDesc) -> u64 { // TODO use a more stable format (with ScalarType, Shape) as hash input. // TODO do not depend at all on the mapping, instead look it up on demand and cache. use md5::Digest; @@ -84,6 +102,11 @@ pub fn get_series_id(chn: &ChannelDesc) -> u64 { u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()) } +pub async fn get_series_id(scy: &ScySession, chn: &ChannelDescDecoded) -> Result { + error!("TODO get_series_id"); + err::todoval() +} + pub struct CommonQueries { pub qu1: PreparedStatement, pub qu2: PreparedStatement, @@ -231,9 +254,11 @@ impl BsreadClient { if dh_md5_last.is_empty() { info!("data header hash {}", bm.head_b_md5); dh_md5_last = bm.head_b_md5.clone(); + let scy = self.scy.clone(); for chn in &head_b.channels { info!("Setup writer for {}", chn.name); - match self.setup_channel_writers(chn).await { + let cd: ChannelDescDecoded = chn.try_into()?; + match self.setup_channel_writers(&scy, &cd).await { Ok(_) => {} Err(e) => { warn!("can not set up writer for {} {e:?}", chn.name); @@ -283,13 +308,16 @@ impl BsreadClient { .len() .min(self.opts.process_channel_count_limit.unwrap_or(4000)) { + // TODO skip decoding if header unchanged. let chn = &head_b.channels[i1]; + let chd: ChannelDescDecoded = chn.try_into()?; let fr = &msg.frames[2 + 2 * i1]; + // TODO refactor to make correctness evident. if i1 >= series_ids.len() { series_ids.resize(head_b.channels.len(), (0u8, 0u64)); } if series_ids[i1].0 == 0 { - let series = get_series_id(chn); + let series = get_series_id(&self.scy, &chd).await?; series_ids[i1].0 = 1; series_ids[i1].1 = series; } @@ -349,38 +377,32 @@ impl BsreadClient { Ok(()) } - async fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> { - let series = get_series_id(chn); - let has_comp = match &chn.compression { - Some(s) => s != "none", - None => false, - }; + async fn setup_channel_writers(&mut self, scy: &ScySession, cd: &ChannelDescDecoded) -> Result<(), Error> { + let series = get_series_id(scy, cd).await?; + let has_comp = cd.compression.is_some(); if has_comp { - warn!("Compression not yet supported [{}]", chn.name); + warn!("Compression not yet supported [{}]", cd.name); return Ok(()); } - let scalar_type = ScalarType::from_bsread_str(&chn.ty)?; - let shape = Shape::from_bsread_jsval(&chn.shape)?; - let byte_order = ByteOrder::from_bsread_str(&chn.encoding)?; let trunc = self.opts.array_truncate.unwrap_or(64); let cw = ChannelWriterAll::new( series, self.common_queries.clone(), self.scy.clone(), - scalar_type.clone(), - shape.clone(), - byte_order.clone(), + cd.scalar_type.clone(), + cd.shape.clone(), + cd.byte_order.clone(), trunc, self.opts.skip_insert, )?; - let shape_dims = shape.to_scylla_vec(); + let shape_dims = cd.shape.to_scylla_vec(); self.channel_writers.insert(series, Box::new(cw)); if !self.opts.skip_insert { // TODO insert correct facility name self.scy .query( - "insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?)", - ("scylla", &chn.name, series as i64, scalar_type.to_scylla_i32(), &shape_dims), + "insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?) if not exists", + ("scylla", &cd.name, series as i64, cd.scalar_type.to_scylla_i32(), &shape_dims), ) .await .err_conv()?; @@ -655,28 +677,8 @@ impl BsreadDumper { let mut bytes_payload = 0u64; for i1 in 0..head_b.channels.len() { let chn = &head_b.channels[i1]; + let _cd: ChannelDescDecoded = chn.try_into()?; let fr = &msg.frames[2 + 2 * i1]; - let _series = get_series_id(chn); - if chn.ty == "string" { - info!("string channel: {} {:?}", chn.name, chn.shape); - if let Ok(shape) = Shape::from_bsread_jsval(&chn.shape) { - if let Ok(_bo) = ByteOrder::from_bsread_str(&chn.encoding) { - match &shape { - Shape::Scalar => { - info!("scalar string..."); - let s = String::from_utf8_lossy(fr.data()); - info!("STRING: {s:?}"); - } - _ => { - warn!( - "non-scalar string channels not yet implemented {}", - chn.name - ); - } - } - } - } - } bytes_payload += fr.data().len() as u64; } info!("zmtp message ts {ts} pulse {pulse} bytes_payload {bytes_payload}");