Establish status and data series ids

This commit is contained in:
Dominik Werder
2024-01-05 22:53:03 +01:00
parent 4e758dc4b8
commit 04b4f4f454
14 changed files with 221 additions and 89 deletions

View File

@@ -13,6 +13,6 @@ batchtools = { path = "../batchtools" }
stats = { path = "../stats" }
series = { path = "../series" }
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] }
futures-util = "0.3"
async-channel = "2.0.0"
md-5 = "0.10"
futures-util = "0.3.29"
async-channel = "2.1.1"
md-5 = "0.10.6"

View File

@@ -5,6 +5,7 @@ use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
use log::*;
use md5::Digest;
use netpod::Database;
@@ -61,6 +62,14 @@ pub trait CanSendChannelInfoResult: Sync {
fn make_send(&self, item: Result<ChannelInfoResult, Error>) -> BoxedSend;
}
impl CanSendChannelInfoResult for async_channel::Sender<Result<ChannelInfoResult, Error>> {
fn make_send(&self, item: Result<ChannelInfoResult, Error>) -> BoxedSend {
let tx = self.clone();
let fut = async move { tx.send(item).map_err(|_| ()).await };
Box::pin(fut)
}
}
pub struct ChannelInfoQuery {
pub backend: String,
pub channel: String,
@@ -100,6 +109,7 @@ struct Worker {
qu_insert: PgStatement,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
pg_client_jh: JoinHandle<Result<(), crate::err::Error>>,
}
impl Worker {
@@ -108,7 +118,7 @@ impl Worker {
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
) -> Result<Self, Error> {
let (pg, jh) = crate::conn::make_pg_client(db).await?;
let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?;
let sql = concat!(
"with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])",
" as inp (backend, channel, scalar_type, shape_dims, rid))",
@@ -133,6 +143,7 @@ impl Worker {
qu_insert,
batch_rx,
stats,
pg_client_jh,
};
Ok(ret)
}

View File

@@ -18,13 +18,12 @@ pub enum Error {
CanNotInsertSeriesId,
}
// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration.
pub async fn get_series_id(
async fn _get_series_id(
backend: &str,
name: &str,
scalar_type: &ScalarType,
shape: &Shape,
pg_client: &PgClient,
backend: String,
) -> Result<Existence<SeriesId>, Error> {
let channel_name = name;
let scalar_type = scalar_type.to_scylla_i32();

View File

@@ -1,2 +1,6 @@
#[allow(unused_imports)]
pub use tracing::{debug, error, info, trace, warn};
#![allow(unused_imports)]
pub use tracing::debug;
pub use tracing::error;
pub use tracing::info;
pub use tracing::trace;
pub use tracing::warn;

View File

@@ -35,6 +35,7 @@ slidebuf = "0.0.1"
dashmap = "5.5.3"
log = { path = "../log" }
series = { path = "../series" }
serieswriter = { path = "../serieswriter" }
stats = { path = "../stats" }
scywr = { path = "../scywr" }
dbpg = { path = "../dbpg" }

View File

@@ -37,6 +37,7 @@ use scywriiq::InsertItem;
use scywriiq::IvlItem;
use scywriiq::MuteItem;
use scywriiq::QueryItem;
use serde::Deserialize;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
@@ -93,7 +94,7 @@ macro_rules! trace4 {
};
}
#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ChannelConnectedInfo {
Disconnected,
Connecting,
@@ -102,7 +103,7 @@ pub enum ChannelConnectedInfo {
Ended,
}
#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelStateInfo {
pub cssid: ChannelStatusSeriesId,
pub addr: SocketAddrV4,
@@ -112,10 +113,10 @@ pub struct ChannelStateInfo {
pub shape: Option<Shape>,
// NOTE: this solution can yield to the same Instant serialize to different string representations.
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "ser_instant")]
#[serde(with = "ser_instant")]
pub ts_created: Option<Instant>,
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "ser_instant")]
#[serde(with = "ser_instant")]
pub ts_event_last: Option<Instant>,
pub recv_count: Option<u64>,
// #[serde(skip_serializing_if = "Option::is_none")]
@@ -123,30 +124,47 @@ pub struct ChannelStateInfo {
pub interest_score: f32,
}
fn ser_instant<S: serde::Serializer>(val: &Option<Instant>, ser: S) -> Result<S::Ok, S::Error> {
match val {
Some(val) => {
let now = chrono::Utc::now();
let tsnow = Instant::now();
let t1 = if tsnow >= *val {
let dur = tsnow.duration_since(*val);
let dur2 = chrono::Duration::seconds(dur.as_secs() as i64)
.checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_sub_signed(dur2).unwrap()
} else {
let dur = (*val).duration_since(tsnow);
let dur2 = chrono::Duration::seconds(dur.as_secs() as i64)
.checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_add_signed(dur2).unwrap()
};
//info!("formatting {:?}", t1);
let s = t1.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
//info!("final string {:?}", s);
ser.serialize_str(&s)
mod ser_instant {
use super::*;
use serde::Deserializer;
use serde::Serializer;
pub fn serialize<S>(val: &Option<Instant>, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match val {
Some(val) => {
let now = chrono::Utc::now();
let tsnow = Instant::now();
let t1 = if tsnow >= *val {
let dur = tsnow.duration_since(*val);
let dur2 = chrono::Duration::seconds(dur.as_secs() as i64)
.checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_sub_signed(dur2).unwrap()
} else {
let dur = (*val).duration_since(tsnow);
let dur2 = chrono::Duration::seconds(dur.as_secs() as i64)
.checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_add_signed(dur2).unwrap()
};
//info!("formatting {:?}", t1);
let s = t1.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
//info!("final string {:?}", s);
ser.serialize_str(&s)
}
None => ser.serialize_none(),
}
None => ser.serialize_none(),
}
pub fn deserialize<'de, D>(de: D) -> Result<Option<Instant>, D::Error>
where
D: Deserializer<'de>,
{
let e = serde::de::Error::custom("todo deserialize for ser_instant");
Err(e)
}
}
@@ -552,7 +570,7 @@ pub struct CaConn {
cid_by_subid: HashMap<Subid, Cid>,
name_by_cid: HashMap<Cid, String>,
time_binners: HashMap<Cid, ConnTimeBin>,
channel_status_last_done: Option<Cid>,
channel_status_emit_last: Instant,
init_state_count: u64,
insert_item_queue: VecDeque<QueryItem>,
remote_addr_dbg: SocketAddrV4,
@@ -570,6 +588,7 @@ pub struct CaConn {
ioc_ping_start: Option<Instant>,
storage_insert_sender: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
ca_conn_event_out_queue_max: usize,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sending: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
thr_msg_poll: ThrottleTrace,
@@ -612,7 +631,7 @@ impl CaConn {
cid_by_subid: HashMap::new(),
name_by_cid: HashMap::new(),
time_binners: HashMap::new(),
channel_status_last_done: None,
channel_status_emit_last: Instant::now(),
insert_item_queue: VecDeque::new(),
remote_addr_dbg,
local_epics_hostname,
@@ -629,6 +648,7 @@ impl CaConn {
ioc_ping_start: None,
storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)),
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
channel_info_query_queue: VecDeque::new(),
channel_info_query_sending: Box::pin(SenderPolling::new(channel_info_query_tx)),
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
@@ -713,38 +733,38 @@ impl CaConn {
// TODO
// Time this, is it fast enough?
let mut kit = self.cid_by_name.values();
if let Some(mut kk) = kit.next().map(Clone::clone) {
let mut start = Some(kk.clone());
if let Some(last) = self.channel_status_last_done.take() {
while kk <= last {
kk = if let Some(x) = kit.next().map(Clone::clone) {
start = Some(x.clone());
x
} else {
start = None;
break;
};
}
}
if let Some(mut kk) = start {
loop {
kk = if let Some(x) = kit.next().map(Clone::clone) {
x
} else {
break;
};
}
} else {
// Nothing to do, will continue on next call from front.
}
}
while let Some(kk) = kit.next() {}
let mut channel_statuses = BTreeMap::new();
for (k, v) in self.channels.iter() {
let info = v.to_info(v.cssid(), self.remote_addr_dbg);
channel_statuses.insert(v.cssid(), info);
}
// let mut kit = self.cid_by_name.values();
// if let Some(mut kk) = kit.next().map(Clone::clone) {
// let mut start = Some(kk.clone());
// if let Some(last) = self.channel_status_last_done.take() {
// while kk <= last {
// kk = if let Some(x) = kit.next().map(Clone::clone) {
// start = Some(x.clone());
// x
// } else {
// start = None;
// break;
// };
// }
// }
// if let Some(mut kk) = start {
// loop {
// kk = if let Some(x) = kit.next().map(Clone::clone) {
// x
// } else {
// break;
// };
// }
// } else {
// // Nothing to do, will continue on next call from front.
// }
// }
// while let Some(kk) = kit.next() {}
// let mut channel_statuses = BTreeMap::new();
// for (k, v) in self.channels.iter() {
// let info = v.to_info(v.cssid(), self.remote_addr_dbg);
// channel_statuses.insert(v.cssid(), info);
// }
}
fn cmd_find_channel(&self, pattern: &str) {
@@ -1968,9 +1988,13 @@ impl CaConn {
}
}
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
fn handle_own_ticker_tick(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
// debug!("tick CaConn {}", self.remote_addr_dbg);
let tsnow = Instant::now();
// TODO add some random variation
if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow {
self.emit_channel_status()?;
}
// TODO use safe version
let this = unsafe { self.get_unchecked_mut() };
match &this.state {
@@ -1996,22 +2020,28 @@ impl CaConn {
Ok(())
}
fn emit_channel_status(&mut self) {
// TODO limit the queue length.
// Maybe factor the actual push item into new function.
// What to do if limit reached?
// Increase some error counter.
// if self.ca_conn_event_out_queue.len()>
let val = ChannelStatusPartial {
channel_statuses: Default::default(),
};
fn emit_channel_status(&mut self) -> Result<(), Error> {
let mut channel_statuses = BTreeMap::new();
for e in self.channels.iter() {
let ch = &e.1;
let chinfo = ch.to_info(ch.cssid(), self.remote_addr_dbg);
channel_statuses.insert(ch.cssid(), chinfo);
}
let val = ChannelStatusPartial { channel_statuses };
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ChannelStatus(val),
};
self.ca_conn_event_out_queue.push_back(item);
// TODO limit the queue length.
// Maybe factor the actual push item into new function.
// What to do if limit reached?
// Increase some error counter.
if self.ca_conn_event_out_queue.len() > self.ca_conn_event_out_queue_max {
self.stats.out_queue_full().inc();
} else {
self.ca_conn_event_out_queue.push_back(item);
}
Ok(())
}
fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> {

View File

@@ -36,6 +36,7 @@ use scywr::iteminsertqueue::ChannelStatus;
use scywr::iteminsertqueue::ChannelStatusItem;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use series::ChannelStatusSeriesId;
use statemap::ActiveChannelState;
use statemap::CaConnStateValue;
@@ -46,7 +47,6 @@ use statemap::ConnectionState;
use statemap::ConnectionStateValue;
use statemap::WithStatusSeriesIdState;
use statemap::WithStatusSeriesIdStateInner;
use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnSetStats;

View File

@@ -11,8 +11,6 @@ use std::ops::RangeBounds;
use std::time::Instant;
use std::time::SystemTime;
pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = 14;
#[derive(Debug)]
pub enum CaConnStateValue {
Fresh,

View File

@@ -1,9 +1,10 @@
[package]
name = "series"
version = "0.0.1"
version = "0.0.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
log = { path = "../log" }
serde = { version = "1.0", features = ["derive"] }
log = { path = "../log" }
err = { path = "../../daqbuffer/crates/err" }

View File

@@ -1,6 +1,8 @@
use serde::Deserialize;
use serde::Serialize;
pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = 14;
#[derive(Clone, Debug)]
pub enum Existence<T> {
Created(T),
@@ -30,7 +32,7 @@ impl SeriesId {
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize, Deserialize)]
pub struct ChannelStatusSeriesId(u64);
impl ChannelStatusSeriesId {

14
serieswriter/Cargo.toml Normal file
View File

@@ -0,0 +1,14 @@
[package]
name = "serieswriter"
version = "0.0.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
async-channel = "2.1.1"
log = { path = "../log" }
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }
dbpg = { path = "../dbpg" }
series = { path = "../series" }

1
serieswriter/src/lib.rs Normal file
View File

@@ -0,0 +1 @@
pub mod writer;

View File

@@ -0,0 +1,70 @@
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::ScalarType;
use netpod::Shape;
use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use series::ChannelStatusSeriesId;
use series::SeriesId;
#[derive(Debug, ThisError)]
pub enum Error {
DbPgSid(#[from] dbpg::seriesid::Error),
ChannelSendError,
ChannelRecvError,
SeriesLookupError,
}
impl<T> From<async_channel::SendError<T>> for Error {
fn from(value: async_channel::SendError<T>) -> Self {
Error::ChannelSendError
}
}
impl From<async_channel::RecvError> for Error {
fn from(value: async_channel::RecvError) -> Self {
Error::ChannelRecvError
}
}
pub struct SeriesWriter {
cssid: ChannelStatusSeriesId,
sid: SeriesId,
}
impl SeriesWriter {
// TODO this requires a database
pub async fn establish(
worker_tx: Sender<Vec<ChannelInfoQuery>>,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
backend: backend.clone(),
channel: channel.clone(),
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
};
worker_tx.send(vec![item]).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id());
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
backend,
channel,
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
};
worker_tx.send(vec![item]).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series.into_inner();
let res = Self { cssid, sid };
Ok(res)
}
}

View File

@@ -381,6 +381,7 @@ stats_proc::stats_struct!((
inserts_queue_drop,
insert_item_queue_pressure,
insert_item_queue_full,
out_queue_full,
channel_fast_item_drop,
logic_error,
// TODO maybe rename: this is now only the recv of the intermediate queue: