From 12778fe121a722418e821a5c29a3e80392219cf5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 26 Feb 2025 16:31:28 +0100 Subject: [PATCH] WIP binner2 --- daqingest/Cargo.toml | 4 +- daqingest/src/daemon.rs | 8 + dbpg/Cargo.toml | 3 +- dbpg/src/confbyseries.rs | 50 ++ dbpg/src/lib.rs | 2 +- dbpg/src/seriesbychannel.rs | 5 +- dbpg/src/seriesid.rs | 12 - netfetch/Cargo.toml | 4 +- netfetch/src/metrics.rs | 50 +- netfetch/src/metrics/ingest.rs | 7 +- netfetch/src/metrics/ingest/write_v02.rs | 642 +++++++++++++++++++++++ scywr/Cargo.toml | 7 +- scywr/src/insertqueues.rs | 96 ++-- scywr/src/schema.rs | 162 +++--- serieswriter/Cargo.toml | 2 +- serieswriter/src/binwriter.rs | 94 +++- serieswriter/src/writer.rs | 1 - 17 files changed, 976 insertions(+), 173 deletions(-) create mode 100644 dbpg/src/confbyseries.rs delete mode 100644 dbpg/src/seriesid.rs create mode 100644 netfetch/src/metrics/ingest/write_v02.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 4523fe6..2bf32d7 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "daqingest" -version = "0.2.7-aa.1" +version = "0.2.7-aa.4" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [features] default = [] diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 5cae419..6e87ae7 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -81,6 +81,8 @@ pub struct Daemon { metrics_shutdown_rx: Receiver, metrics_jh: Option>>, channel_info_query_tx: Sender, + // TODO + series_conf_by_id_tx: Sender<()>, iqtx: Option, } @@ -98,6 +100,9 @@ impl Daemon { .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + // TODO so far a dummy + let (series_conf_by_id_tx, _series_conf_by_id_rx) = async_channel::bounded(16); + let insert_queue_counter = Arc::new(AtomicUsize::new(0)); let local_epics_hostname = ingest_linux::net::local_hostname(); @@ -341,6 +346,7 @@ impl Daemon { metrics_shutdown_rx, metrics_jh: None, channel_info_query_tx, + series_conf_by_id_tx, iqtx: Some(iqtx2), }; Ok(ret) @@ -675,12 +681,14 @@ impl Daemon { let rres = RoutesResources::new( self.ingest_opts.backend().into(), self.channel_info_query_tx.clone(), + self.series_conf_by_id_tx.clone(), self.iqtx .take() .ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?, self.ingest_opts.scylla_config_st().clone(), self.ingest_opts.scylla_config_mt().clone(), self.ingest_opts.scylla_config_lt().clone(), + self.ingest_opts.postgresql_config().clone(), ); let rres = Arc::new(rres); let metrics_jh = { diff --git a/dbpg/Cargo.toml b/dbpg/Cargo.toml index 8cd62e7..34fd69d 100644 --- a/dbpg/Cargo.toml +++ b/dbpg/Cargo.toml @@ -2,7 +2,7 @@ name = "dbpg" version = "0.0.1" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [lib] doctest = false @@ -22,3 +22,4 @@ async-channel = "2.1.1" md-5 = "0.10.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1" +autoerr = "0.0.3" diff --git a/dbpg/src/confbyseries.rs b/dbpg/src/confbyseries.rs new file mode 100644 index 0000000..d4c6b94 --- /dev/null +++ b/dbpg/src/confbyseries.rs @@ -0,0 +1,50 @@ +use crate::conn::PgClient; +use netpod::ChannelConfigResponse; +use netpod::DaqbufChannelConfig; +use netpod::ScalarType; +use netpod::SeriesKind; +use netpod::Shape; +use series::SeriesId; +use tokio_postgres::types::Type as PgTy; + +autoerr::create_error_v1!( + name(Error, "ConfigBySeries"), + enum variants { + Postgres(#[from] tokio_postgres::Error), + Netpod(#[from] netpod::Error), + }, +); + +pub async fn channel_config_by_series(pg: &PgClient, series: SeriesId) -> Result, Error> { + let sql = concat!( + "select t.series, t.facility, t.kind, t.scalar_type, t.shape_dims, t.channel", + " from series_by_channel t", + " where t.series = $1", + " limit 10", + ); + let qu_select = pg.prepare_typed(sql, &[PgTy::INT8]).await?; + let id = series.id() as i64; + let res = pg.query(&qu_select, &[&id]).await?; + for row in res { + let series: i64 = row.try_get(0)?; + let backend = row.try_get(1)?; + let kind = row.try_get(2)?; + let scalar_type = row.try_get(3)?; + let shape_dims: Vec<_> = row.try_get(4)?; + let name = row.try_get(5)?; + let series = series as u64; + let series_kind = SeriesKind::from_db_i16(kind)?; + let scalar_type = ScalarType::from_scylla_i32(scalar_type)?; + let shape = Shape::from_scylla_shape_dims(&shape_dims)?; + let conf = DaqbufChannelConfig { + backend, + series, + kind: series_kind, + scalar_type, + shape, + name, + }; + return Ok(Some(conf)); + } + Ok(None) +} diff --git a/dbpg/src/lib.rs b/dbpg/src/lib.rs index 0b7eba0..8927034 100644 --- a/dbpg/src/lib.rs +++ b/dbpg/src/lib.rs @@ -1,3 +1,4 @@ +pub mod confbyseries; pub mod conn; pub mod err; pub mod findaddr; @@ -5,7 +6,6 @@ pub mod iocindex; pub mod pool; pub mod schema; pub mod seriesbychannel; -pub mod seriesid; pub mod testerr; pub use tokio_postgres as postgres; diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 38ca596..064910c 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -13,6 +13,7 @@ use netpod::Database; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; +use serde::Serialize; use series::SeriesId; use stats::SeriesByChannelStats; use std::pin::Pin; @@ -102,7 +103,7 @@ impl fmt::Debug for ChannelInfoQuery { } } -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct ChannelInfoResult { pub backend: String, pub channel: String, @@ -121,7 +122,7 @@ enum MatchingSeries { Latest(SeriesId), } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize)] pub enum RegisteredSeries { Created(SeriesId), Updated(SeriesId), diff --git a/dbpg/src/seriesid.rs b/dbpg/src/seriesid.rs deleted file mode 100644 index 4d911f9..0000000 --- a/dbpg/src/seriesid.rs +++ /dev/null @@ -1,12 +0,0 @@ -use err::thiserror; -use err::ThisError; - -// TODO still needed? -#[derive(Debug, ThisError)] -#[cstm(name = "PgSeriesId")] -pub enum Error { - Postgres(#[from] tokio_postgres::Error), - IocAddrNotFound, - BadIdGenerated, - CanNotInsertSeriesId, -} diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index c89da0b..777a7a0 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -2,7 +2,7 @@ name = "netfetch" version = "0.0.3" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [lib] doctest = false @@ -25,7 +25,7 @@ hex = "0.4.3" regex = "1.8.4" axum = "0.8.1" http-body = "1" -url = "2.2" +url = "2.5" chrono = "0.4" humantime = "2.1.0" humantime-serde = "1.1.1" diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 7680703..be2a0b0 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -335,16 +335,19 @@ pub struct RoutesResources { scyconf_st: ScyllaIngestConfig, scyconf_mt: ScyllaIngestConfig, scyconf_lt: ScyllaIngestConfig, + pgconf: netpod::Database, } impl RoutesResources { pub fn new( backend: String, worker_tx: Sender, + series_conf_by_id_tx: Sender<()>, iqtx: InsertQueuesTx, scyconf_st: ScyllaIngestConfig, scyconf_mt: ScyllaIngestConfig, scyconf_lt: ScyllaIngestConfig, + pgconf: netpod::Database, ) -> Self { Self { backend, @@ -353,6 +356,7 @@ impl RoutesResources { scyconf_st, scyconf_mt, scyconf_lt, + pgconf, } } } @@ -416,17 +420,7 @@ fn make_routes( ) .nest( "/ingest", - Router::new().route( - "/v1", - post({ - let rres = rres.clone(); - move |(headers, params, body): ( - HeaderMap, - Query>, - axum::body::Body, - )| { ingest::post_v01((headers, params, body), rres) } - }), - ), + make_routes_ingest(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()), ) .nest( "/private", @@ -560,6 +554,40 @@ fn make_routes_channel( ) } +fn make_routes_ingest( + rres: Arc, + dcom: Arc, + connset_cmd_tx: Sender, + stats_set: StatsSet, +) -> axum::Router { + use axum::extract; + use axum::routing::{get, post, put}; + use axum::Router; + use http::StatusCode; + Router::new() + .nest( + "/private", + Router::new().route( + "/write", + put({ + let rres = rres.clone(); + move |(headers, params, body): (HeaderMap, Query>, axum::body::Body)| { + ingest::write_v02::write_with_fresh_msps((headers, params, body), rres) + } + }), + ), + ) + .route( + "/v1", + post({ + let rres = rres.clone(); + move |(headers, params, body): (HeaderMap, Query>, axum::body::Body)| { + ingest::post_v01((headers, params, body), rres) + } + }), + ) +} + pub async fn metrics_service( bind_to: String, dcom: Arc, diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index 4772a47..3ff1a5b 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -1,3 +1,5 @@ +pub mod write_v02; + use super::RoutesResources; use axum::extract::FromRequest; use axum::extract::Query; @@ -214,7 +216,10 @@ async fn post_v01_try( debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); let mut iqtx = rres.iqtx.clone(); - let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic)); + let mut frames = FramedBytesStream::new( + body.into_data_stream() + .map_err(|_| streams::framed_bytes::Error::DataInput), + ); loop { let x = timeout(Duration::from_millis(2000), frames.try_next()).await; let x = match x { diff --git a/netfetch/src/metrics/ingest/write_v02.rs b/netfetch/src/metrics/ingest/write_v02.rs new file mode 100644 index 0000000..cc23247 --- /dev/null +++ b/netfetch/src/metrics/ingest/write_v02.rs @@ -0,0 +1,642 @@ +use crate::metrics::RoutesResources; +use axum::extract::FromRequest; +use axum::extract::Query; +use axum::http::HeaderMap; +use axum::Json; +use bytes::Bytes; +use core::fmt; +use dbpg::seriesbychannel::ChannelInfoQuery; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use items_2::binning::container_events::ContainerEvents; +use items_2::binning::container_events::EventValueType; +use netpod::log; +use netpod::ttl::RetentionTime; +use netpod::DaqbufChannelConfig; +use netpod::EnumVariant; +use netpod::ScalarType; +use netpod::SeriesKind; +use netpod::Shape; +use netpod::TsNano; +use netpod::APP_CBOR_FRAMED; +use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::ArrayValue; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::ScalarValue; +use serde::Deserialize; +use series::SeriesId; +use serieswriter::msptool::MspSplit; +use serieswriter::writer::EmittableType; +use serieswriter::writer::SeriesWriter; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::io::Cursor; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use std::time::SystemTime; +use streams::framed_bytes::FramedBytesStream; +use taskrun::tokio::time::timeout; + +macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*) } ); } + +macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*) } ); } + +macro_rules! debug_setup { ($($arg:expr),*) => ( if true { log::debug!($($arg),*) } ); } + +macro_rules! trace_input { ($($arg:expr),*) => ( if true { log::trace!($($arg),*) } ); } + +macro_rules! trace_queues { ($($arg:expr),*) => ( if true { log::trace!($($arg),*) } ); } + +autoerr::create_error_v1!( + name(Error, "MetricsIngestV02Write"), + enum variants { + UnsupportedContentType, + Logic, + SeriesWriter(#[from] serieswriter::writer::Error), + MissingChannelName, + MissingScalarType, + MissingShape, + MissingSeriesId, + SendError, + Decode, + FramedBytes(#[from] streams::framed_bytes::Error), + InsertQueues(#[from] scywr::insertqueues::Error), + Serde(#[from] serde_json::Error), + Parse(String), + NotSupported, + WorkerQueueMissing, + ConfigLookup, + Postgres(#[from] dbpg::err::Error), + TaskJoin(#[from] taskrun::tokio::task::JoinError), + ConfBySeries(#[from] dbpg::confbyseries::Error), + }, +); + +type ValueSeriesWriter = SeriesWriter; + +struct WritableTypeState { + series: SeriesId, + msp_split_data: MspSplit, +} + +impl WritableTypeState { + fn new(series: SeriesId) -> Self { + Self { + series, + msp_split_data: MspSplit::new(10000, 1024 * 256), + } + } +} + +#[derive(Debug, Clone)] +struct WritableType(TsNano, DataValue); + +impl EmittableType for WritableType { + type State = WritableTypeState; + + fn ts(&self) -> TsNano { + self.0.clone() + } + + fn has_change(&self, k: &Self) -> bool { + true + } + + fn byte_size(&self) -> u32 { + 8 + self.1.byte_size() + } + + fn into_query_item( + self, + ts_net: Instant, + tsev: TsNano, + state: &mut ::State, + ) -> serieswriter::writer::EmitRes { + let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size()); + let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem { + series: state.series.clone(), + ts_msp: ts_msp.to_ts_ms(), + ts_lsp, + val: self.1.clone(), + ts_net, + }); + let mut items = smallvec::SmallVec::new(); + items.push(item); + if ts_msp_chg { + items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new( + state.series.clone(), + ts_msp.to_ts_ms(), + ts_net, + ))); + } + serieswriter::writer::EmitRes { + items, + bytes: self.byte_size(), + status: 0, + } + } +} + +fn evpush_dim0( + frame: &Bytes, + deque: &mut VecDeque, + writer: &mut ValueSeriesWriter, + f1: F1, +) -> Result<(), Error> +where + T: EventValueType, + F1: Fn(::IterTy1<'_>) -> DataValue, +{ + let evs: ContainerEvents = ciborium::de::from_reader(Cursor::new(frame)) + .map_err(|e| { + error!("cbor decode error {e}"); + }) + .map_err(|_| Error::Decode)?; + // trace_input!("see events {:?}", evs); + let stnow = SystemTime::now(); + let tsev = TsNano::from_system_time(stnow); + let tsnow = Instant::now(); + let mut emit_state = WritableTypeState::new(writer.sid()); + for (i, (ts, val)) in evs.iter_zip().enumerate() { + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = f1(val); + writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; + } + Ok(()) +} + +fn evpush_dim0_enum( + frame: &Bytes, + deque: &mut VecDeque, + writer: &mut ValueSeriesWriter, +) -> Result<(), Error> { + let evs: ContainerEvents = ciborium::de::from_reader(Cursor::new(frame)) + .map_err(|e| { + error!("cbor decode error {e}"); + }) + .map_err(|_| Error::Decode)?; + // trace_input!("see events {:?}", evs); + let stnow = SystemTime::now(); + let tsev = TsNano::from_system_time(stnow); + let tsnow = Instant::now(); + let mut emit_state = WritableTypeState::new(writer.sid()); + for (i, (ts, val)) in evs.iter_zip().enumerate() { + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = DataValue::Scalar(ScalarValue::Enum(val.ix as i16, val.name.into())); + writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; + } + Ok(()) +} + +fn evpush_dim1( + frame: &Bytes, + deque: &mut VecDeque, + writer: &mut ValueSeriesWriter, + f1: F1, +) -> Result<(), Error> +where + Vec: EventValueType, + F1: Fn( as EventValueType>::IterTy1<'_>) -> DataValue, +{ + let evs: ContainerEvents> = ciborium::de::from_reader(Cursor::new(frame)) + .map_err(|e| { + error!("cbor decode error {e}"); + }) + .map_err(|_| Error::Decode)?; + trace_input!("see events {:?}", evs); + error!("TODO require timestamp in input format"); + let stnow = SystemTime::now(); + let tsev = TsNano::from_system_time(stnow); + let tsnow = Instant::now(); + let mut emit_state = WritableTypeState::new(writer.sid()); + for (i, (ts, val)) in evs.iter_zip().enumerate() { + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = f1(val); + writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?; + } + Ok(()) +} + +fn frame_write( + frame: &Bytes, + scalar_type: ScalarType, + shape: Shape, + writer: &mut SeriesWriter, + deque: &mut VecDeque, +) -> Result<(), Error> { + match &shape { + Shape::Scalar => match &scalar_type { + ScalarType::U8 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U8(x as _)))?; + } + ScalarType::U16 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U16(x as _)))?; + } + ScalarType::U32 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U32(x as _)))?; + } + ScalarType::U64 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U64(x as _)))?; + } + ScalarType::I8 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I8(x)))?; + } + ScalarType::I16 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I16(x)))?; + } + ScalarType::I32 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I32(x)))?; + } + ScalarType::I64 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I64(x)))?; + } + ScalarType::F32 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::F32(x)))?; + } + ScalarType::F64 => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?; + } + ScalarType::BOOL => { + evpush_dim0::(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::Bool(x)))?; + } + ScalarType::STRING => { + evpush_dim0::(&frame, deque, writer, |x| { + DataValue::Scalar(ScalarValue::String(x.into())) + })?; + } + ScalarType::Enum => { + evpush_dim0_enum(&frame, deque, writer)?; + } + }, + Shape::Wave(_) => match &scalar_type { + ScalarType::U8 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U8(x)))?; + } + ScalarType::U16 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U16(x)))?; + } + ScalarType::U32 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U32(x)))?; + } + ScalarType::U64 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U64(x)))?; + } + ScalarType::I8 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I8(x)))?; + } + ScalarType::I16 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I16(x)))?; + } + ScalarType::I32 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I32(x)))?; + } + ScalarType::I64 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I64(x)))?; + } + ScalarType::F32 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::F32(x)))?; + } + ScalarType::F64 => { + evpush_dim1::(&frame, deque, writer, |x| DataValue::Array(ArrayValue::F64(x)))?; + } + ScalarType::BOOL => return Err(Error::NotSupported), + ScalarType::STRING => return Err(Error::NotSupported), + ScalarType::Enum => return Err(Error::NotSupported), + }, + Shape::Image(_, _) => return Err(Error::NotSupported), + } + Ok(()) +} + +async fn write_with_fresh_msps_inner( + headers: HeaderMap, + params: HashMap, + body: axum::body::Body, + rres: Arc, +) -> Result, Error> { + if let Some(ct) = headers.get("content-type") { + if let Ok(s) = ct.to_str() { + if s == APP_CBOR_FRAMED { + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + }; + debug_setup!("params {:?}", params); + let stnow = SystemTime::now(); + let worker_tx = rres.worker_tx.clone(); + let backend = rres.backend.clone(); + let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into(); + let s = params.get("scalarType").ok_or(Error::MissingScalarType)?; + let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?; + let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?; + let rt: RetentionTime = params + .get("retentionTime") + .and_then(|x| x.parse().ok()) + .unwrap_or(RetentionTime::Short); + debug_setup!( + "establishing series writer for {:?} {:?} {:?} {:?}", + channel, + scalar_type, + shape, + rt + ); + let (tx, rx) = async_channel::bounded(8); + let qu = ChannelInfoQuery { + backend, + channel, + kind: SeriesKind::ChannelData, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + tx: Box::pin(tx), + }; + rres.worker_tx.send(qu).await.map_err(|_| Error::WorkerQueueMissing)?; + let chinfo = rx + .recv() + .await + .map_err(|_| Error::ConfigLookup)? + .map_err(|_| Error::ConfigLookup)?; + let mut writer = SeriesWriter::new(chinfo.series.to_series())?; + debug_setup!("series writer established"); + let mut iqdqs = InsertDeques::new(); + let mut iqtx = rres.iqtx.clone(); + let mut frames = FramedBytesStream::new( + body.into_data_stream() + .map_err(|_| streams::framed_bytes::Error::DataInput), + ); + loop { + let x = timeout(Duration::from_millis(2000), frames.try_next()).await; + let x = match x { + Ok(x) => x, + Err(_) => { + tick_writers(&mut writer, &mut iqdqs, rt.clone())?; + continue; + } + }; + let frame = match x? { + Some(x) => x, + None => { + trace_input!("input stream done"); + break; + } + }; + trace_input!("got frame len {}", frame.len()); + let deque = iqdqs.deque(rt.clone()); + frame_write(&frame, scalar_type.clone(), shape.clone(), &mut writer, deque)?; + trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary()); + iqtx.send_all(&mut iqdqs).await?; + trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary()); + tick_writers(&mut writer, &mut iqdqs, rt.clone())?; + trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary()); + } + + trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary()); + iqtx.send_all(&mut iqdqs).await?; + trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary()); + finish_writers(&mut writer, &mut iqdqs, rt.clone())?; + trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary()); + + let ret = Json(serde_json::json!({})); + Ok(ret) +} + +pub async fn write_with_fresh_msps( + (headers, Query(params), body): (HeaderMap, Query>, axum::body::Body), + rres: Arc, +) -> Json { + match write_with_fresh_msps_inner(headers, params, body, rres).await { + Ok(k) => k, + Err(e) => Json(serde_json::json!({ + "error": e.to_string(), + })), + } +} + +fn tick_writers(writer: &mut ValueSeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> { + writer.tick(deques.deque(rt))?; + Ok(()) +} + +fn finish_writers(writer: &mut ValueSeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> { + writer.tick(deques.deque(rt))?; + Ok(()) +} + +async fn register_series( + headers: HeaderMap, + params: HashMap, + body: axum::body::Body, + rres: Arc, +) -> Result, Error> { + if let Some(ct) = headers.get("content-type") { + if let Ok(s) = ct.to_str() { + if s == APP_CBOR_FRAMED { + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + }; + debug_setup!("params {:?}", params); + let stnow = SystemTime::now(); + let worker_tx = rres.worker_tx.clone(); + let backend = rres.backend.clone(); + let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into(); + let s = params.get("scalarType").ok_or(Error::MissingScalarType)?; + let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?; + let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?; + debug_setup!("register series {:?} {:?} {:?}", channel, scalar_type, shape); + let (tx, rx) = async_channel::bounded(8); + let qu = ChannelInfoQuery { + backend, + channel, + kind: SeriesKind::ChannelData, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + tx: Box::pin(tx), + }; + rres.worker_tx.send(qu).await.map_err(|_| Error::WorkerQueueMissing)?; + let chinfo = rx + .recv() + .await + .map_err(|_| Error::ConfigLookup)? + .map_err(|_| Error::ConfigLookup)?; + let ret = serde_json::json!({ + "register_series": { + "status": "ok", + "chinfo": chinfo, + "seriesId": chinfo.series.to_series().id(), + } + }); + let ret = Json(ret); + Ok(ret) +} + +async fn write_msp( + headers: HeaderMap, + params: HashMap, + body: axum::body::Body, + rres: Arc, +) -> Result, Error> { + if let Some(ct) = headers.get("content-type") { + if let Ok(s) = ct.to_str() { + if s == APP_CBOR_FRAMED { + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + }; + debug_setup!("params {:?}", params); + let stnow = SystemTime::now(); + let worker_tx = rres.worker_tx.clone(); + let backend = rres.backend.clone(); + let series_id: u64 = params + .get("seriesId") + .ok_or(Error::MissingSeriesId)? + .parse() + .map_err(|_| Error::MissingSeriesId)?; + let series = SeriesId::new(series_id); + let rt: RetentionTime = params + .get("retentionTime") + .and_then(|x| x.parse().ok()) + .unwrap_or(RetentionTime::Short); + let (conn, pgjh) = dbpg::conn::make_pg_client(&rres.pgconf).await?; + let conf = dbpg::confbyseries::channel_config_by_series(&conn, series).await?; + drop(conn); + pgjh.await?; + match conf { + Some(conf) => write_events_exact_2(conf, rt, body, rres).await, + None => { + info!("series id not found {:?}", series); + let ret = serde_json::json!({ + "write_events_exact": { + "status": "SeriesIdNotFound", + } + }); + let ret = Json(ret); + Ok(ret) + } + } +} + +async fn write_events_exact_2( + conf: DaqbufChannelConfig, + rt: RetentionTime, + body: axum::body::Body, + rres: Arc, +) -> Result, Error> { + debug_setup!("write_events_exact {:?} {:?}", conf, rt); + let series = SeriesId::new(conf.series); + let mut writer = SeriesWriter::new(series)?; + debug_setup!("series writer established"); + let mut iqdqs = InsertDeques::new(); + let mut iqtx = rres.iqtx.clone(); + let mut frames = FramedBytesStream::new( + body.into_data_stream() + .map_err(|_| streams::framed_bytes::Error::DataInput), + ); + loop { + match timeout(Duration::from_millis(2000), frames.try_next()).await { + Ok(k) => match k? { + Some(frame) => { + trace_input!("got frame len {}", frame.len()); + let deque = iqdqs.deque(rt.clone()); + frame_write(&frame, conf.scalar_type.clone(), conf.shape.clone(), &mut writer, deque)?; + trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary()); + iqtx.send_all(&mut iqdqs).await?; + trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary()); + tick_writers(&mut writer, &mut iqdqs, rt.clone())?; + trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary()); + } + None => { + trace_input!("input stream done"); + break; + } + }, + Err(_) => { + tick_writers(&mut writer, &mut iqdqs, rt.clone())?; + continue; + } + } + } + trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary()); + iqtx.send_all(&mut iqdqs).await?; + trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary()); + finish_writers(&mut writer, &mut iqdqs, rt.clone())?; + trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary()); + let ret = serde_json::json!({ + "write_events_exact": { + "status": "ok", + "seriesId": series.id(), + } + }); + let ret = Json(ret); + Ok(ret) +} + +async fn write_events_exact( + headers: HeaderMap, + params: HashMap, + body: axum::body::Body, + rres: Arc, +) -> Result, Error> { + if let Some(ct) = headers.get("content-type") { + if let Ok(s) = ct.to_str() { + if s == APP_CBOR_FRAMED { + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + }; + debug_setup!("params {:?}", params); + let stnow = SystemTime::now(); + let worker_tx = rres.worker_tx.clone(); + let backend = rres.backend.clone(); + let series_id: u64 = params + .get("seriesId") + .ok_or(Error::MissingSeriesId)? + .parse() + .map_err(|_| Error::MissingSeriesId)?; + let series = SeriesId::new(series_id); + let rt: RetentionTime = params + .get("retentionTime") + .and_then(|x| x.parse().ok()) + .unwrap_or(RetentionTime::Short); + let (conn, pgjh) = dbpg::conn::make_pg_client(&rres.pgconf).await?; + let conf = dbpg::confbyseries::channel_config_by_series(&conn, series).await?; + drop(conn); + pgjh.await?; + match conf { + Some(conf) => write_events_exact_2(conf, rt, body, rres).await, + None => { + info!("series id not found {:?}", series); + let ret = serde_json::json!({ + "write_events_exact": { + "status": "SeriesIdNotFound", + } + }); + let ret = Json(ret); + Ok(ret) + } + } +} diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index b11fd7f..2da1185 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -2,19 +2,20 @@ name = "scywr" version = "0.0.1" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [dependencies] futures-util = "0.3.28" async-channel = "2.3.1" -scylla = "0.15.0" +scylla = "0.15.1" smallvec = "1.11.0" pin-project = "1.1.5" -bytes = "1.7.1" +bytes = "1.10.0" autoerr = "0.0.3" serde = { version = "1", features = ["derive"] } log = { path = "../log" } stats = { path = "../stats" } series = { path = "../../daqbuf-series", package = "daqbuf-series" } netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } +scydb = { path = "../../daqbuf-scydb", package = "daqbuf-scydb" } taskrun = { path = "../../daqbuffer/crates/taskrun" } diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 59d0be1..09e6608 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -27,43 +27,71 @@ pub struct InsertQueuesTx { pub lt_rf3_lat5_tx: Sender>, } +async fn send_nonempty(qu: &mut VecDeque, tx: &Sender>) -> Result<(), Error> { + let item = core::mem::replace(qu, VecDeque::new()); + if item.len() != 0 { + tx.send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?; + } + Ok(()) +} + impl InsertQueuesTx { /// Send all accumulated batches pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - { - let item = core::mem::replace(&mut iqdqs.st_rf1_qu, VecDeque::new()); - self.st_rf1_tx - .send(item) - .await - .map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?; - } - { - let item = core::mem::replace(&mut iqdqs.st_rf3_qu, VecDeque::new()); - self.st_rf3_tx - .send(item) - .await - .map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?; - } - { - let item = core::mem::replace(&mut iqdqs.mt_rf3_qu, VecDeque::new()); - self.mt_rf3_tx - .send(item) - .await - .map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?; - } - { - let item = core::mem::replace(&mut iqdqs.lt_rf3_qu, VecDeque::new()); - self.lt_rf3_tx - .send(item) - .await - .map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?; - } - { - let item = core::mem::replace(&mut iqdqs.lt_rf3_lat5_qu, VecDeque::new()); - self.lt_rf3_tx - .send(item) - .await - .map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?; + if true { + send_nonempty(&mut iqdqs.st_rf1_qu, &self.st_rf1_tx).await?; + send_nonempty(&mut iqdqs.st_rf3_qu, &self.st_rf3_tx).await?; + send_nonempty(&mut iqdqs.mt_rf3_qu, &self.mt_rf3_tx).await?; + send_nonempty(&mut iqdqs.lt_rf3_qu, &self.lt_rf3_tx).await?; + send_nonempty(&mut iqdqs.lt_rf3_lat5_qu, &self.lt_rf3_tx).await?; + } else { + { + let item = core::mem::replace(&mut iqdqs.st_rf1_qu, VecDeque::new()); + if item.len() != 0 { + self.st_rf1_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?; + } + } + { + let item = core::mem::replace(&mut iqdqs.st_rf3_qu, VecDeque::new()); + if item.len() != 0 { + self.st_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?; + } + } + { + let item = core::mem::replace(&mut iqdqs.mt_rf3_qu, VecDeque::new()); + if item.len() != 0 { + self.mt_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?; + } + } + { + let item = core::mem::replace(&mut iqdqs.lt_rf3_qu, VecDeque::new()); + if item.len() != 0 { + self.lt_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?; + } + } + { + let item = core::mem::replace(&mut iqdqs.lt_rf3_lat5_qu, VecDeque::new()); + if item.len() != 0 { + self.lt_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?; + } + } } Ok(()) } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index b2cb15f..c57e161 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -1,6 +1,6 @@ use crate::config::ScyllaIngestConfig; -use crate::session::create_session_no_ks; use crate::session::ScySession; +use crate::session::create_session_no_ks; use futures_util::StreamExt; use futures_util::TryStreamExt; use log::*; @@ -20,7 +20,9 @@ autoerr::create_error_v1!( ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), ScyllaTypecheck(#[from] scylla::deserialize::TypeCheckError), MissingData, - AddColumnImpossible, + AddColumnExists(String, String, String), + AddColumnPk(String, String, String, String), + AddColumnCk(String, String, String, String), BadSchema, }, ); @@ -35,16 +37,16 @@ impl From for Error { struct Changeset { do_change: bool, - would_do: Vec, - done: Vec, + todo: Vec, + cql_done: Vec, } impl Changeset { fn new() -> Self { Self { do_change: false, - would_do: Vec::new(), - done: Vec::new(), + todo: Vec::new(), + cql_done: Vec::new(), } } @@ -58,27 +60,16 @@ impl Changeset { self.do_change } - fn add_would_do(&mut self, cql: String) { - self.would_do.push(cql); + fn add_todo(&mut self, cql: String) { + self.todo.push(cql); } - fn add_done(&mut self, cql: String) { - self.done.push(cql); - } - - fn differs(&self) -> bool { - if self.would_do.len() != 0 { - true - } else { - false - } + fn has_to_do(&self) -> bool { + if self.todo.len() != 0 { true } else { false } } fn log_statements(&self) { - for q in &self.done { - info!("DONE {q}"); - } - for q in &self.would_do { + for q in &self.todo { info!("WOULD DO {q}"); } } @@ -231,25 +222,17 @@ impl GenTwcsTab { } async fn setup(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> { - self.create_if_missing(chs, scy).await?; - self.check_table_options(chs, scy).await?; - self.check_columns(chs, scy).await?; + if self.has_table_name(scy).await? { + self.check_table_options(chs, scy).await?; + self.check_columns(chs, scy).await?; + } else { + chs.add_todo(self.cql()); + } Ok(()) } - async fn create_if_missing(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> { - // TODO check for more details (all columns, correct types, correct kinds, etc) - if !has_table(self.name(), scy).await? { - let cql = self.cql(); - if chs.do_change() { - info!("scylla create table {} {}", self.name(), cql); - scy.query_unpaged(cql.clone(), ()).await?; - chs.add_done(cql); - } else { - chs.add_would_do(cql); - } - } - Ok(()) + async fn has_table_name(&self, scy: &ScySession) -> Result { + has_table(self.name(), scy).await } fn cql(&self) -> String { @@ -278,21 +261,24 @@ impl GenTwcsTab { write!(s, " ({})", cols).unwrap(); write!( s, - " with default_time_to_live = {}", - self.default_time_to_live.as_secs() + " with default_time_to_live = {}, gc_grace_seconds = {}", + self.default_time_to_live.as_secs(), + self.gc_grace.as_secs() ) .unwrap(); s.write_str(" and compaction = { ").unwrap(); - write!( - s, - concat!( - "'class': 'TimeWindowCompactionStrategy'", - ", 'compaction_window_unit': 'MINUTES'", - ", 'compaction_window_size': {}", - ), - self.compaction_window_size.as_secs() / 60 - ) - .unwrap(); + { + let mut s2 = String::new(); + // TODO merge with builder code in check_table_options + for e in self.compaction_options() { + if s2.len() != 0 { + s2.push_str(", "); + } + let op = format!("'{}': '{}'", e.0, e.1); + s2.push_str(&op); + } + s.write_str(&s2).unwrap(); + } s.write_str(" }").unwrap(); s } @@ -331,6 +317,11 @@ impl GenTwcsTab { set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs())); } if row.2 != self.compaction_options() { + info!( + "compaction options differ {:?} vs {:?}", + row.2, + self.compaction_options() + ); let params: Vec<_> = self .compaction_options() .iter() @@ -341,16 +332,10 @@ impl GenTwcsTab { } if set_opts.len() != 0 { let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and ")); - if chs.do_change() { - info!("EXECUTE {cql}"); - scy.query_unpaged(cql.clone(), ()).await?; - chs.add_done(cql); - } else { - chs.add_would_do(cql); - } + chs.add_todo(cql); } } else { - return Err(Error::MissingData); + chs.add_todo(self.cql()); } Ok(()) } @@ -384,32 +369,36 @@ impl GenTwcsTab { ct, ty2 ); - return Err(Error::AddColumnImpossible); + return Err(Error::AddColumnExists(cn.into(), ct.into(), ty2.into())); } } else { if self.partition_keys.contains(cn) { error!("pk {} {}", cn, ct); - return Err(Error::AddColumnImpossible); + return Err(Error::AddColumnPk( + self.keyspace().into(), + self.name().into(), + cn.into(), + ct.into(), + )); } if self.cluster_keys.contains(cn) { error!("ck {} {}", cn, ct); - return Err(Error::AddColumnImpossible); + return Err(Error::AddColumnCk( + self.keyspace().into(), + self.name().into(), + cn.into(), + ct.into(), + )); } - self.add_column(cn, ct, chs, scy).await?; + self.add_column(cn, ct, chs).await?; } } Ok(()) } - async fn add_column(&self, name: &str, ty: &str, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> { + async fn add_column(&self, name: &str, ty: &str, chs: &mut Changeset) -> Result<(), Error> { let cql = format!(concat!("alter table {} add {} {}"), self.name(), name, ty); - if chs.do_change() { - info!("EXECUTE add_column CQL {}", cql); - scy.query_unpaged(cql.clone(), ()).await?; - chs.add_done(cql); - } else { - chs.add_would_do(cql); - } + chs.add_todo(cql); Ok(()) } } @@ -673,6 +662,25 @@ pub async fn migrate_scylla_data_schema( ); tab.setup(chs, scy).await?; } + { + let tab = GenTwcsTab::new( + ks, + rett.table_prefix(), + "bin_write_index_v00", + &[ + ("series", "bigint"), + ("div", "int"), + ("quo", "bigint"), + ("rem", "int"), + ("rt", "int"), + ("binlen", "int"), + ], + ["series", "div", "quo"], + ["rem", "rt", "binlen"], + rett.ttl_binned(), + ); + tab.setup(chs, scy).await?; + } { let tab = GenTwcsTab::new( ks, @@ -733,9 +741,17 @@ pub async fn migrate_scylla_data_schema( tab.setup(chs, scy).await?; } - if chs.differs() { - chs.log_statements(); - Err(Error::BadSchema) + if chs.has_to_do() { + if do_change { + for cql in chs.todo.iter() { + scy.query_unpaged(cql.as_str(), ()).await?; + } + let fut = migrate_scylla_data_schema(scyconf, rett, false); + Box::pin(fut).await + } else { + chs.log_statements(); + Err(Error::BadSchema) + } } else { Ok(()) } diff --git a/serieswriter/Cargo.toml b/serieswriter/Cargo.toml index 6d9e504..bf3b974 100644 --- a/serieswriter/Cargo.toml +++ b/serieswriter/Cargo.toml @@ -2,7 +2,7 @@ name = "serieswriter" version = "0.0.2" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [dependencies] serde = { version = "1.0", features = ["derive"] } diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index b398415..6568644 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -1,4 +1,4 @@ -use crate::log::*; +use crate::log; use crate::rtwriter::MinQuiets; use items_0::timebin::BinnedBinsTimeweightTrait; use items_0::timebin::BinnedEventsTimeweightTrait; @@ -9,26 +9,28 @@ use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight; use items_2::binning::timeweight::timeweight_bins_lazy::BinnedBinsTimeweightLazy; use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight; use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightLazy; -use netpod::ttl::RetentionTime; use netpod::BinnedRange; use netpod::DtMs; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinSimpleF32V02; -use series::msp::PrebinnedPartitioning; use series::ChannelStatusSeriesId; use series::SeriesId; +use series::msp::PrebinnedPartitioning; use std::time::Duration; -macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -macro_rules! trace_tick { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } +macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) } -macro_rules! debug_bin2 { ($t:expr, $($arg:expr),*) => ( if true { if $t { debug!($($arg),*); } } ) } -macro_rules! trace_bin2 { ($t:expr, $($arg:expr),*) => ( if false { if $t { trace!($($arg),*); } } ) } +macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_tick { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } + +macro_rules! debug_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::debug!($($arg),*); } } ) } +macro_rules! trace_bin { ($t:expr, $($arg:expr),*) => ( if false { if $t { log::trace!($($arg),*); } } ) } autoerr::create_error_v1!( name(Error, "SerieswriterBinwriter"), @@ -62,11 +64,6 @@ fn bin_len_clamp(dur: DtMs) -> PrebinnedPartitioning { } } -fn get_div(pbp: PrebinnedPartitioning) -> Result { - let ret = pbp.msp_div(); - Ok(ret) -} - #[derive(Debug, Clone)] enum WriteCntZero { Enable, @@ -82,6 +79,30 @@ impl WriteCntZero { } } +#[derive(Debug)] +struct IndexWritten { + last: (u32, u64, u32), +} + +impl IndexWritten { + fn new() -> Self { + Self { last: (0, 0, 0) } + } + + fn should_write(&self, div: u32, quo: u64, rem: u32) -> bool { + let (div0, quo0, rem0) = self.last; + if div0 == 0 || quo0 != quo || rem0 != rem { + true + } else { + false + } + } + + fn mark_written(&mut self, div: u32, quo: u64, rem: u32) { + self.last = (div, quo, rem); + } +} + #[derive(Debug)] pub struct BinWriter { chname: String, @@ -92,6 +113,7 @@ pub struct BinWriter { evbuf: ContainerEvents, binner_1st: Option<(RetentionTime, BinnedEventsTimeweight, WriteCntZero)>, binner_others: Vec<(RetentionTime, BinnedBinsTimeweight, WriteCntZero)>, + index_written: IndexWritten, trd: bool, } @@ -108,7 +130,7 @@ impl BinWriter { ) -> Result { let trd = series::dbg::dbg_chn(&chname); if trd { - debug_bin2!(trd, "enabled debug for {}", chname); + debug_bin!(trd, "enabled debug for {}", chname); } const DUR_ZERO: DtMs = DtMs::from_ms_u64(0); const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 123); @@ -142,16 +164,8 @@ impl BinWriter { if !is_polled && combs.len() > 1 { combs.remove(0); } - // check - for e in combs.iter() { - if get_div(e.1.clone()).is_err() { - info!("unsupported bin length {:?} {:?} {:?}", e.0, e.1, chname); - combs.clear(); - break; - } - } let combs = combs; - debug_bin2!(trd, "{:?} binning combs {:?}", chname, combs); + debug_bin!(trd, "{:?} binning combs {:?}", chname, combs); for (rt, pbp, write_zero) in combs { if binner_1st.is_none() { let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len()); @@ -179,6 +193,7 @@ impl BinWriter { evbuf: ContainerEvents::new(), binner_1st, binner_others, + index_written: IndexWritten::new(), trd, }; let _ = ret.cssid; @@ -255,8 +270,16 @@ impl BinWriter { }; let bins = binner.output(); if bins.len() > 0 { - trace_bin2!(self.trd, "binner_1st out len {}", bins.len()); - Self::handle_output_ready(self.trd, self.sid, rt, &bins, write_zero, iqdqs)?; + trace_bin!(self.trd, "binner_1st out len {}", bins.len()); + Self::handle_output_ready( + self.trd, + self.sid, + rt, + &bins, + write_zero, + &mut self.index_written, + iqdqs, + )?; // TODO avoid boxing let mut bins2: BinsBoxed = Box::new(bins); for i in 0..self.binner_others.len() { @@ -267,9 +290,17 @@ impl BinWriter { match bb { Some(bb) => { if bb.len() > 0 { - trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len()); + trace_bin!(self.trd, "binner_others {} out len {}", i, bb.len()); if let Some(bb2) = bb.as_any_ref().downcast_ref::>() { - Self::handle_output_ready(self.trd, self.sid, rt.clone(), &bb2, write_zero, iqdqs)?; + Self::handle_output_ready( + self.trd, + self.sid, + rt.clone(), + &bb2, + write_zero, + todo!(), + iqdqs, + )?; } else { return Err(Error::UnexpectedContainerType); } @@ -301,6 +332,7 @@ impl BinWriter { rt: RetentionTime, bins: &ContainerBins, write_zero: WriteCntZero, + index_written: &mut IndexWritten, iqdqs: &mut InsertDeques, ) -> Result<(), Error> { let selfname = "handle_output_ready"; @@ -317,7 +349,7 @@ impl BinWriter { info!("zero count bin {:?}", series); } else { let pbp = PrebinnedPartitioning::try_from(bin_len)?; - let div = get_div(pbp)?; + let div = pbp.msp_div(); if div.ns() % bin_len.ns() != 0 { let e = Error::UnsupportedGridDiv(bin_len, div); return Err(e); @@ -337,7 +369,7 @@ impl BinWriter { lst, }); if bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) { - debug_bin2!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item); + debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item); } match rt { RetentionTime::Short => { @@ -350,6 +382,10 @@ impl BinWriter { iqdqs.lt_rf3_qu.push_back(item); } } + + let div = PrebinnedPartitioning::Day1; + series.id(); + ts1.ms() / div.msp_div().ms(); } } Ok(()) diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index b254f68..0ff0b08 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -13,7 +13,6 @@ macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg) autoerr::create_error_v1!( name(Error, "SerieswriterWriter"), enum variants { - DbPgSid(#[from] dbpg::seriesid::Error), ChannelSendError, ChannelRecvError, SeriesLookupError,