Simplify channel config lookup

This commit is contained in:
Dominik Werder
2024-07-24 19:45:30 +02:00
parent 3889d8bf37
commit 8f383050f5
19 changed files with 152 additions and 169 deletions

View File

@@ -6,6 +6,8 @@ use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ChConf;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::SfDbChannel;
use netpod::Shape;
use netpod::TsMs;
use std::time::Duration;
@@ -20,12 +22,11 @@ use tokio_postgres::Client;
/// In the future, we can even try to involve time range information for that, but backends like
/// old archivers and sf databuffer do not support such lookup.
pub(super) async fn chconf_best_matching_for_name_and_range(
backend: &str,
name: &str,
channel: SfDbChannel,
range: NanoRange,
pg: &Client,
) -> Result<ChConf, Error> {
debug!("chconf_best_matching_for_name_and_range {backend} {name} {range:?}");
debug!("chconf_best_matching_for_name_and_range {channel:?} {range:?}");
#[cfg(DISABLED)]
if ncc.node_config.cluster.scylla.is_none() {
let e = Error::with_msg_no_trace(format!(
@@ -44,12 +45,17 @@ pub(super) async fn chconf_best_matching_for_name_and_range(
let sql = concat!(
"select unnest(tscs) as tsc, series, scalar_type, shape_dims",
" from series_by_channel",
" where kind = 2 and facility = $1 and channel = $2",
" where facility = $1",
" and channel = $2",
" and kind = $3",
" order by tsc",
);
let res = pg.query(sql, &[&backend, &name]).await.err_conv()?;
let res = pg
.query(sql, &[&channel.backend(), &channel.name(), &channel.kind().to_db_i16()])
.await
.err_conv()?;
if res.len() == 0 {
let e = Error::with_public_msg_no_trace(format!("can not find channel information for {name}"));
let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?} {range:?}"));
warn!("{e}");
Err(e)
} else if res.len() > 1 {
@@ -64,12 +70,13 @@ pub(super) async fn chconf_best_matching_for_name_and_range(
let _scalar_type = ScalarType::from_scylla_i32(scalar_type)?;
let _shape = Shape::from_scylla_shape_dims(&shape_dims)?;
let tsms = tsc.signed_duration_since(DateTime::UNIX_EPOCH).num_milliseconds() as u64;
let ts = TsMs(tsms);
let ts = TsMs::from_ms_u64(tsms);
rows.push((ts, series));
}
let tsmss: Vec<_> = rows.iter().map(|x| x.0.clone()).collect();
let range = (TsMs(range.beg / 1000), TsMs(range.end / 1000));
let res = decide_best_matching_index(range, &tsmss)?;
let backend = channel.backend().into();
let ch_conf = chconf_for_series(backend, rows[res].1, pg).await?;
Ok(ch_conf)
} else {
@@ -80,9 +87,10 @@ pub(super) async fn chconf_best_matching_for_name_and_range(
// TODO can I get a slice from psql driver?
let shape_dims: Vec<i32> = r.get(3);
let series = series as u64;
let kind = channel.kind();
let scalar_type = ScalarType::from_scylla_i32(scalar_type)?;
let shape = Shape::from_scylla_shape_dims(&shape_dims)?;
let ret = ChConf::new(backend, series, scalar_type, shape, name);
let ret = ChConf::new(channel.backend(), series, kind, scalar_type, shape, channel.name());
Ok(ret)
}
}
@@ -194,7 +202,7 @@ fn test_decide_best_matching_index_after_01() {
pub(super) async fn chconf_for_series(backend: &str, series: u64, pg: &Client) -> Result<ChConf, Error> {
let res = pg
.query(
"select channel, scalar_type, shape_dims from series_by_channel where facility = $1 and series = $2",
"select channel, scalar_type, shape_dims, kind from series_by_channel where facility = $1 and series = $2",
&[&backend, &(series as i64)],
)
.await
@@ -211,7 +219,9 @@ pub(super) async fn chconf_for_series(backend: &str, series: u64, pg: &Client) -
let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?;
// TODO can I get a slice from psql driver?
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(2))?;
let ret = ChConf::new(backend, series, scalar_type, shape, name);
let kind: i16 = row.get(3);
let kind = SeriesKind::from_db_i16(kind)?;
let ret = ChConf::new(backend, series, kind, scalar_type, shape, name);
Ok(ret)
}
}

View File

@@ -1,6 +1,5 @@
pub mod channelconfig;
pub mod channelinfo;
pub mod query;
pub mod scan;
pub mod search;
pub mod worker;

View File

@@ -1,48 +0,0 @@
use crate::create_connection;
use crate::ErrConv;
use err::Error;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::SfDbChannel;
// For sf-databuffer backend, given a Channel, try to complete the information if only id is given.
async fn sf_databuffer_fetch_channel_by_series(
channel: SfDbChannel,
ncc: &NodeConfigCached,
) -> Result<SfDbChannel, Error> {
let me = "sf_databuffer_fetch_channel_by_series";
info!("{me}");
// TODO should not be needed at some point.
if channel.backend().is_empty() || channel.name().is_empty() {
if let Some(series) = channel.series() {
if series < 1 {
error!("{me} bad input: {channel:?}");
Err(Error::with_msg_no_trace(format!("{me} bad input: {channel:?}")))
} else {
info!("{me} do the lookup");
let series = channel
.series()
.ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64;
let (pgcon, _pgjh) = create_connection(&ncc.node_config.cluster.database).await?;
let mut rows = pgcon
.query("select name from channels where rowid = $1", &[&series])
.await
.err_conv()?;
if let Some(row) = rows.pop() {
info!("{me} got a row {row:?}");
let name: String = row.get(0);
let channel = SfDbChannel::from_full(&ncc.node_config.cluster.backend, channel.series(), name);
info!("{me} return {channel:?}");
Ok(channel)
} else {
info!("{me} nothing found");
Err(Error::with_msg_no_trace("can not find series"))
}
}
} else {
Err(Error::with_msg_no_trace(format!("{me} bad input: {channel:?}")))
}
} else {
Ok(channel)
}
}

View File

@@ -10,6 +10,7 @@ use netpod::ChConf;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
use netpod::Database;
use netpod::SeriesKind;
use netpod::SfDbChannel;
use taskrun::tokio;
use tokio::task::JoinHandle;
@@ -38,7 +39,7 @@ impl err::ToErr for Error {
#[derive(Debug)]
enum Job {
ChConfBestMatchingNameRange(String, String, NanoRange, Sender<Result<ChConf, Error>>),
ChConfBestMatchingNameRange(SfDbChannel, NanoRange, Sender<Result<ChConf, Error>>),
ChConfForSeries(String, u64, Sender<Result<ChConf, Error>>),
InfoForSeriesIds(
Vec<u64>,
@@ -70,12 +71,11 @@ impl PgQueue {
pub async fn chconf_best_matching_name_range(
&self,
backend: &str,
name: &str,
channel: SfDbChannel,
range: NanoRange,
) -> Result<Receiver<Result<ChConf, Error>>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ChConfBestMatchingNameRange(backend.into(), name.into(), range, tx);
let job = Job::ChConfBestMatchingNameRange(channel, range, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
}
@@ -144,10 +144,9 @@ impl PgWorker {
}
};
match job {
Job::ChConfBestMatchingNameRange(backend, name, range, tx) => {
Job::ChConfBestMatchingNameRange(channel, range, tx) => {
let res =
crate::channelconfig::chconf_best_matching_for_name_and_range(&backend, &name, range, &self.pg)
.await;
crate::channelconfig::chconf_best_matching_for_name_and_range(channel, range, &self.pg).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
@@ -223,7 +222,7 @@ async fn find_sf_channel_by_series(
}
if let Some(row) = rows.into_iter().next() {
let name = row.get::<_, String>(0);
let channel = SfDbChannel::from_full(channel.backend(), channel.series(), name);
let channel = SfDbChannel::from_full(channel.backend(), channel.series(), name, SeriesKind::default());
Ok(channel)
} else {
return Err(FindChannelError::NoFound);

View File

@@ -77,9 +77,7 @@ impl EventDataHandler {
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, EventDataError> {
let (_head, body) = req.into_parts();
let body = read_body_bytes(body)
.await
.map_err(|_e| EventDataError::InternalError)?;
let body = read_body_bytes(body).await.map_err(|_| EventDataError::InternalError)?;
let inp = futures_util::stream::iter([Ok(body)]);
let frames = nodenet::conn::events_get_input_frames(inp)
.await

View File

@@ -110,7 +110,7 @@ async fn plain_events_cbor_framed(
ctx: &ReqCtx,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("plain_events_cbor_framed chconf_from_events_quorum: {ch_conf:?} {req:?}");
debug!("plain_events_cbor_framed {ch_conf:?} {req:?}");
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let stream = streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?;
let stream = bytes_chunks_to_framed(stream);
@@ -135,7 +135,7 @@ async fn plain_events_json_framed(
ctx: &ReqCtx,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("plain_events_json_framed chconf_from_events_quorum: {ch_conf:?} {req:?}");
debug!("plain_events_json_framed {ch_conf:?} {req:?}");
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?;
let stream = bytes_chunks_to_len_framed_str(stream);
@@ -151,7 +151,7 @@ async fn plain_events_json(
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let self_name = "plain_events_json";
debug!("{self_name} req: {:?}", req);
debug!("{self_name} {ch_conf:?} {req:?}");
let (_head, _body) = req.into_parts();
// TODO handle None case better and return 404
debug!("{self_name} chconf_from_events_quorum: {ch_conf:?}");

View File

@@ -29,6 +29,7 @@ use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use netpod::ScalarType;
use netpod::SfDbChannel;
use netpod::Shape;
use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
@@ -537,7 +538,7 @@ impl IocForChannel {
#[derive(Clone, Debug, Deserialize)]
pub struct ScyllaSeriesTsMspQuery {
name: String,
channel: SfDbChannel,
range: SeriesRange,
}
@@ -548,10 +549,7 @@ impl FromUrl for ScyllaSeriesTsMspQuery {
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let name = pairs
.get("channelName")
.ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))?
.into();
let channel = SfDbChannel::from_pairs(pairs)?;
let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) {
SeriesRange::TimeRange(x.into())
} else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) {
@@ -559,7 +557,7 @@ impl FromUrl for ScyllaSeriesTsMspQuery {
} else {
return Err(err::Error::with_public_msg_no_trace("no time range in url"));
};
Ok(Self { name, range })
Ok(Self { channel, range })
}
}
@@ -585,7 +583,7 @@ impl ScyllaSeriesTsMsp {
&self,
req: Requ,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
_ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
@@ -596,7 +594,7 @@ impl ScyllaSeriesTsMsp {
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = req_uri_to_url(req.uri())?;
let q = ScyllaSeriesTsMspQuery::from_url(&url)?;
match self.get_ts_msps(&q, shared_res, ncc).await {
match self.get_ts_msps(&q, shared_res).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
@@ -616,10 +614,7 @@ impl ScyllaSeriesTsMsp {
&self,
q: &ScyllaSeriesTsMspQuery,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<ScyllaSeriesTsMspResponse, Error> {
let backend = &ncc.node_config.cluster.backend;
let name = &q.name;
let nano_range = if let SeriesRange::TimeRange(x) = q.range.clone() {
x
} else {
@@ -627,7 +622,7 @@ impl ScyllaSeriesTsMsp {
};
let chconf = shared_res
.pgqueue
.chconf_best_matching_name_range(backend, name, nano_range)
.chconf_best_matching_name_range(q.channel.clone(), nano_range)
.await
.map_err(|e| Error::with_msg_no_trace(format!("error from pg worker: {e}")))?
.recv()

View File

@@ -58,7 +58,6 @@ pub mod log2 {
pub use tracing::{self, event, span, Level};
}
use crate::log::*;
use bytes::Bytes;
use chrono::DateTime;
use chrono::TimeZone;
@@ -169,7 +168,7 @@ pub struct BodyStream {
pub inner: Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>,
}
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]
pub enum SeriesKind {
ChannelStatus,
ChannelData,
@@ -197,6 +196,44 @@ impl SeriesKind {
}
}
impl Default for SeriesKind {
fn default() -> Self {
SeriesKind::ChannelData
}
}
impl FromUrl for SeriesKind {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
let ret = pairs
.get("seriesKind")
.and_then(|x| match x.as_str() {
"channelStatus" => Some(Self::ChannelStatus),
"channelData" => Some(Self::ChannelData),
"caStatus" => Some(Self::CaStatus),
_ => None,
})
.unwrap_or(Self::default());
Ok(ret)
}
}
impl AppendToUrl for SeriesKind {
fn append_to_url(&self, url: &mut Url) {
let s = match self {
SeriesKind::ChannelStatus => "channelStatus",
SeriesKind::ChannelData => "channelData",
SeriesKind::CaStatus => "caStatus",
};
let mut g = url.query_pairs_mut();
g.append_pair("seriesKind", &s);
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ScalarType {
U8,
@@ -975,7 +1012,7 @@ pub struct NodeStatus {
// Also the concept of "backend" could be split into "facility" and some optional other identifier
// for cases like e.g. post-mortem, or to differentiate between channel-access and bsread for cases where
// the same channel-name is delivered via different methods.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SfDbChannel {
series: Option<u64>,
// "backend" is currently used in the existing systems for multiple purposes:
@@ -983,14 +1020,21 @@ pub struct SfDbChannel {
// some special subsystem (eg. sf-rf-databuffer).
backend: String,
name: String,
kind: SeriesKind,
}
impl SfDbChannel {
pub fn from_full<T: Into<String>, U: Into<String>>(backend: T, series: Option<u64>, name: U) -> Self {
pub fn from_full<T: Into<String>, U: Into<String>>(
backend: T,
series: Option<u64>,
name: U,
kind: SeriesKind,
) -> Self {
Self {
backend: backend.into(),
series,
name: name.into(),
kind,
}
}
@@ -999,6 +1043,7 @@ impl SfDbChannel {
backend: backend.into(),
series: None,
name: name.into(),
kind: SeriesKind::default(),
}
}
@@ -1014,6 +1059,10 @@ impl SfDbChannel {
&self.name
}
pub fn kind(&self) -> SeriesKind {
self.kind.clone()
}
pub fn set_series(&mut self, series: u64) {
self.series = Some(series);
}
@@ -1039,6 +1088,7 @@ impl FromUrl for SfDbChannel {
series: pairs
.get("seriesId")
.and_then(|x| x.parse::<u64>().map_or(None, |x| Some(x))),
kind: SeriesKind::from_pairs(pairs)?,
};
if ret.name.is_empty() && ret.series.is_none() {
return Err(Error::with_public_msg_no_trace(format!(
@@ -1059,6 +1109,8 @@ impl AppendToUrl for SfDbChannel {
if let Some(series) = self.series {
g.append_pair("seriesId", &series.to_string());
}
drop(g);
self.kind.append_to_url(url);
}
}
@@ -3130,7 +3182,7 @@ impl HasTimeout for ChannelConfigQuery {
Duration::from_millis(10000)
}
fn set_timeout(&mut self, timeout: Duration) {
fn set_timeout(&mut self, _timeout: Duration) {
// TODO
// self.timeout = Some(timeout);
}
@@ -3212,6 +3264,8 @@ pub struct DaqbufChannelConfig {
pub backend: String,
#[serde(rename = "seriesId")]
pub series: u64,
#[serde(rename = "seriesKind")]
pub kind: SeriesKind,
#[serde(rename = "scalarType")]
pub scalar_type: ScalarType,
#[serde(rename = "shape")]
@@ -3246,6 +3300,7 @@ impl From<ChConf> for ChannelConfigResponse {
Self::Daqbuf(DaqbufChannelConfig {
backend: value.backend().into(),
series: value.series(),
kind: value.kind(),
scalar_type: value.scalar_type().clone(),
shape: value.shape().clone(),
name: value.name().into(),
@@ -3278,13 +3333,21 @@ pub struct ChannelInfo {
pub struct ChConf {
backend: String,
series: u64,
kind: SeriesKind,
scalar_type: ScalarType,
shape: Shape,
name: String,
}
impl ChConf {
pub fn new<S1, S2>(backend: S1, series: u64, scalar_type: ScalarType, shape: Shape, name: S2) -> Self
pub fn new<S1, S2>(
backend: S1,
series: u64,
kind: SeriesKind,
scalar_type: ScalarType,
shape: Shape,
name: S2,
) -> Self
where
S1: Into<String>,
S2: Into<String>,
@@ -3292,6 +3355,7 @@ impl ChConf {
Self {
backend: backend.into(),
series,
kind,
scalar_type,
shape,
name: name.into(),
@@ -3306,6 +3370,10 @@ impl ChConf {
self.series
}
pub fn kind(&self) -> SeriesKind {
self.kind.clone()
}
pub fn scalar_type(&self) -> &ScalarType {
&self.scalar_type
}

View File

@@ -108,7 +108,7 @@ pub async fn channel_config(
Ok(Some(channel_config_test_backend(channel)?))
} else if ncc.node_config.cluster.scylla_st().is_some() {
debug!("try to get ChConf for scylla type backend");
let ret = scylla_chconf_from_sf_db_channel(range, &channel, pgqueue)
let ret = scylla_chconf_from_sf_db_channel(range, channel, pgqueue)
.await
.map_err(Error::from)?;
Ok(Some(ChannelTypeConfigGen::Scylla(ret)))
@@ -207,7 +207,7 @@ pub async fn http_get_channel_config(
async fn scylla_chconf_from_sf_db_channel(
range: NanoRange,
channel: &SfDbChannel,
channel: SfDbChannel,
pgqueue: &PgQueue,
) -> Result<ChConf, Error> {
if let Some(series) = channel.series() {
@@ -220,7 +220,7 @@ async fn scylla_chconf_from_sf_db_channel(
} else {
// TODO let called function allow to return None instead of error-not-found
let ret = pgqueue
.chconf_best_matching_name_range(channel.backend(), channel.name(), range)
.chconf_best_matching_name_range(channel, range)
.await?
.recv()
.await??;

View File

@@ -30,7 +30,7 @@ fn decide_sf_ch_config_quorum(inp: Vec<ChannelConfigResponse>) -> Result<Option<
k.shape,
)),
ChannelConfigResponse::Daqbuf(k) => {
ChannelTypeConfigGen::Scylla(ChConf::new(k.backend, k.series, k.scalar_type, k.shape, k.name))
ChannelTypeConfigGen::Scylla(ChConf::new(k.backend, k.series, k.kind, k.scalar_type, k.shape, k.name))
}
};
if histo.contains_key(&item) {

View File

@@ -129,9 +129,8 @@ pub async fn create_response_bytes_stream(
ncc: &NodeConfigCached,
) -> Result<BoxedBytesStream, Error> {
debug!(
"create_response_bytes_stream {:?} {:?} wasm1 {:?}",
evq.ch_conf().scalar_type(),
evq.ch_conf().shape(),
"create_response_bytes_stream {:?} wasm1 {:?}",
evq.ch_conf(),
evq.wasm1()
);
let reqctx = netpod::ReqCtx::new_from_single_reqid(evq.reqid().into()).into();

View File

@@ -41,9 +41,7 @@ pub async fn scylla_channel_event_stream(
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
let x = scyllaconn::events2::events::EventsStreamRt::new(
rt,
series,
scalar_type.clone(),
shape.clone(),
chconf,
evq.range().into(),
readopts,
scyqueue.clone(),
@@ -51,14 +49,7 @@ pub async fn scylla_channel_event_stream(
.map_err(|e| scyllaconn::events2::mergert::Error::from(e));
Box::pin(x)
} else {
let x = scyllaconn::events2::mergert::MergeRts::new(
series,
scalar_type.clone(),
shape.clone(),
evq.range().into(),
readopts,
scyqueue.clone(),
);
let x = scyllaconn::events2::mergert::MergeRts::new(chconf, evq.range().into(), readopts, scyqueue.clone());
Box::pin(x)
};
let stream = stream

View File

@@ -596,5 +596,6 @@ impl Frame1Parts {
#[test]
fn parse_frame1() {
let inp = r##"{"query":{"select":{"ch_conf":{"Scylla":{"backend":"swissfel-daqbuf-ca","series":2367705320261409690,"scalar_type":"ChannelStatus","shape":[],"name":"SLGRE-LI2C03_CH6:TEMP"}},"range":{"TimeRange":{"beg":1695736001000000000,"end":1695736301000000000}},"transform":{"event":"ValueFull","time_binning":"None"},"wasm1":null},"settings":{"timeout":null,"events_max":200000,"event_delay":null,"stream_batch_len":null,"buf_len_disk_io":null,"queue_len_disk_io":null,"create_errors":[]},"ty":"EventsSubQuery","reqid":"3ea23209"}}"##;
let v: Frame1Parts = serde_json::from_str(inp).unwrap();
// TODO assert
let _v: Frame1Parts = serde_json::from_str(inp).unwrap();
}

View File

@@ -5,34 +5,25 @@ use crate::worker::ScyllaQueue;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::scalar_ops::ScalarOps;
use items_0::Appendable;
use items_0::Empty;
use items_0::Events;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::DtNano;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scylla::frame::response::result::Row;
use scylla::Session;
use series::SeriesId;
use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tracing::Instrument;
#[derive(Debug, ThisError)]

View File

@@ -13,6 +13,7 @@ use items_0::Events;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::ChConf;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::Shape;
@@ -97,9 +98,8 @@ enum State {
pub struct EventsStreamRt {
rt: RetentionTime,
ch_conf: ChConf,
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
range: ScyllaSeriesRange,
readopts: EventReadOpts,
state: State,
@@ -112,21 +112,18 @@ pub struct EventsStreamRt {
impl EventsStreamRt {
pub fn new(
rt: RetentionTime,
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
ch_conf: ChConf,
range: ScyllaSeriesRange,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
) -> Self {
debug!("EventsStreamRt::new {series:?} {range:?} {rt:?} {readopts:?}");
let msp_inp =
crate::events2::msp::MspStreamRt::new(rt.clone(), series.clone(), range.clone(), scyqueue.clone());
debug!("EventsStreamRt::new {ch_conf:?} {range:?} {rt:?} {readopts:?}");
let series = SeriesId::new(ch_conf.series());
let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series, range.clone(), scyqueue.clone());
Self {
rt,
ch_conf,
series,
scalar_type,
shape,
range,
readopts,
state: State::Begin,
@@ -137,12 +134,6 @@ impl EventsStreamRt {
}
}
fn __handle_reading(self: Pin<&mut Self>, st: &mut Reading, cx: &mut Context) -> Result<(), Error> {
let _ = st;
let _ = cx;
todo!()
}
fn make_read_events_fut(
&mut self,
ts_msp: TsMs,
@@ -158,8 +149,8 @@ impl EventsStreamRt {
self.readopts.clone(),
scyqueue,
);
let scalar_type = self.scalar_type.clone();
let shape = self.shape.clone();
let scalar_type = self.ch_conf.scalar_type().clone();
let shape = self.ch_conf.shape().clone();
debug!("make_read_events_fut {:?} {:?}", shape, scalar_type);
let fut = async move {
let ret = match &shape {

View File

@@ -17,9 +17,7 @@ use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
use netpod::Shape;
use series::SeriesId;
use netpod::ChConf;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
@@ -35,6 +33,7 @@ pub enum Error {
OrderMax,
}
#[allow(unused)]
enum Resolvable<F>
where
F: Future,
@@ -44,6 +43,7 @@ where
Taken,
}
#[allow(unused)]
impl<F> Resolvable<F>
where
F: Future,
@@ -100,9 +100,7 @@ enum State {
}
pub struct MergeRts {
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
ch_conf: ChConf,
range: ScyllaSeriesRange,
range_mt: ScyllaSeriesRange,
range_lt: ScyllaSeriesRange,
@@ -121,18 +119,9 @@ pub struct MergeRts {
}
impl MergeRts {
pub fn new(
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
range: ScyllaSeriesRange,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
) -> Self {
pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self {
Self {
series,
scalar_type,
shape,
ch_conf,
range_mt: range.clone(),
range_lt: range.clone(),
range,
@@ -160,9 +149,7 @@ impl MergeRts {
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.series.clone(),
self.scalar_type.clone(),
self.shape.clone(),
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),
@@ -181,9 +168,7 @@ impl MergeRts {
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.series.clone(),
self.scalar_type.clone(),
self.shape.clone(),
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),
@@ -201,9 +186,7 @@ impl MergeRts {
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.series.clone(),
self.scalar_type.clone(),
self.shape.clone(),
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),

View File

@@ -56,7 +56,6 @@ where
}
struct BckAndFirstFwd {
scyqueue: ScyllaQueue,
fut_bck: Resolvable<Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>>,
fut_fwd: Resolvable<Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>>,
}
@@ -97,7 +96,6 @@ impl MspStreamRt {
series,
range,
state: State::BckAndFirstFwd(BckAndFirstFwd {
scyqueue,
fut_bck: Resolvable::Future(Box::pin(fut_bck)),
fut_fwd: Resolvable::Future(Box::pin(fut_fwd)),
}),

View File

@@ -164,8 +164,7 @@ impl ScyllaWorker {
let job = match x {
Ok(x) => x,
Err(_) => {
error!("ScyllaWorker can not receive from channel");
return Err(Error::ChannelRecv);
break;
}
};
match job {
@@ -198,6 +197,7 @@ impl ScyllaWorker {
}
}
}
info!("scylla worker ended");
info!("scylla worker finished");
Ok(())
}
}

View File

@@ -15,6 +15,7 @@ use netpod::range::evrange::SeriesRange;
use netpod::ChConf;
use netpod::ReqCtx;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::SfDbChannel;
use netpod::Shape;
use query::api4::events::EventsSubQuery;
@@ -30,7 +31,14 @@ async fn merged_events_inner() -> Result<(), Error> {
let ctx = ReqCtx::for_test();
// TODO factor out the channel config lookup such that the test code can use a similar code path,
// except that we don't want to go over the network here.
let ch_conf = ChConf::new(TEST_BACKEND, 1, ScalarType::I32, Shape::Scalar, "test-gen-i32-dim0-v00");
let ch_conf = ChConf::new(
TEST_BACKEND,
1,
SeriesKind::ChannelData,
ScalarType::I32,
Shape::Scalar,
"test-gen-i32-dim0-v00",
);
let channel = SfDbChannel::from_name(ch_conf.backend(), ch_conf.name());
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
"2023-12-18T05:10:00Z".parse().unwrap(),