From 6660531e5b168b6136513bc4188a9811101a1fe2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 21 Apr 2024 12:54:51 +0100 Subject: [PATCH] Intentional protocol error for testing, refactor --- daqingest/src/tools.rs | 2 +- netfetch/src/ca/conn.rs | 222 ++++++++++++++++++++++++++--------- netfetch/src/ca/connset.rs | 2 + netfetch/src/ca/proto.rs | 94 ++++++++++++--- netfetch/src/ca/search.rs | 130 +------------------- scywr/src/iteminsertqueue.rs | 14 +++ serieswriter/src/writer.rs | 2 + 7 files changed, 269 insertions(+), 197 deletions(-) diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index b6b21dc..ffceaa4 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -100,7 +100,7 @@ async fn remove_older_series( } pub async fn find_older_msp( - backend: String, + _backend: String, params: FindOlder, pgconf: &Database, scyconf: &ScyllaIngestConfig, diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 89cf10b..f35cb86 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,6 +1,7 @@ use super::proto; use super::proto::CaEventValue; use super::proto::ReadNotify; +use crate::ca::proto::ChannelClose; use crate::ca::proto::EventCancel; use crate::conf::ChannelConfig; use crate::senderpolling::SenderPolling; @@ -34,6 +35,7 @@ use scywr::iteminsertqueue as scywriiq; use scywr::iteminsertqueue::Accounting; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::ShutdownReason; use scywriiq::ChannelStatus; use scywriiq::ChannelStatusClosedReason; use scywriiq::ChannelStatusItem; @@ -70,6 +72,8 @@ use tokio::net::TcpStream; const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000); const IOC_PING_IVL: Duration = Duration::from_millis(80000); const DO_RATE_CHECK: bool = false; +const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(3000); +const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(3000); #[allow(unused)] macro_rules! trace2 { @@ -116,6 +120,7 @@ pub enum Error { NoProgressNoPending, ShutdownWithQueuesNoProgressNoPending, Error, + DurationOutOfBounds, } impl err::ToErr for Error { @@ -170,13 +175,17 @@ mod ser_instant { let tsnow = Instant::now(); let t1 = if tsnow >= *val { let dur = tsnow.duration_since(*val); - let dur2 = chrono::Duration::seconds(dur.as_secs() as i64) + let dur2 = chrono::Duration::try_seconds(dur.as_secs() as i64) + .ok_or(Error::DurationOutOfBounds) + .unwrap() .checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64)) .unwrap(); now.checked_sub_signed(dur2).unwrap() } else { let dur = (*val).duration_since(tsnow); - let dur2 = chrono::Duration::seconds(dur.as_secs() as i64) + let dur2 = chrono::Duration::try_seconds(dur.as_secs() as i64) + .ok_or(Error::DurationOutOfBounds) + .unwrap() .checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64)) .unwrap(); now.checked_add_signed(dur2).unwrap() @@ -190,7 +199,7 @@ mod ser_instant { } } - pub fn deserialize<'de, D>(de: D) -> Result, D::Error> + pub fn deserialize<'de, D>(_de: D) -> Result, D::Error> where D: Deserializer<'de>, { @@ -247,10 +256,22 @@ struct EnableMonitoringState { subid: Subid, } +#[derive(Debug, Clone)] +struct ReadPendingState { + tsbeg: Instant, +} + +#[derive(Debug, Clone)] +enum Monitoring2State { + Passive, + ReadPending(Ioid, Instant), +} + #[derive(Debug, Clone)] struct MonitoringState { tsbeg: Instant, subid: Subid, + mon2state: Monitoring2State, } #[derive(Debug, Clone)] @@ -351,10 +372,17 @@ enum ChannelState { Creating(CreatingState), MakingSeriesWriter(MakingSeriesWriterState), Writable(WritableState), + Closing(ClosingState), Error(ChannelError), Ended(ChannelStatusSeriesId), } +#[derive(Debug)] +struct ClosingState { + tsbeg: Instant, + cssid: ChannelStatusSeriesId, +} + #[derive(Debug)] struct ChannelConf { conf: ChannelConfig, @@ -370,6 +398,7 @@ impl ChannelState { ChannelState::Writable(_) => ChannelConnectedInfo::Connected, ChannelState::Error(_) => ChannelConnectedInfo::Error, ChannelState::Ended(_) => ChannelConnectedInfo::Disconnected, + ChannelState::Closing(_) => ChannelConnectedInfo::Disconnected, }; let scalar_type = match self { ChannelState::Writable(s) => Some(s.writer.scalar_type().clone()), @@ -438,6 +467,7 @@ impl ChannelState { ChannelError::CreateChanFail(cssid) => cssid.clone(), }, ChannelState::Ended(cssid) => cssid.clone(), + ChannelState::Closing(st) => st.cssid.clone(), } } } @@ -669,6 +699,8 @@ pub enum EndOfStreamReason { ConnectFail, OnCommand, RemoteClosed, + IocTimeout, + IoError, } pub struct CaConnOpts { @@ -823,40 +855,37 @@ impl CaConn { } } - fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { - self.proto = None; - match &channel_reason { - ChannelStatusClosedReason::ConnectFail => { + fn trigger_shutdown(&mut self, reason: ShutdownReason) { + let channel_reason = match &reason { + ShutdownReason::ConnectFail => { self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail); + ChannelStatusClosedReason::ConnectFail } - ChannelStatusClosedReason::ShutdownCommand => { + ShutdownReason::IoError => { + self.state = CaConnState::Shutdown(EndOfStreamReason::IoError); + ChannelStatusClosedReason::IoError + } + ShutdownReason::ShutdownCommand => { self.state = CaConnState::Shutdown(EndOfStreamReason::OnCommand); + ChannelStatusClosedReason::ShutdownCommand } - ChannelStatusClosedReason::ChannelRemove => { - self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail); - } - ChannelStatusClosedReason::ProtocolError => { - self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::ProtocolError)); - } - ChannelStatusClosedReason::FrequencyQuota => { - self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue)); - } - ChannelStatusClosedReason::BandwidthQuota => { - self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue)); - } - ChannelStatusClosedReason::InternalError => { + ShutdownReason::InternalError => { self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::Error)); + ChannelStatusClosedReason::InternalError } - ChannelStatusClosedReason::IocTimeout => { - self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue)); + ShutdownReason::Protocol => { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::ProtocolError)); + ChannelStatusClosedReason::ProtocolError } - ChannelStatusClosedReason::NoProtocol => { - self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::NoProtocol)); + ShutdownReason::ProtocolMissing => { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::ProtocolError)); + ChannelStatusClosedReason::ProtocolError } - ChannelStatusClosedReason::ProtocolDone => { - self.state = CaConnState::Shutdown(EndOfStreamReason::RemoteClosed); + ShutdownReason::IocTimeout => { + self.state = CaConnState::Shutdown(EndOfStreamReason::IoError); + ChannelStatusClosedReason::IocTimeout } - } + }; self.channel_state_on_shutdown(channel_reason); let addr = self.remote_addr_dbg.clone(); self.insert_item_queue @@ -866,6 +895,7 @@ impl CaConn { // TODO map to appropriate status status: ConnectionStatus::Closing, })); + self.proto = None; } fn cmd_check_health(&mut self) { @@ -883,7 +913,7 @@ impl CaConn { Ok(_) => {} Err(e) => { error!("{e}"); - self.trigger_shutdown(ChannelStatusClosedReason::InternalError); + self.trigger_shutdown(ShutdownReason::InternalError); } } @@ -936,7 +966,7 @@ impl CaConn { fn cmd_shutdown(&mut self) { debug!("cmd_shutdown {}", self.remote_addr_dbg); - self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand); + self.trigger_shutdown(ShutdownReason::ShutdownCommand); } fn handle_conn_command(&mut self, cx: &mut Context) -> Result>, Error> { @@ -1179,6 +1209,7 @@ impl CaConn { // *chst = ChannelState::Ended; } ChannelState::Ended(_) => {} + ChannelState::Closing(_) => {} } } } @@ -1196,7 +1227,7 @@ impl CaConn { value: CaConnEventValue::EchoTimeout, }; self.ca_conn_event_out_queue.push_back(item); - self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout); + self.trigger_shutdown(ShutdownReason::IocTimeout); } } else { if self.ioc_ping_next < tsnow { @@ -1209,7 +1240,7 @@ impl CaConn { } else { self.stats.ping_no_proto().inc(); warn!("can not ping {} no proto", self.remote_addr_dbg); - self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol); + self.trigger_shutdown(ShutdownReason::Protocol); } } } @@ -1261,6 +1292,7 @@ impl CaConn { // TODO need last-save-ts for this state. } ChannelState::Ended(_) => {} + ChannelState::Closing(_) => {} } } Ok(()) @@ -1350,7 +1382,14 @@ impl CaConn { if crst.stwin_count > 30000 || crst.stwin_bytes > 1024 * 1024 * 500 { let subid = match &mut st.reading { ReadingState::EnableMonitoring(x) => Some(x.subid.clone()), - ReadingState::Monitoring(x) => Some(x.subid.clone()), + ReadingState::Monitoring(x) => { + match x.mon2state { + // actually, no differing behavior needed so far. + Monitoring2State::Passive => (), + Monitoring2State::ReadPending(ioid, since) => (), + } + Some(x.subid.clone()) + } ReadingState::StopMonitoringForPolling(_) => { self.stats.transition_to_polling_bad_state().inc(); None @@ -1376,6 +1415,7 @@ impl CaConn { st.reading = ReadingState::Monitoring(MonitoringState { tsbeg: tsnow, subid: st2.subid, + mon2state: Monitoring2State::Passive, }); let crst = &mut st.channel; let writer = &mut st.writer; @@ -1383,7 +1423,13 @@ impl CaConn { let stats = self.stats.as_ref(); Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?; } - ReadingState::Monitoring(_st2) => { + ReadingState::Monitoring(st2) => { + match st2.mon2state { + Monitoring2State::Passive => {} + Monitoring2State::ReadPending(ioid, since) => { + error!("TODO actually, EventAddRes can anyway not be a response to a ReadNotify"); + } + } let crst = &mut st.channel; let writer = &mut st.writer; let iiq = &mut self.insert_item_queue; @@ -1452,7 +1498,12 @@ impl CaConn { let name = self.name_by_cid(cid); warn!("received event-cancel but channel {name:?} in wrong state"); } - ReadingState::Monitoring(..) => { + ReadingState::Monitoring(st2) => { + match st2.mon2state { + // no special discrimination needed + Monitoring2State::Passive => {} + Monitoring2State::ReadPending(ioid, since) => {} + } let name = self.name_by_cid(cid); warn!("received event-cancel but channel {name:?} in wrong state"); } @@ -1506,31 +1557,28 @@ impl CaConn { PollTickState::Wait(st3, ioid) => { let dt = tsnow.saturating_duration_since(*st3); self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32); - self.read_ioids.remove(ioid); // TODO maintain histogram of read-notify latencies + self.read_ioids.remove(ioid); st2.tick = PollTickState::Idle(tsnow); - let crst = &mut st.channel; - let writer = &mut st.writer; let iiq = &mut self.insert_item_queue; let stats = self.stats.as_ref(); - Self::event_add_ingest( - ev.payload_len, - ev.value, - crst, - writer, - iiq, - tsnow, - stnow, - stats, - )?; + Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?; } }, ReadingState::EnableMonitoring(..) => { error!("TODO handle_read_notify_res handle EnableMonitoring"); } - ReadingState::Monitoring(..) => { - error!("TODO handle_read_notify_res handle Monitoring"); - } + ReadingState::Monitoring(st2) => match st2.mon2state { + Monitoring2State::Passive => { + error!("ReadNotifyRes even though we do not expect one"); + } + Monitoring2State::ReadPending(ioid, since) => { + self.read_ioids.remove(&ioid); + let iiq = &mut self.insert_item_queue; + let stats = self.stats.as_ref(); + Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?; + } + }, ReadingState::StopMonitoringForPolling(..) => { error!("TODO handle_read_notify_res handle StopMonitoringForPolling"); } @@ -1548,6 +1596,20 @@ impl CaConn { Ok(()) } + fn read_notify_res_for_write( + ev: proto::ReadNotifyRes, + st: &mut WritableState, + iiq: &mut VecDeque, + stnow: SystemTime, + tsnow: Instant, + stats: &CaConnStats, + ) -> Result<(), Error> { + let crst = &mut st.channel; + let writer = &mut st.writer; + Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?; + Ok(()) + } + fn event_add_ingest( payload_len: u32, value: CaEventValue, @@ -1732,6 +1794,7 @@ impl CaConn { fn check_channels_state_poll(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> { let mut do_wake_again = false; + let mut do_shutdown = None; let channels = &mut self.channels; for (_k, conf) in channels { let chst = &mut conf.state; @@ -1741,7 +1804,42 @@ impl CaConn { ChannelState::MakingSeriesWriter(_) => {} ChannelState::Writable(st2) => match &mut st2.reading { ReadingState::EnableMonitoring(_) => {} - ReadingState::Monitoring(_) => {} + ReadingState::Monitoring(st3) => match st3.mon2state { + Monitoring2State::Passive => { + // nothing to do + } + Monitoring2State::ReadPending(ioid, since) => { + error!("TODO check for timeout"); + if since + MONITOR_POLL_TIMEOUT < tsnow { + let name = conf.conf.name(); + warn!("channel monitor explicit read timeout {} ioid {:?}", name, ioid); + + // Something is wrong with this channel. + // Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only + // this or a subset of the subscribed channels no longer give updates. + // Here we try to close the channel at hand. + // If the close-state does not + + // TODO need to define the transition from operating channel to inoperable channel in + // a better and reusable way: + // Do not go directly into error state: need to at least attempt to close the channel and wait/timeout for reply. + + let proto = self.proto.as_mut().ok_or(Error::NoProtocol)?; + let item = CaMsg { + ty: CaMsgTy::ChannelClose(ChannelClose { + sid: st2.channel.sid.0, + cid: st2.channel.cid.0, + }), + ts: tsnow, + }; + proto.push_out(item); + *chst = ChannelState::Closing(ClosingState { + tsbeg: tsnow, + cssid: st2.channel.cssid, + }); + } + } + }, ReadingState::StopMonitoringForPolling(_) => {} ReadingState::Polling(st3) => match &mut st3.tick { PollTickState::Idle(x) => { @@ -1777,8 +1875,18 @@ impl CaConn { }, ChannelState::Error(_) => {} ChannelState::Ended(_) => {} + ChannelState::Closing(st2) => { + if st2.tsbeg + TIMEOUT_CHANNEL_CLOSING < tsnow { + let name = conf.conf.name(); + warn!("timeout while closing channel {name}"); + do_shutdown = Some(ShutdownReason::IocTimeout); + } + } } } + if let Some(reason) = do_shutdown { + self.trigger_shutdown(reason); + } if do_wake_again { cx.waker().wake_by_ref(); } @@ -1896,12 +2004,12 @@ impl CaConn { } Ready(Some(Err(e))) => { error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg); - self.trigger_shutdown(ChannelStatusClosedReason::ProtocolError); + self.trigger_shutdown(ShutdownReason::Protocol); Ready(Some(Err(e))) } Ready(None) => { warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg); - self.trigger_shutdown(ChannelStatusClosedReason::ProtocolDone); + self.trigger_shutdown(ShutdownReason::ProtocolMissing); Ready(None) } Pending => Pending, @@ -2032,7 +2140,7 @@ impl CaConn { addr, status: ConnectionStatus::ConnectError, })); - self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail); + self.trigger_shutdown(ShutdownReason::IoError); Ok(Ready(Some(()))) } Err(e) => { @@ -2045,7 +2153,7 @@ impl CaConn { addr, status: ConnectionStatus::ConnectTimeout, })); - self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail); + self.trigger_shutdown(ShutdownReason::IocTimeout); Ok(Ready(Some(()))) } } @@ -2133,7 +2241,7 @@ impl CaConn { Ok(_) => Ok(Pending), Err(e) => { error!("handle_own_ticker {e}"); - self.trigger_shutdown(ChannelStatusClosedReason::InternalError); + self.trigger_shutdown(ShutdownReason::InternalError); Err(e) } }, diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index ac91559..2346dfc 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -928,6 +928,8 @@ impl CaConnSet { warn!("TODO make sure no channel is in state which could trigger health timeout") } EndOfStreamReason::RemoteClosed => self.handle_connect_fail(addr)?, + EndOfStreamReason::IocTimeout => self.handle_connect_fail(addr)?, + EndOfStreamReason::IoError => self.handle_connect_fail(addr)?, } // self.remove_channel_status_for_addr(addr)?; trace2!("still CaConn left {}", self.ca_conn_ress.len()); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index f52ee52..5d8bf94 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -53,6 +53,12 @@ const EPICS_EPOCH_OFFSET: u64 = 631152000; const PAYLOAD_LEN_MAX: u32 = 1024 * 1024 * 32; const PROTO_INPUT_BUF_CAP: u32 = 1024 * 1024 * 40; +const TESTING_UNRESPONSIVE_TODO_REMOVE: bool = true; +const TESTING_EVENT_ADD_RES_MAX: u32 = 3; + +const TESTING_PROTOCOL_ERROR_TODO_REMOVE: bool = true; +const TESTING_PROTOCOL_ERROR_AFTER_BYTES: u32 = 400; + #[derive(Debug)] pub struct Search { pub id: u32, @@ -163,6 +169,24 @@ pub struct ReadNotifyRes { pub value: CaEventValue, } +#[derive(Debug)] +pub struct ChannelClose { + pub sid: u32, + pub cid: u32, +} + +#[derive(Debug)] +pub struct ChannelCloseRes { + pub sid: u32, + pub cid: u32, +} + +// This message is only sent from server to client, on server's initiative. +#[derive(Debug)] +pub struct ChannelDisconnect { + pub cid: u32, +} + #[derive(Debug)] enum CaScalarType { I8, @@ -313,6 +337,9 @@ pub enum CaMsgTy { EventCancelRes(EventCancelRes), ReadNotify(ReadNotify), ReadNotifyRes(ReadNotifyRes), + ChannelClose(ChannelClose), + ChannelCloseRes(ChannelCloseRes), + ChannelDisconnect(ChannelDisconnect), Echo, } @@ -341,6 +368,9 @@ impl CaMsgTy { EventCancelRes(_) => 0x01, ReadNotify(_) => 0x0f, ReadNotifyRes(_) => 0x0f, + ChannelClose(_) => 0x0c, + ChannelCloseRes(_) => 0x0c, + ChannelDisconnect(_) => 0x1b, Echo => 0x17, } } @@ -381,6 +411,9 @@ impl CaMsgTy { error!("should not attempt to serialize the response again"); panic!(); } + ChannelClose(_) => 0, + ChannelCloseRes(_) => 0, + ChannelDisconnect(_) => 0, Echo => 0, } } @@ -410,6 +443,9 @@ impl CaMsgTy { EventCancelRes(x) => x.data_type, ReadNotify(x) => x.data_type, ReadNotifyRes(x) => x.data_type, + ChannelClose(_) => 0, + ChannelCloseRes(_) => 0, + ChannelDisconnect(_) => 0, Echo => 0, } } @@ -445,6 +481,9 @@ impl CaMsgTy { panic!(); // x.data_count as _ } + ChannelClose(_) => 0, + ChannelCloseRes(_) => 0, + ChannelDisconnect(_) => 0, Echo => 0, } } @@ -471,6 +510,9 @@ impl CaMsgTy { EventCancelRes(x) => x.sid, ReadNotify(x) => x.sid, ReadNotifyRes(x) => x.sid, + ChannelClose(x) => x.sid, + ChannelCloseRes(x) => x.sid, + ChannelDisconnect(x) => x.cid, Echo => 0, } } @@ -497,6 +539,9 @@ impl CaMsgTy { EventCancelRes(x) => x.subid, ReadNotify(x) => x.ioid, ReadNotifyRes(x) => x.ioid, + ChannelClose(x) => x.cid, + ChannelCloseRes(x) => x.cid, + ChannelDisconnect(_) => 0, Echo => 0, } } @@ -564,6 +609,9 @@ impl CaMsgTy { EventCancelRes(_) => {} ReadNotify(_) => {} ReadNotifyRes(_) => {} + ChannelClose(_) => {} + ChannelCloseRes(_) => {} + ChannelDisconnect(_) => {} Echo => {} } } @@ -994,6 +1042,8 @@ pub struct CaProto { array_truncate: usize, stats: Arc, resqu: VecDeque, + event_add_res_cnt: u32, + bytes_recv_testing: u32, } impl CaProto { @@ -1009,6 +1059,8 @@ impl CaProto { array_truncate, stats, resqu: VecDeque::with_capacity(256), + event_add_res_cnt: 0, + bytes_recv_testing: 0, } } @@ -1133,18 +1185,23 @@ impl CaProto { let t = rbuf.filled().len().min(32); debug!("received data {:?}", &rbuf.filled()[0..t]); } - match self.buf.wadv(nf) { - Ok(()) => { - have_progress = true; - self.stats.tcp_recv_bytes().add(nf as _); - self.stats.tcp_recv_count().inc(); - continue; - } - Err(e) => { - error!("netbuf wadv fail nf {nf} {e}"); - return Err(e.into()); + if TESTING_PROTOCOL_ERROR_TODO_REMOVE { + self.bytes_recv_testing = self.bytes_recv_testing.saturating_add(nf as u32); + if self.bytes_recv_testing <= TESTING_PROTOCOL_ERROR_AFTER_BYTES { + self.buf.wadv(nf)?; + } else { + let nr = + (self.bytes_recv_testing - TESTING_PROTOCOL_ERROR_AFTER_BYTES).min(nf as u32); + self.buf.wadv(nf - nr as usize)?; + for _ in 0..nr { + self.buf.put_u8(0x55)?; + } } } + have_progress = true; + self.stats.tcp_recv_bytes().add(nf as _); + self.stats.tcp_recv_count().inc(); + continue; } } Err(e) => { @@ -1236,11 +1293,20 @@ impl CaProto { let g = self.buf.read_bytes(hi.payload_len() as usize)?; let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?; // data-count is only reasonable for event messages - if let CaMsgTy::EventAddRes(..) = &msg.ty { - self.stats.data_count().ingest(hi.data_count() as u32); - } + let ret = match &msg.ty { + CaMsgTy::EventAddRes(..) => { + self.stats.data_count().ingest(hi.data_count() as u32); + if TESTING_UNRESPONSIVE_TODO_REMOVE && self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX { + self.event_add_res_cnt += 1; + Ok(Some(CaItem::Msg(msg))) + } else { + Ok(None) + } + } + _ => Ok(Some(CaItem::Msg(msg))), + }; self.state = CaState::StdHead; - Ok(Some(CaItem::Msg(msg))) + ret } CaState::Done => Err(Error::ParseAttemptInDoneState), } diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index f34e0da..0b802e5 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -3,9 +3,6 @@ use crate::ca::findioc::FindIocStream; use crate::conf::CaIngestOpts; use async_channel::Receiver; use async_channel::Sender; -use dbpg::conn::PgClient; -use dbpg::iocindex::IocItem; -use dbpg::iocindex::IocSearchIndexWorker; use err::Error; use futures_util::StreamExt; use log::*; @@ -40,13 +37,11 @@ async fn resolve_address(addr_str: &str) -> Result { }; let host = format!("{}:{}", hostname.clone(), port); match tokio::net::lookup_host(host.clone()).await { - Ok(mut k) => { - if let Some(k) = k.next() { - k - } else { - return Err(Error::with_msg_no_trace(format!("can not lookup host {host}"))); - } - } + Ok(k) => k + .into_iter() + .filter(|addr| if let SocketAddr::V4(_) = addr { true } else { false }) + .next() + .ok_or_else(|| Error::with_msg_no_trace(format!("can not lookup host {host}")))?, Err(e) => return Err(e.into()), } } @@ -56,121 +51,6 @@ async fn resolve_address(addr_str: &str) -> Result { Ok(ac) } -struct DbUpdateWorker { - jh: JoinHandle<()>, -} - -impl DbUpdateWorker { - async fn new(rx: Receiver, backend: String, pg: PgClient) -> Result { - let worker = IocSearchIndexWorker::prepare(rx, backend, pg) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let jh = tokio::spawn(async move { worker.worker().await }); - Ok(Self { jh }) - } -} - -#[cfg(DISABLED)] -pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), Error> { - info!("ca_search begin"); - let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - dbpg::schema::schema_check(&pg) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - - let (search_tgts, blacklist) = search_tgts_from_opts(&opts).await?; - - // let mut finder = FindIocStream::new(search_tgts, Duration::from_millis(800), 20, 16); - // finder.set_stop_on_empty_queue(); - // for ch in channels.iter() { - // finder.push(ch.into()); - // } - - const DB_WORKER_COUNT: usize = 1; - let (dbtx, dbrx) = async_channel::bounded(64); - let mut dbworkers = Vec::new(); - for _ in 0..DB_WORKER_COUNT { - let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let w = DbUpdateWorker::new(dbrx.clone(), opts.backend().into(), pg).await?; - dbworkers.push(w); - } - drop(dbrx); - let dbtx: Sender<_> = dbtx; - - let mut ts_last = Instant::now(); - 'outer: loop { - let ts_now = Instant::now(); - if ts_now.duration_since(ts_last) >= Duration::from_millis(2000) { - ts_last = ts_now; - info!("{}", finder.quick_state()); - } - let k = tokio::time::timeout(Duration::from_millis(1500), finder.next()).await; - let item = match k { - Ok(Some(k)) => k, - Ok(None) => { - info!("Search stream exhausted"); - break; - } - Err(_) => { - continue; - } - }; - let item = match item { - Ok(k) => k, - Err(e) => { - error!("ca_search {e:?}"); - continue; - } - }; - for item in item { - let mut do_block = false; - for a2 in &gw_addrs { - if let Some(response_addr) = &item.response_addr { - if &SocketAddr::V4(*response_addr) == a2 { - do_block = true; - warn!("gateways responded to search"); - } - } - } - if let Some(a1) = item.addr.as_ref() { - for a2 in &gw_addrs { - if &SocketAddr::V4(*a1) == a2 { - do_block = true; - warn!("do not use gateways as ioc address"); - } - } - } - if do_block { - info!("blacklisting {item:?}"); - } else { - let item = IocItem::new(item.channel, item.response_addr, item.addr, item.dt); - match dbtx.send(item).await { - Ok(_) => {} - Err(_) => { - error!("dbtx broken"); - break 'outer; - } - } - } - } - } - drop(dbtx); - for w in dbworkers { - match w.jh.await { - Ok(_) => {} - Err(e) => { - error!("see error while join on db worker: {e}"); - } - } - } - info!("all done"); - Ok(()) -} - pub async fn ca_search_workers_start( opts: &CaIngestOpts, stats: Arc, diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index bf2d589..37d72c2 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -370,6 +370,7 @@ pub enum ChannelStatusClosedReason { NoProtocol, ProtocolDone, ConnectFail, + IoError, } #[derive(Debug)] @@ -397,6 +398,7 @@ impl ChannelStatus { NoProtocol => 9, ProtocolDone => 10, ConnectFail => 11, + IoError => 12, }, } } @@ -416,6 +418,7 @@ impl ChannelStatus { 9 => Closed(NoProtocol), 10 => Closed(ProtocolDone), 11 => Closed(ConnectFail), + 12 => Closed(IoError), 24 => AssignedToAddress, _ => { return Err(err::Error::with_msg_no_trace(format!( @@ -427,6 +430,17 @@ impl ChannelStatus { } } +#[derive(Debug, Clone)] +pub enum ShutdownReason { + ConnectFail, + IoError, + ShutdownCommand, + InternalError, + Protocol, + ProtocolMissing, + IocTimeout, +} + #[derive(Debug)] pub struct ChannelStatusItem { pub ts: SystemTime, diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 052a452..4bb3b26 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -65,6 +65,7 @@ pub struct SeriesWriter { // TODO this should be in an Option: ts_msp_grid_last: u32, binner: ConnTimeBin, + written_last: Option, } impl SeriesWriter { @@ -126,6 +127,7 @@ impl SeriesWriter { msp_max_bytes: 1024 * 1024 * 20, ts_msp_grid_last: 0, binner, + written_last: None, }; Ok(res) }