This commit is contained in:
Dominik Werder
2024-08-27 16:26:12 +02:00
parent b6d8f8830a
commit 3ddcc90363
11 changed files with 400 additions and 33 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.3"
version = "0.2.4-aa.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -1,5 +1,6 @@
pub mod beacons;
pub mod conn;
pub mod conn2;
pub mod connset;
pub mod connset_input_merge;
pub mod finder;

View File

@@ -84,13 +84,14 @@ use std::time::SystemTime;
use taskrun::tokio;
use tokio::net::TcpStream;
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000);
const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 80);
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(1000 * 6);
const CHANNEL_STATUS_EMIT_IVL: Duration = Duration::from_millis(1000 * 8);
const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 120);
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(1000 * 6);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(1000 * 8);
const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(1000 * 10);
const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis(1000 * 120);
const DO_RATE_CHECK: bool = false;
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(6000);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(8000);
const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(10000);
const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis(120000);
#[allow(unused)]
macro_rules! trace2 {
@@ -1055,7 +1056,6 @@ impl CaConn {
stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
) -> Self {
let _ = channel_info_query_tx;
let tsnow = Instant::now();
let (cq_tx, cq_rx) = async_channel::bounded(32);
let mut rng = stats::xoshiro_from_time();
@@ -1063,7 +1063,7 @@ impl CaConn {
opts,
backend,
state: CaConnState::Unconnected(tsnow),
ticker: Self::new_self_ticker(),
ticker: Self::new_self_ticker(&mut rng),
proto: None,
cid_store: CidStore::new_from_time(),
subid_store: SubidStore::new_from_time(),
@@ -1105,19 +1105,28 @@ impl CaConn {
}
fn ioc_ping_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
IOC_PING_IVL * 100 / (70 + (rng.next_u32() % 60))
let b = IOC_PING_IVL;
b + b / 128 * (rng.next_u32() & 0x1f)
}
fn channel_status_emit_ivl(rng: &mut Xoshiro128PlusPlus) -> Duration {
Duration::from_millis(8000 + (rng.next_u32() & 0xfff) as u64)
let b = CHANNEL_STATUS_EMIT_IVL;
b + b / 128 * (rng.next_u32() & 0x1f)
}
fn silence_read_next_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
Duration::from_millis(1000 * 300 + (rng.next_u32() & 0x3fff) as u64)
}
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
Box::pin(tokio::time::sleep(Duration::from_millis(1500)))
fn recv_value_status_emit_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
let b = READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN;
b + b / 128 * (rng.next_u32() & 0x1f)
}
fn new_self_ticker(rng: &mut Xoshiro128PlusPlus) -> Pin<Box<tokio::time::Sleep>> {
let b = Duration::from_millis(1500);
let dur = b + b / 128 * (rng.next_u32() & 0x1f);
Box::pin(tokio::time::sleep(dur))
}
fn proto(&mut self) -> Option<&mut CaProto> {
@@ -1700,6 +1709,7 @@ impl CaConn {
tsnow,
stnow,
stats,
&mut self.rng,
)?;
}
ReadingState::Monitoring(st2) => {
@@ -1729,6 +1739,7 @@ impl CaConn {
tsnow,
stnow,
stats,
&mut self.rng,
)?;
}
ReadingState::StopMonitoringForPolling(st2) => {
@@ -1882,7 +1893,16 @@ impl CaConn {
st2.tick = PollTickState::Idle(tsnow);
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?;
Self::read_notify_res_for_write(
ev,
ch_wrst,
st,
iqdqs,
stnow,
tsnow,
stats,
&mut self.rng,
)?;
}
},
ReadingState::EnableMonitoring(_) => {
@@ -1926,7 +1946,16 @@ impl CaConn {
// More involved check would be to raise a flag, wait for the expected monitor for some
// timeout, and if we get nothing error out.
if false {
Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?;
Self::read_notify_res_for_write(
ev,
ch_wrst,
st,
iqdqs,
stnow,
tsnow,
stats,
&mut self.rng,
)?;
}
}
},
@@ -1956,6 +1985,7 @@ impl CaConn {
stnow: SystemTime,
tsnow: Instant,
stats: &CaConnStats,
rng: &mut Xoshiro128PlusPlus,
) -> Result<(), Error> {
let crst = &mut st.channel;
let writer = &mut st.writer;
@@ -1971,6 +2001,7 @@ impl CaConn {
tsnow,
stnow,
stats,
rng,
)?;
Ok(())
}
@@ -1986,6 +2017,7 @@ impl CaConn {
tsnow: Instant,
stnow: SystemTime,
stats: &CaConnStats,
rng: &mut Xoshiro128PlusPlus,
) -> Result<(), Error> {
{
use proto::CaMetaValue::*;
@@ -2009,7 +2041,7 @@ impl CaConn {
crst.acc_recv.push_written(payload_len);
// TODO should attach these counters already to Writable state.
if crst.ts_recv_value_status_emit_next <= tsnow {
crst.ts_recv_value_status_emit_next = tsnow + READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN;
crst.ts_recv_value_status_emit_next = tsnow + Self::recv_value_status_emit_ivl_rng(rng);
let item = ChannelStatusItem {
ts: stnow,
cssid: crst.cssid,
@@ -2495,16 +2527,6 @@ impl CaConn {
warn!("CaConn sees: {msg:?}");
}
}
#[cfg(DISABLED)]
CaMsgTy::IssueDataCount(hi, stat, sev, secs, nanos) => {
let cid = *self.cid_by_subid.get(&hi.param2()).unwrap();
let name = self.name_by_cid.get(&cid).unwrap();
debug!("ca large count for {name} {hi:?} {stat} {sev} {secs} {nanos}");
self.weird_count += 1;
if self.weird_count > 200 {
std::process::exit(13);
}
}
CaMsgTy::VersionRes(x) => {
debug!("VersionRes({x})");
self.weird_count += 1;
@@ -2825,7 +2847,7 @@ impl CaConn {
// debug!("tick CaConn {}", self.remote_addr_dbg);
let tsnow = Instant::now();
if !self.is_shutdown() {
self.ticker = Self::new_self_ticker();
self.ticker = Self::new_self_ticker(&mut self.rng);
let _ = self.ticker.poll_unpin(cx);
// cx.waker().wake_by_ref();
}

5
netfetch/src/ca/conn2.rs Normal file
View File

@@ -0,0 +1,5 @@
mod channel;
mod channelstateinfo;
mod conn;
mod conncmd;
mod connevent;

View File

@@ -0,0 +1,3 @@
trait Channel {}
struct ChannelAny {}

View File

@@ -0,0 +1,93 @@
use crate::conf::ChannelConfig;
use netpod::ScalarType;
use netpod::Shape;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::net::SocketAddrV4;
use std::time::Instant;
use std::time::SystemTime;
#[derive(Clone, Debug, Serialize)]
pub enum ChannelConnectedInfo {
Disconnected,
Connecting,
Connected,
Error,
}
#[derive(Clone, Debug, Serialize)]
pub struct ChannelStateInfo {
pub stnow: SystemTime,
pub cssid: ChannelStatusSeriesId,
pub addr: SocketAddrV4,
pub series: Option<SeriesId>,
pub channel_connected_info: ChannelConnectedInfo,
pub ping_last: Option<SystemTime>,
pub pong_last: Option<SystemTime>,
pub scalar_type: Option<ScalarType>,
pub shape: Option<Shape>,
// NOTE: this solution can yield to the same Instant serialize to different string representations.
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "ser_instant")]
pub ts_created: Option<Instant>,
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "ser_instant")]
pub ts_event_last: Option<Instant>,
pub recv_count: Option<u64>,
pub recv_bytes: Option<u64>,
// #[serde(skip_serializing_if = "Option::is_none")]
pub item_recv_ivl_ema: Option<f32>,
pub interest_score: f32,
pub conf: ChannelConfig,
pub recv_last: SystemTime,
pub write_st_last: SystemTime,
pub write_mt_last: SystemTime,
pub write_lt_last: SystemTime,
pub status_emit_count: u64,
}
mod ser_instant {
use super::*;
use netpod::DATETIME_FMT_3MS;
use serde::Deserializer;
use serde::Serializer;
pub fn serialize<S>(val: &Option<Instant>, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match val {
Some(val) => {
let now = chrono::Utc::now();
let tsnow = Instant::now();
let t1 = if tsnow >= *val {
let dur = tsnow.duration_since(*val);
let dur2 = chrono::Duration::try_seconds(dur.as_secs() as i64)
.unwrap()
.checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_sub_signed(dur2).unwrap()
} else {
let dur = (*val).duration_since(tsnow);
let dur2 = chrono::Duration::try_seconds(dur.as_secs() as i64)
.unwrap()
.checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_add_signed(dur2).unwrap()
};
let s = t1.format(DATETIME_FMT_3MS).to_string();
ser.serialize_str(&s)
}
None => ser.serialize_none(),
}
}
pub fn deserialize<'de, D>(_de: D) -> Result<Option<Instant>, D::Error>
where
D: Deserializer<'de>,
{
let e = serde::de::Error::custom("todo deserialize for ser_instant");
Err(e)
}
}

View File

@@ -0,0 +1,83 @@
use super::conncmd::ConnCommand;
use super::connevent::CaConnEvent;
use super::connevent::EndOfStreamReason;
use crate::ca::conn::CaConnOpts;
use crate::ca::proto::CaProto;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use hashbrown::HashMap;
use log::*;
use scywr::insertqueues::InsertQueuesTx;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnStats;
use stats::CaProtoStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
use taskrun::tokio;
use tokio::net::TcpStream;
#[derive(Debug)]
pub enum Error {}
type ConnectingFut =
Pin<Box<dyn Future<Output = Result<Result<TcpStream, std::io::Error>, tokio::time::error::Elapsed>> + Send>>;
enum ConnectedState {
Init(CaProto),
Handshake(CaProto),
PeerReady(CaProto),
}
enum CaConnState {
Connecting(Instant, SocketAddrV4, ConnectingFut),
Connected(CaProto),
Shutdown(EndOfStreamReason),
Done,
}
struct CaConn {
opts: CaConnOpts,
backend: String,
state: CaConnState,
rng: Xoshiro128PlusPlus,
}
impl CaConn {
fn new(
opts: CaConnOpts,
backend: String,
remote_addr: SocketAddrV4,
local_epics_hostname: String,
iqtx: InsertQueuesTx,
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
) -> Self {
let tsnow = Instant::now();
let (cq_tx, cq_rx) = async_channel::bounded::<ConnCommand>(32);
let mut rng = stats::xoshiro_from_time();
Self {
opts,
backend,
state: CaConnState::Connecting(tsnow, remote_addr, err::todoval()),
rng,
}
}
}
impl Stream for CaConn {
type Item = CaConnEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
todo!()
}
}

View File

@@ -0,0 +1,49 @@
use crate::conf::ChannelConfig;
use atomic::AtomicUsize;
use series::ChannelStatusSeriesId;
use std::sync::atomic;
#[derive(Debug)]
pub enum ConnCommandKind {
ChannelAdd(ChannelConfig, ChannelStatusSeriesId),
ChannelClose(String),
Shutdown,
}
#[derive(Debug)]
pub struct ConnCommand {
id: usize,
kind: ConnCommandKind,
}
impl ConnCommand {
pub fn channel_add(conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::ChannelAdd(conf, cssid),
}
}
pub fn channel_close(name: String) -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::ChannelClose(name),
}
}
pub fn shutdown() -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::Shutdown,
}
}
fn make_id() -> usize {
static ID: AtomicUsize = AtomicUsize::new(0);
ID.fetch_add(1, atomic::Ordering::AcqRel)
}
pub fn id(&self) -> usize {
self.id
}
}

View File

@@ -0,0 +1,87 @@
use super::channelstateinfo::ChannelStateInfo;
use core::fmt;
use series::ChannelStatusSeriesId;
use std::collections::BTreeMap;
use std::time::Instant;
#[derive(Debug)]
pub struct CaConnEvent {
pub ts: Instant,
pub value: CaConnEventValue,
}
impl CaConnEvent {
pub fn new(ts: Instant, value: CaConnEventValue) -> Self {
Self { ts, value }
}
pub fn err_now(err: super::conn::Error) -> Self {
Self::new_now(CaConnEventValue::EndOfStream(EndOfStreamReason::Error(err)))
}
pub fn new_now(value: CaConnEventValue) -> Self {
Self {
ts: Instant::now(),
value,
}
}
pub fn desc_short(&self) -> CaConnEventDescShort {
CaConnEventDescShort { inner: self }
}
}
pub struct CaConnEventDescShort<'a> {
inner: &'a CaConnEvent,
}
impl<'a> fmt::Display for CaConnEventDescShort<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"CaConnEventDescShort {{ ts: {:?}, value: {} }}",
self.inner.ts,
self.inner.value.desc_short()
)
}
}
#[derive(Debug)]
pub enum CaConnEventValue {
None,
EchoTimeout,
// ConnCommandResult(ConnCommandResult),
ChannelStatus(ChannelStatusPartial),
ChannelCreateFail(String),
EndOfStream(EndOfStreamReason),
}
impl CaConnEventValue {
pub fn desc_short(&self) -> &'static str {
match self {
CaConnEventValue::None => "None",
CaConnEventValue::EchoTimeout => "EchoTimeout",
// CaConnEventValue::ConnCommandResult(_) => "ConnCommandResult",
CaConnEventValue::ChannelStatus(_) => "ChannelStatus",
CaConnEventValue::ChannelCreateFail(_) => "ChannelCreateFail",
CaConnEventValue::EndOfStream(_) => "EndOfStream",
}
}
}
#[derive(Debug)]
pub enum EndOfStreamReason {
UnspecifiedReason,
Error(super::conn::Error),
ConnectRefused,
ConnectTimeout,
OnCommand,
RemoteClosed,
IocTimeout,
IoError,
}
#[derive(Debug)]
pub struct ChannelStatusPartial {
pub channel_statuses: BTreeMap<ChannelStatusSeriesId, ChannelStateInfo>,
}

View File

@@ -358,6 +358,7 @@ pub struct CaConnSet {
find_ioc_query_sender: Pin<Box<SenderPolling<IocAddrQuery>>>,
find_ioc_res_rx: Pin<Box<Receiver<VecDeque<FindIocRes>>>>,
iqtx: Pin<Box<InsertQueuesTx>>,
storage_insert_queue_l1: VecDeque<QueryItem>,
storage_insert_queue: VecDeque<VecDeque<QueryItem>>,
storage_insert_sender: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
ca_conn_res_tx: Pin<Box<Sender<(SocketAddr, CaConnEvent)>>>,
@@ -424,6 +425,7 @@ impl CaConnSet {
find_ioc_query_sender: Box::pin(SenderPolling::new(find_ioc_query_tx)),
find_ioc_res_rx: Box::pin(find_ioc_res_rx),
iqtx: Box::pin(iqtx.clone()),
storage_insert_queue_l1: VecDeque::new(),
storage_insert_queue: VecDeque::new(),
// TODO simplify for all combinations
@@ -639,11 +641,10 @@ impl CaConnSet {
let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64());
let state = &mut writer_status_state;
let ts_net = Instant::now();
let mut deque = VecDeque::new();
let deque = &mut self.storage_insert_queue_l1;
writer_status
.write(item, state, ts_net, ts, &mut deque)
.write(item, state, ts_net, ts, deque)
.map_err(Error::from_string)?;
self.storage_insert_queue.push_back(deque);
}
*chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState {
cssid: cmd.cssid,
@@ -706,11 +707,10 @@ impl CaConnSet {
let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64());
let state = &mut writer_status_state;
let ts_net = Instant::now();
let mut deque = VecDeque::new();
let deque = &mut self.storage_insert_queue_l1;
writer_status
.write(item, state, ts_net, ts, &mut deque)
.write(item, state, ts_net, ts, deque)
.map_err(Error::from_string)?;
self.storage_insert_queue.push_back(deque);
}
*st3 = WithStatusSeriesIdState {
cssid: cmd.cssid.clone(),
@@ -1594,6 +1594,12 @@ impl CaConnSet {
// cx.waker().wake_by_ref();
}
self.handle_check_health()?;
{
if self.storage_insert_queue_l1.len() != 0 {
let a = core::mem::replace(&mut self.storage_insert_queue_l1, VecDeque::new());
self.storage_insert_queue.push_back(a);
}
}
Ok(())
}
}

View File

@@ -1,6 +1,7 @@
use async_channel::Send;
use async_channel::SendError;
use async_channel::Sender;
use core::fmt;
use futures_util::Future;
use pin_project::pin_project;
use std::marker::PhantomPinned;
@@ -14,6 +15,23 @@ pub enum Error<T> {
Closed(T),
}
impl<T> fmt::Debug for Error<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::NoSendInProgress => fmt.debug_tuple("NoSendInProgress").finish(),
Error::Closed(_) => fmt.debug_tuple("Closed").finish(),
}
}
}
impl<T> fmt::Display for Error<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self, fmt)
}
}
impl<T> std::error::Error for Error<T> {}
#[pin_project]
pub struct SenderPolling<T>
where