Deliver channel status events

This commit is contained in:
Dominik Werder
2023-09-27 14:07:48 +02:00
parent 76c61f564c
commit 921c3c1498
22 changed files with 431 additions and 307 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,6 @@
[workspace]
members = ["crates/*"]
resolver = "2"
[profile.release]
opt-level = 2
@@ -22,3 +23,4 @@ incremental = true
[patch.crates-io]
#tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" }
thiserror = { git = "https://github.com/dominikwerder/thiserror.git" }

View File

@@ -16,6 +16,7 @@ use netpod::Shape;
/// In the future, we can even try to involve time range information for that, but backends like
/// old archivers and sf databuffer do not support such lookup.
pub async fn chconf_from_scylla_type_backend(channel: &SfDbChannel, ncc: &NodeConfigCached) -> Result<ChConf, Error> {
debug!("chconf_from_scylla_type_backend {channel:?}");
if channel.backend() != ncc.node_config.cluster.backend {
warn!(
"mismatched backend {} vs {}",

View File

@@ -8,7 +8,6 @@ use netpod::ChannelSearchSingleResult;
use netpod::Database;
use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::ScyllaConfig;
use netpod::Shape;
use serde_json::Value as JsVal;
@@ -85,24 +84,28 @@ pub async fn search_channel_databuffer(
Ok(ret)
}
pub async fn search_channel_scylla(
query: ChannelSearchQuery,
_scyconf: &ScyllaConfig,
pgconf: &Database,
) -> Result<ChannelSearchResult, Error> {
pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database) -> Result<ChannelSearchResult, Error> {
let empty = if !query.name_regex.is_empty() { false } else { true };
if empty {
let ret = ChannelSearchResult { channels: Vec::new() };
return Ok(ret);
}
let sql = format!(concat!(
"select",
" series, facility, channel, scalar_type, shape_dims",
" from series_by_channel",
" where channel ~* $1",
" and scalar_type != -2147483647",
" limit 400000",
));
let cond_status = if query.channel_status {
"scalar_type = 14"
} else {
"scalar_type != 14"
};
let sql = format!(
concat!(
"select",
" series, facility, channel, scalar_type, shape_dims",
" from series_by_channel",
" where channel ~* $1",
" and {}",
" limit 400000",
),
cond_status
);
let pgclient = crate::create_connection(pgconf).await?;
let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?;
let mut res = Vec::new();
@@ -112,21 +115,28 @@ pub async fn search_channel_scylla(
let backend: String = row.get(1);
let channel: String = row.get(2);
let a: i32 = row.get(3);
let scalar_type = ScalarType::from_scylla_i32(a)?;
let a: Vec<i32> = row.get(4);
let shape = Shape::from_scylla_shape_dims(&a)?;
let k = ChannelSearchSingleResult {
backend,
name: channel,
series,
source: "".into(),
ty: scalar_type.to_variant_str().into(),
shape: shape.to_scylla_vec().into_iter().map(|x| x as u32).collect(),
unit: "".into(),
description: "".into(),
is_api_0: None,
};
res.push(k);
// TODO count the failure cases
if let Ok(scalar_type) = ScalarType::from_scylla_i32(a) {
let a: Vec<i32> = row.get(4);
if let Ok(shape) = Shape::from_scylla_shape_dims(&a) {
let k = ChannelSearchSingleResult {
backend,
name: channel,
series,
source: "".into(),
ty: scalar_type.to_variant_str().into(),
shape: shape.to_scylla_vec().into_iter().map(|x| x as u32).collect(),
unit: "".into(),
description: "".into(),
is_api_0: None,
};
res.push(k);
} else {
netpod::log::warn!("unknown shape {a:?}");
}
} else {
netpod::log::warn!("unknown scalar_type {a:?}");
}
}
let ret = ChannelSearchResult { channels: res };
Ok(ret)
@@ -249,8 +259,8 @@ pub async fn search_channel(
node_config: &NodeConfigCached,
) -> Result<ChannelSearchResult, Error> {
let pgconf = &node_config.node_config.cluster.database;
if let Some(scyconf) = node_config.node_config.cluster.scylla.as_ref() {
search_channel_scylla(query, scyconf, pgconf).await
if let Some(_scyconf) = node_config.node_config.cluster.scylla.as_ref() {
search_channel_scylla(query, pgconf).await
} else if let Some(conf) = node_config.node.channel_archiver.as_ref() {
search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, pgconf).await
} else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() {

View File

@@ -269,6 +269,7 @@ fn make_scalar_conv(
ScalarType::F64 => ValueDim0FromBytesImpl::<f64>::boxed(),
ScalarType::BOOL => ValueDim0FromBytesImpl::<bool>::boxed(),
ScalarType::STRING => ValueDim0FromBytesImpl::<String>::boxed(),
ScalarType::ChannelStatus => ValueDim0FromBytesImpl::<u32>::boxed(),
},
Shape::Wave(_) => {
let shape = shape.clone();
@@ -285,6 +286,7 @@ fn make_scalar_conv(
ScalarType::F64 => ValueDim1FromBytesImpl::<f64>::boxed(shape),
ScalarType::BOOL => ValueDim1FromBytesImpl::<bool>::boxed(shape),
ScalarType::STRING => ValueDim1FromBytesImpl::<String>::boxed(shape),
ScalarType::ChannelStatus => ValueDim1FromBytesImpl::<u32>::boxed(shape),
}
}
Shape::Image(_, _) => {

View File

@@ -20,6 +20,7 @@ regex = "1.9.1"
http = "0.2.9"
thiserror = "=0.0.1"
anyhow = "1.0"
tokio = "1"
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git" }

View File

@@ -115,7 +115,7 @@ impl Error {
where
E: ToString,
{
Self::with_msg(e.to_string())
Self::with_msg_no_trace(e.to_string())
}
pub fn add_backtrace(self) -> Self {
@@ -425,6 +425,12 @@ impl From<anyhow::Error> for Error {
}
}
impl From<tokio::task::JoinError> for Error {
fn from(k: tokio::task::JoinError) -> Self {
Self::with_msg(format!("{k}"))
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PublicError {
reason: Option<Reason>,

View File

@@ -144,6 +144,7 @@ pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConf
name_regex: query.regex.map_or(String::new(), |k| k),
source_regex: query.source_regex.map_or(String::new(), |k| k),
description_regex: query.description_regex.map_or(String::new(), |k| k),
channel_status: false,
};
let urls = proxy_config
.backends
@@ -250,6 +251,7 @@ pub async fn channel_search_configs_v1(
name_regex: query.regex.map_or(String::new(), |k| k),
source_regex: query.source_regex.map_or(String::new(), |k| k),
description_regex: query.description_regex.map_or(String::new(), |k| k),
channel_status: false,
};
let urls = proxy_config
.backends

View File

@@ -7,11 +7,12 @@ use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use items_2::channelevents::ChannelStatusEvent;
use items_0::Empty;
use items_0::Extendable;
use items_2::channelevents::ChannelStatusEvents;
use items_2::channelevents::ConnStatusEvent;
use netpod::log::*;
use netpod::query::ChannelStateEventsQuery;
use netpod::ChannelTypeConfigGen;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ACCEPT_ALL;
@@ -75,26 +76,26 @@ impl ConnectionStatusEvents {
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let _scy = scyllaconn::create_scy_session(scyco).await?;
let chconf =
let _chconf =
nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?;
let _do_one_before_range = true;
let ret = Vec::new();
if true {
return Err(Error::with_msg_no_trace("TODO channel_status fetch_data"));
}
/*let mut stream =
scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy);
while let Some(item) = stream.next().await {
let item = item?;
ret.push(item);
}*/
// let mut stream =
// scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy);
// while let Some(item) = stream.next().await {
// let item = item?;
// ret.push(item);
// }
Ok(ret)
}
}
pub struct ChannelStatusEvents {}
pub struct ChannelStatusEventsHandler {}
impl ChannelStatusEvents {
impl ChannelStatusEventsHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/status/channel/events" {
Some(Self {})
@@ -141,7 +142,7 @@ impl ChannelStatusEvents {
&self,
q: &ChannelStateEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Vec<ChannelStatusEvent>, Error> {
) -> Result<ChannelStatusEvents, Error> {
let scyco = node_config
.node_config
.cluster
@@ -149,26 +150,28 @@ impl ChannelStatusEvents {
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let scy = scyllaconn::create_scy_session(scyco).await?;
let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel config not found"))?;
let do_one_before_range = true;
match chconf {
ChannelTypeConfigGen::Scylla(ch_conf) => {
let mut stream = scyllaconn::status::StatusStreamScylla::new(
ch_conf.series(),
q.range().clone(),
do_one_before_range,
scy,
);
let mut ret = Vec::new();
while let Some(item) = stream.next().await {
let item = item?;
ret.push(item);
}
Ok(ret)
if false {
let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel config not found"))?;
use netpod::ChannelTypeConfigGen;
match chconf {
ChannelTypeConfigGen::Scylla(_x) => todo!(),
ChannelTypeConfigGen::SfDatabuffer(_x) => todo!(),
}
ChannelTypeConfigGen::SfDatabuffer(k) => todo!(),
}
let mut stream = scyllaconn::status::StatusStreamScylla::new(
q.channel().series().unwrap(),
q.range().clone(),
do_one_before_range,
scy,
);
let mut ret = ChannelStatusEvents::empty();
while let Some(item) = stream.next().await {
let mut item = item?;
ret.extend_from(&mut item);
}
Ok(ret)
}
}

View File

@@ -407,7 +407,7 @@ async fn http_service_inner(
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
} else if let Some(h) = channel_status::ChannelStatusEvents::handler(&req) {
} else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
} else if path == "/api/4/prebinned" {
if req.method() == Method::GET {

View File

@@ -290,8 +290,8 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
if v == APP_JSON {
let url = Url::parse(&format!("dummy:{}", head.uri))?;
let query = ChannelSearchQuery::from_url(&url)?;
let mut methods = vec![];
let mut bodies = vec![];
let mut methods = Vec::new();
let mut bodies = Vec::new();
let mut urls = proxy_config
.backends
.iter()

View File

@@ -6,9 +6,18 @@ use serde::Deserialize;
use serde::Serialize;
use serde::Serializer;
#[derive(Clone, Debug, Deserialize)]
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct IsoDateTime(DateTime<Utc>);
impl IsoDateTime {
pub fn from_unix_millis(ms: u64) -> Self {
let datetime = chrono::NaiveDateTime::from_timestamp_millis(ms as i64)
.unwrap()
.and_utc();
Self(datetime)
}
}
impl Serialize for IsoDateTime {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where

View File

@@ -39,6 +39,10 @@ pub trait Appendable<STY>: Empty + WithLen {
fn push(&mut self, ts: u64, pulse: u64, value: STY);
}
pub trait Extendable: Empty + WithLen {
fn extend_from(&mut self, src: &mut Self);
}
pub trait TypeName {
fn type_name(&self) -> String;
}

View File

@@ -6,6 +6,7 @@ use items_0::collect_s::Collected;
use items_0::collect_s::Collector;
use items_0::container::ByteEstimate;
use items_0::framable::FrameTypeInnerStatic;
use items_0::isodate::IsoDateTime;
use items_0::overlap::RangeOverlapInfo;
use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
use items_0::timebin::TimeBinnable;
@@ -15,7 +16,9 @@ use items_0::timebin::TimeBinner;
use items_0::timebin::TimeBinnerTy;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::EventsNonObj;
use items_0::Extendable;
use items_0::MergeError;
use items_0::TypeName;
use items_0::WithLen;
@@ -85,6 +88,41 @@ impl ChannelStatus {
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ChannelStatusEvents {
pub tss: VecDeque<u64>,
pub datetimes: VecDeque<IsoDateTime>,
pub statuses: VecDeque<ChannelStatus>,
}
impl Empty for ChannelStatusEvents {
fn empty() -> Self {
Self {
tss: VecDeque::new(),
datetimes: VecDeque::new(),
statuses: VecDeque::new(),
}
}
}
impl WithLen for ChannelStatusEvents {
fn len(&self) -> usize {
self.tss.len()
}
}
impl Extendable for ChannelStatusEvents {
fn extend_from(&mut self, src: &mut Self) {
use core::mem::replace;
let v = replace(&mut src.tss, VecDeque::new());
self.tss.extend(v.into_iter());
let v = replace(&mut src.datetimes, VecDeque::new());
self.datetimes.extend(v.into_iter());
let v = replace(&mut src.statuses, VecDeque::new());
self.statuses.extend(v.into_iter());
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelStatusEvent {
pub ts: u64,
@@ -269,6 +307,11 @@ mod serde_channel_events {
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
String::SUB => {
let obj: EventsDim0<String> =
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
_ => {
error!("TODO serde cty {cty} nty {nty}");
Err(de::Error::custom(&format!("unknown nty {nty}")))
@@ -296,6 +339,11 @@ mod serde_channel_events {
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
String::SUB => {
let obj: EventsDim1<String> =
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
_ => {
error!("TODO serde cty {cty} nty {nty}");
Err(de::Error::custom(&format!("unknown nty {nty}")))

View File

@@ -25,6 +25,7 @@ pub fn empty_events_dyn_ev(scalar_type: &ScalarType, shape: &Shape) -> Result<Bo
F64 => Box::new(K::<f64>::empty()),
BOOL => Box::new(K::<bool>::empty()),
STRING => Box::new(K::<String>::empty()),
ChannelStatus => Box::new(K::<u32>::empty()),
}
}
Shape::Wave(..) => {
@@ -43,6 +44,7 @@ pub fn empty_events_dyn_ev(scalar_type: &ScalarType, shape: &Shape) -> Result<Bo
F64 => Box::new(K::<f64>::empty()),
BOOL => Box::new(K::<bool>::empty()),
STRING => Box::new(K::<String>::empty()),
ChannelStatus => Box::new(K::<u32>::empty()),
}
}
Shape::Image(..) => {

View File

@@ -95,6 +95,7 @@ pub enum ScalarType {
F64,
BOOL,
STRING,
ChannelStatus,
}
impl fmt::Debug for ScalarType {
@@ -128,6 +129,7 @@ impl Serialize for ScalarType {
F64 => ser.serialize_str("f64"),
BOOL => ser.serialize_str("bool"),
STRING => ser.serialize_str("string"),
ChannelStatus => ser.serialize_str("ChannelStatus"),
}
}
}
@@ -156,6 +158,7 @@ impl<'de> serde::de::Visitor<'de> for ScalarTypeVis {
"f64" => ScalarType::F64,
"bool" => ScalarType::BOOL,
"string" => ScalarType::STRING,
"channelstatus" => ScalarType::ChannelStatus,
k => return Err(E::custom(format!("can not understand variant {k:?}"))),
};
Ok(ret)
@@ -192,6 +195,7 @@ impl ScalarType {
11 => F32,
12 => F64,
13 => STRING,
14 => ChannelStatus,
//13 => return Err(Error::with_msg(format!("STRING not supported"))),
6 => return Err(Error::with_msg(format!("CHARACTER not supported"))),
_ => return Err(Error::with_msg(format!("unknown dtype code: {:?}", ix))),
@@ -214,6 +218,7 @@ impl ScalarType {
F64 => "f64",
BOOL => "bool",
STRING => "string",
ChannelStatus => "ChannelStatus",
}
}
@@ -232,6 +237,7 @@ impl ScalarType {
F64 => "float64",
BOOL => "bool",
STRING => "string",
ChannelStatus => "ChannelStatus",
}
}
@@ -252,6 +258,7 @@ impl ScalarType {
"float64" => F64,
"string" => STRING,
"bool" => BOOL,
"ChannelStatus" => ChannelStatus,
_ => {
return Err(Error::with_msg_no_trace(format!(
"from_bsread_str can not understand bsread {:?}",
@@ -323,6 +330,7 @@ impl ScalarType {
F64 => 8,
BOOL => 1,
STRING => 1,
ChannelStatus => 4,
}
}
@@ -341,6 +349,7 @@ impl ScalarType {
F64 => 12,
BOOL => 0,
STRING => 13,
ChannelStatus => 14,
}
}
@@ -2469,6 +2478,8 @@ pub struct ChannelSearchQuery {
pub name_regex: String,
pub source_regex: String,
pub description_regex: String,
#[serde(default)]
pub channel_status: bool,
}
impl ChannelSearchQuery {
@@ -2479,6 +2490,11 @@ impl ChannelSearchQuery {
name_regex: pairs.get("nameRegex").map_or(String::new(), |k| k.clone()),
source_regex: pairs.get("sourceRegex").map_or(String::new(), |k| k.clone()),
description_regex: pairs.get("descriptionRegex").map_or(String::new(), |k| k.clone()),
channel_status: pairs
.get("channelStatus")
.map(|k| k.parse().ok())
.unwrap_or(None)
.unwrap_or(false),
};
Ok(ret)
}
@@ -2491,6 +2507,8 @@ impl ChannelSearchQuery {
qp.append_pair("nameRegex", &self.name_regex);
qp.append_pair("sourceRegex", &self.source_regex);
qp.append_pair("descriptionRegex", &self.description_regex);
let v = &self.channel_status;
qp.append_pair("channelStatus", &v.to_string());
}
}

View File

@@ -317,8 +317,9 @@ pub fn events_parse_input_query(frames: Vec<InMemoryFrame>) -> Result<(EventsSub
Err(e) => return Err(e),
};
let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|e| {
let e = Error::with_msg_no_trace(format!("json parse error: {} inp {:?}", e, qitem.str()));
let e = Error::with_msg_no_trace(format!("json parse error: {} inp {}", e, qitem.str()));
error!("{e}");
error!("input was {}", qitem.str());
e
})?;
Ok(frame1.parts())

View File

@@ -110,6 +110,7 @@ impl From<&ScalarType> for Api1ScalarType {
A::F64 => B::F64,
A::BOOL => B::BOOL,
A::STRING => B::STRING,
A::ChannelStatus => todo!("ChannelStatus not in Api1ScalarType"),
}
}
}

View File

@@ -503,3 +503,9 @@ impl Frame1Parts {
(self.query,)
}
}
#[test]
fn parse_frame1() {
let inp = r##"{"query":{"select":{"ch_conf":{"Scylla":{"backend":"swissfel-daqbuf-ca","series":2367705320261409690,"scalar_type":"ChannelStatus","shape":[],"name":"SLGRE-LI2C03_CH6:TEMP"}},"range":{"TimeRange":{"beg":1695736001000000000,"end":1695736301000000000}},"transform":{"event":"ValueFull","time_binning":"None"},"wasm1":null},"settings":{"timeout":null,"events_max":200000,"event_delay":null,"stream_batch_len":null,"buf_len_disk_io":null,"queue_len_disk_io":null,"create_errors":[]},"ty":"EventsSubQuery","reqid":"3ea23209"}}"##;
let v: Frame1Parts = serde_json::from_str(inp).unwrap();
}

View File

@@ -115,6 +115,7 @@ impl_scaty_scalar!(i64, i64, "events_scalar_i64");
impl_scaty_scalar!(f32, f32, "events_scalar_f32");
impl_scaty_scalar!(f64, f64, "events_scalar_f64");
impl_scaty_scalar!(bool, bool, "events_scalar_bool");
impl_scaty_scalar!(String, String, "events_scalar_string");
impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "events_array_u8");
impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "events_array_u16");
@@ -127,6 +128,7 @@ impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "events_array_i64");
impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "events_array_f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "events_array_f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "events_array_bool");
impl_scaty_array!(Vec<String>, String, Vec<String>, "events_array_string");
struct ReadNextValuesOpts {
series: u64,
@@ -350,6 +352,7 @@ impl ReadValues {
ScalarType::F32 => read_next_values::<f32>(opts).await,
ScalarType::F64 => read_next_values::<f64>(opts).await,
ScalarType::BOOL => read_next_values::<bool>(opts).await,
ScalarType::STRING => read_next_values::<String>(opts).await,
_ => {
error!("TODO ReadValues add more types");
err::todoval()

View File

@@ -3,11 +3,15 @@ use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use items_0::isodate::IsoDateTime;
use items_0::Empty;
use items_0::Extendable;
use items_0::WithLen;
use items_2::channelevents::ChannelStatus;
use items_2::channelevents::ChannelStatusEvent;
use items_2::channelevents::ChannelStatusEvents;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::MS;
use netpod::CONNECTION_STATUS_DIV;
use scylla::Session as ScySession;
use std::collections::VecDeque;
@@ -15,8 +19,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::SystemTime;
async fn read_next_status_events(
series: u64,
@@ -25,7 +27,7 @@ async fn read_next_status_events(
fwd: bool,
do_one_before: bool,
scy: Arc<ScySession>,
) -> Result<VecDeque<ChannelStatusEvent>, Error> {
) -> Result<ChannelStatusEvents, Error> {
if ts_msp >= range.end {
warn!(
"given ts_msp {} >= range.end {} not necessary to read this",
@@ -75,27 +77,27 @@ async fn read_next_status_events(
.err_conv()?
};
let mut last_before = None;
let mut ret = VecDeque::new();
let mut ret = ChannelStatusEvents::empty();
for row in res.rows_typed_or_empty::<(i64, i32)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let kind = row.1 as u32;
let ev = ChannelStatusEvent {
ts,
datetime: SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000),
status: ChannelStatus::from_ca_ingest_status_kind(kind),
};
let datetime = IsoDateTime::from_unix_millis(ts / MS);
let status = ChannelStatus::from_ca_ingest_status_kind(kind);
if ts >= range.end {
} else if ts >= range.beg {
ret.push_back(ev);
ret.tss.push_back(ts);
ret.datetimes.push_back(datetime);
ret.statuses.push_back(status);
} else {
last_before = Some(ev);
last_before = Some((ts, datetime, status));
}
}
if do_one_before {
if let Some(ev) = last_before {
debug!("PREPENDING THE LAST BEFORE {ev:?}");
ret.push_front(ev);
if let Some((ts, datetime, status)) = last_before {
ret.tss.push_front(ts);
ret.datetimes.push_front(datetime);
ret.statuses.push_front(status);
}
}
trace!("found in total {} events ts_msp {}", ret.len(), ts_msp);
@@ -108,7 +110,7 @@ struct ReadValues {
ts_msps: VecDeque<u64>,
fwd: bool,
do_one_before_range: bool,
fut: Pin<Box<dyn Future<Output = Result<VecDeque<ChannelStatusEvent>, Error>> + Send>>,
fut: Pin<Box<dyn Future<Output = Result<ChannelStatusEvents, Error>> + Send>>,
scy: Arc<ScySession>,
}
@@ -146,10 +148,7 @@ impl ReadValues {
}
}
fn make_fut(
&mut self,
ts_msp: u64,
) -> Pin<Box<dyn Future<Output = Result<VecDeque<ChannelStatusEvent>, Error>> + Send>> {
fn make_fut(&mut self, ts_msp: u64) -> Pin<Box<dyn Future<Output = Result<ChannelStatusEvents, Error>> + Send>> {
info!("make fut for {ts_msp}");
let fut = read_next_status_events(
self.series,
@@ -175,7 +174,7 @@ pub struct StatusStreamScylla {
range: NanoRange,
do_one_before_range: bool,
scy: Arc<ScySession>,
outbuf: VecDeque<ChannelStatusEvent>,
outbuf: ChannelStatusEvents,
}
impl StatusStreamScylla {
@@ -186,28 +185,29 @@ impl StatusStreamScylla {
range,
do_one_before_range,
scy,
outbuf: VecDeque::new(),
outbuf: ChannelStatusEvents::empty(),
}
}
}
impl Stream for StatusStreamScylla {
type Item = Result<ChannelStatusEvent, Error>;
type Item = Result<ChannelStatusEvents, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = tracing::span!(tracing::Level::TRACE, "poll_next");
let _spg = span.enter();
loop {
if let Some(x) = self.outbuf.pop_front() {
break Ready(Some(Ok(x)));
if self.outbuf.len() > 0 {
let item = std::mem::replace(&mut self.outbuf, ChannelStatusEvents::empty());
break Ready(Some(Ok(item)));
}
break match self.state {
FrState::New => {
let mut ts_msps = VecDeque::new();
let mut ts = self.range.beg / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
while ts < self.range.end {
info!("Use ts {ts}");
debug!("Use ts {ts}");
ts_msps.push_back(ts);
ts += CONNECTION_STATUS_DIV;
}
@@ -223,14 +223,12 @@ impl Stream for StatusStreamScylla {
continue;
}
FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(Ok(item)) => {
Ready(Ok(mut item)) => {
if !st.next() {
debug!("ReadValues exhausted");
self.state = FrState::Done;
}
for x in item {
self.outbuf.push_back(x);
}
self.outbuf.extend_from(&mut item);
continue;
}
Ready(Err(e)) => {

View File

@@ -23,7 +23,7 @@ pub fn get_runtime() -> Arc<Runtime> {
get_runtime_opts(24, 128)
}
#[allow(unused)]
// #[allow(unused)]
fn on_thread_start() {
let old = panic::take_hook();
panic::set_hook(Box::new(move |info| {
@@ -58,7 +58,7 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc<Runtime> {
.worker_threads(nworkers)
.max_blocking_threads(nblocking)
.enable_all()
.on_thread_start(on_thread_start)
// .on_thread_start(on_thread_start)
.build();
let res = match res {
Ok(x) => x,