diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 8fc6934..e3c4bd7 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.0-aa.3" +version = "0.2.1-aa.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index fe687ed..23821da 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -89,7 +89,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { netfetch::ca::search::ca_search(conf, &channels).await? } ChannelAccess::CaIngest(k) => { - info!("daqingest version {}", clap::crate_version!()); + info!("daqingest version {} +0001", clap::crate_version!()); let (conf, channels_config) = parse_config(k.config.into()).await?; daqingest::daemon::run(conf, channels_config).await? } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 9c7ade4..dc49529 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -44,7 +44,6 @@ const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000); const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500); -const RUN_WITHOUT_SCYLLA: bool = true; pub struct DaemonOpts { pgconf: Database, @@ -104,9 +103,11 @@ impl Daemon { let insert_queue_counter = Arc::new(AtomicUsize::new(0)); let wrest_stats = Arc::new(SeriesWriterEstablishStats::new()); - let (writer_establis_tx,) = - serieswriter::writer::start_writer_establish_worker(channel_info_query_tx.clone(), wrest_stats.clone()) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let (writer_establis_tx,) = serieswriter::establish_worker::start_writer_establish_worker( + channel_info_query_tx.clone(), + wrest_stats.clone(), + ) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let local_epics_hostname = ingest_linux::net::local_hostname(); @@ -199,26 +200,71 @@ impl Daemon { let mut insert_worker_jhs = Vec::new(); - if RUN_WITHOUT_SCYLLA { + if ingest_opts.scylla_disable() { let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( ingest_opts.insert_worker_count(), ingest_opts.insert_worker_concurrency(), iqrx.st_rf3_rx, - insert_worker_opts, + insert_worker_opts.clone(), + insert_worker_stats.clone(), + ) + .await?; + insert_worker_jhs.extend(jh); + let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + iqrx.mt_rf3_rx, + insert_worker_opts.clone(), + insert_worker_stats.clone(), + ) + .await?; + insert_worker_jhs.extend(jh); + let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + iqrx.lt_rf3_rx, + insert_worker_opts.clone(), insert_worker_stats.clone(), ) .await?; insert_worker_jhs.extend(jh); } else { let jh = scywr::insertworker::spawn_scylla_insert_workers( - // TODO does the worker actually need RETT? Yes, to use the correct table names. RetentionTime::Short, opts.scyconf_st.clone(), ingest_opts.insert_scylla_sessions(), ingest_opts.insert_worker_count(), ingest_opts.insert_worker_concurrency(), - iqrx.st_rf3_rx.clone(), - insert_worker_opts, + iqrx.st_rf3_rx, + insert_worker_opts.clone(), + insert_worker_stats.clone(), + ingest_opts.use_rate_limit_queue(), + ) + .await?; + insert_worker_jhs.extend(jh); + + let jh = scywr::insertworker::spawn_scylla_insert_workers( + RetentionTime::Medium, + opts.scyconf_mt.clone(), + ingest_opts.insert_scylla_sessions(), + ingest_opts.insert_worker_count().min(2), + ingest_opts.insert_worker_concurrency().min(8), + iqrx.mt_rf3_rx, + insert_worker_opts.clone(), + insert_worker_stats.clone(), + ingest_opts.use_rate_limit_queue(), + ) + .await?; + insert_worker_jhs.extend(jh); + + let jh = scywr::insertworker::spawn_scylla_insert_workers( + RetentionTime::Long, + opts.scyconf_lt.clone(), + ingest_opts.insert_scylla_sessions(), + ingest_opts.insert_worker_count().min(2), + ingest_opts.insert_worker_concurrency().min(8), + iqrx.lt_rf3_rx, + insert_worker_opts.clone(), insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ) @@ -629,15 +675,15 @@ impl Daemon { } } } - info!("wait for metrics handler"); + debug!("wait for metrics handler"); self.metrics_shutdown_tx.send(1).await?; if let Some(jh) = self.metrics_jh.take() { jh.await??; } - info!("joined metrics handler"); - info!("\n\n\n-----------------------\n\n\nwait for postingest task"); + debug!("joined metrics handler"); + debug!("wait for postingest task"); worker_jh.await?.map_err(|e| Error::from_string(e))?; - info!("joined postingest task"); + debug!("joined postingest task"); Ok(()) } } @@ -669,10 +715,13 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> .await .map_err(Error::from_string)?; dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?; + drop(pg); jh.await?.map_err(Error::from_string)?; } - if RUN_WITHOUT_SCYLLA { + if opts.scylla_disable() { + warn!("scylla_disable config flag enabled"); } else { + info!("start scylla schema check"); scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short) .await .map_err(Error::from_string)?; @@ -682,6 +731,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long) .await .map_err(Error::from_string)?; + info!("stop scylla schema check"); } info!("database check done"); diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index bb59d19..10c5864 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -7,10 +7,10 @@ use log::*; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; -use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::ScalarValue; use serieswriter::writer::SeriesWriter; +use std::collections::VecDeque; use std::io::Cursor; use std::net::Ipv4Addr; use std::time::SystemTime; @@ -36,7 +36,7 @@ pub async fn listen_beacons( sock.set_broadcast(true).unwrap(); let mut buf = Vec::new(); buf.resize(1024 * 4, 0); - let mut iqdqs = InsertDeques::new(); + let mut deque = VecDeque::new(); loop { let bb = &mut buf; let (n, remote) = taskrun::tokio::select! { @@ -65,12 +65,12 @@ pub async fn listen_beacons( let ts_local = ts; let blob = addr_u32 as i64; let val = DataValue::Scalar(ScalarValue::I64(blob)); - writer.write(ts, ts_local, val, &mut iqdqs)?; + writer.write(ts, ts_local, val, &mut deque)?; } } - if iqdqs.len() != 0 { + if deque.len() != 0 { // TODO deliver to insert queue - iqdqs.clear(); + deque.clear(); } } Ok(()) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 755d1bf..82ed8dd 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -46,9 +46,9 @@ use scywriiq::ConnectionStatusItem; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; -use serieswriter::writer::EstablishWorkerJob; -use serieswriter::writer::JobId; -use serieswriter::writer::SeriesWriter; +use serieswriter::establish_worker::EstablishWorkerJob; +use serieswriter::establish_worker::JobId; +use serieswriter::rtwriter::RtWriter; use stats::rand_xoshiro::rand_core::RngCore; use stats::rand_xoshiro::rand_core::SeedableRng; use stats::rand_xoshiro::Xoshiro128PlusPlus; @@ -72,11 +72,12 @@ use taskrun::tokio; use tokio::net::TcpStream; const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000); -const IOC_PING_IVL: Duration = Duration::from_millis(80000); +const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 80); const DO_RATE_CHECK: bool = false; -const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(3000); -const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(3000); -const TIMEOUT_MONITOR_PASSIVE: Duration = Duration::from_millis(3000); +const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(6000); +const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(8000); +const TIMEOUT_MONITOR_PASSIVE: Duration = Duration::from_millis(1000 * 68); +const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(10000); #[allow(unused)] macro_rules! trace2 { @@ -90,7 +91,7 @@ macro_rules! trace2 { #[allow(unused)] macro_rules! trace3 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -120,7 +121,7 @@ pub enum Error { ProtocolError, IocIssue, Protocol(#[from] crate::ca::proto::Error), - Writer(#[from] serieswriter::writer::Error), + RtWriter(#[from] serieswriter::rtwriter::Error), // TODO remove false positive from ThisError derive #[allow(private_interfaces)] UnknownCid(Cid), @@ -137,6 +138,7 @@ pub enum Error { Error, DurationOutOfBounds, NoFreeCid, + InsertQueues(#[from] scywr::insertqueues::Error), } impl err::ToErr for Error { @@ -324,7 +326,7 @@ enum PollTickState { struct WritableState { tsbeg: Instant, channel: CreatedState, - writer: SeriesWriter, + writer: RtWriter, reading: ReadingState, } @@ -344,7 +346,10 @@ struct CreatedState { ca_dbr_type: u16, ca_dbr_count: u32, ts_created: Instant, + // Updated when we receive something via monitoring or polling ts_alive_last: Instant, + // Updated on monitoring, polling or when the channel config changes to reset the timeout + ts_activity_last: Instant, ts_msp_last: u64, ts_msp_grid_last: u32, inserted_in_ts_msp: u64, @@ -374,6 +379,7 @@ impl CreatedState { ca_dbr_count: 0, ts_created: tsnow, ts_alive_last: tsnow, + ts_activity_last: tsnow, ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: 0, @@ -417,6 +423,12 @@ struct ChannelConf { state: ChannelState, } +impl ChannelConf { + pub fn poll_conf(&self) -> Option<(u64,)> { + self.conf.poll_conf() + } +} + impl ChannelState { fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4, conf: ChannelConfig) -> ChannelStateInfo { let channel_connected_info = match self { @@ -701,6 +713,18 @@ impl CaConnEvent { value, } } + + pub fn desc_short(&self) -> CaConnEventDescShort { + CaConnEventDescShort {} + } +} + +pub struct CaConnEventDescShort {} + +impl fmt::Display for CaConnEventDescShort { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "CaConnEventDescShort {{ TODO-impl }}") + } } #[derive(Debug)] @@ -781,8 +805,8 @@ pub struct CaConn { rng: Xoshiro128PlusPlus, writer_establish_qu: VecDeque, writer_establish_tx: Pin>>, - writer_tx: Sender<(JobId, Result)>, - writer_rx: Pin)>>>, + writer_tx: Sender<(JobId, Result)>, + writer_rx: Pin)>>>, tmp_ts_poll: SystemTime, poll_tsnow: Instant, ioid: u32, @@ -909,9 +933,10 @@ impl CaConn { }; self.channel_state_on_shutdown(channel_reason); let addr = self.remote_addr_dbg.clone(); - self.iqdqs - .lt_rf3_rx - .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + // TODO handle Err: + let _ = self + .iqdqs + .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { ts: self.tmp_ts_poll, addr, // TODO map to appropriate status @@ -990,7 +1015,7 @@ impl CaConn { } } - fn handle_writer_establish_inner(&mut self, cid: Cid, writer: SeriesWriter) -> Result<(), Error> { + fn handle_writer_establish_inner(&mut self, cid: Cid, writer: RtWriter) -> Result<(), Error> { trace!("handle_writer_establish_inner {cid:?}"); // At this point we have created the channel and created a writer for that type and sid. // We do not yet monitor. @@ -999,6 +1024,8 @@ impl CaConn { // Create a monitor for the channel. // NOTE: must store the Writer even if not yet in Evented, we could also transition to Polled! if let Some(conf) = self.channels.get_mut(&cid) { + // TODO refactor, should only execute this when required: + let conf_poll_conf = conf.poll_conf(); let chst = &mut conf.state; if let ChannelState::MakingSeriesWriter(st2) = chst { self.stats.get_series_id_ok.inc(); @@ -1008,21 +1035,21 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::Opened, }); - self.iqdqs.lt_rf3_rx.push_back(item); + self.iqdqs.emit_status_item(item); } - let name = conf.conf.name(); - if name.starts_with("TEST:PEAKING:") { + if let Some((ivl,)) = conf_poll_conf { let created_state = WritableState { tsbeg: self.poll_tsnow, channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), + // channel: st2.channel.clone(), writer, reading: ReadingState::Polling(PollingState { tsbeg: self.poll_tsnow, - poll_ivl: Duration::from_millis(1000), + poll_ivl: Duration::from_millis(ivl), tick: PollTickState::Idle(self.poll_tsnow), }), }; - *chst = ChannelState::Writable(created_state); + conf.state = ChannelState::Writable(created_state); Ok(()) } else { let subid = { @@ -1058,7 +1085,7 @@ impl CaConn { subid, }), }; - *chst = ChannelState::Writable(created_state); + conf.state = ChannelState::Writable(created_state); Ok(()) } } else { @@ -1175,7 +1202,7 @@ impl CaConn { cssid: cssid.clone(), status: ChannelStatus::Closed(channel_reason.clone()), }); - self.iqdqs.lt_rf3_rx.push_back(item); + self.iqdqs.emit_status_item(item); *chst = ChannelState::Ended(cssid); } ChannelState::Error(..) => { @@ -1307,8 +1334,8 @@ impl CaConn { ReadingState::Monitoring(x) => { match x.mon2state { // actually, no differing behavior needed so far. - Monitoring2State::Passive(_) => (), - Monitoring2State::ReadPending(ioid, since) => (), + Monitoring2State::Passive(_) => {} + Monitoring2State::ReadPending(ioid, since) => {} } Some(x.subid.clone()) } @@ -1350,9 +1377,10 @@ impl CaConn { Monitoring2State::Passive(st3) => { st3.tsbeg = tsnow; } - Monitoring2State::ReadPending(ioid, since) => { - warn!("TODO we are waiting for a explicit caget, but received a monitor event"); - st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); + Monitoring2State::ReadPending(_ioid, _since) => { + // Received EventAdd while still waiting for answer to explicit ReadNotify. + // This is fine. + self.stats.recv_event_add_while_wait_on_read_notify.inc(); } } let crst = &mut st.channel; @@ -1377,9 +1405,11 @@ impl CaConn { } } } - _ => { - // TODO count instead of print - error!("unexpected state: EventAddRes while having {ch_s:?}"); + ChannelState::Creating(_) | ChannelState::Init(_) | ChannelState::MakingSeriesWriter(_) => { + self.stats.recv_read_notify_but_not_init_yet.inc(); + } + ChannelState::Closing(_) | ChannelState::Ended(_) | ChannelState::Error(_) => { + self.stats.recv_read_notify_but_no_longer_ready.inc(); } } Ok(()) @@ -1479,7 +1509,7 @@ impl CaConn { match &mut st.reading { ReadingState::Polling(st2) => match &mut st2.tick { PollTickState::Idle(_st3) => { - warn!("received ReadNotifyRes while in Wait state"); + self.stats.recv_read_notify_while_polling_idle.inc(); } PollTickState::Wait(st3, ioid) => { let dt = tsnow.saturating_duration_since(*st3); @@ -1492,21 +1522,26 @@ impl CaConn { Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; } }, - ReadingState::EnableMonitoring(..) => { - error!("TODO handle_read_notify_res handle EnableMonitoring"); + ReadingState::EnableMonitoring(_) => { + self.stats.recv_read_notify_while_enabling_monitoring.inc(); } ReadingState::Monitoring(st2) => match &mut st2.mon2state { Monitoring2State::Passive(st3) => { - self.read_ioids.remove(&ioid); + if self.read_ioids.remove(&ioid).is_some() { + self.stats.recv_read_notify_state_passive_found_ioid.inc(); + } else { + self.stats.recv_read_notify_state_passive.inc(); + } st3.tsbeg = tsnow; - error!("ReadNotifyRes even though we do not expect one"); } Monitoring2State::ReadPending(ioid2, _since) => { - trace!("\nhandle_read_notify_res received ReadNotify in Monitoring2State::ReadPending\n\n"); // We don't check again for `since` here. That's done in timeout checking. // So we could be here a little beyond timeout but we don't care about that. if ioid != *ioid2 { - warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); + // warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); + self.stats.recv_read_notify_state_read_pending_bad_ioid.inc(); + } else { + self.stats.recv_read_notify_state_read_pending.inc(); } self.read_ioids.remove(&ioid); st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); @@ -1550,7 +1585,7 @@ impl CaConn { payload_len: u32, value: CaEventValue, crst: &mut CreatedState, - writer: &mut SeriesWriter, + writer: &mut RtWriter, iqdqs: &mut InsertDeques, tsnow: Instant, stnow: SystemTime, @@ -1558,6 +1593,7 @@ impl CaConn { ) -> Result<(), Error> { // debug!("event_add_ingest payload_len {} value {:?}", payload_len, value); crst.ts_alive_last = tsnow; + crst.ts_activity_last = tsnow; crst.item_recv_ivl_ema.tick(tsnow); crst.recv_count += 1; crst.recv_bytes += payload_len as u64; @@ -1578,7 +1614,7 @@ impl CaConn { crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); } - Self::check_ev_value_data(&value.data, writer.scalar_type())?; + Self::check_ev_value_data(&value.data, &writer.scalar_type())?; { let val: DataValue = value.data.into(); writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?; @@ -1849,12 +1885,12 @@ impl CaConn { Ok(()) } - fn check_channels_alive(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> { - trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg); + fn check_channels_alive(&mut self, tsnow: Instant, _cx: &mut Context) -> Result<(), Error> { + trace3!("check_channels_alive {}", self.remote_addr_dbg); if let Some(started) = self.ioc_ping_start { - if started + Duration::from_millis(4000) < tsnow { + if started + TIMEOUT_PONG_WAIT < tsnow { self.stats.pong_timeout().inc(); - warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg); + warn!("pong timeout {}", self.remote_addr_dbg); self.ioc_ping_start = None; let item = CaConnEvent { ts: tsnow, @@ -1867,7 +1903,6 @@ impl CaConn { if self.ioc_ping_next < tsnow { if let Some(proto) = &mut self.proto { self.stats.ping_start().inc(); - info!("start ping"); self.ioc_ping_start = Some(tsnow); let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow); proto.push_out(msg); @@ -1889,8 +1924,8 @@ impl CaConn { // TODO handle timeout check } ReadingState::Monitoring(st3) => match &st3.mon2state { - Monitoring2State::Passive(st4) => {} - Monitoring2State::ReadPending(_, tsbeg) => { + Monitoring2State::Passive(_st4) => {} + Monitoring2State::ReadPending(_, _) => { // This is handled in check_channels_state_poll // TODO should unify. } @@ -1898,14 +1933,14 @@ impl CaConn { ReadingState::StopMonitoringForPolling(_) => { // TODO handle timeout check } - ReadingState::Polling(st3) => { + ReadingState::Polling(_st3) => { // This is handled in check_channels_state_poll // TODO should unify. } } - if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) { - warn!("TODO assume channel not alive because nothing received, but should do CAGET"); + if st2.channel.ts_activity_last + conf.conf.expect_activity_within() < tsnow { not_alive_count += 1; + self.stats.channel_not_alive_no_activity.inc(); } else { alive_count += 1; } @@ -2082,6 +2117,7 @@ impl CaConn { ca_dbr_count: k.data_count, ts_created: tsnow, ts_alive_last: tsnow, + ts_activity_last: tsnow, ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, @@ -2104,8 +2140,10 @@ impl CaConn { JobId(cid.0 as _), self.backend.clone(), conf.conf.name().into(), + cssid, scalar_type, shape, + conf.conf.min_quiets(), self.writer_tx.clone(), self.tmp_ts_poll, ); @@ -2149,12 +2187,11 @@ impl CaConn { self.stats.tcp_connected.inc(); let addr = addr.clone(); self.iqdqs - .lt_rf3_rx - .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { ts: self.tmp_ts_poll, addr, status: ConnectionStatus::Established, - })); + }))?; self.backoff_reset(); let proto = CaProto::new( tcp, @@ -2167,29 +2204,27 @@ impl CaConn { Ok(Ready(Some(()))) } Ok(Err(e)) => { - debug!("error connect to {addr} {e}"); + info!("error connect to {addr} {e}"); let addr = addr.clone(); self.iqdqs - .lt_rf3_rx - .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { ts: self.tmp_ts_poll, addr, status: ConnectionStatus::ConnectError, - })); + }))?; self.trigger_shutdown(ShutdownReason::IoError); Ok(Ready(Some(()))) } Err(e) => { // TODO log with exponential backoff - debug!("timeout connect to {addr} {e}"); + info!("timeout connect to {addr} {e}"); let addr = addr.clone(); self.iqdqs - .lt_rf3_rx - .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { ts: self.tmp_ts_poll, addr, status: ConnectionStatus::ConnectTimeout, - })); + }))?; self.trigger_shutdown(ShutdownReason::IocTimeout); Ok(Ready(Some(()))) } @@ -2372,7 +2407,7 @@ impl CaConn { count, bytes, }); - self.iqdqs.lt_rf3_rx.push_back(item); + self.iqdqs.emit_status_item(item)?; } } } @@ -2454,7 +2489,10 @@ impl CaConn { use scywr::senderpolling::Error as SpErr; match e { SpErr::NoSendInProgress => return Err(Error::NotSending), - SpErr::Closed(_) => return Err(Error::ClosedSending), + SpErr::Closed(_) => { + error!("{self_name} queue closed id {:10}", id); + return Err(Error::ClosedSending); + } } } Pending => { @@ -2471,6 +2509,11 @@ impl CaConn { Ok(Ready(None)) } } + + fn log_queues_summary(&self) { + self.iqdqs.log_summary(); + self.iqsp.log_summary(); + } } // $have is tuple (have_progress, have_pending)) @@ -2592,6 +2635,7 @@ impl Stream for CaConn { cx, stats_fn ); + let stats2 = self.stats.clone(); let stats_fn = move |item: &VecDeque| { stats2.iiq_batch_len().ingest(item.len() as u32); @@ -2607,6 +2651,7 @@ impl Stream for CaConn { cx, stats_fn ); + let stats2 = self.stats.clone(); let stats_fn = move |item: &VecDeque| { stats2.iiq_batch_len().ingest(item.len() as u32); @@ -2622,6 +2667,22 @@ impl Stream for CaConn { cx, stats_fn ); + + let stats2 = self.stats.clone(); + let stats_fn = move |item: &VecDeque| { + stats2.iiq_batch_len().ingest(item.len() as u32); + }; + flush_queue_dqs!( + self, + lt_rf3_rx, + lt_rf3_sp_pin, + send_batched::<256, _>, + 32, + (&mut have_progress, &mut have_pending), + "lt_rf3_rx", + cx, + stats_fn + ); } let lts3 = Instant::now(); @@ -2728,6 +2789,7 @@ impl Stream for CaConn { continue; } else if have_pending { debug!("is_shutdown NOT queues_out_flushed pend {}", self.remote_addr_dbg); + self.log_queues_summary(); self.stats.poll_pending().inc(); Pending } else { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 67054ac..efaceb0 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -40,7 +40,6 @@ use scywr::iteminsertqueue::QueryItem; use scywr::senderpolling::SenderPolling; use serde::Serialize; use series::ChannelStatusSeriesId; -use serieswriter::writer::EstablishWorkerJob; use statemap::ActiveChannelState; use statemap::CaConnStateValue; use statemap::ChannelState; @@ -64,6 +63,7 @@ use std::pin::Pin; use netpod::OnDrop; use scywr::insertqueues::InsertQueuesTx; +use serieswriter::establish_worker::EstablishWorkerJob; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -1102,7 +1102,7 @@ impl CaConnSet { ) -> Result { let mut eos_reason = None; while let Some(item) = conn.next().await { - trace!("ca_conn_item_merge_inner item {item:?}"); + trace!("ca_conn_item_merge_inner item {}", item.desc_short()); if let Some(x) = eos_reason { let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}")); error!("{e}"); diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 4ce8658..68168ce 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -603,17 +603,17 @@ impl Stream for FindIocStream { have_progress = true; } Ready(Err(e)) => { - error!("{e:?}"); + error!("{e}"); } Pending => { g.clear_ready(); - warn!("socket seemed ready for write, but is not"); + // warn!("socket seemed ready for write, but is not"); have_progress = true; } }, Ready(Err(e)) => { - let e = Error::with_msg_no_trace(format!("{e:?}")); - error!("poll_write_ready {e:?}"); + error!("poll_write_ready {e}"); + let e = Error::from_string(e); } Pending => {} } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 0a08d6c..a46331f 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -788,15 +788,7 @@ impl CaMsg { }; CaMsg::from_ty_ts(CaMsgTy::Error(e), tsnow) } - 20 => { - let name = std::ffi::CString::new(payload) - .map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}"))) - .unwrap_or_else(|e| format!("{e:?}")); - CaMsg::from_ty_ts(CaMsgTy::ClientNameRes(ClientNameRes { name }), tsnow) - } - // TODO make response type for host name: - 21 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow), - 6 => { + 0x06 => { if hi.payload_len() != 8 { warn!("protocol error: search result is expected with fixed payload size 8"); } @@ -815,29 +807,6 @@ impl CaMsg { }); CaMsg::from_ty_ts(ty, tsnow) } - 18 => { - let ty = CaMsgTy::CreateChanRes(CreateChanRes { - data_type: hi.data_type, - // TODO what am I supposed to use here in case of extended header? - data_count: hi.data_count() as _, - cid: hi.param1, - sid: hi.param2, - }); - CaMsg::from_ty_ts(ty, tsnow) - } - 22 => { - // TODO use different structs for request and response: - let ty = CaMsgTy::AccessRightsRes(AccessRightsRes { - cid: hi.param1, - rights: hi.param2, - }); - CaMsg::from_ty_ts(ty, tsnow) - } - 26 => { - // TODO use different structs for request and response: - let ty = CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }); - CaMsg::from_ty_ts(ty, tsnow) - } 0x01 => { if payload.len() < 12 { if payload.len() == 0 { @@ -897,7 +866,40 @@ impl CaMsg { }); CaMsg::from_ty_ts(ty, tsnow) } - 0x11 => CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow), + 0x12 => { + let ty = CaMsgTy::CreateChanRes(CreateChanRes { + data_type: hi.data_type, + // TODO what am I supposed to use here in case of extended header? + data_count: hi.data_count() as _, + cid: hi.param1, + sid: hi.param2, + }); + CaMsg::from_ty_ts(ty, tsnow) + } + 0x16 => { + let ty = CaMsgTy::AccessRightsRes(AccessRightsRes { + cid: hi.param1, + rights: hi.param2, + }); + CaMsg::from_ty_ts(ty, tsnow) + } + 0x17 => { + let ty = CaMsgTy::Echo; + CaMsg::from_ty_ts(ty, tsnow) + } + 0x1a => { + // TODO use different structs for request and response: + let ty = CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }); + CaMsg::from_ty_ts(ty, tsnow) + } + 0x14 => { + let name = std::ffi::CString::new(payload) + .map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}"))) + .unwrap_or_else(|e| format!("{e:?}")); + CaMsg::from_ty_ts(CaMsgTy::ClientNameRes(ClientNameRes { name }), tsnow) + } + // TODO make response type for host name: + 0x15 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow), x => return Err(Error::CaCommandNotSupported(x)), }; Ok(msg) diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index ba550fe..db33002 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -5,6 +5,7 @@ use regex::Regex; use scywr::config::ScyllaIngestConfig; use serde::Deserialize; use serde::Serialize; +use serieswriter::rtwriter::MinQuiets; use std::collections::BTreeMap; use std::path::Path; use std::path::PathBuf; @@ -39,6 +40,8 @@ pub struct CaIngestOpts { insert_frac: Option, use_rate_limit_queue: Option, pub test_bsread_addr: Option, + #[serde(default)] + scylla_disable: bool, } impl CaIngestOpts { @@ -109,12 +112,16 @@ impl CaIngestOpts { pub fn use_rate_limit_queue(&self) -> bool { self.use_rate_limit_queue.unwrap_or(false) } + + pub fn scylla_disable(&self) -> bool { + self.scylla_disable + } } #[test] fn parse_config_minimal() { let conf = r###" -backend: scylla +backend: test_backend timeout: 10m 3s 45ms api_bind: "0.0.0.0:3011" channels: /some/path/file.txt @@ -127,7 +134,7 @@ postgresql: user: USER pass: PASS name: NAME -scylla: +scylla_st: hosts: - sf-nube-11:19042 - sf-nube-12:19042 @@ -525,4 +532,75 @@ impl ChannelConfig { pub fn name(&self) -> &str { &self.name } + + pub fn is_polled(&self) -> bool { + self.arch.is_polled + } + + pub fn poll_conf(&self) -> Option<(u64,)> { + if self.is_polled() { + if let Some(ChannelReadConfig::Poll(x)) = self.arch.short_term { + Some((x.as_millis() as u64,)) + } else if let Some(ChannelReadConfig::Poll(x)) = self.arch.medium_term { + Some((x.as_millis() as u64,)) + } else if let Some(ChannelReadConfig::Poll(x)) = self.arch.long_term { + Some((x.as_millis() as u64,)) + } else { + Some((60,)) + } + } else { + None + } + } + + /// Only used when in monitoring mode. If we do not see activity for this Duration then + /// we issue a manual read to see if the channel is alive. + pub fn manual_poll_on_quiet(&self) -> Duration { + Duration::from_secs(120) + } + + pub fn expect_activity_within(&self) -> Duration { + let dur = if self.is_polled() { + // It would be anyway invalid to be polled and specify a monitor record policy. + match self.arch.short_term { + Some(ChannelReadConfig::Poll(x)) => x, + Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(), + None => match self.arch.medium_term { + Some(ChannelReadConfig::Poll(x)) => x, + Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(), + None => match self.arch.long_term { + Some(ChannelReadConfig::Poll(x)) => x, + Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(), + None => { + // This is an invalid configuration, so just a fallback + self.manual_poll_on_quiet() + } + }, + }, + } + } else { + self.manual_poll_on_quiet() + }; + dur + Duration::from_millis(1000 * 10) + } + + pub fn min_quiets(&self) -> MinQuiets { + MinQuiets { + st: match self.arch.short_term { + Some(ChannelReadConfig::Monitor) => Duration::ZERO, + Some(ChannelReadConfig::Poll(x)) => x, + None => Duration::MAX, + }, + mt: match self.arch.medium_term { + Some(ChannelReadConfig::Monitor) => Duration::ZERO, + Some(ChannelReadConfig::Poll(x)) => x, + None => Duration::MAX, + }, + lt: match self.arch.long_term { + Some(ChannelReadConfig::Monitor) => Duration::ZERO, + Some(ChannelReadConfig::Poll(x)) => x, + None => Duration::MAX, + }, + } + } } diff --git a/netfetch/src/metrics/postingest.rs b/netfetch/src/metrics/postingest.rs index ea361b7..94c3927 100644 --- a/netfetch/src/metrics/postingest.rs +++ b/netfetch/src/metrics/postingest.rs @@ -55,10 +55,11 @@ pub async fn process_api_query_items( #[allow(irrefutable_let_patterns)] while let item = taskrun::tokio::time::timeout(Duration::from_millis(500), item_rx.recv()).await { + let deque = &mut iqdqs.st_rf3_rx; let tsnow = Instant::now(); if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) { sw_tick_last = tsnow; - tick_writers(mucache.all_ref_mut(), &mut iqdqs)?; + tick_writers(mucache.all_ref_mut(), deque)?; } let item = match item { Ok(Ok(item)) => item, @@ -83,23 +84,25 @@ pub async fn process_api_query_items( stnow, ) .await?; - sw.write(item.ts, item.ts, item.val, &mut iqdqs)?; + sw.write(item.ts, item.ts, item.val, deque)?; iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; } - finish_writers(mucache.all_ref_mut(), &mut iqdqs)?; + let deque = &mut iqdqs.st_rf3_rx; + finish_writers(mucache.all_ref_mut(), deque)?; + iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; Ok(()) } -fn tick_writers(sws: Vec<&mut SeriesWriter>, iqdqs: &mut InsertDeques) -> Result<(), Error> { +fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { for sw in sws { - sw.tick(iqdqs)?; + sw.tick(deque)?; } Ok(()) } -fn finish_writers(sws: Vec<&mut SeriesWriter>, iqdqs: &mut InsertDeques) -> Result<(), Error> { +fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { for sw in sws { - sw.tick(iqdqs)?; + sw.tick(deque)?; } Ok(()) } diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 0aab5de..e42803e 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -2,10 +2,18 @@ use crate::iteminsertqueue::QueryItem; use crate::senderpolling::SenderPolling; use async_channel::Receiver; use async_channel::Sender; +use err::thiserror; +use err::ThisError; +use netpod::log::*; use pin_project::pin_project; use std::collections::VecDeque; use std::pin::Pin; +#[derive(Debug, ThisError)] +pub enum Error { + QueuePush, +} + #[derive(Clone)] pub struct InsertQueuesTx { pub st_rf1_tx: Sender>, @@ -71,6 +79,32 @@ impl InsertDeques { self.mt_rf3_rx.clear(); self.lt_rf3_rx.clear(); } + + pub fn log_summary(&self) { + let summ = InsertDequesSummary { + st_rf1_len: self.st_rf1_rx.len(), + st_rf3_len: self.st_rf3_rx.len(), + mt_rf3_len: self.mt_rf3_rx.len(), + lt_rf3_len: self.lt_rf3_rx.len(), + }; + info!("{summ:?}"); + } + + // Should be used only for connection and channel status items. + // It encapsulates the decision to which queue(s) we want to send these kind of items. + pub fn emit_status_item(&mut self, item: QueryItem) -> Result<(), Error> { + self.lt_rf3_rx.push_back(item); + Ok(()) + } +} + +#[derive(Debug)] +#[allow(unused)] +struct InsertDequesSummary { + st_rf1_len: usize, + st_rf3_len: usize, + mt_rf3_len: usize, + lt_rf3_len: usize, } #[pin_project] @@ -81,6 +115,8 @@ pub struct InsertSenderPolling { pub st_rf3_sp: SenderPolling>, #[pin] pub mt_rf3_sp: SenderPolling>, + #[pin] + pub lt_rf3_sp: SenderPolling>, } impl InsertSenderPolling { @@ -89,19 +125,15 @@ impl InsertSenderPolling { st_rf1_sp: SenderPolling::new(iqtx.st_rf1_tx), st_rf3_sp: SenderPolling::new(iqtx.st_rf3_tx), mt_rf3_sp: SenderPolling::new(iqtx.mt_rf3_tx), + lt_rf3_sp: SenderPolling::new(iqtx.lt_rf3_tx), } } pub fn is_idle(&self) -> bool { - self.st_rf1_sp.is_idle() && self.st_rf3_sp.is_idle() && self.mt_rf3_sp.is_idle() + self.st_rf1_sp.is_idle() && self.st_rf3_sp.is_idle() && self.mt_rf3_sp.is_idle() && self.lt_rf3_sp.is_idle() } pub fn st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { - // unsafe { - // let this = self.get_unchecked_mut(); - // let pp1 = &mut this.st_rf1_sp; - // Pin::new_unchecked(pp1) - // } self.project().st_rf1_sp } @@ -112,4 +144,34 @@ impl InsertSenderPolling { pub fn mt_rf3_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { self.project().mt_rf3_sp } + + pub fn lt_rf3_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { + self.project().lt_rf3_sp + } + + pub fn __st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { + if true { + panic!("encapsulated by pin_project"); + } + unsafe { self.map_unchecked_mut(|x| &mut x.st_rf1_sp) } + } + + pub fn log_summary(&self) { + let summ = InsertSenderPollingSummary { + st_rf1_idle: self.st_rf1_sp.is_idle(), + st_rf3_idle: self.st_rf3_sp.is_idle(), + mt_rf3_idle: self.mt_rf3_sp.is_idle(), + lt_rf3_idle: self.lt_rf3_sp.is_idle(), + }; + info!("{summ:?}"); + } +} + +#[derive(Debug)] +#[allow(unused)] +struct InsertSenderPollingSummary { + st_rf1_idle: bool, + st_rf3_idle: bool, + mt_rf3_idle: bool, + lt_rf3_idle: bool, } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 2389cbd..2fc4fa5 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -50,6 +50,15 @@ macro_rules! trace3 { }; } +#[allow(unused)] +macro_rules! trace_item_execute { + ($($arg:tt)*) => { + if true { + debug!($($arg)*); + } + }; +} + fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::Error) { use crate::iteminsertqueue::Error; match err { @@ -266,7 +275,10 @@ async fn worker_streamed( .insert_workers_running .fetch_add(1, atomic::Ordering::AcqRel); let stream = item_inp; - let stream = inspect_items(stream); + let worker_name = data_store + .as_ref() + .map_or_else(|| format!("dummy"), |x| x.rett.debug_tag().to_string()); + let stream = inspect_items(stream, worker_name.clone()); if let Some(data_store) = data_store { let stream = transform_to_db_futures(stream, data_store, stats.clone()); let stream = stream @@ -351,7 +363,10 @@ where }) } -fn inspect_items(item_inp: Receiver>) -> impl Stream> { +fn inspect_items( + item_inp: Receiver>, + worker_name: String, +) -> impl Stream> { trace!("transform_to_db_futures begin"); // TODO possible without box? // let item_inp = Box::pin(item_inp); @@ -359,19 +374,19 @@ fn inspect_items(item_inp: Receiver>) -> impl Stream { - trace2!("execute ConnectionStatus {item:?}"); + trace_item_execute!("execute {worker_name} ConnectionStatus {item:?}"); } QueryItem::ChannelStatus(_) => { - trace2!("execute ChannelStatus {item:?}"); + trace_item_execute!("execute {worker_name} ChannelStatus {item:?}"); } QueryItem::Insert(item) => { - trace3!("execute Insert {}", item.string_short()); + trace_item_execute!("execute {worker_name} Insert {}", item.string_short()); } QueryItem::TimeBinSimpleF32(_) => { - trace2!("execute TimeBinSimpleF32"); + trace_item_execute!("execute {worker_name} TimeBinSimpleF32"); } QueryItem::Accounting(_) => { - trace2!("execute Accounting {item:?}"); + trace_item_execute!("execute {worker_name} Accounting {item:?}"); } } } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 92e3663..78c7802 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -7,6 +7,8 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use futures_util::FutureExt; +#[allow(unused)] +use netpod::log::*; use netpod::DtNano; use netpod::ScalarType; use netpod::Shape; @@ -43,7 +45,7 @@ pub enum Error { GetValHelpInnerTypeMismatch, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum ScalarValue { I8(i8), I16(i16), @@ -86,7 +88,7 @@ impl ScalarValue { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum ArrayValue { I8(Vec), I16(Vec), @@ -207,7 +209,7 @@ impl ArrayValue { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum DataValue { Scalar(ScalarValue), Array(ArrayValue), @@ -620,6 +622,14 @@ impl InsertFut { // let fut = StackFuture::from(fut); Self { scy, qu, fut } } + + pub fn dummy(scy: Arc, qu: Arc) -> Self { + Self { + scy, + qu, + fut: Box::pin(async { Err(QueryError::InvalidMessage("no longer used".into())) }), + } + } } impl Future for InsertFut { @@ -834,6 +844,16 @@ pub fn insert_item_fut( } } +#[cfg(DISABLED)] +pub fn insert_connection_status_fut( + item: ConnectionStatusItem, + data_store: &DataStore, + stats: Arc, +) -> InsertFut { + warn!("separate connection status table no longer used"); + InsertFut::dummy(data_store.scy.clone(), data_store.qu_dummy.clone()) +} + pub fn insert_connection_status_fut( item: ConnectionStatusItem, data_store: &DataStore, @@ -855,6 +875,16 @@ pub fn insert_connection_status_fut( ) } +#[cfg(DISABLED)] +pub fn insert_channel_status_fut( + item: ChannelStatusItem, + data_store: &DataStore, + stats: Arc, +) -> SmallVec<[InsertFut; 4]> { + warn!("separate channel status table no longer used"); + SmallVec::new() +} + pub fn insert_channel_status_fut( item: ChannelStatusItem, data_store: &DataStore, @@ -884,6 +914,12 @@ pub fn insert_channel_status_fut( smallvec![fut1, fut2] } +#[cfg(DISABLED)] +pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &DataStore) -> Result<(), Error> { + warn!("separate connection status table no longer used"); + Ok(()) +} + pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &DataStore) -> Result<(), Error> { let ts = TsMs::from_system_time(item.ts); let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); @@ -897,6 +933,12 @@ pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &D Ok(()) } +#[cfg(DISABLED)] +pub async fn insert_channel_status(item: ChannelStatusItem, data_store: &DataStore) -> Result<(), Error> { + warn!("separate channel status table no longer used"); + Ok(()) +} + pub async fn insert_channel_status(item: ChannelStatusItem, data_store: &DataStore) -> Result<(), Error> { let ts = TsMs::from_system_time(item.ts); let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 41c355d..2b7a9d6 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -80,26 +80,6 @@ pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result Result<(), Error> { - use std::fmt::Write; - // seconds: - let default_time_to_live = 60 * 60 * 5; - // hours: - let twcs_window_index = 24 * 4; - let mut s = String::new(); - s.write_str("create table ")?; - s.write_str(table_name)?; - s.write_str(" (series bigint, ts_msp bigint, primary key (series, ts_msp))")?; - write!(s, " with default_time_to_live = {}", default_time_to_live)?; - s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy'")?; - s.write_str(", 'compaction_window_unit': 'HOURS'")?; - write!(s, ", 'compaction_window_size': {}", twcs_window_index)?; - s.write_str(" }")?; - eprintln!("create table cql {s}"); - scy.query(s, ()).await?; - Ok(()) -} - #[allow(unused)] fn dhours(x: u64) -> Duration { Duration::from_secs(60 * 60 * x) diff --git a/scywr/src/store.rs b/scywr/src/store.rs index fe16e28..53dc56b 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -40,6 +40,7 @@ pub struct DataStore { pub qu_insert_channel_status_by_ts_msp: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, pub qu_account_00: Arc, + pub qu_dummy: Arc, } macro_rules! prep_qu_ins_a { @@ -57,6 +58,35 @@ macro_rules! prep_qu_ins_a { }}; } +macro_rules! prep_qu_ins_b { + ($id1:expr, $rett:expr, $scy:expr) => {{ + let cql = format!( + concat!( + "insert into {}{} (series, ts_msp, ts_lsp, pulse, valueblob)", + " values (?, ?, ?, ?, ?)" + ), + $rett.table_prefix(), + $id1 + ); + let q = $scy.prepare(cql).await?; + Arc::new(q) + }}; +} + +macro_rules! prep_qu_ins_c { + ($id1:expr, $fields:expr, $values:expr, $rett:expr, $scy:expr) => {{ + let cql = format!( + concat!("insert into {}{} ({})", " values ({})"), + $rett.table_prefix(), + $id1, + $fields, + $values, + ); + let q = $scy.prepare(cql).await?; + Arc::new(q) + }}; +} + impl DataStore { pub async fn new(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result { let scy = create_session(scyconf).await.map_err(|_| Error::NewSession)?; @@ -80,62 +110,63 @@ impl DataStore { let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy); // array - let cql = "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_array_i8 = Arc::new(q); - - let cql = "insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_array_i16 = Arc::new(q); - - let cql = "insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_array_i32 = Arc::new(q); - - let cql = "insert into events_array_i64 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_array_i64 = Arc::new(q); - - let cql = "insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_array_f32 = Arc::new(q); - - let cql = "insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_array_f64 = Arc::new(q); - - let cql = "insert into events_array_bool (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_array_bool = Arc::new(q); + let qu_insert_array_i8 = prep_qu_ins_b!("events_array_i8", rett, scy); + let qu_insert_array_i16 = prep_qu_ins_b!("events_array_i16", rett, scy); + let qu_insert_array_i32 = prep_qu_ins_b!("events_array_i32", rett, scy); + let qu_insert_array_i64 = prep_qu_ins_b!("events_array_i64", rett, scy); + let qu_insert_array_f32 = prep_qu_ins_b!("events_array_f32", rett, scy); + let qu_insert_array_f64 = prep_qu_ins_b!("events_array_f64", rett, scy); + let qu_insert_array_bool = prep_qu_ins_b!("events_array_bool", rett, scy); // Connection status: - let cql = "insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_connection_status = Arc::new(q); - - let cql = "insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_channel_status = Arc::new(q); - - let cql = "insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_channel_status_by_ts_msp = Arc::new(q); - - let cql = concat!( - "insert into binned_scalar_f32 (", - "series, bin_len_ms, ts_msp, off, count, min, max, avg)", - " values (?, ?, ?, ?, ?, ?, ?, ?)" + let qu_insert_connection_status = prep_qu_ins_c!( + "connection_status", + "ts_msp, ts_lsp, kind, addr", + "?, ?, ?, ?", + rett, + scy ); - let q = scy.prepare(cql).await?; - let qu_insert_binned_scalar_f32_v02 = Arc::new(q); - let cql = concat!( - "insert into account_00", - " (part, ts, series, count, bytes)", - " values (?, ?, ?, ?, ?)" + let qu_insert_channel_status = prep_qu_ins_c!( + "channel_status", + "series, ts_msp, ts_lsp, kind", + "?, ?, ?, ?", + rett, + scy ); - let q = scy.prepare(cql).await?; - let qu_account_00 = Arc::new(q); + + let qu_insert_channel_status_by_ts_msp = prep_qu_ins_c!( + "channel_status_by_ts_msp", + "ts_msp, ts_lsp, series, kind", + "?, ?, ?, ?", + rett, + scy + ); + + let qu_insert_binned_scalar_f32_v02 = prep_qu_ins_c!( + "binned_scalar_f32", + "series, bin_len_ms, ts_msp, off, count, min, max, avg", + "?, ?, ?, ?, ?, ?, ?, ?", + rett, + scy + ); + + let qu_account_00 = prep_qu_ins_c!( + "account_00", + "part, ts, series, count, bytes", + "?, ?, ?, ?, ?", + rett, + scy + ); + + let q = scy + .prepare(format!( + concat!("select * from {}{} limit 1"), + rett.table_prefix(), + "ts_msp" + )) + .await?; + let qu_dummy = Arc::new(q); let ret = Self { rett, @@ -161,6 +192,7 @@ impl DataStore { qu_insert_channel_status_by_ts_msp, qu_insert_binned_scalar_f32_v02, qu_account_00, + qu_dummy, }; Ok(ret) } diff --git a/serieswriter/src/establish_worker.rs b/serieswriter/src/establish_worker.rs new file mode 100644 index 0000000..a2f8489 --- /dev/null +++ b/serieswriter/src/establish_worker.rs @@ -0,0 +1,193 @@ +use crate::rtwriter::MinQuiets; +use crate::rtwriter::RtWriter; +use crate::writer::SeriesWriter; +use async_channel::Receiver; +use async_channel::Sender; +use dbpg::seriesbychannel::ChannelInfoQuery; +use err::thiserror; +use err::ThisError; +use futures_util::future; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::timeunits::HOUR; +use netpod::timeunits::SEC; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; +use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::DataValue; +use series::ChannelStatusSeriesId; +use series::SeriesId; +use stats::SeriesWriterEstablishStats; +use std::sync::atomic; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use std::time::Duration; +use std::time::SystemTime; + +#[derive(Debug, ThisError)] +pub enum Error { + Postgres(#[from] dbpg::err::Error), + PostgresSchema(#[from] dbpg::schema::Error), + ScyllaSession(#[from] scywr::session::Error), + ScyllaSchema(#[from] scywr::schema::Error), + SeriesWriter(#[from] crate::writer::Error), + SeriesByChannel(#[from] dbpg::seriesbychannel::Error), +} + +pub struct JobId(pub u64); + +pub struct EstablishWriterWorker { + worker_tx: Sender, + jobrx: Receiver, + stats: Arc, +} + +impl EstablishWriterWorker { + fn new( + worker_tx: Sender, + jobrx: Receiver, + stats: Arc, + ) -> Self { + Self { + worker_tx, + jobrx, + stats, + } + } + + async fn work(self) { + let cnt = Arc::new(AtomicU64::new(0)); + taskrun::spawn({ + let cnt = cnt.clone(); + async move { + if true { + return Ok::<_, Error>(()); + } + loop { + taskrun::tokio::time::sleep(Duration::from_millis(10000)).await; + debug!("EstablishWriterWorker cnt {}", cnt.load(atomic::Ordering::SeqCst)); + } + Ok::<_, Error>(()) + } + }); + self.jobrx + .map(move |item| { + let wtx = self.worker_tx.clone(); + let cnt = cnt.clone(); + let stats = self.stats.clone(); + async move { + let res = RtWriter::new( + wtx.clone(), + item.cssid, + item.backend, + item.channel, + item.scalar_type, + item.shape, + item.min_quiets, + item.tsnow, + ) + .await; + cnt.fetch_add(1, atomic::Ordering::SeqCst); + if item.restx.send((item.job_id, res)).await.is_err() { + stats.result_send_fail().inc(); + trace!("can not send writer establish result"); + } + } + }) + .buffer_unordered(512) + .for_each(|_| future::ready(())) + .await; + } +} + +pub struct EstablishWorkerJob { + job_id: JobId, + backend: String, + channel: String, + cssid: ChannelStatusSeriesId, + scalar_type: ScalarType, + shape: Shape, + min_quiets: MinQuiets, + restx: Sender<(JobId, Result)>, + tsnow: SystemTime, +} + +impl EstablishWorkerJob { + pub fn new( + job_id: JobId, + backend: String, + channel: String, + cssid: ChannelStatusSeriesId, + scalar_type: ScalarType, + shape: Shape, + min_quiets: MinQuiets, + restx: Sender<(JobId, Result)>, + tsnow: SystemTime, + ) -> Self { + Self { + job_id, + backend, + channel, + cssid, + scalar_type, + shape, + min_quiets, + restx, + tsnow, + } + } +} + +pub fn start_writer_establish_worker( + worker_tx: Sender, + stats: Arc, +) -> Result<(Sender,), Error> { + let (tx, rx) = async_channel::bounded(256); + let worker = EstablishWriterWorker::new(worker_tx, rx, stats); + taskrun::spawn(worker.work()); + Ok((tx,)) +} + +#[test] +fn write_00() { + use netpod::Database; + use scywr::config::ScyllaIngestConfig; + use stats::SeriesByChannelStats; + use std::sync::Arc; + let fut = async { + let dbconf = &Database { + name: "daqbuffer".into(), + host: "localhost".into(), + port: 5432, + user: "daqbuffer".into(), + pass: "daqbuffer".into(), + }; + let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00_rf3", "daqingest_test_00_rf1"); + let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?; + dbpg::schema::schema_check(&pgc).await?; + scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?; + let scy = scywr::session::create_session(scyconf).await?; + let stats = SeriesByChannelStats::new(); + let stats = Arc::new(stats); + let (tx, jhs, jh) = + dbpg::seriesbychannel::start_lookup_workers::(1, dbconf, stats) + .await?; + let backend = "bck-test-00"; + let channel = "chn-test-00"; + let scalar_type = ScalarType::I16; + let shape = Shape::Scalar; + let tsnow = SystemTime::now(); + let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?; + eprintln!("{writer:?}"); + let mut iqdqs = InsertDeques::new(); + for i in 0..10 { + let ts = TsNano::from_ns(HOUR * 24 + SEC * i); + let ts_local = ts.clone(); + let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _)); + writer.write(ts, ts_local, val, &mut iqdqs.st_rf3_rx)?; + } + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index c91fd88..eac21c1 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -1,3 +1,5 @@ +pub mod establish_worker; pub mod patchcollect; +pub mod rtwriter; pub mod timebin; pub mod writer; diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs new file mode 100644 index 0000000..56f382a --- /dev/null +++ b/serieswriter/src/rtwriter.rs @@ -0,0 +1,186 @@ +use crate::writer::SeriesWriter; +use async_channel::Sender; +use dbpg::seriesbychannel::ChannelInfoQuery; +use err::thiserror; +use err::ThisError; +use netpod::ScalarType; +use netpod::SeriesKind; +use netpod::Shape; +use netpod::TsNano; +use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::DataValue; +use series::ChannelStatusSeriesId; +use series::SeriesId; +use std::time::Duration; +use std::time::SystemTime; + +#[derive(Debug, ThisError)] +pub enum Error { + SeriesLookupError, + SeriesWriter(#[from] crate::writer::Error), +} + +#[derive(Debug)] +pub struct MinQuiets { + pub st: Duration, + pub mt: Duration, + pub lt: Duration, +} + +#[derive(Debug)] +pub struct RtWriter { + sid: SeriesId, + scalar_type: ScalarType, + shape: Shape, + state_st: State, + state_mt: State, + state_lt: State, + min_quiets: MinQuiets, +} + +impl RtWriter { + pub async fn new( + channel_info_tx: Sender, + cssid: ChannelStatusSeriesId, + backend: String, + channel: String, + scalar_type: ScalarType, + shape: Shape, + min_quiets: MinQuiets, + stnow: SystemTime, + ) -> Result { + let sid = { + let (tx, rx) = async_channel::bounded(1); + let item = ChannelInfoQuery { + backend, + channel, + kind: SeriesKind::ChannelData, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + tx: Box::pin(tx), + }; + channel_info_tx.send(item).await.map_err(|_| Error::SeriesLookupError)?; + let res = rx + .recv() + .await + .map_err(|_| Error::SeriesLookupError)? + .map_err(|_| Error::SeriesLookupError)?; + res.series.to_series() + }; + let state_st = { + let writer = + SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?; + State { writer, last_ins: None } + }; + let state_mt = { + let writer = + SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?; + State { writer, last_ins: None } + }; + let state_lt = { + let writer = + SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?; + State { writer, last_ins: None } + }; + let ret = Self { + sid, + scalar_type, + shape, + state_st, + state_mt, + state_lt, + min_quiets, + }; + Ok(ret) + } + + pub fn sid(&self) -> SeriesId { + self.sid.clone() + } + + pub fn scalar_type(&self) -> ScalarType { + self.scalar_type.clone() + } + + pub fn shape(&self) -> Shape { + self.shape.clone() + } + + pub fn write( + &mut self, + ts: TsNano, + ts_local: TsNano, + val: DataValue, + iqdqs: &mut InsertDeques, + ) -> Result<(), Error> { + // Decide whether we want to write. + { + let min_quiet = self.min_quiets.st; + let deque = &mut iqdqs.st_rf3_rx; + if self.state_st.last_ins.as_ref().map_or(true, |x| { + if x.0 >= ts_local { + // bad clock, ignore. + // TODO count in stats. + false + } else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() { + false + } else { + val != x.1 + } + }) { + self.state_st.last_ins = Some((ts, val.clone())); + self.state_st.writer.write(ts, ts_local, val.clone(), deque)?; + } + } + { + let min_quiet = self.min_quiets.mt; + let deque = &mut iqdqs.mt_rf3_rx; + if self.state_mt.last_ins.as_ref().map_or(true, |x| { + if x.0 >= ts_local { + // bad clock, ignore. + // TODO count in stats. + false + } else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() { + false + } else { + val != x.1 + } + }) { + self.state_mt.last_ins = Some((ts, val.clone())); + self.state_mt.writer.write(ts, ts_local, val.clone(), deque)?; + } + } + { + let min_quiet = self.min_quiets.lt; + let deque = &mut iqdqs.lt_rf3_rx; + if self.state_lt.last_ins.as_ref().map_or(true, |x| { + if x.0 >= ts_local { + // bad clock, ignore. + // TODO count in stats. + false + } else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() { + false + } else { + val != x.1 + } + }) { + self.state_lt.last_ins = Some((ts, val.clone())); + self.state_lt.writer.write(ts, ts_local, val.clone(), deque)?; + } + } + Ok(()) + } + + pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { + self.state_st.writer.tick(&mut iqdqs.st_rf3_rx)?; + self.state_mt.writer.tick(&mut iqdqs.mt_rf3_rx)?; + self.state_lt.writer.tick(&mut iqdqs.lt_rf3_rx)?; + Ok(()) + } +} + +#[derive(Debug)] +struct State { + writer: SeriesWriter, + last_ins: Option<(TsNano, DataValue)>, +} diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index 722ba52..4754111 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -388,8 +388,8 @@ fn store_bins( // TODO check which RT we want to push into iqdqs.st_rf3_rx.push_back(item.clone()); - iqdqs.mt_rf3_rx.push_back(item.clone()); - iqdqs.lt_rf3_rx.push_back(item); + // iqdqs.mt_rf3_rx.push_back(item.clone()); + // iqdqs.lt_rf3_rx.push_back(item); } } Ok(()) diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index eb5e052..c62bde9 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -1,12 +1,8 @@ use crate::timebin::ConnTimeBin; -use async_channel::Receiver; use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; -use futures_util::future; -use futures_util::StreamExt; -use log::*; use netpod::timeunits::HOUR; use netpod::timeunits::SEC; use netpod::ScalarType; @@ -19,12 +15,7 @@ use scywr::iteminsertqueue::InsertItem; use scywr::iteminsertqueue::QueryItem; use series::ChannelStatusSeriesId; use series::SeriesId; -use stats::SeriesWriterEstablishStats; use std::collections::VecDeque; -use std::sync::atomic; -use std::sync::atomic::AtomicU64; -use std::sync::Arc; -use std::time::Duration; use std::time::SystemTime; #[derive(Debug, ThisError)] @@ -42,12 +33,13 @@ pub enum Error { } impl From> for Error { - fn from(value: async_channel::SendError) -> Self { + fn from(_value: async_channel::SendError) -> Self { Error::ChannelSendError } } + impl From for Error { - fn from(value: async_channel::RecvError) -> Self { + fn from(_value: async_channel::RecvError) -> Self { Error::ChannelRecvError } } @@ -65,8 +57,7 @@ pub struct SeriesWriter { msp_max_bytes: u32, // TODO this should be in an Option: ts_msp_grid_last: u32, - binner: ConnTimeBin, - written_last: Option, + binner: Option, } impl SeriesWriter { @@ -94,13 +85,13 @@ impl SeriesWriter { } pub async fn establish_with_cssid( - worker_tx: Sender, + channel_info_tx: Sender, cssid: ChannelStatusSeriesId, backend: String, channel: String, scalar_type: ScalarType, shape: Shape, - tsnow: SystemTime, + stnow: SystemTime, ) -> Result { let (tx, rx) = async_channel::bounded(1); let item = ChannelInfoQuery { @@ -111,11 +102,23 @@ impl SeriesWriter { shape: shape.clone(), tx: Box::pin(tx), }; - worker_tx.send(item).await?; + channel_info_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; let sid = res.series.to_series(); + Self::establish_with_cssid_sid(cssid, sid, scalar_type, shape, stnow).await + } + + pub async fn establish_with_cssid_sid( + cssid: ChannelStatusSeriesId, + sid: SeriesId, + scalar_type: ScalarType, + shape: Shape, + stnow: SystemTime, + ) -> Result { let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10)); - binner.setup_for(&scalar_type, &shape, tsnow)?; + binner.setup_for(&scalar_type, &shape, stnow)?; + let _ = binner; + let binner = None; let res = Self { cssid, sid, @@ -128,7 +131,6 @@ impl SeriesWriter { msp_max_bytes: 1024 * 1024 * 20, ts_msp_grid_last: 0, binner, - written_last: None, }; Ok(res) } @@ -150,10 +152,12 @@ impl SeriesWriter { ts: TsNano, ts_local: TsNano, val: DataValue, - iqdqs: &mut InsertDeques, + deque: &mut VecDeque, ) -> Result<(), Error> { // TODO compute the binned data here as well and flush completed bins if needed. - self.binner.push(ts.clone(), &val)?; + if let Some(binner) = self.binner.as_mut() { + binner.push(ts.clone(), &val)?; + } // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. @@ -203,161 +207,15 @@ impl SeriesWriter { ts_local: ts_local.to_ts_ms(), }; // TODO decide on the path in the new deques struct - iqdqs.st_rf3_rx.push_back(QueryItem::Insert(item)); + deque.push_back(QueryItem::Insert(item)); Ok(()) } - pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - self.binner.tick(iqdqs)?; + pub fn tick(&mut self, deque: &mut VecDeque) -> Result<(), Error> { + if let Some(binner) = self.binner.as_mut() { + // TODO + //binner.tick(deque)?; + } Ok(()) } } - -pub struct JobId(pub u64); - -pub struct EstablishWriterWorker { - worker_tx: Sender, - jobrx: Receiver, - stats: Arc, -} - -impl EstablishWriterWorker { - fn new( - worker_tx: Sender, - jobrx: Receiver, - stats: Arc, - ) -> Self { - Self { - worker_tx, - jobrx, - stats, - } - } - - async fn work(self) { - let cnt = Arc::new(AtomicU64::new(0)); - taskrun::spawn({ - let cnt = cnt.clone(); - async move { - if true { - return Ok::<_, Error>(()); - } - loop { - taskrun::tokio::time::sleep(Duration::from_millis(10000)).await; - debug!("EstablishWriterWorker cnt {}", cnt.load(atomic::Ordering::SeqCst)); - } - Ok::<_, Error>(()) - } - }); - self.jobrx - .map(move |item| { - let wtx = self.worker_tx.clone(); - let cnt = cnt.clone(); - let stats = self.stats.clone(); - async move { - let res = SeriesWriter::establish( - wtx.clone(), - item.backend, - item.channel, - item.scalar_type, - item.shape, - item.tsnow, - ) - .await; - cnt.fetch_add(1, atomic::Ordering::SeqCst); - if item.restx.send((item.job_id, res)).await.is_err() { - stats.result_send_fail().inc(); - trace!("can not send writer establish result"); - } - } - }) - .buffer_unordered(512) - .for_each(|_| future::ready(())) - .await; - } -} - -pub struct EstablishWorkerJob { - job_id: JobId, - backend: String, - channel: String, - scalar_type: ScalarType, - shape: Shape, - restx: Sender<(JobId, Result)>, - tsnow: SystemTime, -} - -impl EstablishWorkerJob { - pub fn new( - job_id: JobId, - backend: String, - channel: String, - scalar_type: ScalarType, - shape: Shape, - restx: Sender<(JobId, Result)>, - tsnow: SystemTime, - ) -> Self { - Self { - job_id, - backend, - channel, - scalar_type, - shape, - restx, - tsnow, - } - } -} - -pub fn start_writer_establish_worker( - worker_tx: Sender, - stats: Arc, -) -> Result<(Sender,), Error> { - let (tx, rx) = async_channel::bounded(256); - let worker = EstablishWriterWorker::new(worker_tx, rx, stats); - taskrun::spawn(worker.work()); - Ok((tx,)) -} - -#[test] -fn write_00() { - use netpod::Database; - use scywr::config::ScyllaIngestConfig; - use stats::SeriesByChannelStats; - use std::sync::Arc; - let fut = async { - let dbconf = &Database { - name: "daqbuffer".into(), - host: "localhost".into(), - port: 5432, - user: "daqbuffer".into(), - pass: "daqbuffer".into(), - }; - let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00_rf3", "daqingest_test_00_rf1"); - let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?; - dbpg::schema::schema_check(&pgc).await?; - scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?; - let scy = scywr::session::create_session(scyconf).await?; - let stats = SeriesByChannelStats::new(); - let stats = Arc::new(stats); - let (tx, jhs, jh) = - dbpg::seriesbychannel::start_lookup_workers::(1, dbconf, stats) - .await?; - let backend = "bck-test-00"; - let channel = "chn-test-00"; - let scalar_type = ScalarType::I16; - let shape = Shape::Scalar; - let tsnow = SystemTime::now(); - let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?; - eprintln!("{writer:?}"); - let mut iqdqs = InsertDeques::new(); - for i in 0..10 { - let ts = TsNano::from_ns(HOUR * 24 + SEC * i); - let ts_local = ts.clone(); - let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _)); - writer.write(ts, ts_local, val, &mut iqdqs)?; - } - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 0b54a16..66a7c17 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -347,6 +347,16 @@ stats_proc::stats_struct!(( transition_to_polling_already_in, transition_to_polling_bad_state, channel_add_exists, + recv_event_add_while_wait_on_read_notify, + recv_read_notify_state_passive_found_ioid, + recv_read_notify_state_passive, + recv_read_notify_state_read_pending_bad_ioid, + recv_read_notify_state_read_pending, + recv_read_notify_but_not_init_yet, + recv_read_notify_but_no_longer_ready, + recv_read_notify_while_enabling_monitoring, + recv_read_notify_while_polling_idle, + channel_not_alive_no_activity, ), values(inter_ivl_ema, read_ioids_len, proto_out_len,), histolog2s(