diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index a788b1a..c8f53a9 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.3" +version = "0.2.4-aa.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 93cc05f..0dfed89 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,5 +1,6 @@ pub mod beacons; pub mod conn; +pub mod conn2; pub mod connset; pub mod connset_input_merge; pub mod finder; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index bb2e344..7983c80 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -84,13 +84,14 @@ use std::time::SystemTime; use taskrun::tokio; use tokio::net::TcpStream; -const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000); -const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 80); +const CONNECTING_TIMEOUT: Duration = Duration::from_millis(1000 * 6); +const CHANNEL_STATUS_EMIT_IVL: Duration = Duration::from_millis(1000 * 8); +const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 120); +const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(1000 * 6); +const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(1000 * 8); +const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(1000 * 10); +const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis(1000 * 120); const DO_RATE_CHECK: bool = false; -const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(6000); -const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(8000); -const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(10000); -const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis(120000); #[allow(unused)] macro_rules! trace2 { @@ -1055,7 +1056,6 @@ impl CaConn { stats: Arc, ca_proto_stats: Arc, ) -> Self { - let _ = channel_info_query_tx; let tsnow = Instant::now(); let (cq_tx, cq_rx) = async_channel::bounded(32); let mut rng = stats::xoshiro_from_time(); @@ -1063,7 +1063,7 @@ impl CaConn { opts, backend, state: CaConnState::Unconnected(tsnow), - ticker: Self::new_self_ticker(), + ticker: Self::new_self_ticker(&mut rng), proto: None, cid_store: CidStore::new_from_time(), subid_store: SubidStore::new_from_time(), @@ -1105,19 +1105,28 @@ impl CaConn { } fn ioc_ping_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration { - IOC_PING_IVL * 100 / (70 + (rng.next_u32() % 60)) + let b = IOC_PING_IVL; + b + b / 128 * (rng.next_u32() & 0x1f) } fn channel_status_emit_ivl(rng: &mut Xoshiro128PlusPlus) -> Duration { - Duration::from_millis(8000 + (rng.next_u32() & 0xfff) as u64) + let b = CHANNEL_STATUS_EMIT_IVL; + b + b / 128 * (rng.next_u32() & 0x1f) } fn silence_read_next_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration { Duration::from_millis(1000 * 300 + (rng.next_u32() & 0x3fff) as u64) } - fn new_self_ticker() -> Pin> { - Box::pin(tokio::time::sleep(Duration::from_millis(1500))) + fn recv_value_status_emit_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration { + let b = READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN; + b + b / 128 * (rng.next_u32() & 0x1f) + } + + fn new_self_ticker(rng: &mut Xoshiro128PlusPlus) -> Pin> { + let b = Duration::from_millis(1500); + let dur = b + b / 128 * (rng.next_u32() & 0x1f); + Box::pin(tokio::time::sleep(dur)) } fn proto(&mut self) -> Option<&mut CaProto> { @@ -1700,6 +1709,7 @@ impl CaConn { tsnow, stnow, stats, + &mut self.rng, )?; } ReadingState::Monitoring(st2) => { @@ -1729,6 +1739,7 @@ impl CaConn { tsnow, stnow, stats, + &mut self.rng, )?; } ReadingState::StopMonitoringForPolling(st2) => { @@ -1882,7 +1893,16 @@ impl CaConn { st2.tick = PollTickState::Idle(tsnow); let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?; + Self::read_notify_res_for_write( + ev, + ch_wrst, + st, + iqdqs, + stnow, + tsnow, + stats, + &mut self.rng, + )?; } }, ReadingState::EnableMonitoring(_) => { @@ -1926,7 +1946,16 @@ impl CaConn { // More involved check would be to raise a flag, wait for the expected monitor for some // timeout, and if we get nothing error out. if false { - Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?; + Self::read_notify_res_for_write( + ev, + ch_wrst, + st, + iqdqs, + stnow, + tsnow, + stats, + &mut self.rng, + )?; } } }, @@ -1956,6 +1985,7 @@ impl CaConn { stnow: SystemTime, tsnow: Instant, stats: &CaConnStats, + rng: &mut Xoshiro128PlusPlus, ) -> Result<(), Error> { let crst = &mut st.channel; let writer = &mut st.writer; @@ -1971,6 +2001,7 @@ impl CaConn { tsnow, stnow, stats, + rng, )?; Ok(()) } @@ -1986,6 +2017,7 @@ impl CaConn { tsnow: Instant, stnow: SystemTime, stats: &CaConnStats, + rng: &mut Xoshiro128PlusPlus, ) -> Result<(), Error> { { use proto::CaMetaValue::*; @@ -2009,7 +2041,7 @@ impl CaConn { crst.acc_recv.push_written(payload_len); // TODO should attach these counters already to Writable state. if crst.ts_recv_value_status_emit_next <= tsnow { - crst.ts_recv_value_status_emit_next = tsnow + READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN; + crst.ts_recv_value_status_emit_next = tsnow + Self::recv_value_status_emit_ivl_rng(rng); let item = ChannelStatusItem { ts: stnow, cssid: crst.cssid, @@ -2495,16 +2527,6 @@ impl CaConn { warn!("CaConn sees: {msg:?}"); } } - #[cfg(DISABLED)] - CaMsgTy::IssueDataCount(hi, stat, sev, secs, nanos) => { - let cid = *self.cid_by_subid.get(&hi.param2()).unwrap(); - let name = self.name_by_cid.get(&cid).unwrap(); - debug!("ca large count for {name} {hi:?} {stat} {sev} {secs} {nanos}"); - self.weird_count += 1; - if self.weird_count > 200 { - std::process::exit(13); - } - } CaMsgTy::VersionRes(x) => { debug!("VersionRes({x})"); self.weird_count += 1; @@ -2825,7 +2847,7 @@ impl CaConn { // debug!("tick CaConn {}", self.remote_addr_dbg); let tsnow = Instant::now(); if !self.is_shutdown() { - self.ticker = Self::new_self_ticker(); + self.ticker = Self::new_self_ticker(&mut self.rng); let _ = self.ticker.poll_unpin(cx); // cx.waker().wake_by_ref(); } diff --git a/netfetch/src/ca/conn2.rs b/netfetch/src/ca/conn2.rs new file mode 100644 index 0000000..5a9bd9b --- /dev/null +++ b/netfetch/src/ca/conn2.rs @@ -0,0 +1,5 @@ +mod channel; +mod channelstateinfo; +mod conn; +mod conncmd; +mod connevent; diff --git a/netfetch/src/ca/conn2/channel.rs b/netfetch/src/ca/conn2/channel.rs new file mode 100644 index 0000000..c9c73c2 --- /dev/null +++ b/netfetch/src/ca/conn2/channel.rs @@ -0,0 +1,3 @@ +trait Channel {} + +struct ChannelAny {} diff --git a/netfetch/src/ca/conn2/channelstateinfo.rs b/netfetch/src/ca/conn2/channelstateinfo.rs new file mode 100644 index 0000000..8650593 --- /dev/null +++ b/netfetch/src/ca/conn2/channelstateinfo.rs @@ -0,0 +1,93 @@ +use crate::conf::ChannelConfig; +use netpod::ScalarType; +use netpod::Shape; +use serde::Serialize; +use series::ChannelStatusSeriesId; +use series::SeriesId; +use std::net::SocketAddrV4; +use std::time::Instant; +use std::time::SystemTime; + +#[derive(Clone, Debug, Serialize)] +pub enum ChannelConnectedInfo { + Disconnected, + Connecting, + Connected, + Error, +} + +#[derive(Clone, Debug, Serialize)] +pub struct ChannelStateInfo { + pub stnow: SystemTime, + pub cssid: ChannelStatusSeriesId, + pub addr: SocketAddrV4, + pub series: Option, + pub channel_connected_info: ChannelConnectedInfo, + pub ping_last: Option, + pub pong_last: Option, + pub scalar_type: Option, + pub shape: Option, + // NOTE: this solution can yield to the same Instant serialize to different string representations. + // #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "ser_instant")] + pub ts_created: Option, + // #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "ser_instant")] + pub ts_event_last: Option, + pub recv_count: Option, + pub recv_bytes: Option, + // #[serde(skip_serializing_if = "Option::is_none")] + pub item_recv_ivl_ema: Option, + pub interest_score: f32, + pub conf: ChannelConfig, + pub recv_last: SystemTime, + pub write_st_last: SystemTime, + pub write_mt_last: SystemTime, + pub write_lt_last: SystemTime, + pub status_emit_count: u64, +} + +mod ser_instant { + use super::*; + use netpod::DATETIME_FMT_3MS; + use serde::Deserializer; + use serde::Serializer; + + pub fn serialize(val: &Option, ser: S) -> Result + where + S: Serializer, + { + match val { + Some(val) => { + let now = chrono::Utc::now(); + let tsnow = Instant::now(); + let t1 = if tsnow >= *val { + let dur = tsnow.duration_since(*val); + let dur2 = chrono::Duration::try_seconds(dur.as_secs() as i64) + .unwrap() + .checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64)) + .unwrap(); + now.checked_sub_signed(dur2).unwrap() + } else { + let dur = (*val).duration_since(tsnow); + let dur2 = chrono::Duration::try_seconds(dur.as_secs() as i64) + .unwrap() + .checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64)) + .unwrap(); + now.checked_add_signed(dur2).unwrap() + }; + let s = t1.format(DATETIME_FMT_3MS).to_string(); + ser.serialize_str(&s) + } + None => ser.serialize_none(), + } + } + + pub fn deserialize<'de, D>(_de: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let e = serde::de::Error::custom("todo deserialize for ser_instant"); + Err(e) + } +} diff --git a/netfetch/src/ca/conn2/conn.rs b/netfetch/src/ca/conn2/conn.rs new file mode 100644 index 0000000..378da86 --- /dev/null +++ b/netfetch/src/ca/conn2/conn.rs @@ -0,0 +1,83 @@ +use super::conncmd::ConnCommand; +use super::connevent::CaConnEvent; +use super::connevent::EndOfStreamReason; +use crate::ca::conn::CaConnOpts; +use crate::ca::proto::CaProto; +use async_channel::Sender; +use dbpg::seriesbychannel::ChannelInfoQuery; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use hashbrown::HashMap; +use log::*; +use scywr::insertqueues::InsertQueuesTx; +use stats::rand_xoshiro::Xoshiro128PlusPlus; +use stats::CaConnStats; +use stats::CaProtoStats; +use std::net::SocketAddrV4; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; +use taskrun::tokio; +use tokio::net::TcpStream; + +#[derive(Debug)] +pub enum Error {} + +type ConnectingFut = + Pin, tokio::time::error::Elapsed>> + Send>>; + +enum ConnectedState { + Init(CaProto), + Handshake(CaProto), + PeerReady(CaProto), +} + +enum CaConnState { + Connecting(Instant, SocketAddrV4, ConnectingFut), + Connected(CaProto), + Shutdown(EndOfStreamReason), + Done, +} + +struct CaConn { + opts: CaConnOpts, + backend: String, + state: CaConnState, + rng: Xoshiro128PlusPlus, +} + +impl CaConn { + fn new( + opts: CaConnOpts, + backend: String, + remote_addr: SocketAddrV4, + local_epics_hostname: String, + iqtx: InsertQueuesTx, + channel_info_query_tx: Sender, + stats: Arc, + ca_proto_stats: Arc, + ) -> Self { + let tsnow = Instant::now(); + let (cq_tx, cq_rx) = async_channel::bounded::(32); + let mut rng = stats::xoshiro_from_time(); + Self { + opts, + backend, + state: CaConnState::Connecting(tsnow, remote_addr, err::todoval()), + rng, + } + } +} + +impl Stream for CaConn { + type Item = CaConnEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + todo!() + } +} diff --git a/netfetch/src/ca/conn2/conncmd.rs b/netfetch/src/ca/conn2/conncmd.rs new file mode 100644 index 0000000..576f02c --- /dev/null +++ b/netfetch/src/ca/conn2/conncmd.rs @@ -0,0 +1,49 @@ +use crate::conf::ChannelConfig; +use atomic::AtomicUsize; +use series::ChannelStatusSeriesId; +use std::sync::atomic; + +#[derive(Debug)] +pub enum ConnCommandKind { + ChannelAdd(ChannelConfig, ChannelStatusSeriesId), + ChannelClose(String), + Shutdown, +} + +#[derive(Debug)] +pub struct ConnCommand { + id: usize, + kind: ConnCommandKind, +} + +impl ConnCommand { + pub fn channel_add(conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::ChannelAdd(conf, cssid), + } + } + + pub fn channel_close(name: String) -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::ChannelClose(name), + } + } + + pub fn shutdown() -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::Shutdown, + } + } + + fn make_id() -> usize { + static ID: AtomicUsize = AtomicUsize::new(0); + ID.fetch_add(1, atomic::Ordering::AcqRel) + } + + pub fn id(&self) -> usize { + self.id + } +} diff --git a/netfetch/src/ca/conn2/connevent.rs b/netfetch/src/ca/conn2/connevent.rs new file mode 100644 index 0000000..eddbbfc --- /dev/null +++ b/netfetch/src/ca/conn2/connevent.rs @@ -0,0 +1,87 @@ +use super::channelstateinfo::ChannelStateInfo; +use core::fmt; +use series::ChannelStatusSeriesId; +use std::collections::BTreeMap; +use std::time::Instant; + +#[derive(Debug)] +pub struct CaConnEvent { + pub ts: Instant, + pub value: CaConnEventValue, +} + +impl CaConnEvent { + pub fn new(ts: Instant, value: CaConnEventValue) -> Self { + Self { ts, value } + } + + pub fn err_now(err: super::conn::Error) -> Self { + Self::new_now(CaConnEventValue::EndOfStream(EndOfStreamReason::Error(err))) + } + + pub fn new_now(value: CaConnEventValue) -> Self { + Self { + ts: Instant::now(), + value, + } + } + + pub fn desc_short(&self) -> CaConnEventDescShort { + CaConnEventDescShort { inner: self } + } +} + +pub struct CaConnEventDescShort<'a> { + inner: &'a CaConnEvent, +} + +impl<'a> fmt::Display for CaConnEventDescShort<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "CaConnEventDescShort {{ ts: {:?}, value: {} }}", + self.inner.ts, + self.inner.value.desc_short() + ) + } +} + +#[derive(Debug)] +pub enum CaConnEventValue { + None, + EchoTimeout, + // ConnCommandResult(ConnCommandResult), + ChannelStatus(ChannelStatusPartial), + ChannelCreateFail(String), + EndOfStream(EndOfStreamReason), +} + +impl CaConnEventValue { + pub fn desc_short(&self) -> &'static str { + match self { + CaConnEventValue::None => "None", + CaConnEventValue::EchoTimeout => "EchoTimeout", + // CaConnEventValue::ConnCommandResult(_) => "ConnCommandResult", + CaConnEventValue::ChannelStatus(_) => "ChannelStatus", + CaConnEventValue::ChannelCreateFail(_) => "ChannelCreateFail", + CaConnEventValue::EndOfStream(_) => "EndOfStream", + } + } +} + +#[derive(Debug)] +pub enum EndOfStreamReason { + UnspecifiedReason, + Error(super::conn::Error), + ConnectRefused, + ConnectTimeout, + OnCommand, + RemoteClosed, + IocTimeout, + IoError, +} + +#[derive(Debug)] +pub struct ChannelStatusPartial { + pub channel_statuses: BTreeMap, +} diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 4a5ae7c..3e53301 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -358,6 +358,7 @@ pub struct CaConnSet { find_ioc_query_sender: Pin>>, find_ioc_res_rx: Pin>>>, iqtx: Pin>, + storage_insert_queue_l1: VecDeque, storage_insert_queue: VecDeque>, storage_insert_sender: Pin>>>, ca_conn_res_tx: Pin>>, @@ -424,6 +425,7 @@ impl CaConnSet { find_ioc_query_sender: Box::pin(SenderPolling::new(find_ioc_query_tx)), find_ioc_res_rx: Box::pin(find_ioc_res_rx), iqtx: Box::pin(iqtx.clone()), + storage_insert_queue_l1: VecDeque::new(), storage_insert_queue: VecDeque::new(), // TODO simplify for all combinations @@ -639,11 +641,10 @@ impl CaConnSet { let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64()); let state = &mut writer_status_state; let ts_net = Instant::now(); - let mut deque = VecDeque::new(); + let deque = &mut self.storage_insert_queue_l1; writer_status - .write(item, state, ts_net, ts, &mut deque) + .write(item, state, ts_net, ts, deque) .map_err(Error::from_string)?; - self.storage_insert_queue.push_back(deque); } *chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState { cssid: cmd.cssid, @@ -706,11 +707,10 @@ impl CaConnSet { let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64()); let state = &mut writer_status_state; let ts_net = Instant::now(); - let mut deque = VecDeque::new(); + let deque = &mut self.storage_insert_queue_l1; writer_status - .write(item, state, ts_net, ts, &mut deque) + .write(item, state, ts_net, ts, deque) .map_err(Error::from_string)?; - self.storage_insert_queue.push_back(deque); } *st3 = WithStatusSeriesIdState { cssid: cmd.cssid.clone(), @@ -1594,6 +1594,12 @@ impl CaConnSet { // cx.waker().wake_by_ref(); } self.handle_check_health()?; + { + if self.storage_insert_queue_l1.len() != 0 { + let a = core::mem::replace(&mut self.storage_insert_queue_l1, VecDeque::new()); + self.storage_insert_queue.push_back(a); + } + } Ok(()) } } diff --git a/scywr/src/senderpolling.rs b/scywr/src/senderpolling.rs index 920eece..9c3ab23 100644 --- a/scywr/src/senderpolling.rs +++ b/scywr/src/senderpolling.rs @@ -1,6 +1,7 @@ use async_channel::Send; use async_channel::SendError; use async_channel::Sender; +use core::fmt; use futures_util::Future; use pin_project::pin_project; use std::marker::PhantomPinned; @@ -14,6 +15,23 @@ pub enum Error { Closed(T), } +impl fmt::Debug for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::NoSendInProgress => fmt.debug_tuple("NoSendInProgress").finish(), + Error::Closed(_) => fmt.debug_tuple("Closed").finish(), + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) + } +} + +impl std::error::Error for Error {} + #[pin_project] pub struct SenderPolling where