From 8f383050f55d422971f890bfed8c4119942de9ea Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 24 Jul 2024 19:45:30 +0200 Subject: [PATCH] Simplify channel config lookup --- crates/dbconn/src/channelconfig.rs | 30 ++++++--- crates/dbconn/src/dbconn.rs | 1 - crates/dbconn/src/query.rs | 48 -------------- crates/dbconn/src/worker.rs | 15 +++-- crates/httpret/src/api4/eventdata.rs | 4 +- crates/httpret/src/api4/events.rs | 6 +- crates/httpret/src/channelconfig.rs | 19 +++--- crates/netpod/src/netpod.rs | 80 ++++++++++++++++++++++-- crates/nodenet/src/channelconfig.rs | 6 +- crates/nodenet/src/configquorum.rs | 2 +- crates/nodenet/src/conn.rs | 5 +- crates/nodenet/src/scylla.rs | 13 +--- crates/query/src/api4/events.rs | 3 +- crates/scyllaconn/src/events.rs | 9 --- crates/scyllaconn/src/events2/events.rs | 27 +++----- crates/scyllaconn/src/events2/mergert.rs | 35 +++-------- crates/scyllaconn/src/events2/msp.rs | 2 - crates/scyllaconn/src/worker.rs | 6 +- crates/streams/src/test/events.rs | 10 ++- 19 files changed, 152 insertions(+), 169 deletions(-) delete mode 100644 crates/dbconn/src/query.rs diff --git a/crates/dbconn/src/channelconfig.rs b/crates/dbconn/src/channelconfig.rs index 09c8dc9..f2ff1d2 100644 --- a/crates/dbconn/src/channelconfig.rs +++ b/crates/dbconn/src/channelconfig.rs @@ -6,6 +6,8 @@ use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::ChConf; use netpod::ScalarType; +use netpod::SeriesKind; +use netpod::SfDbChannel; use netpod::Shape; use netpod::TsMs; use std::time::Duration; @@ -20,12 +22,11 @@ use tokio_postgres::Client; /// In the future, we can even try to involve time range information for that, but backends like /// old archivers and sf databuffer do not support such lookup. pub(super) async fn chconf_best_matching_for_name_and_range( - backend: &str, - name: &str, + channel: SfDbChannel, range: NanoRange, pg: &Client, ) -> Result { - debug!("chconf_best_matching_for_name_and_range {backend} {name} {range:?}"); + debug!("chconf_best_matching_for_name_and_range {channel:?} {range:?}"); #[cfg(DISABLED)] if ncc.node_config.cluster.scylla.is_none() { let e = Error::with_msg_no_trace(format!( @@ -44,12 +45,17 @@ pub(super) async fn chconf_best_matching_for_name_and_range( let sql = concat!( "select unnest(tscs) as tsc, series, scalar_type, shape_dims", " from series_by_channel", - " where kind = 2 and facility = $1 and channel = $2", + " where facility = $1", + " and channel = $2", + " and kind = $3", " order by tsc", ); - let res = pg.query(sql, &[&backend, &name]).await.err_conv()?; + let res = pg + .query(sql, &[&channel.backend(), &channel.name(), &channel.kind().to_db_i16()]) + .await + .err_conv()?; if res.len() == 0 { - let e = Error::with_public_msg_no_trace(format!("can not find channel information for {name}")); + let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?} {range:?}")); warn!("{e}"); Err(e) } else if res.len() > 1 { @@ -64,12 +70,13 @@ pub(super) async fn chconf_best_matching_for_name_and_range( let _scalar_type = ScalarType::from_scylla_i32(scalar_type)?; let _shape = Shape::from_scylla_shape_dims(&shape_dims)?; let tsms = tsc.signed_duration_since(DateTime::UNIX_EPOCH).num_milliseconds() as u64; - let ts = TsMs(tsms); + let ts = TsMs::from_ms_u64(tsms); rows.push((ts, series)); } let tsmss: Vec<_> = rows.iter().map(|x| x.0.clone()).collect(); let range = (TsMs(range.beg / 1000), TsMs(range.end / 1000)); let res = decide_best_matching_index(range, &tsmss)?; + let backend = channel.backend().into(); let ch_conf = chconf_for_series(backend, rows[res].1, pg).await?; Ok(ch_conf) } else { @@ -80,9 +87,10 @@ pub(super) async fn chconf_best_matching_for_name_and_range( // TODO can I get a slice from psql driver? let shape_dims: Vec = r.get(3); let series = series as u64; + let kind = channel.kind(); let scalar_type = ScalarType::from_scylla_i32(scalar_type)?; let shape = Shape::from_scylla_shape_dims(&shape_dims)?; - let ret = ChConf::new(backend, series, scalar_type, shape, name); + let ret = ChConf::new(channel.backend(), series, kind, scalar_type, shape, channel.name()); Ok(ret) } } @@ -194,7 +202,7 @@ fn test_decide_best_matching_index_after_01() { pub(super) async fn chconf_for_series(backend: &str, series: u64, pg: &Client) -> Result { let res = pg .query( - "select channel, scalar_type, shape_dims from series_by_channel where facility = $1 and series = $2", + "select channel, scalar_type, shape_dims, kind from series_by_channel where facility = $1 and series = $2", &[&backend, &(series as i64)], ) .await @@ -211,7 +219,9 @@ pub(super) async fn chconf_for_series(backend: &str, series: u64, pg: &Client) - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?; // TODO can I get a slice from psql driver? let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; - let ret = ChConf::new(backend, series, scalar_type, shape, name); + let kind: i16 = row.get(3); + let kind = SeriesKind::from_db_i16(kind)?; + let ret = ChConf::new(backend, series, kind, scalar_type, shape, name); Ok(ret) } } diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 2d5a2ba..3b9e41f 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -1,6 +1,5 @@ pub mod channelconfig; pub mod channelinfo; -pub mod query; pub mod scan; pub mod search; pub mod worker; diff --git a/crates/dbconn/src/query.rs b/crates/dbconn/src/query.rs deleted file mode 100644 index 3c85a85..0000000 --- a/crates/dbconn/src/query.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::create_connection; -use crate::ErrConv; -use err::Error; -use netpod::log::*; -use netpod::NodeConfigCached; -use netpod::SfDbChannel; - -// For sf-databuffer backend, given a Channel, try to complete the information if only id is given. -async fn sf_databuffer_fetch_channel_by_series( - channel: SfDbChannel, - ncc: &NodeConfigCached, -) -> Result { - let me = "sf_databuffer_fetch_channel_by_series"; - info!("{me}"); - // TODO should not be needed at some point. - if channel.backend().is_empty() || channel.name().is_empty() { - if let Some(series) = channel.series() { - if series < 1 { - error!("{me} bad input: {channel:?}"); - Err(Error::with_msg_no_trace(format!("{me} bad input: {channel:?}"))) - } else { - info!("{me} do the lookup"); - let series = channel - .series() - .ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64; - let (pgcon, _pgjh) = create_connection(&ncc.node_config.cluster.database).await?; - let mut rows = pgcon - .query("select name from channels where rowid = $1", &[&series]) - .await - .err_conv()?; - if let Some(row) = rows.pop() { - info!("{me} got a row {row:?}"); - let name: String = row.get(0); - let channel = SfDbChannel::from_full(&ncc.node_config.cluster.backend, channel.series(), name); - info!("{me} return {channel:?}"); - Ok(channel) - } else { - info!("{me} nothing found"); - Err(Error::with_msg_no_trace("can not find series")) - } - } - } else { - Err(Error::with_msg_no_trace(format!("{me} bad input: {channel:?}"))) - } - } else { - Ok(channel) - } -} diff --git a/crates/dbconn/src/worker.rs b/crates/dbconn/src/worker.rs index de37d51..664718f 100644 --- a/crates/dbconn/src/worker.rs +++ b/crates/dbconn/src/worker.rs @@ -10,6 +10,7 @@ use netpod::ChConf; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::Database; +use netpod::SeriesKind; use netpod::SfDbChannel; use taskrun::tokio; use tokio::task::JoinHandle; @@ -38,7 +39,7 @@ impl err::ToErr for Error { #[derive(Debug)] enum Job { - ChConfBestMatchingNameRange(String, String, NanoRange, Sender>), + ChConfBestMatchingNameRange(SfDbChannel, NanoRange, Sender>), ChConfForSeries(String, u64, Sender>), InfoForSeriesIds( Vec, @@ -70,12 +71,11 @@ impl PgQueue { pub async fn chconf_best_matching_name_range( &self, - backend: &str, - name: &str, + channel: SfDbChannel, range: NanoRange, ) -> Result>, Error> { let (tx, rx) = async_channel::bounded(1); - let job = Job::ChConfBestMatchingNameRange(backend.into(), name.into(), range, tx); + let job = Job::ChConfBestMatchingNameRange(channel, range, tx); self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; Ok(rx) } @@ -144,10 +144,9 @@ impl PgWorker { } }; match job { - Job::ChConfBestMatchingNameRange(backend, name, range, tx) => { + Job::ChConfBestMatchingNameRange(channel, range, tx) => { let res = - crate::channelconfig::chconf_best_matching_for_name_and_range(&backend, &name, range, &self.pg) - .await; + crate::channelconfig::chconf_best_matching_for_name_and_range(channel, range, &self.pg).await; if tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats } @@ -223,7 +222,7 @@ async fn find_sf_channel_by_series( } if let Some(row) = rows.into_iter().next() { let name = row.get::<_, String>(0); - let channel = SfDbChannel::from_full(channel.backend(), channel.series(), name); + let channel = SfDbChannel::from_full(channel.backend(), channel.series(), name, SeriesKind::default()); Ok(channel) } else { return Err(FindChannelError::NoFound); diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index f1901ca..d01569a 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -77,9 +77,7 @@ impl EventDataHandler { shared_res: Arc, ) -> Result { let (_head, body) = req.into_parts(); - let body = read_body_bytes(body) - .await - .map_err(|_e| EventDataError::InternalError)?; + let body = read_body_bytes(body).await.map_err(|_| EventDataError::InternalError)?; let inp = futures_util::stream::iter([Ok(body)]); let frames = nodenet::conn::events_get_input_frames(inp) .await diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index edde9dc..14c903b 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -110,7 +110,7 @@ async fn plain_events_cbor_framed( ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> Result { - debug!("plain_events_cbor_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); + debug!("plain_events_cbor_framed {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let stream = streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; let stream = bytes_chunks_to_framed(stream); @@ -135,7 +135,7 @@ async fn plain_events_json_framed( ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> Result { - debug!("plain_events_json_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); + debug!("plain_events_json_framed {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; let stream = bytes_chunks_to_len_framed_str(stream); @@ -151,7 +151,7 @@ async fn plain_events_json( ncc: &NodeConfigCached, ) -> Result { let self_name = "plain_events_json"; - debug!("{self_name} req: {:?}", req); + debug!("{self_name} {ch_conf:?} {req:?}"); let (_head, _body) = req.into_parts(); // TODO handle None case better and return 404 debug!("{self_name} chconf_from_events_quorum: {ch_conf:?}"); diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 307e6ed..e2803e0 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -29,6 +29,7 @@ use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::ScalarType; +use netpod::SfDbChannel; use netpod::Shape; use netpod::ACCEPT_ALL; use netpod::APP_JSON; @@ -537,7 +538,7 @@ impl IocForChannel { #[derive(Clone, Debug, Deserialize)] pub struct ScyllaSeriesTsMspQuery { - name: String, + channel: SfDbChannel, range: SeriesRange, } @@ -548,10 +549,7 @@ impl FromUrl for ScyllaSeriesTsMspQuery { } fn from_pairs(pairs: &BTreeMap) -> Result { - let name = pairs - .get("channelName") - .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? - .into(); + let channel = SfDbChannel::from_pairs(pairs)?; let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { SeriesRange::TimeRange(x.into()) } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { @@ -559,7 +557,7 @@ impl FromUrl for ScyllaSeriesTsMspQuery { } else { return Err(err::Error::with_public_msg_no_trace("no time range in url")); }; - Ok(Self { name, range }) + Ok(Self { channel, range }) } } @@ -585,7 +583,7 @@ impl ScyllaSeriesTsMsp { &self, req: Requ, shared_res: &ServiceSharedResources, - ncc: &NodeConfigCached, + _ncc: &NodeConfigCached, ) -> Result { if req.method() == Method::GET { let accept_def = APP_JSON; @@ -596,7 +594,7 @@ impl ScyllaSeriesTsMsp { if accept == APP_JSON || accept == ACCEPT_ALL { let url = req_uri_to_url(req.uri())?; let q = ScyllaSeriesTsMspQuery::from_url(&url)?; - match self.get_ts_msps(&q, shared_res, ncc).await { + match self.get_ts_msps(&q, shared_res).await { Ok(k) => { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) @@ -616,10 +614,7 @@ impl ScyllaSeriesTsMsp { &self, q: &ScyllaSeriesTsMspQuery, shared_res: &ServiceSharedResources, - ncc: &NodeConfigCached, ) -> Result { - let backend = &ncc.node_config.cluster.backend; - let name = &q.name; let nano_range = if let SeriesRange::TimeRange(x) = q.range.clone() { x } else { @@ -627,7 +622,7 @@ impl ScyllaSeriesTsMsp { }; let chconf = shared_res .pgqueue - .chconf_best_matching_name_range(backend, name, nano_range) + .chconf_best_matching_name_range(q.channel.clone(), nano_range) .await .map_err(|e| Error::with_msg_no_trace(format!("error from pg worker: {e}")))? .recv() diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index be5dd8c..2c1f28b 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -58,7 +58,6 @@ pub mod log2 { pub use tracing::{self, event, span, Level}; } -use crate::log::*; use bytes::Bytes; use chrono::DateTime; use chrono::TimeZone; @@ -169,7 +168,7 @@ pub struct BodyStream { pub inner: Box> + Send + Unpin>, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)] pub enum SeriesKind { ChannelStatus, ChannelData, @@ -197,6 +196,44 @@ impl SeriesKind { } } +impl Default for SeriesKind { + fn default() -> Self { + SeriesKind::ChannelData + } +} + +impl FromUrl for SeriesKind { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = pairs + .get("seriesKind") + .and_then(|x| match x.as_str() { + "channelStatus" => Some(Self::ChannelStatus), + "channelData" => Some(Self::ChannelData), + "caStatus" => Some(Self::CaStatus), + _ => None, + }) + .unwrap_or(Self::default()); + Ok(ret) + } +} + +impl AppendToUrl for SeriesKind { + fn append_to_url(&self, url: &mut Url) { + let s = match self { + SeriesKind::ChannelStatus => "channelStatus", + SeriesKind::ChannelData => "channelData", + SeriesKind::CaStatus => "caStatus", + }; + let mut g = url.query_pairs_mut(); + g.append_pair("seriesKind", &s); + } +} + #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum ScalarType { U8, @@ -975,7 +1012,7 @@ pub struct NodeStatus { // Also the concept of "backend" could be split into "facility" and some optional other identifier // for cases like e.g. post-mortem, or to differentiate between channel-access and bsread for cases where // the same channel-name is delivered via different methods. -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct SfDbChannel { series: Option, // "backend" is currently used in the existing systems for multiple purposes: @@ -983,14 +1020,21 @@ pub struct SfDbChannel { // some special subsystem (eg. sf-rf-databuffer). backend: String, name: String, + kind: SeriesKind, } impl SfDbChannel { - pub fn from_full, U: Into>(backend: T, series: Option, name: U) -> Self { + pub fn from_full, U: Into>( + backend: T, + series: Option, + name: U, + kind: SeriesKind, + ) -> Self { Self { backend: backend.into(), series, name: name.into(), + kind, } } @@ -999,6 +1043,7 @@ impl SfDbChannel { backend: backend.into(), series: None, name: name.into(), + kind: SeriesKind::default(), } } @@ -1014,6 +1059,10 @@ impl SfDbChannel { &self.name } + pub fn kind(&self) -> SeriesKind { + self.kind.clone() + } + pub fn set_series(&mut self, series: u64) { self.series = Some(series); } @@ -1039,6 +1088,7 @@ impl FromUrl for SfDbChannel { series: pairs .get("seriesId") .and_then(|x| x.parse::().map_or(None, |x| Some(x))), + kind: SeriesKind::from_pairs(pairs)?, }; if ret.name.is_empty() && ret.series.is_none() { return Err(Error::with_public_msg_no_trace(format!( @@ -1059,6 +1109,8 @@ impl AppendToUrl for SfDbChannel { if let Some(series) = self.series { g.append_pair("seriesId", &series.to_string()); } + drop(g); + self.kind.append_to_url(url); } } @@ -3130,7 +3182,7 @@ impl HasTimeout for ChannelConfigQuery { Duration::from_millis(10000) } - fn set_timeout(&mut self, timeout: Duration) { + fn set_timeout(&mut self, _timeout: Duration) { // TODO // self.timeout = Some(timeout); } @@ -3212,6 +3264,8 @@ pub struct DaqbufChannelConfig { pub backend: String, #[serde(rename = "seriesId")] pub series: u64, + #[serde(rename = "seriesKind")] + pub kind: SeriesKind, #[serde(rename = "scalarType")] pub scalar_type: ScalarType, #[serde(rename = "shape")] @@ -3246,6 +3300,7 @@ impl From for ChannelConfigResponse { Self::Daqbuf(DaqbufChannelConfig { backend: value.backend().into(), series: value.series(), + kind: value.kind(), scalar_type: value.scalar_type().clone(), shape: value.shape().clone(), name: value.name().into(), @@ -3278,13 +3333,21 @@ pub struct ChannelInfo { pub struct ChConf { backend: String, series: u64, + kind: SeriesKind, scalar_type: ScalarType, shape: Shape, name: String, } impl ChConf { - pub fn new(backend: S1, series: u64, scalar_type: ScalarType, shape: Shape, name: S2) -> Self + pub fn new( + backend: S1, + series: u64, + kind: SeriesKind, + scalar_type: ScalarType, + shape: Shape, + name: S2, + ) -> Self where S1: Into, S2: Into, @@ -3292,6 +3355,7 @@ impl ChConf { Self { backend: backend.into(), series, + kind, scalar_type, shape, name: name.into(), @@ -3306,6 +3370,10 @@ impl ChConf { self.series } + pub fn kind(&self) -> SeriesKind { + self.kind.clone() + } + pub fn scalar_type(&self) -> &ScalarType { &self.scalar_type } diff --git a/crates/nodenet/src/channelconfig.rs b/crates/nodenet/src/channelconfig.rs index 8a21e00..8487f71 100644 --- a/crates/nodenet/src/channelconfig.rs +++ b/crates/nodenet/src/channelconfig.rs @@ -108,7 +108,7 @@ pub async fn channel_config( Ok(Some(channel_config_test_backend(channel)?)) } else if ncc.node_config.cluster.scylla_st().is_some() { debug!("try to get ChConf for scylla type backend"); - let ret = scylla_chconf_from_sf_db_channel(range, &channel, pgqueue) + let ret = scylla_chconf_from_sf_db_channel(range, channel, pgqueue) .await .map_err(Error::from)?; Ok(Some(ChannelTypeConfigGen::Scylla(ret))) @@ -207,7 +207,7 @@ pub async fn http_get_channel_config( async fn scylla_chconf_from_sf_db_channel( range: NanoRange, - channel: &SfDbChannel, + channel: SfDbChannel, pgqueue: &PgQueue, ) -> Result { if let Some(series) = channel.series() { @@ -220,7 +220,7 @@ async fn scylla_chconf_from_sf_db_channel( } else { // TODO let called function allow to return None instead of error-not-found let ret = pgqueue - .chconf_best_matching_name_range(channel.backend(), channel.name(), range) + .chconf_best_matching_name_range(channel, range) .await? .recv() .await??; diff --git a/crates/nodenet/src/configquorum.rs b/crates/nodenet/src/configquorum.rs index 64aaf1b..b5618d0 100644 --- a/crates/nodenet/src/configquorum.rs +++ b/crates/nodenet/src/configquorum.rs @@ -30,7 +30,7 @@ fn decide_sf_ch_config_quorum(inp: Vec) -> Result { - ChannelTypeConfigGen::Scylla(ChConf::new(k.backend, k.series, k.scalar_type, k.shape, k.name)) + ChannelTypeConfigGen::Scylla(ChConf::new(k.backend, k.series, k.kind, k.scalar_type, k.shape, k.name)) } }; if histo.contains_key(&item) { diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 31e7737..40b7b52 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -129,9 +129,8 @@ pub async fn create_response_bytes_stream( ncc: &NodeConfigCached, ) -> Result { debug!( - "create_response_bytes_stream {:?} {:?} wasm1 {:?}", - evq.ch_conf().scalar_type(), - evq.ch_conf().shape(), + "create_response_bytes_stream {:?} wasm1 {:?}", + evq.ch_conf(), evq.wasm1() ); let reqctx = netpod::ReqCtx::new_from_single_reqid(evq.reqid().into()).into(); diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index cafd95a..9afe9f9 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -41,9 +41,7 @@ pub async fn scylla_channel_event_stream( let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { let x = scyllaconn::events2::events::EventsStreamRt::new( rt, - series, - scalar_type.clone(), - shape.clone(), + chconf, evq.range().into(), readopts, scyqueue.clone(), @@ -51,14 +49,7 @@ pub async fn scylla_channel_event_stream( .map_err(|e| scyllaconn::events2::mergert::Error::from(e)); Box::pin(x) } else { - let x = scyllaconn::events2::mergert::MergeRts::new( - series, - scalar_type.clone(), - shape.clone(), - evq.range().into(), - readopts, - scyqueue.clone(), - ); + let x = scyllaconn::events2::mergert::MergeRts::new(chconf, evq.range().into(), readopts, scyqueue.clone()); Box::pin(x) }; let stream = stream diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 3da6465..af442ec 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -596,5 +596,6 @@ impl Frame1Parts { #[test] fn parse_frame1() { let inp = r##"{"query":{"select":{"ch_conf":{"Scylla":{"backend":"swissfel-daqbuf-ca","series":2367705320261409690,"scalar_type":"ChannelStatus","shape":[],"name":"SLGRE-LI2C03_CH6:TEMP"}},"range":{"TimeRange":{"beg":1695736001000000000,"end":1695736301000000000}},"transform":{"event":"ValueFull","time_binning":"None"},"wasm1":null},"settings":{"timeout":null,"events_max":200000,"event_delay":null,"stream_batch_len":null,"buf_len_disk_io":null,"queue_len_disk_io":null,"create_errors":[]},"ty":"EventsSubQuery","reqid":"3ea23209"}}"##; - let v: Frame1Parts = serde_json::from_str(inp).unwrap(); + // TODO assert + let _v: Frame1Parts = serde_json::from_str(inp).unwrap(); } diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index f2e42f3..e0f89bb 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -5,34 +5,25 @@ use crate::worker::ScyllaQueue; use err::thiserror; use err::ThisError; use futures_util::Future; -use futures_util::FutureExt; -use futures_util::Stream; use futures_util::StreamExt; use items_0::scalar_ops::ScalarOps; use items_0::Appendable; use items_0::Empty; use items_0::Events; use items_0::WithLen; -use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::DtNano; use netpod::EnumVariant; -use netpod::ScalarType; -use netpod::Shape; use netpod::TsMs; use netpod::TsNano; use scylla::frame::response::result::Row; use scylla::Session; use series::SeriesId; -use std::collections::VecDeque; -use std::mem; use std::pin::Pin; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use tracing::Instrument; #[derive(Debug, ThisError)] diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index da347a6..979295e 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -13,6 +13,7 @@ use items_0::Events; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ttl::RetentionTime; +use netpod::ChConf; use netpod::EnumVariant; use netpod::ScalarType; use netpod::Shape; @@ -97,9 +98,8 @@ enum State { pub struct EventsStreamRt { rt: RetentionTime, + ch_conf: ChConf, series: SeriesId, - scalar_type: ScalarType, - shape: Shape, range: ScyllaSeriesRange, readopts: EventReadOpts, state: State, @@ -112,21 +112,18 @@ pub struct EventsStreamRt { impl EventsStreamRt { pub fn new( rt: RetentionTime, - series: SeriesId, - scalar_type: ScalarType, - shape: Shape, + ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue, ) -> Self { - debug!("EventsStreamRt::new {series:?} {range:?} {rt:?} {readopts:?}"); - let msp_inp = - crate::events2::msp::MspStreamRt::new(rt.clone(), series.clone(), range.clone(), scyqueue.clone()); + debug!("EventsStreamRt::new {ch_conf:?} {range:?} {rt:?} {readopts:?}"); + let series = SeriesId::new(ch_conf.series()); + let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series, range.clone(), scyqueue.clone()); Self { rt, + ch_conf, series, - scalar_type, - shape, range, readopts, state: State::Begin, @@ -137,12 +134,6 @@ impl EventsStreamRt { } } - fn __handle_reading(self: Pin<&mut Self>, st: &mut Reading, cx: &mut Context) -> Result<(), Error> { - let _ = st; - let _ = cx; - todo!() - } - fn make_read_events_fut( &mut self, ts_msp: TsMs, @@ -158,8 +149,8 @@ impl EventsStreamRt { self.readopts.clone(), scyqueue, ); - let scalar_type = self.scalar_type.clone(); - let shape = self.shape.clone(); + let scalar_type = self.ch_conf.scalar_type().clone(); + let shape = self.ch_conf.shape().clone(); debug!("make_read_events_fut {:?} {:?}", shape, scalar_type); let fut = async move { let ret = match &shape { diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs index 6eff0d2..8244957 100644 --- a/crates/scyllaconn/src/events2/mergert.rs +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -17,9 +17,7 @@ use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use netpod::ttl::RetentionTime; -use netpod::ScalarType; -use netpod::Shape; -use series::SeriesId; +use netpod::ChConf; use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; @@ -35,6 +33,7 @@ pub enum Error { OrderMax, } +#[allow(unused)] enum Resolvable where F: Future, @@ -44,6 +43,7 @@ where Taken, } +#[allow(unused)] impl Resolvable where F: Future, @@ -100,9 +100,7 @@ enum State { } pub struct MergeRts { - series: SeriesId, - scalar_type: ScalarType, - shape: Shape, + ch_conf: ChConf, range: ScyllaSeriesRange, range_mt: ScyllaSeriesRange, range_lt: ScyllaSeriesRange, @@ -121,18 +119,9 @@ pub struct MergeRts { } impl MergeRts { - pub fn new( - series: SeriesId, - scalar_type: ScalarType, - shape: Shape, - range: ScyllaSeriesRange, - readopts: EventReadOpts, - scyqueue: ScyllaQueue, - ) -> Self { + pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self { Self { - series, - scalar_type, - shape, + ch_conf, range_mt: range.clone(), range_lt: range.clone(), range, @@ -160,9 +149,7 @@ impl MergeRts { let tsbeg = range.beg(); let inp = EventsStreamRt::new( rt, - self.series.clone(), - self.scalar_type.clone(), - self.shape.clone(), + self.ch_conf.clone(), range, self.readopts.clone(), self.scyqueue.clone(), @@ -181,9 +168,7 @@ impl MergeRts { let tsbeg = range.beg(); let inp = EventsStreamRt::new( rt, - self.series.clone(), - self.scalar_type.clone(), - self.shape.clone(), + self.ch_conf.clone(), range, self.readopts.clone(), self.scyqueue.clone(), @@ -201,9 +186,7 @@ impl MergeRts { let tsbeg = range.beg(); let inp = EventsStreamRt::new( rt, - self.series.clone(), - self.scalar_type.clone(), - self.shape.clone(), + self.ch_conf.clone(), range, self.readopts.clone(), self.scyqueue.clone(), diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index 8a4aa8a..df913c0 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -56,7 +56,6 @@ where } struct BckAndFirstFwd { - scyqueue: ScyllaQueue, fut_bck: Resolvable, crate::worker::Error>> + Send>>>, fut_fwd: Resolvable, crate::worker::Error>> + Send>>>, } @@ -97,7 +96,6 @@ impl MspStreamRt { series, range, state: State::BckAndFirstFwd(BckAndFirstFwd { - scyqueue, fut_bck: Resolvable::Future(Box::pin(fut_bck)), fut_fwd: Resolvable::Future(Box::pin(fut_fwd)), }), diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 4196efb..470a709 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -164,8 +164,7 @@ impl ScyllaWorker { let job = match x { Ok(x) => x, Err(_) => { - error!("ScyllaWorker can not receive from channel"); - return Err(Error::ChannelRecv); + break; } }; match job { @@ -198,6 +197,7 @@ impl ScyllaWorker { } } } - info!("scylla worker ended"); + info!("scylla worker finished"); + Ok(()) } } diff --git a/crates/streams/src/test/events.rs b/crates/streams/src/test/events.rs index 5ee66a2..ba18ae2 100644 --- a/crates/streams/src/test/events.rs +++ b/crates/streams/src/test/events.rs @@ -15,6 +15,7 @@ use netpod::range::evrange::SeriesRange; use netpod::ChConf; use netpod::ReqCtx; use netpod::ScalarType; +use netpod::SeriesKind; use netpod::SfDbChannel; use netpod::Shape; use query::api4::events::EventsSubQuery; @@ -30,7 +31,14 @@ async fn merged_events_inner() -> Result<(), Error> { let ctx = ReqCtx::for_test(); // TODO factor out the channel config lookup such that the test code can use a similar code path, // except that we don't want to go over the network here. - let ch_conf = ChConf::new(TEST_BACKEND, 1, ScalarType::I32, Shape::Scalar, "test-gen-i32-dim0-v00"); + let ch_conf = ChConf::new( + TEST_BACKEND, + 1, + SeriesKind::ChannelData, + ScalarType::I32, + Shape::Scalar, + "test-gen-i32-dim0-v00", + ); let channel = SfDbChannel::from_name(ch_conf.backend(), ch_conf.name()); let range = SeriesRange::TimeRange(NanoRange::from_date_time( "2023-12-18T05:10:00Z".parse().unwrap(),