Intentional protocol error for testing, refactor

This commit is contained in:
Dominik Werder
2024-04-21 12:54:51 +01:00
parent 9f04c7616c
commit 6660531e5b
7 changed files with 269 additions and 197 deletions
+1 -1
View File
@@ -100,7 +100,7 @@ async fn remove_older_series(
}
pub async fn find_older_msp(
backend: String,
_backend: String,
params: FindOlder,
pgconf: &Database,
scyconf: &ScyllaIngestConfig,
+165 -57
View File
@@ -1,6 +1,7 @@
use super::proto;
use super::proto::CaEventValue;
use super::proto::ReadNotify;
use crate::ca::proto::ChannelClose;
use crate::ca::proto::EventCancel;
use crate::conf::ChannelConfig;
use crate::senderpolling::SenderPolling;
@@ -34,6 +35,7 @@ use scywr::iteminsertqueue as scywriiq;
use scywr::iteminsertqueue::Accounting;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ShutdownReason;
use scywriiq::ChannelStatus;
use scywriiq::ChannelStatusClosedReason;
use scywriiq::ChannelStatusItem;
@@ -70,6 +72,8 @@ use tokio::net::TcpStream;
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000);
const IOC_PING_IVL: Duration = Duration::from_millis(80000);
const DO_RATE_CHECK: bool = false;
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(3000);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(3000);
#[allow(unused)]
macro_rules! trace2 {
@@ -116,6 +120,7 @@ pub enum Error {
NoProgressNoPending,
ShutdownWithQueuesNoProgressNoPending,
Error,
DurationOutOfBounds,
}
impl err::ToErr for Error {
@@ -170,13 +175,17 @@ mod ser_instant {
let tsnow = Instant::now();
let t1 = if tsnow >= *val {
let dur = tsnow.duration_since(*val);
let dur2 = chrono::Duration::seconds(dur.as_secs() as i64)
let dur2 = chrono::Duration::try_seconds(dur.as_secs() as i64)
.ok_or(Error::DurationOutOfBounds)
.unwrap()
.checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_sub_signed(dur2).unwrap()
} else {
let dur = (*val).duration_since(tsnow);
let dur2 = chrono::Duration::seconds(dur.as_secs() as i64)
let dur2 = chrono::Duration::try_seconds(dur.as_secs() as i64)
.ok_or(Error::DurationOutOfBounds)
.unwrap()
.checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_add_signed(dur2).unwrap()
@@ -190,7 +199,7 @@ mod ser_instant {
}
}
pub fn deserialize<'de, D>(de: D) -> Result<Option<Instant>, D::Error>
pub fn deserialize<'de, D>(_de: D) -> Result<Option<Instant>, D::Error>
where
D: Deserializer<'de>,
{
@@ -247,10 +256,22 @@ struct EnableMonitoringState {
subid: Subid,
}
#[derive(Debug, Clone)]
struct ReadPendingState {
tsbeg: Instant,
}
#[derive(Debug, Clone)]
enum Monitoring2State {
Passive,
ReadPending(Ioid, Instant),
}
#[derive(Debug, Clone)]
struct MonitoringState {
tsbeg: Instant,
subid: Subid,
mon2state: Monitoring2State,
}
#[derive(Debug, Clone)]
@@ -351,10 +372,17 @@ enum ChannelState {
Creating(CreatingState),
MakingSeriesWriter(MakingSeriesWriterState),
Writable(WritableState),
Closing(ClosingState),
Error(ChannelError),
Ended(ChannelStatusSeriesId),
}
#[derive(Debug)]
struct ClosingState {
tsbeg: Instant,
cssid: ChannelStatusSeriesId,
}
#[derive(Debug)]
struct ChannelConf {
conf: ChannelConfig,
@@ -370,6 +398,7 @@ impl ChannelState {
ChannelState::Writable(_) => ChannelConnectedInfo::Connected,
ChannelState::Error(_) => ChannelConnectedInfo::Error,
ChannelState::Ended(_) => ChannelConnectedInfo::Disconnected,
ChannelState::Closing(_) => ChannelConnectedInfo::Disconnected,
};
let scalar_type = match self {
ChannelState::Writable(s) => Some(s.writer.scalar_type().clone()),
@@ -438,6 +467,7 @@ impl ChannelState {
ChannelError::CreateChanFail(cssid) => cssid.clone(),
},
ChannelState::Ended(cssid) => cssid.clone(),
ChannelState::Closing(st) => st.cssid.clone(),
}
}
}
@@ -669,6 +699,8 @@ pub enum EndOfStreamReason {
ConnectFail,
OnCommand,
RemoteClosed,
IocTimeout,
IoError,
}
pub struct CaConnOpts {
@@ -823,40 +855,37 @@ impl CaConn {
}
}
fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
self.proto = None;
match &channel_reason {
ChannelStatusClosedReason::ConnectFail => {
fn trigger_shutdown(&mut self, reason: ShutdownReason) {
let channel_reason = match &reason {
ShutdownReason::ConnectFail => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail);
ChannelStatusClosedReason::ConnectFail
}
ChannelStatusClosedReason::ShutdownCommand => {
ShutdownReason::IoError => {
self.state = CaConnState::Shutdown(EndOfStreamReason::IoError);
ChannelStatusClosedReason::IoError
}
ShutdownReason::ShutdownCommand => {
self.state = CaConnState::Shutdown(EndOfStreamReason::OnCommand);
ChannelStatusClosedReason::ShutdownCommand
}
ChannelStatusClosedReason::ChannelRemove => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail);
}
ChannelStatusClosedReason::ProtocolError => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::ProtocolError));
}
ChannelStatusClosedReason::FrequencyQuota => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue));
}
ChannelStatusClosedReason::BandwidthQuota => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue));
}
ChannelStatusClosedReason::InternalError => {
ShutdownReason::InternalError => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::Error));
ChannelStatusClosedReason::InternalError
}
ChannelStatusClosedReason::IocTimeout => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue));
ShutdownReason::Protocol => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::ProtocolError));
ChannelStatusClosedReason::ProtocolError
}
ChannelStatusClosedReason::NoProtocol => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::NoProtocol));
ShutdownReason::ProtocolMissing => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::ProtocolError));
ChannelStatusClosedReason::ProtocolError
}
ChannelStatusClosedReason::ProtocolDone => {
self.state = CaConnState::Shutdown(EndOfStreamReason::RemoteClosed);
ShutdownReason::IocTimeout => {
self.state = CaConnState::Shutdown(EndOfStreamReason::IoError);
ChannelStatusClosedReason::IocTimeout
}
}
};
self.channel_state_on_shutdown(channel_reason);
let addr = self.remote_addr_dbg.clone();
self.insert_item_queue
@@ -866,6 +895,7 @@ impl CaConn {
// TODO map to appropriate status
status: ConnectionStatus::Closing,
}));
self.proto = None;
}
fn cmd_check_health(&mut self) {
@@ -883,7 +913,7 @@ impl CaConn {
Ok(_) => {}
Err(e) => {
error!("{e}");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
self.trigger_shutdown(ShutdownReason::InternalError);
}
}
@@ -936,7 +966,7 @@ impl CaConn {
fn cmd_shutdown(&mut self) {
debug!("cmd_shutdown {}", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand);
self.trigger_shutdown(ShutdownReason::ShutdownCommand);
}
fn handle_conn_command(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
@@ -1179,6 +1209,7 @@ impl CaConn {
// *chst = ChannelState::Ended;
}
ChannelState::Ended(_) => {}
ChannelState::Closing(_) => {}
}
}
}
@@ -1196,7 +1227,7 @@ impl CaConn {
value: CaConnEventValue::EchoTimeout,
};
self.ca_conn_event_out_queue.push_back(item);
self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout);
self.trigger_shutdown(ShutdownReason::IocTimeout);
}
} else {
if self.ioc_ping_next < tsnow {
@@ -1209,7 +1240,7 @@ impl CaConn {
} else {
self.stats.ping_no_proto().inc();
warn!("can not ping {} no proto", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol);
self.trigger_shutdown(ShutdownReason::Protocol);
}
}
}
@@ -1261,6 +1292,7 @@ impl CaConn {
// TODO need last-save-ts for this state.
}
ChannelState::Ended(_) => {}
ChannelState::Closing(_) => {}
}
}
Ok(())
@@ -1350,7 +1382,14 @@ impl CaConn {
if crst.stwin_count > 30000 || crst.stwin_bytes > 1024 * 1024 * 500 {
let subid = match &mut st.reading {
ReadingState::EnableMonitoring(x) => Some(x.subid.clone()),
ReadingState::Monitoring(x) => Some(x.subid.clone()),
ReadingState::Monitoring(x) => {
match x.mon2state {
// actually, no differing behavior needed so far.
Monitoring2State::Passive => (),
Monitoring2State::ReadPending(ioid, since) => (),
}
Some(x.subid.clone())
}
ReadingState::StopMonitoringForPolling(_) => {
self.stats.transition_to_polling_bad_state().inc();
None
@@ -1376,6 +1415,7 @@ impl CaConn {
st.reading = ReadingState::Monitoring(MonitoringState {
tsbeg: tsnow,
subid: st2.subid,
mon2state: Monitoring2State::Passive,
});
let crst = &mut st.channel;
let writer = &mut st.writer;
@@ -1383,7 +1423,13 @@ impl CaConn {
let stats = self.stats.as_ref();
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?;
}
ReadingState::Monitoring(_st2) => {
ReadingState::Monitoring(st2) => {
match st2.mon2state {
Monitoring2State::Passive => {}
Monitoring2State::ReadPending(ioid, since) => {
error!("TODO actually, EventAddRes can anyway not be a response to a ReadNotify");
}
}
let crst = &mut st.channel;
let writer = &mut st.writer;
let iiq = &mut self.insert_item_queue;
@@ -1452,7 +1498,12 @@ impl CaConn {
let name = self.name_by_cid(cid);
warn!("received event-cancel but channel {name:?} in wrong state");
}
ReadingState::Monitoring(..) => {
ReadingState::Monitoring(st2) => {
match st2.mon2state {
// no special discrimination needed
Monitoring2State::Passive => {}
Monitoring2State::ReadPending(ioid, since) => {}
}
let name = self.name_by_cid(cid);
warn!("received event-cancel but channel {name:?} in wrong state");
}
@@ -1506,31 +1557,28 @@ impl CaConn {
PollTickState::Wait(st3, ioid) => {
let dt = tsnow.saturating_duration_since(*st3);
self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32);
self.read_ioids.remove(ioid);
// TODO maintain histogram of read-notify latencies
self.read_ioids.remove(ioid);
st2.tick = PollTickState::Idle(tsnow);
let crst = &mut st.channel;
let writer = &mut st.writer;
let iiq = &mut self.insert_item_queue;
let stats = self.stats.as_ref();
Self::event_add_ingest(
ev.payload_len,
ev.value,
crst,
writer,
iiq,
tsnow,
stnow,
stats,
)?;
Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?;
}
},
ReadingState::EnableMonitoring(..) => {
error!("TODO handle_read_notify_res handle EnableMonitoring");
}
ReadingState::Monitoring(..) => {
error!("TODO handle_read_notify_res handle Monitoring");
}
ReadingState::Monitoring(st2) => match st2.mon2state {
Monitoring2State::Passive => {
error!("ReadNotifyRes even though we do not expect one");
}
Monitoring2State::ReadPending(ioid, since) => {
self.read_ioids.remove(&ioid);
let iiq = &mut self.insert_item_queue;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?;
}
},
ReadingState::StopMonitoringForPolling(..) => {
error!("TODO handle_read_notify_res handle StopMonitoringForPolling");
}
@@ -1548,6 +1596,20 @@ impl CaConn {
Ok(())
}
fn read_notify_res_for_write(
ev: proto::ReadNotifyRes,
st: &mut WritableState,
iiq: &mut VecDeque<QueryItem>,
stnow: SystemTime,
tsnow: Instant,
stats: &CaConnStats,
) -> Result<(), Error> {
let crst = &mut st.channel;
let writer = &mut st.writer;
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?;
Ok(())
}
fn event_add_ingest(
payload_len: u32,
value: CaEventValue,
@@ -1732,6 +1794,7 @@ impl CaConn {
fn check_channels_state_poll(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> {
let mut do_wake_again = false;
let mut do_shutdown = None;
let channels = &mut self.channels;
for (_k, conf) in channels {
let chst = &mut conf.state;
@@ -1741,7 +1804,42 @@ impl CaConn {
ChannelState::MakingSeriesWriter(_) => {}
ChannelState::Writable(st2) => match &mut st2.reading {
ReadingState::EnableMonitoring(_) => {}
ReadingState::Monitoring(_) => {}
ReadingState::Monitoring(st3) => match st3.mon2state {
Monitoring2State::Passive => {
// nothing to do
}
Monitoring2State::ReadPending(ioid, since) => {
error!("TODO check for timeout");
if since + MONITOR_POLL_TIMEOUT < tsnow {
let name = conf.conf.name();
warn!("channel monitor explicit read timeout {} ioid {:?}", name, ioid);
// Something is wrong with this channel.
// Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only
// this or a subset of the subscribed channels no longer give updates.
// Here we try to close the channel at hand.
// If the close-state does not
// TODO need to define the transition from operating channel to inoperable channel in
// a better and reusable way:
// Do not go directly into error state: need to at least attempt to close the channel and wait/timeout for reply.
let proto = self.proto.as_mut().ok_or(Error::NoProtocol)?;
let item = CaMsg {
ty: CaMsgTy::ChannelClose(ChannelClose {
sid: st2.channel.sid.0,
cid: st2.channel.cid.0,
}),
ts: tsnow,
};
proto.push_out(item);
*chst = ChannelState::Closing(ClosingState {
tsbeg: tsnow,
cssid: st2.channel.cssid,
});
}
}
},
ReadingState::StopMonitoringForPolling(_) => {}
ReadingState::Polling(st3) => match &mut st3.tick {
PollTickState::Idle(x) => {
@@ -1777,8 +1875,18 @@ impl CaConn {
},
ChannelState::Error(_) => {}
ChannelState::Ended(_) => {}
ChannelState::Closing(st2) => {
if st2.tsbeg + TIMEOUT_CHANNEL_CLOSING < tsnow {
let name = conf.conf.name();
warn!("timeout while closing channel {name}");
do_shutdown = Some(ShutdownReason::IocTimeout);
}
}
}
}
if let Some(reason) = do_shutdown {
self.trigger_shutdown(reason);
}
if do_wake_again {
cx.waker().wake_by_ref();
}
@@ -1896,12 +2004,12 @@ impl CaConn {
}
Ready(Some(Err(e))) => {
error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::ProtocolError);
self.trigger_shutdown(ShutdownReason::Protocol);
Ready(Some(Err(e)))
}
Ready(None) => {
warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::ProtocolDone);
self.trigger_shutdown(ShutdownReason::ProtocolMissing);
Ready(None)
}
Pending => Pending,
@@ -2032,7 +2140,7 @@ impl CaConn {
addr,
status: ConnectionStatus::ConnectError,
}));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
self.trigger_shutdown(ShutdownReason::IoError);
Ok(Ready(Some(())))
}
Err(e) => {
@@ -2045,7 +2153,7 @@ impl CaConn {
addr,
status: ConnectionStatus::ConnectTimeout,
}));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
self.trigger_shutdown(ShutdownReason::IocTimeout);
Ok(Ready(Some(())))
}
}
@@ -2133,7 +2241,7 @@ impl CaConn {
Ok(_) => Ok(Pending),
Err(e) => {
error!("handle_own_ticker {e}");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
self.trigger_shutdown(ShutdownReason::InternalError);
Err(e)
}
},
+2
View File
@@ -928,6 +928,8 @@ impl CaConnSet {
warn!("TODO make sure no channel is in state which could trigger health timeout")
}
EndOfStreamReason::RemoteClosed => self.handle_connect_fail(addr)?,
EndOfStreamReason::IocTimeout => self.handle_connect_fail(addr)?,
EndOfStreamReason::IoError => self.handle_connect_fail(addr)?,
}
// self.remove_channel_status_for_addr(addr)?;
trace2!("still CaConn left {}", self.ca_conn_ress.len());
+80 -14
View File
@@ -53,6 +53,12 @@ const EPICS_EPOCH_OFFSET: u64 = 631152000;
const PAYLOAD_LEN_MAX: u32 = 1024 * 1024 * 32;
const PROTO_INPUT_BUF_CAP: u32 = 1024 * 1024 * 40;
const TESTING_UNRESPONSIVE_TODO_REMOVE: bool = true;
const TESTING_EVENT_ADD_RES_MAX: u32 = 3;
const TESTING_PROTOCOL_ERROR_TODO_REMOVE: bool = true;
const TESTING_PROTOCOL_ERROR_AFTER_BYTES: u32 = 400;
#[derive(Debug)]
pub struct Search {
pub id: u32,
@@ -163,6 +169,24 @@ pub struct ReadNotifyRes {
pub value: CaEventValue,
}
#[derive(Debug)]
pub struct ChannelClose {
pub sid: u32,
pub cid: u32,
}
#[derive(Debug)]
pub struct ChannelCloseRes {
pub sid: u32,
pub cid: u32,
}
// This message is only sent from server to client, on server's initiative.
#[derive(Debug)]
pub struct ChannelDisconnect {
pub cid: u32,
}
#[derive(Debug)]
enum CaScalarType {
I8,
@@ -313,6 +337,9 @@ pub enum CaMsgTy {
EventCancelRes(EventCancelRes),
ReadNotify(ReadNotify),
ReadNotifyRes(ReadNotifyRes),
ChannelClose(ChannelClose),
ChannelCloseRes(ChannelCloseRes),
ChannelDisconnect(ChannelDisconnect),
Echo,
}
@@ -341,6 +368,9 @@ impl CaMsgTy {
EventCancelRes(_) => 0x01,
ReadNotify(_) => 0x0f,
ReadNotifyRes(_) => 0x0f,
ChannelClose(_) => 0x0c,
ChannelCloseRes(_) => 0x0c,
ChannelDisconnect(_) => 0x1b,
Echo => 0x17,
}
}
@@ -381,6 +411,9 @@ impl CaMsgTy {
error!("should not attempt to serialize the response again");
panic!();
}
ChannelClose(_) => 0,
ChannelCloseRes(_) => 0,
ChannelDisconnect(_) => 0,
Echo => 0,
}
}
@@ -410,6 +443,9 @@ impl CaMsgTy {
EventCancelRes(x) => x.data_type,
ReadNotify(x) => x.data_type,
ReadNotifyRes(x) => x.data_type,
ChannelClose(_) => 0,
ChannelCloseRes(_) => 0,
ChannelDisconnect(_) => 0,
Echo => 0,
}
}
@@ -445,6 +481,9 @@ impl CaMsgTy {
panic!();
// x.data_count as _
}
ChannelClose(_) => 0,
ChannelCloseRes(_) => 0,
ChannelDisconnect(_) => 0,
Echo => 0,
}
}
@@ -471,6 +510,9 @@ impl CaMsgTy {
EventCancelRes(x) => x.sid,
ReadNotify(x) => x.sid,
ReadNotifyRes(x) => x.sid,
ChannelClose(x) => x.sid,
ChannelCloseRes(x) => x.sid,
ChannelDisconnect(x) => x.cid,
Echo => 0,
}
}
@@ -497,6 +539,9 @@ impl CaMsgTy {
EventCancelRes(x) => x.subid,
ReadNotify(x) => x.ioid,
ReadNotifyRes(x) => x.ioid,
ChannelClose(x) => x.cid,
ChannelCloseRes(x) => x.cid,
ChannelDisconnect(_) => 0,
Echo => 0,
}
}
@@ -564,6 +609,9 @@ impl CaMsgTy {
EventCancelRes(_) => {}
ReadNotify(_) => {}
ReadNotifyRes(_) => {}
ChannelClose(_) => {}
ChannelCloseRes(_) => {}
ChannelDisconnect(_) => {}
Echo => {}
}
}
@@ -994,6 +1042,8 @@ pub struct CaProto {
array_truncate: usize,
stats: Arc<CaProtoStats>,
resqu: VecDeque<CaItem>,
event_add_res_cnt: u32,
bytes_recv_testing: u32,
}
impl CaProto {
@@ -1009,6 +1059,8 @@ impl CaProto {
array_truncate,
stats,
resqu: VecDeque::with_capacity(256),
event_add_res_cnt: 0,
bytes_recv_testing: 0,
}
}
@@ -1133,18 +1185,23 @@ impl CaProto {
let t = rbuf.filled().len().min(32);
debug!("received data {:?}", &rbuf.filled()[0..t]);
}
match self.buf.wadv(nf) {
Ok(()) => {
have_progress = true;
self.stats.tcp_recv_bytes().add(nf as _);
self.stats.tcp_recv_count().inc();
continue;
}
Err(e) => {
error!("netbuf wadv fail nf {nf} {e}");
return Err(e.into());
if TESTING_PROTOCOL_ERROR_TODO_REMOVE {
self.bytes_recv_testing = self.bytes_recv_testing.saturating_add(nf as u32);
if self.bytes_recv_testing <= TESTING_PROTOCOL_ERROR_AFTER_BYTES {
self.buf.wadv(nf)?;
} else {
let nr =
(self.bytes_recv_testing - TESTING_PROTOCOL_ERROR_AFTER_BYTES).min(nf as u32);
self.buf.wadv(nf - nr as usize)?;
for _ in 0..nr {
self.buf.put_u8(0x55)?;
}
}
}
have_progress = true;
self.stats.tcp_recv_bytes().add(nf as _);
self.stats.tcp_recv_count().inc();
continue;
}
}
Err(e) => {
@@ -1236,11 +1293,20 @@ impl CaProto {
let g = self.buf.read_bytes(hi.payload_len() as usize)?;
let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?;
// data-count is only reasonable for event messages
if let CaMsgTy::EventAddRes(..) = &msg.ty {
self.stats.data_count().ingest(hi.data_count() as u32);
}
let ret = match &msg.ty {
CaMsgTy::EventAddRes(..) => {
self.stats.data_count().ingest(hi.data_count() as u32);
if TESTING_UNRESPONSIVE_TODO_REMOVE && self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX {
self.event_add_res_cnt += 1;
Ok(Some(CaItem::Msg(msg)))
} else {
Ok(None)
}
}
_ => Ok(Some(CaItem::Msg(msg))),
};
self.state = CaState::StdHead;
Ok(Some(CaItem::Msg(msg)))
ret
}
CaState::Done => Err(Error::ParseAttemptInDoneState),
}
+5 -125
View File
@@ -3,9 +3,6 @@ use crate::ca::findioc::FindIocStream;
use crate::conf::CaIngestOpts;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::conn::PgClient;
use dbpg::iocindex::IocItem;
use dbpg::iocindex::IocSearchIndexWorker;
use err::Error;
use futures_util::StreamExt;
use log::*;
@@ -40,13 +37,11 @@ async fn resolve_address(addr_str: &str) -> Result<SocketAddr, Error> {
};
let host = format!("{}:{}", hostname.clone(), port);
match tokio::net::lookup_host(host.clone()).await {
Ok(mut k) => {
if let Some(k) = k.next() {
k
} else {
return Err(Error::with_msg_no_trace(format!("can not lookup host {host}")));
}
}
Ok(k) => k
.into_iter()
.filter(|addr| if let SocketAddr::V4(_) = addr { true } else { false })
.next()
.ok_or_else(|| Error::with_msg_no_trace(format!("can not lookup host {host}")))?,
Err(e) => return Err(e.into()),
}
}
@@ -56,121 +51,6 @@ async fn resolve_address(addr_str: &str) -> Result<SocketAddr, Error> {
Ok(ac)
}
struct DbUpdateWorker {
jh: JoinHandle<()>,
}
impl DbUpdateWorker {
async fn new(rx: Receiver<IocItem>, backend: String, pg: PgClient) -> Result<Self, Error> {
let worker = IocSearchIndexWorker::prepare(rx, backend, pg)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let jh = tokio::spawn(async move { worker.worker().await });
Ok(Self { jh })
}
}
#[cfg(DISABLED)]
pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(), Error> {
info!("ca_search begin");
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config())
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
dbpg::schema::schema_check(&pg)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (search_tgts, blacklist) = search_tgts_from_opts(&opts).await?;
// let mut finder = FindIocStream::new(search_tgts, Duration::from_millis(800), 20, 16);
// finder.set_stop_on_empty_queue();
// for ch in channels.iter() {
// finder.push(ch.into());
// }
const DB_WORKER_COUNT: usize = 1;
let (dbtx, dbrx) = async_channel::bounded(64);
let mut dbworkers = Vec::new();
for _ in 0..DB_WORKER_COUNT {
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config())
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let w = DbUpdateWorker::new(dbrx.clone(), opts.backend().into(), pg).await?;
dbworkers.push(w);
}
drop(dbrx);
let dbtx: Sender<_> = dbtx;
let mut ts_last = Instant::now();
'outer: loop {
let ts_now = Instant::now();
if ts_now.duration_since(ts_last) >= Duration::from_millis(2000) {
ts_last = ts_now;
info!("{}", finder.quick_state());
}
let k = tokio::time::timeout(Duration::from_millis(1500), finder.next()).await;
let item = match k {
Ok(Some(k)) => k,
Ok(None) => {
info!("Search stream exhausted");
break;
}
Err(_) => {
continue;
}
};
let item = match item {
Ok(k) => k,
Err(e) => {
error!("ca_search {e:?}");
continue;
}
};
for item in item {
let mut do_block = false;
for a2 in &gw_addrs {
if let Some(response_addr) = &item.response_addr {
if &SocketAddr::V4(*response_addr) == a2 {
do_block = true;
warn!("gateways responded to search");
}
}
}
if let Some(a1) = item.addr.as_ref() {
for a2 in &gw_addrs {
if &SocketAddr::V4(*a1) == a2 {
do_block = true;
warn!("do not use gateways as ioc address");
}
}
}
if do_block {
info!("blacklisting {item:?}");
} else {
let item = IocItem::new(item.channel, item.response_addr, item.addr, item.dt);
match dbtx.send(item).await {
Ok(_) => {}
Err(_) => {
error!("dbtx broken");
break 'outer;
}
}
}
}
}
drop(dbtx);
for w in dbworkers {
match w.jh.await {
Ok(_) => {}
Err(e) => {
error!("see error while join on db worker: {e}");
}
}
}
info!("all done");
Ok(())
}
pub async fn ca_search_workers_start(
opts: &CaIngestOpts,
stats: Arc<IocFinderStats>,
+14
View File
@@ -370,6 +370,7 @@ pub enum ChannelStatusClosedReason {
NoProtocol,
ProtocolDone,
ConnectFail,
IoError,
}
#[derive(Debug)]
@@ -397,6 +398,7 @@ impl ChannelStatus {
NoProtocol => 9,
ProtocolDone => 10,
ConnectFail => 11,
IoError => 12,
},
}
}
@@ -416,6 +418,7 @@ impl ChannelStatus {
9 => Closed(NoProtocol),
10 => Closed(ProtocolDone),
11 => Closed(ConnectFail),
12 => Closed(IoError),
24 => AssignedToAddress,
_ => {
return Err(err::Error::with_msg_no_trace(format!(
@@ -427,6 +430,17 @@ impl ChannelStatus {
}
}
#[derive(Debug, Clone)]
pub enum ShutdownReason {
ConnectFail,
IoError,
ShutdownCommand,
InternalError,
Protocol,
ProtocolMissing,
IocTimeout,
}
#[derive(Debug)]
pub struct ChannelStatusItem {
pub ts: SystemTime,
+2
View File
@@ -65,6 +65,7 @@ pub struct SeriesWriter {
// TODO this should be in an Option:
ts_msp_grid_last: u32,
binner: ConnTimeBin,
written_last: Option<DataValue>,
}
impl SeriesWriter {
@@ -126,6 +127,7 @@ impl SeriesWriter {
msp_max_bytes: 1024 * 1024 * 20,
ts_msp_grid_last: 0,
binner,
written_last: None,
};
Ok(res)
}