diff --git a/batchtools/src/batcher.rs b/batchtools/src/batcher.rs index 545b901..5ca0d90 100644 --- a/batchtools/src/batcher.rs +++ b/batchtools/src/batcher.rs @@ -44,7 +44,7 @@ async fn run_batcher(rx: Receiver, batch_tx: Sender>, batch_limit: do_emit = true; } } - Err(e) => { + Err(_e) => { break; } }, diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 08123cb..3b7d7ba 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -19,6 +19,8 @@ use netfetch::throttletrace::ThrottleTrace; use netpod::ttl::RetentionTime; use netpod::Database; use scywr::config::ScyllaIngestConfig; +use scywr::insertqueues::InsertQueuesRx; +use scywr::insertqueues::InsertQueuesTx; use scywr::insertworker::InsertWorkerOpts; use scywr::iteminsertqueue as scywriiq; use scywriiq::QueryItem; @@ -46,7 +48,9 @@ const RUN_WITHOUT_SCYLLA: bool = true; pub struct DaemonOpts { pgconf: Database, - scyconf: ScyllaIngestConfig, + scyconf_st: ScyllaIngestConfig, + scyconf_mt: ScyllaIngestConfig, + scyconf_lt: ScyllaIngestConfig, #[allow(unused)] test_bsread_addr: Option, insert_frac: Arc, @@ -97,51 +101,14 @@ impl Daemon { .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); - let query_item_tx_weak = query_item_tx.downgrade(); - let insert_queue_counter = Arc::new(AtomicUsize::new(0)); - // Insert queue hook - // let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); - let wrest_stats = Arc::new(SeriesWriterEstablishStats::new()); let (writer_establis_tx,) = serieswriter::writer::start_writer_establish_worker(channel_info_query_tx.clone(), wrest_stats.clone()) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let local_epics_hostname = ingest_linux::net::local_hostname(); - let conn_set_ctrl = CaConnSet::start( - ingest_opts.backend().into(), - local_epics_hostname, - query_item_tx, - channel_info_query_tx.clone(), - ingest_opts.clone(), - writer_establis_tx, - ); - - // TODO remove - tokio::spawn({ - let rx = conn_set_ctrl.receiver().clone(); - let tx = daemon_ev_tx.clone(); - async move { - loop { - match rx.recv().await { - Ok(item) => { - let item = DaemonEvent::CaConnSetItem(item); - if let Err(_) = tx.send(item).await { - debug!("CaConnSet to Daemon adapter: tx closed, break"); - break; - } - } - Err(_) => { - debug!("CaConnSet to Daemon adapter: rx done, break"); - break; - } - } - } - } - }); #[cfg(DISABLED)] let query_item_rx = { @@ -172,33 +139,91 @@ impl Daemon { }; let insert_worker_opts = Arc::new(insert_worker_opts); - debug!("TODO RetentionTime"); + let (iqtx, iqrx) = { + let (st_rf3_tx, st_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); + let (st_rf1_tx, st_rf1_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); + let (mt_rf3_tx, mt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); + let iqtx = InsertQueuesTx { + st_rf3_tx, + st_rf1_tx, + mt_rf3_tx, + }; + let iqrx = InsertQueuesRx { + st_rf3_rx, + st_rf1_rx, + mt_rf3_rx, + }; + (iqtx, iqrx) + }; - let rett = RetentionTime::Short; + let query_item_tx_weak = iqtx.st_rf3_tx.clone().downgrade(); - #[cfg(DISABLED)] - let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers( - rett, - opts.scyconf.clone(), - ingest_opts.insert_scylla_sessions(), - ingest_opts.insert_worker_count(), - ingest_opts.insert_worker_concurrency(), - query_item_rx, - insert_worker_opts, - insert_worker_stats.clone(), - ingest_opts.use_rate_limit_queue(), - ) - .await?; - let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( - ingest_opts.insert_worker_count(), - ingest_opts.insert_worker_concurrency(), - query_item_rx, - insert_worker_opts, - insert_worker_stats.clone(), - ) - .await?; + let conn_set_ctrl = CaConnSet::start( + ingest_opts.backend().into(), + local_epics_hostname, + iqtx, + channel_info_query_tx.clone(), + ingest_opts.clone(), + writer_establis_tx, + ); + + // TODO remove + tokio::spawn({ + let rx = conn_set_ctrl.receiver().clone(); + let tx = daemon_ev_tx.clone(); + async move { + loop { + match rx.recv().await { + Ok(item) => { + let item = DaemonEvent::CaConnSetItem(item); + if let Err(_) = tx.send(item).await { + debug!("CaConnSet to Daemon adapter: tx closed, break"); + break; + } + } + Err(_) => { + debug!("CaConnSet to Daemon adapter: rx done, break"); + break; + } + } + } + } + }); + + // let query_item_tx_weak = query_item_tx.downgrade(); + // Insert queue hook + // let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); + + let mut insert_worker_jhs = Vec::new(); + + if RUN_WITHOUT_SCYLLA { + let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + iqrx.st_rf3_rx, + insert_worker_opts, + insert_worker_stats.clone(), + ) + .await?; + insert_worker_jhs.extend(jh); + } else { + let jh = scywr::insertworker::spawn_scylla_insert_workers( + // TODO does the worker actually need RETT? Yes, to use the correct table names. + RetentionTime::Short, + opts.scyconf_st.clone(), + ingest_opts.insert_scylla_sessions(), + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + iqrx.st_rf3_rx.clone(), + insert_worker_opts, + insert_worker_stats.clone(), + ingest_opts.use_rate_limit_queue(), + ) + .await?; + insert_worker_jhs.extend(jh); + }; let stats = Arc::new(DaemonStats::new()); - stats.insert_worker_spawned().add(insert_workers_jh.len() as _); + stats.insert_worker_spawned().add(insert_worker_jhs.len() as _); #[cfg(feature = "bsread")] if let Some(bsaddr) = &opts.test_bsread_addr { @@ -248,7 +273,7 @@ impl Daemon { count_unassigned: 0, count_assigned: 0, last_status_print: SystemTime::now(), - insert_workers_jh, + insert_workers_jh: insert_worker_jhs, stats, insert_worker_stats, series_by_channel_stats, @@ -289,11 +314,14 @@ impl Daemon { async fn handle_timer_tick(&mut self) -> Result<(), Error> { if self.shutting_down { let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire); - let nitems = self - .query_item_tx_weak - .upgrade() - .map(|x| (x.sender_count(), x.receiver_count(), x.len())); - info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems); + #[cfg(DISABLED)] + { + let nitems = self + .query_item_tx_weak + .upgrade() + .map(|x| (x.sender_count(), x.receiver_count(), x.len())); + info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems); + } if nworkers == 0 { info!("goodbye"); std::process::exit(0); @@ -552,15 +580,15 @@ impl Daemon { } pub async fn daemon(mut self) -> Result<(), Error> { - { + let worker_jh = { let backend = String::new(); - let (item_tx, item_rx) = async_channel::bounded(256); + let (_item_tx, item_rx) = async_channel::bounded(256); let info_worker_tx = self.channel_info_query_tx.clone(); let iiq_tx = self.query_item_tx_weak.upgrade().unwrap(); let worker_fut = netfetch::metrics::postingest::process_api_query_items(backend, item_rx, info_worker_tx, iiq_tx); - let worker_jh = taskrun::spawn(worker_fut); - } + taskrun::spawn(worker_fut) + }; Self::spawn_ticker(self.tx.clone(), self.stats.clone()); loop { if self.shutting_down { @@ -598,12 +626,15 @@ impl Daemon { } } } - info!("Wait for metrics handler"); + info!("wait for metrics handler"); self.metrics_shutdown_tx.send(1).await?; if let Some(jh) = self.metrics_jh.take() { jh.await??; } - info!("Joined metrics handler"); + info!("joined metrics handler"); + info!("\n\n\n-----------------------\n\n\nwait for postingest task"); + worker_jh.await?.map_err(|e| Error::from_string(e))?; + info!("joined postingest task"); Ok(()) } } @@ -630,25 +661,24 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> info!("start up {opts:?}"); ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?; ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?; - - let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) - .await - .map_err(Error::from_string)?; - dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?; - drop(pg); - jh.await?.map_err(Error::from_string)?; - - if RUN_WITHOUT_SCYLLA { - } else { - scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short) + { + let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) + .await + .map_err(Error::from_string)?; + dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?; + jh.await?.map_err(Error::from_string)?; + } + if RUN_WITHOUT_SCYLLA { + } else { + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short) + .await + .map_err(Error::from_string)?; + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium) + .await + .map_err(Error::from_string)?; + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long) .await .map_err(Error::from_string)?; - - if let Some(scyconf) = opts.scylla_config_lt() { - scywr::schema::migrate_scylla_data_schema(scyconf, RetentionTime::Long) - .await - .map_err(Error::from_string)?; - } } info!("database check done"); @@ -668,7 +698,9 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> let opts2 = DaemonOpts { pgconf: opts.postgresql_config().clone(), - scyconf: opts.scylla_config().clone(), + scyconf_st: opts.scylla_config_st().clone(), + scyconf_mt: opts.scylla_config_mt().clone(), + scyconf_lt: opts.scylla_config_lt().clone(), test_bsread_addr: opts.test_bsread_addr.clone(), insert_frac: insert_frac.clone(), store_workers_rate, diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index d871850..f5ae990 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -1,21 +1,42 @@ +use async_channel::Sender; use bytes::Buf; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use log::*; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::ScalarValue; +use serieswriter::writer::SeriesWriter; +use std::collections::VecDeque; use std::io::Cursor; use std::net::Ipv4Addr; +use std::time::SystemTime; use taskrun::tokio::net::UdpSocket; #[derive(Debug, ThisError)] pub enum Error { Io(#[from] std::io::Error), + SeriesWriter(#[from] serieswriter::writer::Error), } -pub async fn listen_beacons(mut cancel: taskrun::tokio::sync::mpsc::Receiver) -> Result<(), Error> { +pub async fn listen_beacons( + mut cancel: taskrun::tokio::sync::mpsc::Receiver, + worker_tx: Sender, + backend: String, +) -> Result<(), Error> { + let stnow = SystemTime::now(); + let channel = "epics-ca-beacons".to_string(); + let scalar_type = ScalarType::U64; + let shape = Shape::Scalar; + let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?; let sock = UdpSocket::bind("0.0.0.0:5065").await?; sock.set_broadcast(true).unwrap(); let mut buf = Vec::new(); buf.resize(1024 * 4, 0); + let mut item_qu = VecDeque::new(); loop { let bb = &mut buf; let (n, remote) = taskrun::tokio::select! { @@ -34,12 +55,23 @@ pub async fn listen_beacons(mut cancel: taskrun::tokio::sync::mpsc::Receiver { - if true { + if false { trace!($($arg)*); } }; } +#[allow(unused)] +macro_rules! trace_flush_queue { + ($($arg:tt)*) => { + if false { + trace3!($($arg)*); + } + }; +} + #[derive(Debug, ThisError)] pub enum Error { NoProtocol, @@ -1932,7 +1941,7 @@ impl CaConn { CaMsgTy::SearchRes(k) => { let a = k.addr.to_be_bytes(); let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); - trace!("Search result indicates server address: {addr}"); + trace!("search result indicates server address: {addr}"); // TODO count this unexpected case. } CaMsgTy::CreateChanRes(k) => { @@ -1940,12 +1949,12 @@ impl CaConn { cx.waker().wake_by_ref(); } CaMsgTy::EventAddRes(ev) => { - trace2!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count); + trace4!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count); self.stats.event_add_res_recv.inc(); Self::handle_event_add_res(self, ev, tsnow)? } CaMsgTy::EventAddResEmpty(ev) => { - trace2!("got EventAddResEmpty {:?}", camsg.ts); + trace4!("got EventAddResEmpty {:?}", camsg.ts); Self::handle_event_add_res_empty(self, ev, tsnow)? } CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow)?, @@ -1957,7 +1966,7 @@ impl CaConn { self.stats.pong_recv_lat().ingest(dt); } else { let addr = &self.remote_addr_dbg; - warn!("Received Echo even though we didn't asked for it {addr:?}"); + warn!("received Echo even though we didn't asked for it {addr:?}"); } self.ioc_ping_last = tsnow; self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng); @@ -2144,7 +2153,7 @@ impl CaConn { self.backoff_reset(); let proto = CaProto::new( tcp, - self.remote_addr_dbg.clone(), + self.remote_addr_dbg.to_string(), self.opts.array_truncate, self.ca_proto_stats.clone(), ); @@ -2408,7 +2417,7 @@ impl CaConn { { use Poll::*; if qu.len() != 0 { - trace3!("attempt_flush_queue id {:7} len {}", id, qu.len()); + trace_flush_queue!("attempt_flush_queue id {:7} len {}", id, qu.len()); } let mut have_progress = false; let mut i = 0; @@ -2431,7 +2440,7 @@ impl CaConn { if sp.is_sending() { match sp.poll_unpin(cx) { Ready(Ok(())) => { - trace3!("attempt_flush_queue id {:7} send done", id); + trace_flush_queue!("attempt_flush_queue id {:7} send done", id); have_progress = true; } Ready(Err(e)) => { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index adb845c..c470587 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -63,6 +63,7 @@ use std::net::SocketAddrV4; use std::pin::Pin; use netpod::OnDrop; +use scywr::insertqueues::InsertQueuesTx; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -367,7 +368,7 @@ pub struct CaConnSet { find_ioc_query_queue: VecDeque, find_ioc_query_sender: Pin>>, find_ioc_res_rx: Pin>>>, - storage_insert_tx: Pin>>>, + iqtx: Pin>, storage_insert_queue: VecDeque>, storage_insert_sender: Pin>>>, ca_conn_res_tx: Pin>>, @@ -398,7 +399,7 @@ impl CaConnSet { pub fn start( backend: String, local_epics_hostname: String, - storage_insert_tx: Sender>, + iqtx: InsertQueuesTx, channel_info_query_tx: Sender, ingest_opts: CaIngestOpts, establish_worker_tx: async_channel::Sender, @@ -435,9 +436,12 @@ impl CaConnSet { find_ioc_query_queue: VecDeque::new(), find_ioc_query_sender: Box::pin(SenderPolling::new(find_ioc_query_tx)), find_ioc_res_rx: Box::pin(find_ioc_res_rx), - storage_insert_tx: Box::pin(storage_insert_tx.clone()), + iqtx: Box::pin(iqtx.clone()), storage_insert_queue: VecDeque::new(), - storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)), + + // TODO simplify for all combinations + storage_insert_sender: Box::pin(SenderPolling::new(iqtx.st_rf3_tx.clone())), + ca_conn_res_tx: Box::pin(ca_conn_res_tx), ca_conn_res_rx: Box::pin(ca_conn_res_rx), shutdown_stopping: false, @@ -480,13 +484,17 @@ impl CaConnSet { async fn run(mut this: CaConnSet) -> Result<(), Error> { trace!("CaConnSet run begin"); let (beacons_cancel_guard_tx, rx) = taskrun::tokio::sync::mpsc::channel(12); - let beacons_jh = tokio::spawn(async move { - if false { - crate::ca::beacons::listen_beacons(rx).await - } else { - Ok(()) - } - }); + let beacons_jh = { + let tx2 = this.channel_info_query_tx.clone().unwrap(); + let backend = this.backend.clone(); + tokio::spawn(async move { + if false { + crate::ca::beacons::listen_beacons(rx, tx2, backend).await + } else { + Ok(()) + } + }) + }; let _g_beacon = OnDrop::new(move || {}); loop { let x = this.next().await; @@ -1039,7 +1047,7 @@ impl CaConnSet { add.backend.clone(), addr_v4, self.local_epics_hostname.clone(), - self.storage_insert_tx.as_ref().get_ref().clone(), + self.iqtx.st_rf3_tx.clone(), self.channel_info_query_tx .clone() .ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?, @@ -1050,8 +1058,7 @@ impl CaConnSet { let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); let tx1 = self.ca_conn_res_tx.as_ref().get_ref().clone(); - let tx2 = self.storage_insert_tx.as_ref().get_ref().clone(); - let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr, self.stats.clone())); + let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, addr, self.stats.clone())); let ca_conn_res = CaConnRes { state: CaConnState::new(CaConnStateValue::Fresh), sender: Box::pin(conn_tx.into()), @@ -1065,7 +1072,6 @@ impl CaConnSet { async fn ca_conn_item_merge( conn: CaConn, tx1: Sender<(SocketAddr, CaConnEvent)>, - _tx2: Sender>, addr: SocketAddr, stats: Arc, ) -> Result<(), Error> { @@ -1546,7 +1552,8 @@ impl Stream for CaConnSet { trace4!("CaConnSet poll loop"); self.stats.poll_loop_begin().inc(); - self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _); + // TODO generalize to all combinations + self.stats.storage_insert_tx_len.set(self.iqtx.st_rf3_tx.len() as _); self.stats .storage_insert_queue_len .set(self.storage_insert_queue.len() as _); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 9b918f3..c177c7e 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -8,7 +8,6 @@ use slidebuf::SlideBuf; use stats::CaProtoStats; use std::collections::VecDeque; use std::io; -use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -1045,7 +1044,7 @@ impl CaState { pub struct CaProto { tcp: TcpStream, tcp_eof: bool, - remote_addr_dbg: SocketAddrV4, + remote_name: String, state: CaState, buf: SlideBuf, outbuf: SlideBuf, @@ -1058,11 +1057,11 @@ pub struct CaProto { } impl CaProto { - pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize, stats: Arc) -> Self { + pub fn new(tcp: TcpStream, remote_name: String, array_truncate: usize, stats: Arc) -> Self { Self { tcp, tcp_eof: false, - remote_addr_dbg, + remote_name, state: CaState::StdHead, buf: SlideBuf::new(PROTO_INPUT_BUF_CAP as usize), outbuf: SlideBuf::new(1024 * 256), @@ -1186,7 +1185,7 @@ impl CaProto { debug!( "peer done {:?} {:?} {:?}", self.tcp.peer_addr(), - self.remote_addr_dbg, + self.remote_name, self.state ); self.tcp_eof = true; diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index c9e9f91..f6876d4 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -27,11 +27,9 @@ pub struct CaIngestOpts { #[serde(default, with = "humantime_serde")] timeout: Option, postgresql: Database, - scylla: ScyllaIngestConfig, - #[serde(default)] - scylla_mt: Option, - #[serde(default)] - scylla_lt: Option, + scylla_st: ScyllaIngestConfig, + scylla_mt: ScyllaIngestConfig, + scylla_lt: ScyllaIngestConfig, array_truncate: Option, insert_worker_count: Option, insert_worker_concurrency: Option, @@ -56,16 +54,16 @@ impl CaIngestOpts { &self.postgresql } - pub fn scylla_config(&self) -> &ScyllaIngestConfig { - &self.scylla + pub fn scylla_config_st(&self) -> &ScyllaIngestConfig { + &self.scylla_st } - pub fn scylla_config_mt(&self) -> Option<&ScyllaIngestConfig> { - self.scylla_mt.as_ref() + pub fn scylla_config_mt(&self) -> &ScyllaIngestConfig { + &self.scylla_mt } - pub fn scylla_config_lt(&self) -> Option<&ScyllaIngestConfig> { - self.scylla_lt.as_ref() + pub fn scylla_config_lt(&self) -> &ScyllaIngestConfig { + &self.scylla_lt } pub fn search(&self) -> &Vec { @@ -140,7 +138,10 @@ scylla: assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt"))); assert_eq!(&conf.api_bind, "0.0.0.0:3011"); assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); - assert_eq!(conf.scylla.hosts().get(1), Some(&"sf-nube-12:19042".to_string())); + assert_eq!( + conf.scylla_config_st().hosts().get(1), + Some(&"sf-nube-12:19042".to_string()) + ); assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); } diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs new file mode 100644 index 0000000..c8ce25f --- /dev/null +++ b/scywr/src/insertqueues.rs @@ -0,0 +1,18 @@ +use crate::iteminsertqueue::QueryItem; +use async_channel::Receiver; +use async_channel::Sender; +use std::collections::VecDeque; + +#[derive(Clone)] +pub struct InsertQueuesTx { + pub st_rf3_tx: Sender>, + pub st_rf1_tx: Sender>, + pub mt_rf3_tx: Sender>, +} + +#[derive(Clone)] +pub struct InsertQueuesRx { + pub st_rf3_rx: Receiver>, + pub st_rf1_rx: Receiver>, + pub mt_rf3_rx: Receiver>, +} diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 56ac313..2389cbd 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -365,13 +365,7 @@ fn inspect_items(item_inp: Receiver>) -> impl Stream { - trace3!( - "execute Insert {:?} {:?} {:?} {:?}", - item.series, - item.ts_msp, - item.val.shape(), - item - ); + trace3!("execute Insert {}", item.string_short()); } QueryItem::TimeBinSimpleF32(_) => { trace2!("execute TimeBinSimpleF32"); diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 868244f..4681d25 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -48,6 +48,7 @@ pub enum ScalarValue { I8(i8), I16(i16), I32(i32), + I64(i64), F32(f32), F64(f64), Enum(i16), @@ -61,6 +62,7 @@ impl ScalarValue { ScalarValue::I8(_) => 1, ScalarValue::I16(_) => 2, ScalarValue::I32(_) => 4, + ScalarValue::I64(_) => 8, ScalarValue::F32(_) => 4, ScalarValue::F64(_) => 8, ScalarValue::Enum(_) => 2, @@ -68,6 +70,20 @@ impl ScalarValue { ScalarValue::Bool(_) => 1, } } + + pub fn string_short(&self) -> String { + match self { + ScalarValue::I8(x) => x.to_string(), + ScalarValue::I16(x) => x.to_string(), + ScalarValue::I32(x) => x.to_string(), + ScalarValue::I64(x) => x.to_string(), + ScalarValue::F32(x) => x.to_string(), + ScalarValue::F64(x) => x.to_string(), + ScalarValue::Enum(x) => x.to_string(), + ScalarValue::String(x) => x.to_string(), + ScalarValue::Bool(x) => x.to_string(), + } + } } #[derive(Clone, Debug)] @@ -177,6 +193,18 @@ impl ArrayValue { } } } + + pub fn string_short(&self) -> String { + use ArrayValue::*; + match self { + I8(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + I16(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + I32(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + F32(x) => format!("{}", x.get(0).map_or(0., |x| *x)), + F64(x) => format!("{}", x.get(0).map_or(0., |x| *x)), + Bool(x) => format!("{}", x.get(0).map_or(false, |x| *x)), + } + } } #[derive(Clone, Debug)] @@ -199,6 +227,7 @@ impl DataValue { ScalarValue::I8(_) => ScalarType::I8, ScalarValue::I16(_) => ScalarType::I16, ScalarValue::I32(_) => ScalarType::I32, + ScalarValue::I64(_) => ScalarType::I64, ScalarValue::F32(_) => ScalarType::F32, ScalarValue::F64(_) => ScalarType::F64, ScalarValue::Enum(_) => ScalarType::U16, @@ -222,6 +251,13 @@ impl DataValue { DataValue::Array(a) => Shape::Wave(a.len() as u32), } } + + pub fn string_short(&self) -> String { + match self { + DataValue::Scalar(x) => x.string_short(), + DataValue::Array(x) => x.string_short(), + } + } } pub trait GetValHelp { @@ -471,6 +507,18 @@ pub struct InsertItem { pub ts_local: TsMs, } +impl InsertItem { + pub fn string_short(&self) -> String { + format!( + "{} {} {} {}", + self.series.id(), + self.ts_msp.ms(), + self.ts_lsp.ms(), + self.val.string_short() + ) + } +} + #[derive(Debug)] pub struct TimeBinSimpleF32 { pub series: SeriesId, @@ -682,6 +730,7 @@ pub async fn insert_item( I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, Enum(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, + I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?, F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?, String(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_string, &data_store).await?, @@ -751,6 +800,7 @@ pub fn insert_item_fut( I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy), I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy), + I64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i64.clone(), scy), F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy), F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy), Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index 14cf810..25bc7d1 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -6,6 +6,7 @@ pub mod futbatch; pub mod futbatchgen; pub mod futinsert; pub mod futinsertloop; +pub mod insertqueues; pub mod insertworker; pub mod iteminsertqueue; pub mod ratelimit; diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 4bb3b26..5a84f7f 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -75,7 +75,7 @@ impl SeriesWriter { channel: String, scalar_type: ScalarType, shape: Shape, - tsnow: SystemTime, + stnow: SystemTime, ) -> Result { let (tx, rx) = async_channel::bounded(1); let item = ChannelInfoQuery { @@ -89,7 +89,7 @@ impl SeriesWriter { worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; let cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); - Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, tsnow).await + Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, stnow).await } pub async fn establish_with_cssid(