This commit is contained in:
Dominik Werder
2023-11-09 22:35:12 +01:00
parent 613a7e130b
commit 8351dabb72
5 changed files with 148 additions and 101 deletions

View File

@@ -1,4 +1,5 @@
use super::proto;
use super::proto::CreateChanRes;
use super::ExtraInsertsConf;
use crate::senderpolling::SenderPolling;
use crate::throttletrace::ThrottleTrace;
@@ -151,6 +152,9 @@ fn ser_instant<S: serde::Serializer>(val: &Option<Instant>, ser: S) -> Result<S:
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct Cid(pub u32);
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct Subid(pub u32);
#[derive(Clone, Debug)]
enum ChannelError {
CreateChanFail,
@@ -314,34 +318,52 @@ fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
}
struct CidStore {
next: u32,
rng: XorShift32,
}
impl CidStore {
fn new() -> Self {
Self { next: 0 }
fn new(seed: u32) -> Self {
Self {
rng: XorShift32::new(seed),
}
}
fn new_from_time() -> Self {
Self::new(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.subsec_nanos(),
)
}
fn next(&mut self) -> Cid {
self.next += 1;
let ret = self.next;
Cid(ret)
Cid(self.rng.next())
}
}
struct SubidStore {
next: u32,
rng: XorShift32,
}
impl SubidStore {
fn new() -> Self {
Self { next: 0 }
fn new(seed: u32) -> Self {
Self {
rng: XorShift32::new(seed),
}
}
fn next(&mut self) -> u32 {
self.next += 1;
let ret = self.next;
ret
fn new_from_time() -> Self {
Self::new(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.subsec_nanos(),
)
}
fn next(&mut self) -> Subid {
Subid(self.rng.next())
}
}
@@ -500,10 +522,11 @@ pub struct CaConn {
cid_store: CidStore,
subid_store: SubidStore,
channels: BTreeMap<Cid, ChannelState>,
init_state_count: u64,
cid_by_name: BTreeMap<String, Cid>,
cid_by_subid: BTreeMap<u32, Cid>,
cid_by_subid: BTreeMap<Subid, Cid>,
name_by_cid: BTreeMap<Cid, String>,
time_binners: BTreeMap<Cid, ConnTimeBin>,
init_state_count: u64,
insert_item_queue: VecDeque<QueryItem>,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
@@ -522,7 +545,6 @@ pub struct CaConn {
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sending: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
time_binners: BTreeMap<Cid, ConnTimeBin>,
thr_msg_poll: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
weird_count: usize,
@@ -555,13 +577,14 @@ impl CaConn {
state: CaConnState::Unconnected(Instant::now()),
ticker: Self::new_self_ticker(),
proto: None,
cid_store: CidStore::new(),
subid_store: SubidStore::new(),
channels: BTreeMap::new(),
cid_store: CidStore::new_from_time(),
subid_store: SubidStore::new_from_time(),
init_state_count: 0,
channels: BTreeMap::new(),
cid_by_name: BTreeMap::new(),
cid_by_subid: BTreeMap::new(),
name_by_cid: BTreeMap::new(),
time_binners: BTreeMap::new(),
insert_item_queue: VecDeque::new(),
remote_addr_dbg,
local_epics_hostname,
@@ -580,7 +603,6 @@ impl CaConn {
ca_conn_event_out_queue: VecDeque::new(),
channel_info_query_queue: VecDeque::new(),
channel_info_query_sending: Box::pin(SenderPolling::new(channel_info_query_tx)),
time_binners: BTreeMap::new(),
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
ca_proto_stats,
weird_count: 0,
@@ -978,10 +1000,10 @@ impl CaConn {
let tsnow = Instant::now();
trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg);
if let Some(started) = self.ioc_ping_start {
if started.elapsed() >= Duration::from_millis(4000) {
if started + Duration::from_millis(4000) < tsnow {
self.stats.pong_timeout().inc();
self.ioc_ping_start = None;
warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg);
self.ioc_ping_start = None;
let item = CaConnEvent {
ts: tsnow,
value: CaConnEventValue::EchoTimeout,
@@ -993,6 +1015,7 @@ impl CaConn {
if self.ioc_ping_next < tsnow {
if let Some(proto) = &mut self.proto {
self.stats.ping_start().inc();
info!("start ping");
self.ioc_ping_start = Some(Instant::now());
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
proto.push_out(msg);
@@ -1097,7 +1120,7 @@ impl CaConn {
sid,
data_type: data_type_asked,
data_count: data_count as _,
subid,
subid: subid.0,
});
let msg = CaMsg::from_ty_ts(ty, tsnow);
let proto = self.proto.as_mut().unwrap();
@@ -1252,15 +1275,36 @@ impl CaConn {
}
fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
let subid = Subid(ev.subid);
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&ev.subid).unwrap();
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
} else {
warn!("can not find cid for subid {subid:?}");
// return Err(Error::with_msg_no_trace());
return Ok(());
};
if false {
let name = self.name_by_cid(cid);
info!("event {name:?} {ev:?}");
}
// TODO handle not-found error:
let mut series_2 = None;
let ch_s = self.channels.get_mut(&cid).unwrap();
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
x
} else {
// TODO return better as error and let caller decide (with more structured errors)
warn!("can not find channel for {cid:?} {subid:?}");
// TODO
// When removing a channel, keep it in "closed" btree for some time because messages can
// still arrive from all buffers.
// If we don't have it in the "closed" btree, then close connection to the IOC and count
// as logic error.
// Close connection to the IOC. Cout as logic error.
// return Err(Error::with_msg_no_trace());
std::process::exit(1);
return Ok(());
};
match ch_s {
ChannelState::Created(_series, st) => {
st.ts_alive_last = tsnow;
@@ -1448,7 +1492,7 @@ impl CaConn {
}
}
fn check_channels_state_init(&mut self, msgs_tmp: &mut Vec<CaMsg>) -> Result<(), Error> {
fn check_channels_state_init(&mut self, do_wake_again: &mut bool) -> Result<(), Error> {
// TODO profile, efficient enough?
if self.init_state_count == 0 {
return Ok(());
@@ -1472,7 +1516,8 @@ impl CaConn {
}),
Instant::now(),
);
msgs_tmp.push(msg);
*do_wake_again = true;
self.proto.as_mut().unwrap().push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Creating {
@@ -1494,24 +1539,13 @@ impl CaConn {
use Poll::*;
let mut ts1 = Instant::now();
// TODO unify with Listen state where protocol gets polled as well.
let mut msgs_tmp = Vec::new();
self.check_channels_state_init(&mut msgs_tmp)?;
let mut do_wake_again = false;
self.check_channels_state_init(&mut do_wake_again)?;
let ts2 = Instant::now();
self.stats
.time_check_channels_state_init
.add((ts2.duration_since(ts1) * MS as u32).as_secs());
ts1 = ts2;
let mut do_wake_again = false;
if msgs_tmp.len() > 0 {
do_wake_again = true;
}
{
let proto = self.proto.as_mut().unwrap();
// TODO be careful to not overload outgoing message queue.
for msg in msgs_tmp {
proto.push_out(msg);
}
}
let tsnow = Instant::now();
let res = match self.proto.as_mut().unwrap().poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
@@ -1525,62 +1559,7 @@ impl CaConn {
// TODO count this unexpected case.
}
CaMsgTy::CreateChanRes(k) => {
// TODO handle cid-not-found which can also indicate peer error.
let cid = Cid(k.cid);
let sid = k.sid;
// TODO handle error:
let name = self.name_by_cid(cid).unwrap().to_string();
trace3!("CreateChanRes {name:?}");
if k.data_type > 6 {
error!("CreateChanRes with unexpected data_type {}", k.data_type);
}
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating { cssid, .. } => cssid.clone(),
_ => {
// TODO handle in better way:
// Remove channel and emit notice that channel is removed with reason.
let e = Error::with_msg_no_trace("handle_peer_ready bad state");
return Ready(Some(Err(e)));
}
};
let created_state = CreatedState {
cssid,
cid,
sid,
data_type: k.data_type,
data_count: k.data_count,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
ts_created: tsnow,
ts_alive_last: tsnow,
state: MonitoringState::FetchSeriesId,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
};
*ch_s = ChannelState::FetchingSeriesId(created_state);
// TODO handle error in different way. Should most likely not abort.
let tx = SendSeriesLookup {
tx: self.conn_command_tx(),
};
let query = ChannelInfoQuery {
backend: self.backend.clone(),
channel: name.clone(),
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
};
self.channel_info_query_queue.push_back(query);
self.handle_create_chan_res(k, tsnow)?;
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => {
@@ -1679,6 +1658,69 @@ impl CaConn {
res.map_err(|e| Error::from(e.to_string()))
}
fn handle_create_chan_res(&mut self, k: CreateChanRes, tsnow: Instant) -> Result<(), Error> {
// TODO handle cid-not-found which can also indicate peer error.
let cid = Cid(k.cid);
let sid = k.sid;
let name = if let Some(x) = self.name_by_cid(cid) {
x.to_string()
} else {
return Err(Error::with_msg_no_trace(format!("no name for {cid:?}")));
};
trace3!("CreateChanRes {name:?}");
if k.data_type > 6 {
error!("CreateChanRes with unexpected data_type {}", k.data_type);
}
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating { cssid, .. } => cssid.clone(),
_ => {
// TODO handle in better way:
// Remove channel and emit notice that channel is removed with reason.
let e = Error::with_msg_no_trace("handle_peer_ready bad state");
return Err(e);
}
};
let created_state = CreatedState {
cssid,
cid,
sid,
data_type: k.data_type,
data_count: k.data_count,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
ts_created: tsnow,
ts_alive_last: tsnow,
state: MonitoringState::FetchSeriesId,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
};
*ch_s = ChannelState::FetchingSeriesId(created_state);
// TODO handle error in different way. Should most likely not abort.
let tx = SendSeriesLookup {
tx: self.conn_command_tx(),
};
let query = ChannelInfoQuery {
backend: self.backend.clone(),
channel: name.clone(),
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
};
self.channel_info_query_queue.push_back(query);
Ok(())
}
// `?` works not in here.
fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow<Poll<Result<(), Error>>> {
use ControlFlow::*;

View File

@@ -1284,6 +1284,9 @@ impl CaConnSet {
if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow {
self.stats.channel_health_timeout().inc();
trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
// TODO
error!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
std::process::exit(1);
let addr = SocketAddr::V4(*addr_v4);
cmd_remove_channel.push((addr, ch.clone()));
if st.health_timeout_count < 3 {

View File

@@ -1026,7 +1026,7 @@ impl CaProto {
tcp,
remote_addr_dbg,
state: CaState::StdHead,
buf: SlideBuf::new(1024 * 512),
buf: SlideBuf::new(1024 * 1024 * 4),
outbuf: SlideBuf::new(1024 * 128),
out: VecDeque::new(),
array_truncate,

View File

@@ -15,7 +15,7 @@ use tokio::io::AsyncReadExt;
pub struct CaIngestOpts {
backend: String,
channels: PathBuf,
api_bind: Option<String>,
api_bind: String,
search: Vec<String>,
#[serde(default)]
search_blacklist: Vec<String>,
@@ -52,7 +52,7 @@ impl CaIngestOpts {
}
pub fn api_bind(&self) -> String {
self.api_bind.clone().unwrap_or_else(|| "0.0.0.0:3011".into())
self.api_bind.clone()
}
pub fn postgresql_config(&self) -> &Database {
@@ -159,7 +159,7 @@ scylla:
let res: Result<CaIngestOpts, _> = serde_yaml::from_slice(conf.as_bytes());
let conf = res.unwrap();
assert_eq!(conf.channels, PathBuf::from("/some/path/file.txt"));
assert_eq!(conf.api_bind, Some("0.0.0.0:3011".to_string()));
assert_eq!(&conf.api_bind, "0.0.0.0:3011");
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string()));
assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));

View File

@@ -421,7 +421,9 @@ fn prepare_query_insert_futs(
data_store.qu_insert_ts_msp.clone(),
stats.clone(),
);
futs.push(fut);
if item_ts_local % 100000 == 7461 {
futs.push(fut);
}
}
#[cfg(DISABLED)]
if let Some(ts_msp_grid) = item.ts_msp_grid {