WIP binner2

This commit is contained in:
Dominik Werder
2025-02-26 16:31:28 +01:00
parent dd88288ba7
commit 12778fe121
17 changed files with 976 additions and 173 deletions
+2 -2
View File
@@ -1,8 +1,8 @@
[package]
name = "daqingest"
version = "0.2.7-aa.1"
version = "0.2.7-aa.4"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
edition = "2024"
[features]
default = []
+8
View File
@@ -81,6 +81,8 @@ pub struct Daemon {
metrics_shutdown_rx: Receiver<u32>,
metrics_jh: Option<JoinHandle<Result<(), Error>>>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
// TODO
series_conf_by_id_tx: Sender<()>,
iqtx: Option<InsertQueuesTx>,
}
@@ -98,6 +100,9 @@ impl Daemon {
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
// TODO so far a dummy
let (series_conf_by_id_tx, _series_conf_by_id_rx) = async_channel::bounded(16);
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
let local_epics_hostname = ingest_linux::net::local_hostname();
@@ -341,6 +346,7 @@ impl Daemon {
metrics_shutdown_rx,
metrics_jh: None,
channel_info_query_tx,
series_conf_by_id_tx,
iqtx: Some(iqtx2),
};
Ok(ret)
@@ -675,12 +681,14 @@ impl Daemon {
let rres = RoutesResources::new(
self.ingest_opts.backend().into(),
self.channel_info_query_tx.clone(),
self.series_conf_by_id_tx.clone(),
self.iqtx
.take()
.ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?,
self.ingest_opts.scylla_config_st().clone(),
self.ingest_opts.scylla_config_mt().clone(),
self.ingest_opts.scylla_config_lt().clone(),
self.ingest_opts.postgresql_config().clone(),
);
let rres = Arc::new(rres);
let metrics_jh = {
+2 -1
View File
@@ -2,7 +2,7 @@
name = "dbpg"
version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
edition = "2024"
[lib]
doctest = false
@@ -22,3 +22,4 @@ async-channel = "2.1.1"
md-5 = "0.10.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
autoerr = "0.0.3"
+50
View File
@@ -0,0 +1,50 @@
use crate::conn::PgClient;
use netpod::ChannelConfigResponse;
use netpod::DaqbufChannelConfig;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use series::SeriesId;
use tokio_postgres::types::Type as PgTy;
autoerr::create_error_v1!(
name(Error, "ConfigBySeries"),
enum variants {
Postgres(#[from] tokio_postgres::Error),
Netpod(#[from] netpod::Error),
},
);
pub async fn channel_config_by_series(pg: &PgClient, series: SeriesId) -> Result<Option<DaqbufChannelConfig>, Error> {
let sql = concat!(
"select t.series, t.facility, t.kind, t.scalar_type, t.shape_dims, t.channel",
" from series_by_channel t",
" where t.series = $1",
" limit 10",
);
let qu_select = pg.prepare_typed(sql, &[PgTy::INT8]).await?;
let id = series.id() as i64;
let res = pg.query(&qu_select, &[&id]).await?;
for row in res {
let series: i64 = row.try_get(0)?;
let backend = row.try_get(1)?;
let kind = row.try_get(2)?;
let scalar_type = row.try_get(3)?;
let shape_dims: Vec<_> = row.try_get(4)?;
let name = row.try_get(5)?;
let series = series as u64;
let series_kind = SeriesKind::from_db_i16(kind)?;
let scalar_type = ScalarType::from_scylla_i32(scalar_type)?;
let shape = Shape::from_scylla_shape_dims(&shape_dims)?;
let conf = DaqbufChannelConfig {
backend,
series,
kind: series_kind,
scalar_type,
shape,
name,
};
return Ok(Some(conf));
}
Ok(None)
}
+1 -1
View File
@@ -1,3 +1,4 @@
pub mod confbyseries;
pub mod conn;
pub mod err;
pub mod findaddr;
@@ -5,7 +6,6 @@ pub mod iocindex;
pub mod pool;
pub mod schema;
pub mod seriesbychannel;
pub mod seriesid;
pub mod testerr;
pub use tokio_postgres as postgres;
+3 -2
View File
@@ -13,6 +13,7 @@ use netpod::Database;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use serde::Serialize;
use series::SeriesId;
use stats::SeriesByChannelStats;
use std::pin::Pin;
@@ -102,7 +103,7 @@ impl fmt::Debug for ChannelInfoQuery {
}
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct ChannelInfoResult {
pub backend: String,
pub channel: String,
@@ -121,7 +122,7 @@ enum MatchingSeries {
Latest(SeriesId),
}
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum RegisteredSeries {
Created(SeriesId),
Updated(SeriesId),
-12
View File
@@ -1,12 +0,0 @@
use err::thiserror;
use err::ThisError;
// TODO still needed?
#[derive(Debug, ThisError)]
#[cstm(name = "PgSeriesId")]
pub enum Error {
Postgres(#[from] tokio_postgres::Error),
IocAddrNotFound,
BadIdGenerated,
CanNotInsertSeriesId,
}
+2 -2
View File
@@ -2,7 +2,7 @@
name = "netfetch"
version = "0.0.3"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
edition = "2024"
[lib]
doctest = false
@@ -25,7 +25,7 @@ hex = "0.4.3"
regex = "1.8.4"
axum = "0.8.1"
http-body = "1"
url = "2.2"
url = "2.5"
chrono = "0.4"
humantime = "2.1.0"
humantime-serde = "1.1.1"
+39 -11
View File
@@ -335,16 +335,19 @@ pub struct RoutesResources {
scyconf_st: ScyllaIngestConfig,
scyconf_mt: ScyllaIngestConfig,
scyconf_lt: ScyllaIngestConfig,
pgconf: netpod::Database,
}
impl RoutesResources {
pub fn new(
backend: String,
worker_tx: Sender<ChannelInfoQuery>,
series_conf_by_id_tx: Sender<()>,
iqtx: InsertQueuesTx,
scyconf_st: ScyllaIngestConfig,
scyconf_mt: ScyllaIngestConfig,
scyconf_lt: ScyllaIngestConfig,
pgconf: netpod::Database,
) -> Self {
Self {
backend,
@@ -353,6 +356,7 @@ impl RoutesResources {
scyconf_st,
scyconf_mt,
scyconf_lt,
pgconf,
}
}
}
@@ -416,17 +420,7 @@ fn make_routes(
)
.nest(
"/ingest",
Router::new().route(
"/v1",
post({
let rres = rres.clone();
move |(headers, params, body): (
HeaderMap,
Query<HashMap<String, String>>,
axum::body::Body,
)| { ingest::post_v01((headers, params, body), rres) }
}),
),
make_routes_ingest(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()),
)
.nest(
"/private",
@@ -560,6 +554,40 @@ fn make_routes_channel(
)
}
fn make_routes_ingest(
rres: Arc<RoutesResources>,
dcom: Arc<DaemonComm>,
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::extract;
use axum::routing::{get, post, put};
use axum::Router;
use http::StatusCode;
Router::new()
.nest(
"/private",
Router::new().route(
"/write",
put({
let rres = rres.clone();
move |(headers, params, body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body)| {
ingest::write_v02::write_with_fresh_msps((headers, params, body), rres)
}
}),
),
)
.route(
"/v1",
post({
let rres = rres.clone();
move |(headers, params, body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body)| {
ingest::post_v01((headers, params, body), rres)
}
}),
)
}
pub async fn metrics_service(
bind_to: String,
dcom: Arc<DaemonComm>,
+6 -1
View File
@@ -1,3 +1,5 @@
pub mod write_v02;
use super::RoutesResources;
use axum::extract::FromRequest;
use axum::extract::Query;
@@ -214,7 +216,10 @@ async fn post_v01_try(
debug_setup!("series writer established");
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic));
let mut frames = FramedBytesStream::new(
body.into_data_stream()
.map_err(|_| streams::framed_bytes::Error::DataInput),
);
loop {
let x = timeout(Duration::from_millis(2000), frames.try_next()).await;
let x = match x {
+642
View File
@@ -0,0 +1,642 @@
use crate::metrics::RoutesResources;
use axum::extract::FromRequest;
use axum::extract::Query;
use axum::http::HeaderMap;
use axum::Json;
use bytes::Bytes;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::container_events::EventValueType;
use netpod::log;
use netpod::ttl::RetentionTime;
use netpod::DaqbufChannelConfig;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use netpod::APP_CBOR_FRAMED;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::ArrayValue;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Deserialize;
use series::SeriesId;
use serieswriter::msptool::MspSplit;
use serieswriter::writer::EmittableType;
use serieswriter::writer::SeriesWriter;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use streams::framed_bytes::FramedBytesStream;
use taskrun::tokio::time::timeout;
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*) } ); }
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*) } ); }
macro_rules! debug_setup { ($($arg:expr),*) => ( if true { log::debug!($($arg),*) } ); }
macro_rules! trace_input { ($($arg:expr),*) => ( if true { log::trace!($($arg),*) } ); }
macro_rules! trace_queues { ($($arg:expr),*) => ( if true { log::trace!($($arg),*) } ); }
autoerr::create_error_v1!(
name(Error, "MetricsIngestV02Write"),
enum variants {
UnsupportedContentType,
Logic,
SeriesWriter(#[from] serieswriter::writer::Error),
MissingChannelName,
MissingScalarType,
MissingShape,
MissingSeriesId,
SendError,
Decode,
FramedBytes(#[from] streams::framed_bytes::Error),
InsertQueues(#[from] scywr::insertqueues::Error),
Serde(#[from] serde_json::Error),
Parse(String),
NotSupported,
WorkerQueueMissing,
ConfigLookup,
Postgres(#[from] dbpg::err::Error),
TaskJoin(#[from] taskrun::tokio::task::JoinError),
ConfBySeries(#[from] dbpg::confbyseries::Error),
},
);
type ValueSeriesWriter = SeriesWriter<WritableType>;
struct WritableTypeState {
series: SeriesId,
msp_split_data: MspSplit,
}
impl WritableTypeState {
fn new(series: SeriesId) -> Self {
Self {
series,
msp_split_data: MspSplit::new(10000, 1024 * 256),
}
}
}
#[derive(Debug, Clone)]
struct WritableType(TsNano, DataValue);
impl EmittableType for WritableType {
type State = WritableTypeState;
fn ts(&self) -> TsNano {
self.0.clone()
}
fn has_change(&self, k: &Self) -> bool {
true
}
fn byte_size(&self) -> u32 {
8 + self.1.byte_size()
}
fn into_query_item(
self,
ts_net: Instant,
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size());
let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem {
series: state.series.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
val: self.1.clone(),
ts_net,
});
let mut items = smallvec::SmallVec::new();
items.push(item);
if ts_msp_chg {
items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new(
state.series.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
}
serieswriter::writer::EmitRes {
items,
bytes: self.byte_size(),
status: 0,
}
}
}
fn evpush_dim0<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut ValueSeriesWriter,
f1: F1,
) -> Result<(), Error>
where
T: EventValueType,
F1: Fn(<T as EventValueType>::IterTy1<'_>) -> DataValue,
{
let evs: ContainerEvents<T> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
// trace_input!("see events {:?}", evs);
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}
fn evpush_dim0_enum(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut ValueSeriesWriter,
) -> Result<(), Error> {
let evs: ContainerEvents<EnumVariant> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
// trace_input!("see events {:?}", evs);
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = DataValue::Scalar(ScalarValue::Enum(val.ix as i16, val.name.into()));
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}
fn evpush_dim1<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut ValueSeriesWriter,
f1: F1,
) -> Result<(), Error>
where
Vec<T>: EventValueType,
F1: Fn(<Vec<T> as EventValueType>::IterTy1<'_>) -> DataValue,
{
let evs: ContainerEvents<Vec<T>> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
trace_input!("see events {:?}", evs);
error!("TODO require timestamp in input format");
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}
fn frame_write(
frame: &Bytes,
scalar_type: ScalarType,
shape: Shape,
writer: &mut SeriesWriter<WritableType>,
deque: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U8 => {
evpush_dim0::<u8, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U8(x as _)))?;
}
ScalarType::U16 => {
evpush_dim0::<u16, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U16(x as _)))?;
}
ScalarType::U32 => {
evpush_dim0::<u32, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U32(x as _)))?;
}
ScalarType::U64 => {
evpush_dim0::<u64, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::U64(x as _)))?;
}
ScalarType::I8 => {
evpush_dim0::<i8, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I8(x)))?;
}
ScalarType::I16 => {
evpush_dim0::<i16, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I16(x)))?;
}
ScalarType::I32 => {
evpush_dim0::<i32, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I32(x)))?;
}
ScalarType::I64 => {
evpush_dim0::<i64, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::I64(x)))?;
}
ScalarType::F32 => {
evpush_dim0::<f32, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::F32(x)))?;
}
ScalarType::F64 => {
evpush_dim0::<f64, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?;
}
ScalarType::BOOL => {
evpush_dim0::<bool, _>(&frame, deque, writer, |x| DataValue::Scalar(ScalarValue::Bool(x)))?;
}
ScalarType::STRING => {
evpush_dim0::<String, _>(&frame, deque, writer, |x| {
DataValue::Scalar(ScalarValue::String(x.into()))
})?;
}
ScalarType::Enum => {
evpush_dim0_enum(&frame, deque, writer)?;
}
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => {
evpush_dim1::<u8, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U8(x)))?;
}
ScalarType::U16 => {
evpush_dim1::<u16, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U16(x)))?;
}
ScalarType::U32 => {
evpush_dim1::<u32, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U32(x)))?;
}
ScalarType::U64 => {
evpush_dim1::<u64, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::U64(x)))?;
}
ScalarType::I8 => {
evpush_dim1::<i8, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I8(x)))?;
}
ScalarType::I16 => {
evpush_dim1::<i16, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I16(x)))?;
}
ScalarType::I32 => {
evpush_dim1::<i32, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I32(x)))?;
}
ScalarType::I64 => {
evpush_dim1::<i64, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::I64(x)))?;
}
ScalarType::F32 => {
evpush_dim1::<f32, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::F32(x)))?;
}
ScalarType::F64 => {
evpush_dim1::<f64, _>(&frame, deque, writer, |x| DataValue::Array(ArrayValue::F64(x)))?;
}
ScalarType::BOOL => return Err(Error::NotSupported),
ScalarType::STRING => return Err(Error::NotSupported),
ScalarType::Enum => return Err(Error::NotSupported),
},
Shape::Image(_, _) => return Err(Error::NotSupported),
}
Ok(())
}
async fn write_with_fresh_msps_inner(
headers: HeaderMap,
params: HashMap<String, String>,
body: axum::body::Body,
rres: Arc<RoutesResources>,
) -> Result<Json<serde_json::Value>, Error> {
if let Some(ct) = headers.get("content-type") {
if let Ok(s) = ct.to_str() {
if s == APP_CBOR_FRAMED {
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
};
debug_setup!("params {:?}", params);
let stnow = SystemTime::now();
let worker_tx = rres.worker_tx.clone();
let backend = rres.backend.clone();
let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into();
let s = params.get("scalarType").ok_or(Error::MissingScalarType)?;
let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?;
let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?;
let rt: RetentionTime = params
.get("retentionTime")
.and_then(|x| x.parse().ok())
.unwrap_or(RetentionTime::Short);
debug_setup!(
"establishing series writer for {:?} {:?} {:?} {:?}",
channel,
scalar_type,
shape,
rt
);
let (tx, rx) = async_channel::bounded(8);
let qu = ChannelInfoQuery {
backend,
channel,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
tx: Box::pin(tx),
};
rres.worker_tx.send(qu).await.map_err(|_| Error::WorkerQueueMissing)?;
let chinfo = rx
.recv()
.await
.map_err(|_| Error::ConfigLookup)?
.map_err(|_| Error::ConfigLookup)?;
let mut writer = SeriesWriter::new(chinfo.series.to_series())?;
debug_setup!("series writer established");
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
let mut frames = FramedBytesStream::new(
body.into_data_stream()
.map_err(|_| streams::framed_bytes::Error::DataInput),
);
loop {
let x = timeout(Duration::from_millis(2000), frames.try_next()).await;
let x = match x {
Ok(x) => x,
Err(_) => {
tick_writers(&mut writer, &mut iqdqs, rt.clone())?;
continue;
}
};
let frame = match x? {
Some(x) => x,
None => {
trace_input!("input stream done");
break;
}
};
trace_input!("got frame len {}", frame.len());
let deque = iqdqs.deque(rt.clone());
frame_write(&frame, scalar_type.clone(), shape.clone(), &mut writer, deque)?;
trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary());
tick_writers(&mut writer, &mut iqdqs, rt.clone())?;
trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary());
}
trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary());
finish_writers(&mut writer, &mut iqdqs, rt.clone())?;
trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary());
let ret = Json(serde_json::json!({}));
Ok(ret)
}
pub async fn write_with_fresh_msps(
(headers, Query(params), body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body),
rres: Arc<RoutesResources>,
) -> Json<serde_json::Value> {
match write_with_fresh_msps_inner(headers, params, body, rres).await {
Ok(k) => k,
Err(e) => Json(serde_json::json!({
"error": e.to_string(),
})),
}
}
fn tick_writers(writer: &mut ValueSeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> {
writer.tick(deques.deque(rt))?;
Ok(())
}
fn finish_writers(writer: &mut ValueSeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> {
writer.tick(deques.deque(rt))?;
Ok(())
}
async fn register_series(
headers: HeaderMap,
params: HashMap<String, String>,
body: axum::body::Body,
rres: Arc<RoutesResources>,
) -> Result<Json<serde_json::Value>, Error> {
if let Some(ct) = headers.get("content-type") {
if let Ok(s) = ct.to_str() {
if s == APP_CBOR_FRAMED {
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
};
debug_setup!("params {:?}", params);
let stnow = SystemTime::now();
let worker_tx = rres.worker_tx.clone();
let backend = rres.backend.clone();
let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into();
let s = params.get("scalarType").ok_or(Error::MissingScalarType)?;
let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?;
let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?;
debug_setup!("register series {:?} {:?} {:?}", channel, scalar_type, shape);
let (tx, rx) = async_channel::bounded(8);
let qu = ChannelInfoQuery {
backend,
channel,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
tx: Box::pin(tx),
};
rres.worker_tx.send(qu).await.map_err(|_| Error::WorkerQueueMissing)?;
let chinfo = rx
.recv()
.await
.map_err(|_| Error::ConfigLookup)?
.map_err(|_| Error::ConfigLookup)?;
let ret = serde_json::json!({
"register_series": {
"status": "ok",
"chinfo": chinfo,
"seriesId": chinfo.series.to_series().id(),
}
});
let ret = Json(ret);
Ok(ret)
}
async fn write_msp(
headers: HeaderMap,
params: HashMap<String, String>,
body: axum::body::Body,
rres: Arc<RoutesResources>,
) -> Result<Json<serde_json::Value>, Error> {
if let Some(ct) = headers.get("content-type") {
if let Ok(s) = ct.to_str() {
if s == APP_CBOR_FRAMED {
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
};
debug_setup!("params {:?}", params);
let stnow = SystemTime::now();
let worker_tx = rres.worker_tx.clone();
let backend = rres.backend.clone();
let series_id: u64 = params
.get("seriesId")
.ok_or(Error::MissingSeriesId)?
.parse()
.map_err(|_| Error::MissingSeriesId)?;
let series = SeriesId::new(series_id);
let rt: RetentionTime = params
.get("retentionTime")
.and_then(|x| x.parse().ok())
.unwrap_or(RetentionTime::Short);
let (conn, pgjh) = dbpg::conn::make_pg_client(&rres.pgconf).await?;
let conf = dbpg::confbyseries::channel_config_by_series(&conn, series).await?;
drop(conn);
pgjh.await?;
match conf {
Some(conf) => write_events_exact_2(conf, rt, body, rres).await,
None => {
info!("series id not found {:?}", series);
let ret = serde_json::json!({
"write_events_exact": {
"status": "SeriesIdNotFound",
}
});
let ret = Json(ret);
Ok(ret)
}
}
}
async fn write_events_exact_2(
conf: DaqbufChannelConfig,
rt: RetentionTime,
body: axum::body::Body,
rres: Arc<RoutesResources>,
) -> Result<Json<serde_json::Value>, Error> {
debug_setup!("write_events_exact {:?} {:?}", conf, rt);
let series = SeriesId::new(conf.series);
let mut writer = SeriesWriter::new(series)?;
debug_setup!("series writer established");
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
let mut frames = FramedBytesStream::new(
body.into_data_stream()
.map_err(|_| streams::framed_bytes::Error::DataInput),
);
loop {
match timeout(Duration::from_millis(2000), frames.try_next()).await {
Ok(k) => match k? {
Some(frame) => {
trace_input!("got frame len {}", frame.len());
let deque = iqdqs.deque(rt.clone());
frame_write(&frame, conf.scalar_type.clone(), conf.shape.clone(), &mut writer, deque)?;
trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary());
tick_writers(&mut writer, &mut iqdqs, rt.clone())?;
trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary());
}
None => {
trace_input!("input stream done");
break;
}
},
Err(_) => {
tick_writers(&mut writer, &mut iqdqs, rt.clone())?;
continue;
}
}
}
trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary());
finish_writers(&mut writer, &mut iqdqs, rt.clone())?;
trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary());
let ret = serde_json::json!({
"write_events_exact": {
"status": "ok",
"seriesId": series.id(),
}
});
let ret = Json(ret);
Ok(ret)
}
async fn write_events_exact(
headers: HeaderMap,
params: HashMap<String, String>,
body: axum::body::Body,
rres: Arc<RoutesResources>,
) -> Result<Json<serde_json::Value>, Error> {
if let Some(ct) = headers.get("content-type") {
if let Ok(s) = ct.to_str() {
if s == APP_CBOR_FRAMED {
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
};
debug_setup!("params {:?}", params);
let stnow = SystemTime::now();
let worker_tx = rres.worker_tx.clone();
let backend = rres.backend.clone();
let series_id: u64 = params
.get("seriesId")
.ok_or(Error::MissingSeriesId)?
.parse()
.map_err(|_| Error::MissingSeriesId)?;
let series = SeriesId::new(series_id);
let rt: RetentionTime = params
.get("retentionTime")
.and_then(|x| x.parse().ok())
.unwrap_or(RetentionTime::Short);
let (conn, pgjh) = dbpg::conn::make_pg_client(&rres.pgconf).await?;
let conf = dbpg::confbyseries::channel_config_by_series(&conn, series).await?;
drop(conn);
pgjh.await?;
match conf {
Some(conf) => write_events_exact_2(conf, rt, body, rres).await,
None => {
info!("series id not found {:?}", series);
let ret = serde_json::json!({
"write_events_exact": {
"status": "SeriesIdNotFound",
}
});
let ret = Json(ret);
Ok(ret)
}
}
}
+4 -3
View File
@@ -2,19 +2,20 @@
name = "scywr"
version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
edition = "2024"
[dependencies]
futures-util = "0.3.28"
async-channel = "2.3.1"
scylla = "0.15.0"
scylla = "0.15.1"
smallvec = "1.11.0"
pin-project = "1.1.5"
bytes = "1.7.1"
bytes = "1.10.0"
autoerr = "0.0.3"
serde = { version = "1", features = ["derive"] }
log = { path = "../log" }
stats = { path = "../stats" }
series = { path = "../../daqbuf-series", package = "daqbuf-series" }
netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" }
scydb = { path = "../../daqbuf-scydb", package = "daqbuf-scydb" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }
+62 -34
View File
@@ -27,43 +27,71 @@ pub struct InsertQueuesTx {
pub lt_rf3_lat5_tx: Sender<VecDeque<QueryItem>>,
}
async fn send_nonempty(qu: &mut VecDeque<QueryItem>, tx: &Sender<VecDeque<QueryItem>>) -> Result<(), Error> {
let item = core::mem::replace(qu, VecDeque::new());
if item.len() != 0 {
tx.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?;
}
Ok(())
}
impl InsertQueuesTx {
/// Send all accumulated batches
pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
{
let item = core::mem::replace(&mut iqdqs.st_rf1_qu, VecDeque::new());
self.st_rf1_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?;
}
{
let item = core::mem::replace(&mut iqdqs.st_rf3_qu, VecDeque::new());
self.st_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?;
}
{
let item = core::mem::replace(&mut iqdqs.mt_rf3_qu, VecDeque::new());
self.mt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?;
}
{
let item = core::mem::replace(&mut iqdqs.lt_rf3_qu, VecDeque::new());
self.lt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?;
}
{
let item = core::mem::replace(&mut iqdqs.lt_rf3_lat5_qu, VecDeque::new());
self.lt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?;
if true {
send_nonempty(&mut iqdqs.st_rf1_qu, &self.st_rf1_tx).await?;
send_nonempty(&mut iqdqs.st_rf3_qu, &self.st_rf3_tx).await?;
send_nonempty(&mut iqdqs.mt_rf3_qu, &self.mt_rf3_tx).await?;
send_nonempty(&mut iqdqs.lt_rf3_qu, &self.lt_rf3_tx).await?;
send_nonempty(&mut iqdqs.lt_rf3_lat5_qu, &self.lt_rf3_tx).await?;
} else {
{
let item = core::mem::replace(&mut iqdqs.st_rf1_qu, VecDeque::new());
if item.len() != 0 {
self.st_rf1_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?;
}
}
{
let item = core::mem::replace(&mut iqdqs.st_rf3_qu, VecDeque::new());
if item.len() != 0 {
self.st_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?;
}
}
{
let item = core::mem::replace(&mut iqdqs.mt_rf3_qu, VecDeque::new());
if item.len() != 0 {
self.mt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?;
}
}
{
let item = core::mem::replace(&mut iqdqs.lt_rf3_qu, VecDeque::new());
if item.len() != 0 {
self.lt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?;
}
}
{
let item = core::mem::replace(&mut iqdqs.lt_rf3_lat5_qu, VecDeque::new());
if item.len() != 0 {
self.lt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?;
}
}
}
Ok(())
}
+89 -73
View File
@@ -1,6 +1,6 @@
use crate::config::ScyllaIngestConfig;
use crate::session::create_session_no_ks;
use crate::session::ScySession;
use crate::session::create_session_no_ks;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::*;
@@ -20,7 +20,9 @@ autoerr::create_error_v1!(
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypecheck(#[from] scylla::deserialize::TypeCheckError),
MissingData,
AddColumnImpossible,
AddColumnExists(String, String, String),
AddColumnPk(String, String, String, String),
AddColumnCk(String, String, String, String),
BadSchema,
},
);
@@ -35,16 +37,16 @@ impl From<crate::session::Error> for Error {
struct Changeset {
do_change: bool,
would_do: Vec<String>,
done: Vec<String>,
todo: Vec<String>,
cql_done: Vec<String>,
}
impl Changeset {
fn new() -> Self {
Self {
do_change: false,
would_do: Vec::new(),
done: Vec::new(),
todo: Vec::new(),
cql_done: Vec::new(),
}
}
@@ -58,27 +60,16 @@ impl Changeset {
self.do_change
}
fn add_would_do(&mut self, cql: String) {
self.would_do.push(cql);
fn add_todo(&mut self, cql: String) {
self.todo.push(cql);
}
fn add_done(&mut self, cql: String) {
self.done.push(cql);
}
fn differs(&self) -> bool {
if self.would_do.len() != 0 {
true
} else {
false
}
fn has_to_do(&self) -> bool {
if self.todo.len() != 0 { true } else { false }
}
fn log_statements(&self) {
for q in &self.done {
info!("DONE {q}");
}
for q in &self.would_do {
for q in &self.todo {
info!("WOULD DO {q}");
}
}
@@ -231,25 +222,17 @@ impl GenTwcsTab {
}
async fn setup(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> {
self.create_if_missing(chs, scy).await?;
self.check_table_options(chs, scy).await?;
self.check_columns(chs, scy).await?;
if self.has_table_name(scy).await? {
self.check_table_options(chs, scy).await?;
self.check_columns(chs, scy).await?;
} else {
chs.add_todo(self.cql());
}
Ok(())
}
async fn create_if_missing(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> {
// TODO check for more details (all columns, correct types, correct kinds, etc)
if !has_table(self.name(), scy).await? {
let cql = self.cql();
if chs.do_change() {
info!("scylla create table {} {}", self.name(), cql);
scy.query_unpaged(cql.clone(), ()).await?;
chs.add_done(cql);
} else {
chs.add_would_do(cql);
}
}
Ok(())
async fn has_table_name(&self, scy: &ScySession) -> Result<bool, Error> {
has_table(self.name(), scy).await
}
fn cql(&self) -> String {
@@ -278,21 +261,24 @@ impl GenTwcsTab {
write!(s, " ({})", cols).unwrap();
write!(
s,
" with default_time_to_live = {}",
self.default_time_to_live.as_secs()
" with default_time_to_live = {}, gc_grace_seconds = {}",
self.default_time_to_live.as_secs(),
self.gc_grace.as_secs()
)
.unwrap();
s.write_str(" and compaction = { ").unwrap();
write!(
s,
concat!(
"'class': 'TimeWindowCompactionStrategy'",
", 'compaction_window_unit': 'MINUTES'",
", 'compaction_window_size': {}",
),
self.compaction_window_size.as_secs() / 60
)
.unwrap();
{
let mut s2 = String::new();
// TODO merge with builder code in check_table_options
for e in self.compaction_options() {
if s2.len() != 0 {
s2.push_str(", ");
}
let op = format!("'{}': '{}'", e.0, e.1);
s2.push_str(&op);
}
s.write_str(&s2).unwrap();
}
s.write_str(" }").unwrap();
s
}
@@ -331,6 +317,11 @@ impl GenTwcsTab {
set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs()));
}
if row.2 != self.compaction_options() {
info!(
"compaction options differ {:?} vs {:?}",
row.2,
self.compaction_options()
);
let params: Vec<_> = self
.compaction_options()
.iter()
@@ -341,16 +332,10 @@ impl GenTwcsTab {
}
if set_opts.len() != 0 {
let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and "));
if chs.do_change() {
info!("EXECUTE {cql}");
scy.query_unpaged(cql.clone(), ()).await?;
chs.add_done(cql);
} else {
chs.add_would_do(cql);
}
chs.add_todo(cql);
}
} else {
return Err(Error::MissingData);
chs.add_todo(self.cql());
}
Ok(())
}
@@ -384,32 +369,36 @@ impl GenTwcsTab {
ct,
ty2
);
return Err(Error::AddColumnImpossible);
return Err(Error::AddColumnExists(cn.into(), ct.into(), ty2.into()));
}
} else {
if self.partition_keys.contains(cn) {
error!("pk {} {}", cn, ct);
return Err(Error::AddColumnImpossible);
return Err(Error::AddColumnPk(
self.keyspace().into(),
self.name().into(),
cn.into(),
ct.into(),
));
}
if self.cluster_keys.contains(cn) {
error!("ck {} {}", cn, ct);
return Err(Error::AddColumnImpossible);
return Err(Error::AddColumnCk(
self.keyspace().into(),
self.name().into(),
cn.into(),
ct.into(),
));
}
self.add_column(cn, ct, chs, scy).await?;
self.add_column(cn, ct, chs).await?;
}
}
Ok(())
}
async fn add_column(&self, name: &str, ty: &str, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> {
async fn add_column(&self, name: &str, ty: &str, chs: &mut Changeset) -> Result<(), Error> {
let cql = format!(concat!("alter table {} add {} {}"), self.name(), name, ty);
if chs.do_change() {
info!("EXECUTE add_column CQL {}", cql);
scy.query_unpaged(cql.clone(), ()).await?;
chs.add_done(cql);
} else {
chs.add_would_do(cql);
}
chs.add_todo(cql);
Ok(())
}
}
@@ -673,6 +662,25 @@ pub async fn migrate_scylla_data_schema(
);
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
ks,
rett.table_prefix(),
"bin_write_index_v00",
&[
("series", "bigint"),
("div", "int"),
("quo", "bigint"),
("rem", "int"),
("rt", "int"),
("binlen", "int"),
],
["series", "div", "quo"],
["rem", "rt", "binlen"],
rett.ttl_binned(),
);
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
ks,
@@ -733,9 +741,17 @@ pub async fn migrate_scylla_data_schema(
tab.setup(chs, scy).await?;
}
if chs.differs() {
chs.log_statements();
Err(Error::BadSchema)
if chs.has_to_do() {
if do_change {
for cql in chs.todo.iter() {
scy.query_unpaged(cql.as_str(), ()).await?;
}
let fut = migrate_scylla_data_schema(scyconf, rett, false);
Box::pin(fut).await
} else {
chs.log_statements();
Err(Error::BadSchema)
}
} else {
Ok(())
}
+1 -1
View File
@@ -2,7 +2,7 @@
name = "serieswriter"
version = "0.0.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
edition = "2024"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
+65 -29
View File
@@ -1,4 +1,4 @@
use crate::log::*;
use crate::log;
use crate::rtwriter::MinQuiets;
use items_0::timebin::BinnedBinsTimeweightTrait;
use items_0::timebin::BinnedEventsTimeweightTrait;
@@ -9,26 +9,28 @@ use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight;
use items_2::binning::timeweight::timeweight_bins_lazy::BinnedBinsTimeweightLazy;
use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightLazy;
use netpod::ttl::RetentionTime;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use netpod::ttl::RetentionTime;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use series::msp::PrebinnedPartitioning;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use series::msp::PrebinnedPartitioning;
use std::time::Duration;
macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! trace_tick { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) }
macro_rules! debug_bin2 { ($t:expr, $($arg:expr),*) => ( if true { if $t { debug!($($arg),*); } } ) }
macro_rules! trace_bin2 { ($t:expr, $($arg:expr),*) => ( if false { if $t { trace!($($arg),*); } } ) }
macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_tick { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! debug_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::debug!($($arg),*); } } ) }
macro_rules! trace_bin { ($t:expr, $($arg:expr),*) => ( if false { if $t { log::trace!($($arg),*); } } ) }
autoerr::create_error_v1!(
name(Error, "SerieswriterBinwriter"),
@@ -62,11 +64,6 @@ fn bin_len_clamp(dur: DtMs) -> PrebinnedPartitioning {
}
}
fn get_div(pbp: PrebinnedPartitioning) -> Result<DtMs, Error> {
let ret = pbp.msp_div();
Ok(ret)
}
#[derive(Debug, Clone)]
enum WriteCntZero {
Enable,
@@ -82,6 +79,30 @@ impl WriteCntZero {
}
}
#[derive(Debug)]
struct IndexWritten {
last: (u32, u64, u32),
}
impl IndexWritten {
fn new() -> Self {
Self { last: (0, 0, 0) }
}
fn should_write(&self, div: u32, quo: u64, rem: u32) -> bool {
let (div0, quo0, rem0) = self.last;
if div0 == 0 || quo0 != quo || rem0 != rem {
true
} else {
false
}
}
fn mark_written(&mut self, div: u32, quo: u64, rem: u32) {
self.last = (div, quo, rem);
}
}
#[derive(Debug)]
pub struct BinWriter {
chname: String,
@@ -92,6 +113,7 @@ pub struct BinWriter {
evbuf: ContainerEvents<f32>,
binner_1st: Option<(RetentionTime, BinnedEventsTimeweight<f32>, WriteCntZero)>,
binner_others: Vec<(RetentionTime, BinnedBinsTimeweight<f32, f32>, WriteCntZero)>,
index_written: IndexWritten,
trd: bool,
}
@@ -108,7 +130,7 @@ impl BinWriter {
) -> Result<Self, Error> {
let trd = series::dbg::dbg_chn(&chname);
if trd {
debug_bin2!(trd, "enabled debug for {}", chname);
debug_bin!(trd, "enabled debug for {}", chname);
}
const DUR_ZERO: DtMs = DtMs::from_ms_u64(0);
const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 123);
@@ -142,16 +164,8 @@ impl BinWriter {
if !is_polled && combs.len() > 1 {
combs.remove(0);
}
// check
for e in combs.iter() {
if get_div(e.1.clone()).is_err() {
info!("unsupported bin length {:?} {:?} {:?}", e.0, e.1, chname);
combs.clear();
break;
}
}
let combs = combs;
debug_bin2!(trd, "{:?} binning combs {:?}", chname, combs);
debug_bin!(trd, "{:?} binning combs {:?}", chname, combs);
for (rt, pbp, write_zero) in combs {
if binner_1st.is_none() {
let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len());
@@ -179,6 +193,7 @@ impl BinWriter {
evbuf: ContainerEvents::new(),
binner_1st,
binner_others,
index_written: IndexWritten::new(),
trd,
};
let _ = ret.cssid;
@@ -255,8 +270,16 @@ impl BinWriter {
};
let bins = binner.output();
if bins.len() > 0 {
trace_bin2!(self.trd, "binner_1st out len {}", bins.len());
Self::handle_output_ready(self.trd, self.sid, rt, &bins, write_zero, iqdqs)?;
trace_bin!(self.trd, "binner_1st out len {}", bins.len());
Self::handle_output_ready(
self.trd,
self.sid,
rt,
&bins,
write_zero,
&mut self.index_written,
iqdqs,
)?;
// TODO avoid boxing
let mut bins2: BinsBoxed = Box::new(bins);
for i in 0..self.binner_others.len() {
@@ -267,9 +290,17 @@ impl BinWriter {
match bb {
Some(bb) => {
if bb.len() > 0 {
trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len());
trace_bin!(self.trd, "binner_others {} out len {}", i, bb.len());
if let Some(bb2) = bb.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
Self::handle_output_ready(self.trd, self.sid, rt.clone(), &bb2, write_zero, iqdqs)?;
Self::handle_output_ready(
self.trd,
self.sid,
rt.clone(),
&bb2,
write_zero,
todo!(),
iqdqs,
)?;
} else {
return Err(Error::UnexpectedContainerType);
}
@@ -301,6 +332,7 @@ impl BinWriter {
rt: RetentionTime,
bins: &ContainerBins<f32, f32>,
write_zero: WriteCntZero,
index_written: &mut IndexWritten,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let selfname = "handle_output_ready";
@@ -317,7 +349,7 @@ impl BinWriter {
info!("zero count bin {:?}", series);
} else {
let pbp = PrebinnedPartitioning::try_from(bin_len)?;
let div = get_div(pbp)?;
let div = pbp.msp_div();
if div.ns() % bin_len.ns() != 0 {
let e = Error::UnsupportedGridDiv(bin_len, div);
return Err(e);
@@ -337,7 +369,7 @@ impl BinWriter {
lst,
});
if bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) {
debug_bin2!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item);
debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item);
}
match rt {
RetentionTime::Short => {
@@ -350,6 +382,10 @@ impl BinWriter {
iqdqs.lt_rf3_qu.push_back(item);
}
}
let div = PrebinnedPartitioning::Day1;
series.id();
ts1.ms() / div.msp_div().ms();
}
}
Ok(())
-1
View File
@@ -13,7 +13,6 @@ macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)
autoerr::create_error_v1!(
name(Error, "SerieswriterWriter"),
enum variants {
DbPgSid(#[from] dbpg::seriesid::Error),
ChannelSendError,
ChannelRecvError,
SeriesLookupError,