diff --git a/batchtools/Cargo.toml b/batchtools/Cargo.toml index 62ff17d..7adf462 100644 --- a/batchtools/Cargo.toml +++ b/batchtools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "batchtools" -version = "0.0.1" +version = "0.0.2" authors = ["Dominik Werder "] edition = "2021" @@ -9,6 +9,6 @@ doctest = false [dependencies] log = { path = "../log" } -err = { path = "../../daqbuffer/crates/err" } +err = { path = "../../daqbuf-err", package = "daqbuf-err" } taskrun = { path = "../../daqbuffer/crates/taskrun" } async-channel = "2.1.0" diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index add5ccf..b2d691d 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.4-aa.4" +version = "0.2.5-aa.0" authors = ["Dominik Werder "] edition = "2021" @@ -10,23 +10,23 @@ default = [] bsread = [] [dependencies] -clap = { version = "4.5.1", features = ["derive", "cargo"] } +clap = { version = "4.5.20", features = ["derive", "cargo"] } tracing = "0.1" serde = { version = "1.0", features = ["derive"] } -tokio-postgres = "0.7.10" +tokio-postgres = "0.7.12" async-channel = "2.3.1" futures-util = "0.3" chrono = "0.4.38" -bytes = "1.7.1" +bytes = "1.8.0" libc = "0.2" -err = { path = "../../daqbuffer/crates/err" } -netpod = { path = "../../daqbuffer/crates/netpod" } +err = { path = "../../daqbuf-err", package = "daqbuf-err" } +netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } +series = { path = "../../daqbuf-series", package = "daqbuf-series" } log = { path = "../log" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } -series = { path = "../series" } netfetch = { path = "../netfetch" } serieswriter = { path = "../serieswriter" } #ingest-bsread = { path = "../ingest-bsread" } diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 62f406e..779c80c 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -27,7 +27,9 @@ pub fn main() -> Result<(), Error> { } async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> { - taskrun::tokio::spawn(main_run_inner(opts)).await? + taskrun::tokio::spawn(main_run_inner(opts)) + .await + .map_err(Error::from_string)? } async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { @@ -129,10 +131,10 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { error!("log-test"); debug!("log-test"); trace!("log-test"); - series::log_test(); + netfetch::log_test(); let _spg = tracing::span!(tracing::Level::INFO, "log_span_debug"); _spg.in_scope(|| { - series::log_test(); + netfetch::log_test(); }) } SubCmd::Ca(subcmd) => { @@ -143,7 +145,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { .await .map_err(|e| Error::from_string(e))?; } - Get(cmd) => todo!(), + Get(_cmd) => todo!(), } } } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 5b5f3e1..712404c 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -716,7 +716,7 @@ impl Daemon { debug!("wait for metrics handler"); self.metrics_shutdown_tx.send(1).await?; if let Some(jh) = self.metrics_jh.take() { - jh.await??; + jh.await.map_err(Error::from_string)??; } debug!("joined metrics handler"); debug!("wait for insert workers"); @@ -771,7 +771,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> .map_err(Error::from_string)?; dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?; drop(pg); - jh.await?.map_err(Error::from_string)?; + jh.await.map_err(Error::from_string)?.map_err(Error::from_string)?; } if opts.scylla_disable() { warn!("scylla_disable config flag enabled"); diff --git a/dbpg/Cargo.toml b/dbpg/Cargo.toml index a954ee0..8cd62e7 100644 --- a/dbpg/Cargo.toml +++ b/dbpg/Cargo.toml @@ -9,12 +9,12 @@ doctest = false [dependencies] log = { path = "../log" } -err = { path = "../../daqbuffer/crates/err" } -netpod = { path = "../../daqbuffer/crates/netpod" } +err = { path = "../../daqbuf-err", package = "daqbuf-err" } +netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } batchtools = { path = "../batchtools" } stats = { path = "../stats" } -series = { path = "../series" } +series = { path = "../../daqbuf-series", package = "daqbuf-series" } chrono = "0.4.33" tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4", "with-serde_json-1"] } futures-util = "0.3.29" diff --git a/ingest-bsread/Cargo.toml b/ingest-bsread/Cargo.toml index 178eb1d..4df07c3 100644 --- a/ingest-bsread/Cargo.toml +++ b/ingest-bsread/Cargo.toml @@ -15,16 +15,16 @@ hex = "0.4.3" pin-project = "1" slidebuf = "0.0.1" log = { path = "../log" } -series = { path = "../series" } +series = { path = "../../daqbuf-series", package = "daqbuf-series" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } ingest-linux = { path = "../ingest-linux" } -err = { path = "../../daqbuffer/crates/err" } -netpod = { path = "../../daqbuffer/crates/netpod" } +err = { path = "../../daqbuf-err", package = "daqbuf-err" } +netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } batchtools = { path = "../batchtools" } -items_0 = { path = "../../daqbuffer/crates/items_0" } -items_2 = { path = "../../daqbuffer/crates/items_2" } -streams = { path = "../../daqbuffer/crates/streams" } +items_0 = { path = "../../daqbuf-items-0", package = "daqbuf-items-0" } +items_2 = { path = "../../daqbuf-items-2", package = "daqbuf-items-2" } +streams = { path = "../../daqbuf-streams", package = "daqbuf-streams" } bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 66d45fb..dbad665 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -37,20 +37,27 @@ slidebuf = "0.0.1" dashmap = "6.0.1" hashbrown = "0.14.3" smallvec = "1.13.2" +thiserror = "=0.0.1" log = { path = "../log" } -series = { path = "../series" } +series = { path = "../../daqbuf-series", package = "daqbuf-series" } serieswriter = { path = "../serieswriter" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } serde_helper = { path = "../serde_helper" } ingest-linux = { path = "../ingest-linux" } -err = { path = "../../daqbuffer/crates/err" } -netpod = { path = "../../daqbuffer/crates/netpod" } -items_0 = { path = "../../daqbuffer/crates/items_0" } -items_2 = { path = "../../daqbuffer/crates/items_2" } -streams = { path = "../../daqbuffer/crates/streams" } +err = { path = "../../daqbuf-err", package = "daqbuf-err" } +netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } +items_0 = { path = "../../daqbuf-items-0", package = "daqbuf-items-0" } +items_2 = { path = "../../daqbuf-items-2", package = "daqbuf-items-2" } +streams = { path = "../../daqbuf-streams", package = "daqbuf-streams" } taskrun = { path = "../../daqbuffer/crates/taskrun" } #bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } mrucache = { path = "../mrucache" } batchtools = { path = "../batchtools" } + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } + +[features] +disabled = [] diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 5d3aa00..7b0189c 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -194,6 +194,7 @@ pub enum Error { MissingTimestamp, EnumFetch(#[from] enumfetch::Error), SeriesLookup(#[from] dbpg::seriesbychannel::Error), + Netpod(#[from] netpod::NetpodError), } impl err::ToErr for Error { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index b896810..a56b43a 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -136,7 +136,7 @@ pub enum Error { PushCmdsNoSendInProgress(SocketAddr), SenderPollingSend, NoProgressNoPending, - IocFinder(::err::Error), + IocFinder(#[from] crate::ca::finder::Error), ChannelAssignedWithoutConnRess, } @@ -453,7 +453,7 @@ pub struct CaConnSet { chan_check_next: Option, stats: Arc, ca_conn_stats: Arc, - ioc_finder_jh: JoinHandle>, + ioc_finder_jh: JoinHandle>, await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, thr_msg_poll_1: ThrottleTrace, thr_msg_storage_len: ThrottleTrace, @@ -582,7 +582,7 @@ impl CaConnSet { trace!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len()); this.find_ioc_query_sender.as_mut().drop(); trace!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len()); - this.ioc_finder_jh.await?.map_err(|e| Error::IocFinder(e))?; + this.ioc_finder_jh.await??; trace!("joined ioc_finder_jh"); this.connset_out_tx.close(); this.connset_inp_rx.close(); diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 1c630d0..4be133d 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -10,7 +10,6 @@ use dbpg::conn::make_pg_client; use dbpg::iocindex::IocItem; use dbpg::iocindex::IocSearchIndexWorker; use dbpg::postgres::Row as PgRow; -use err::Error; use hashbrown::HashMap; use log::*; use netpod::Database; @@ -24,18 +23,17 @@ use tokio::task::JoinHandle; const SEARCH_DB_PIPELINE_LEN: usize = 2; -#[allow(unused)] -macro_rules! debug_batch { - ($($arg:tt)*) => (if false { - debug!($($arg)*); - }); -} +macro_rules! debug_batch { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) } -#[allow(unused)] -macro_rules! trace_batch { - ($($arg:tt)*) => (if false { - trace!($($arg)*); - }); +macro_rules! trace_batch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Finder")] +pub enum Error { + Join(#[from] tokio::task::JoinError), + DbPg(#[from] dbpg::err::Error), + Postgres(#[from] dbpg::postgres::Error), + IocSearch(#[from] crate::ca::search::Error), } fn transform_pgres(rows: Vec) -> VecDeque { @@ -145,19 +143,14 @@ async fn finder_worker_single( stats: Arc, ) -> Result<(), Error> { debug!("finder_worker_single make_pg_client"); - let (pg, jh) = make_pg_client(&db) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let (pg, jh) = make_pg_client(&db).await?; let sql = concat!( "with q1 as (select * from unnest($2::text[]) as unn (ch))", " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", " order by tt.facility, tt.channel, tsmod desc", ); - let qu_select_multi = pg - .prepare(sql) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let qu_select_multi = pg.prepare(sql).await?; let mut resdiff = 0; loop { match inp.recv().await { @@ -235,7 +228,7 @@ async fn finder_worker_single( } } drop(pg); - jh.await?.map_err(|e| Error::from_string(e))?; + jh.await??; trace!("finder_worker_single done"); Ok(()) } @@ -281,7 +274,7 @@ async fn finder_network_if_not_found( } async fn process_net_result( - net_rx: Receiver, Error>>, + net_rx: Receiver, crate::ca::findioc::Error>>, tx: Sender>, opts: CaIngestOpts, ) -> Result<(), Error> { @@ -291,13 +284,9 @@ async fn process_net_result( let mut index_worker_pg_jh = Vec::new(); for _ in 0..IOC_SEARCH_INDEX_WORKER_COUNT { let backend = opts.backend().into(); - let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) - .await - .map_err(Error::from_string)?; + let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()).await?; index_worker_pg_jh.push(jh); - let worker = IocSearchIndexWorker::prepare(dbrx.clone(), backend, pg) - .await - .map_err(Error::from_string)?; + let worker = IocSearchIndexWorker::prepare(dbrx.clone(), backend, pg).await?; let jh = tokio::spawn(async move { worker.worker().await }); ioc_search_index_worker_jhs.push(jh); } @@ -332,8 +321,7 @@ async fn process_net_result( Ok(()) } -#[cfg(DISABLED)] -#[allow(unused)] +#[cfg(feature = "disabled")] fn start_finder_ca(tx: Sender, tgts: Vec) -> (Sender, JoinHandle<()>) { let (qtx, qrx) = async_channel::bounded(32); let (atx, arx) = async_channel::bounded(32); diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index e0b8331..fa5b06e 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -3,7 +3,6 @@ use crate::ca::proto::CaMsgTy; use crate::ca::proto::HeadInfo; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; -use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -25,13 +24,20 @@ use std::time::Instant; use taskrun::tokio; use tokio::io::unix::AsyncFd; -#[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "FindIoc")] +pub enum Error { + SocketCreate, + SocketConvertTokio, + BroadcastEnable, + NonblockEnable, + SocketBind, + SendFailure, + ReadFailure, + ReadEmpty, + Proto(#[from] crate::ca::proto::Error), + Slidebuf(#[from] slidebuf::Error), + IO(#[from] std::io::Error), } struct SockBox(c_int); @@ -182,7 +188,7 @@ impl FindIocStream { unsafe fn create_socket() -> Result { let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0); if ec == -1 { - return Err("can not create socket".into()); + return Err(Error::SocketCreate); } let sock = SockBox(ec); { @@ -195,13 +201,13 @@ impl FindIocStream { std::mem::size_of::() as _, ); if ec == -1 { - return Err("can not enable broadcast".into()); + return Err(Error::BroadcastEnable); } } { let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK); if ec == -1 { - return Err("can not set nonblock".into()); + return Err(Error::NonblockEnable); } } let ip: [u8; 4] = [0, 0, 0, 0]; @@ -216,7 +222,7 @@ impl FindIocStream { let addr_len = std::mem::size_of::(); let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _); if ec == -1 { - return Err("can not bind socket".into()); + return Err(Error::SocketBind); } { let mut addr = libc::sockaddr_in { @@ -229,7 +235,7 @@ impl FindIocStream { let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _); if ec == -1 { error!("getsockname {ec}"); - return Err("can not convert raw socket to tokio socket".into()); + return Err(Error::SocketConvertTokio); } else { if true { let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); @@ -266,7 +272,7 @@ impl FindIocStream { if errno == libc::EAGAIN { return Poll::Pending; } else { - return Poll::Ready(Err("FindIocStream can not send".into())); + return Poll::Ready(Err(Error::SendFailure)); } } Poll::Ready(Ok(())) @@ -293,15 +299,15 @@ impl FindIocStream { if errno == libc::EAGAIN { return Poll::Pending; } else { - return Poll::Ready(Err("FindIocStream can not read".into())); + return Poll::Ready(Err(Error::ReadFailure)); } } else if ec < 0 { stats.ca_udp_io_error().inc(); error!("unexpected received {ec}"); - Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) + Poll::Ready(Err(Error::ReadFailure)) } else if ec == 0 { stats.ca_udp_io_empty().inc(); - Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) + Poll::Ready(Err(Error::ReadEmpty)) } else { stats.ca_udp_io_recv().inc(); let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); @@ -324,7 +330,7 @@ impl FindIocStream { panic!(); } let mut nb = slidebuf::SlideBuf::new(2048); - nb.put_slice(&buf[..ec as usize]).map_err(|e| e.to_string())?; + nb.put_slice(&buf[..ec as usize])?; let mut msgs = Vec::new(); let mut accounted = 0; loop { @@ -336,7 +342,7 @@ impl FindIocStream { error!("incomplete message, not enough for header"); break; } - let hi = HeadInfo::from_netbuf(&mut nb).map_err(|e| e.to_string())?; + let hi = HeadInfo::from_netbuf(&mut nb)?; if hi.cmdid() == 0 && hi.payload_len() == 0 { } else if hi.cmdid() == 6 && hi.payload_len() == 8 { } else { @@ -346,8 +352,8 @@ impl FindIocStream { error!("incomplete message, missing payload"); break; } - let msg = CaMsg::from_proto_infos(&hi, nb.data(), tsnow, 32).map_err(|e| e.to_string())?; - nb.adv(hi.payload_len() as usize).map_err(|e| e.to_string())?; + let msg = CaMsg::from_proto_infos(&hi, nb.data(), tsnow, 32)?; + nb.adv(hi.payload_len() as usize)?; msgs.push(msg); accounted += 16 + hi.payload_len(); } @@ -612,7 +618,7 @@ impl Stream for FindIocStream { }, Ready(Err(e)) => { error!("poll_write_ready {e}"); - let e = Error::from_string(e); + // TODO should we abort? } Pending => {} } @@ -711,9 +717,8 @@ impl Stream for FindIocStream { } } Ready(Err(e)) => { - let e = Error::with_msg_no_trace(format!("{e:?}")); error!("poll_read_ready {e:?}"); - Ready(Some(Err(e))) + Ready(Some(Err(e.into()))) } Pending => { // debug!("BLOCK BB"); diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 0b802e5..73271e8 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -1,9 +1,7 @@ -use super::findioc::FindIocRes; use crate::ca::findioc::FindIocStream; use crate::conf::CaIngestOpts; use async_channel::Receiver; use async_channel::Sender; -use err::Error; use futures_util::StreamExt; use log::*; use stats::IocFinderStats; @@ -16,6 +14,13 @@ use std::time::Duration; use taskrun::tokio; use tokio::task::JoinHandle; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "IocSearch")] +pub enum Error { + LookupFailure(String), + IO(#[from] std::io::Error), +} + async fn resolve_address(addr_str: &str) -> Result { const PORT_DEFAULT: u16 = 5064; let ac = match addr_str.parse::() { @@ -41,7 +46,7 @@ async fn resolve_address(addr_str: &str) -> Result { .into_iter() .filter(|addr| if let SocketAddr::V4(_) = addr { true } else { false }) .next() - .ok_or_else(|| Error::with_msg_no_trace(format!("can not lookup host {host}")))?, + .ok_or_else(|| Error::LookupFailure(host))?, Err(e) => return Err(e.into()), } } @@ -57,7 +62,7 @@ pub async fn ca_search_workers_start( ) -> Result< ( Sender, - Receiver, Error>>, + Receiver, crate::ca::findioc::Error>>, JoinHandle>, ), Error, @@ -116,7 +121,10 @@ async fn search_tgts_from_opts(opts: &CaIngestOpts) -> Result<(Vec Ok((addrs, blacklist)) } -async fn finder_run(finder: FindIocStream, tx: Sender, Error>>) -> Result<(), Error> { +async fn finder_run( + finder: FindIocStream, + tx: Sender, crate::ca::findioc::Error>>, +) -> Result<(), Error> { let mut finder = Box::pin(finder); while let Some(item) = finder.next().await { if let Err(_) = tx.send(item).await { diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 66044e5..2edae26 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -11,3 +11,13 @@ pub mod rt; #[cfg(test)] pub mod test; pub mod throttletrace; + +use log::*; + +pub fn log_test() { + info!("log-test"); + warn!("log-test"); + error!("log-test"); + debug!("log-test"); + trace!("log-test"); +} diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index 272b071..6159c01 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -15,8 +15,8 @@ bytes = "1.7.1" serde = { version = "1", features = ["derive"] } log = { path = "../log" } stats = { path = "../stats" } -series = { path = "../series" } -err = { path = "../../daqbuffer/crates/err" } -netpod = { path = "../../daqbuffer/crates/netpod" } +series = { path = "../../daqbuf-series", package = "daqbuf-series" } +err = { path = "../../daqbuf-err", package = "daqbuf-err" } +netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } #bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index d324824..6c2b9e4 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -144,14 +144,6 @@ pub async fn spawn_scylla_insert_workers( } for worker_ix in 0..insert_worker_count { let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); - #[cfg(DISABLED)] - let jh = tokio::spawn(worker( - worker_ix, - item_inp.clone(), - insert_worker_opts.clone(), - data_store, - store_stats.clone(), - )); let jh = tokio::spawn(worker_streamed( worker_ix, insert_worker_concurrency, diff --git a/series/Cargo.toml b/series/Cargo.toml deleted file mode 100644 index 311b682..0000000 --- a/series/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "series" -version = "0.0.2" -authors = ["Dominik Werder "] -edition = "2021" - -[dependencies] -serde = { version = "1.0", features = ["derive"] } -log = { path = "../log" } -err = { path = "../../daqbuffer/crates/err" } diff --git a/series/src/lib.rs b/series/src/lib.rs deleted file mode 100644 index fe705a3..0000000 --- a/series/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub mod series; - -pub use series::ChannelStatusSeriesId; -pub use series::SeriesId; - -use log::*; - -pub fn log_test() { - info!("log-test"); - warn!("log-test"); - error!("log-test"); - debug!("log-test"); - trace!("log-test"); -} diff --git a/series/src/series.rs b/series/src/series.rs deleted file mode 100644 index 5befc30..0000000 --- a/series/src/series.rs +++ /dev/null @@ -1,59 +0,0 @@ -use core::fmt; -use serde::Deserialize; -use serde::Serialize; - -#[derive(Clone, Debug)] -pub enum Existence { - Created(T), - Existing(T), -} - -impl Existence { - pub fn into_inner(self) -> T { - use Existence::*; - match self { - Created(x) => x, - Existing(x) => x, - } - } -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub struct SeriesId(u64); - -impl SeriesId { - pub fn new(id: u64) -> Self { - Self(id) - } - - pub fn id(&self) -> u64 { - self.0 - } - - pub fn to_i64(&self) -> i64 { - self.0 as i64 - } -} - -impl fmt::Display for SeriesId { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "SeriesId {{ {:20} }}", self.0) - } -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -pub struct ChannelStatusSeriesId(u64); - -impl ChannelStatusSeriesId { - pub fn new(id: u64) -> Self { - Self(id) - } - - pub fn id(&self) -> u64 { - self.0 - } - - pub fn to_i64(&self) -> i64 { - self.0 as i64 - } -} diff --git a/serieswriter/Cargo.toml b/serieswriter/Cargo.toml index 927db6e..b998f9e 100644 --- a/serieswriter/Cargo.toml +++ b/serieswriter/Cargo.toml @@ -10,12 +10,12 @@ async-channel = "2.1.1" futures-util = "0.3.30" smallvec = "1.13.2" log = { path = "../log" } -err = { path = "../../daqbuffer/crates/err" } -netpod = { path = "../../daqbuffer/crates/netpod" } -items_0 = { path = "../../daqbuffer/crates/items_0" } -items_2 = { path = "../../daqbuffer/crates/items_2" } +err = { path = "../../daqbuf-err", package = "daqbuf-err" } +netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } +items_0 = { path = "../../daqbuf-items-0", package = "daqbuf-items-0" } +items_2 = { path = "../../daqbuf-items-2", package = "daqbuf-items-2" } dbpg = { path = "../dbpg" } scywr = { path = "../scywr" } -series = { path = "../series" } +series = { path = "../../daqbuf-series", package = "daqbuf-series" } stats = { path = "../stats" } taskrun = { path = "../../daqbuffer/crates/taskrun" } diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 8ec85c4..a34bb04 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -19,13 +19,8 @@ use series::ChannelStatusSeriesId; use series::SeriesId; use std::mem; -#[allow(unused)] macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } - -#[allow(unused)] macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } - -#[allow(unused)] macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[derive(Debug, ThisError)]