diff --git a/.gitignore b/.gitignore index b866961..f7a5862 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /Cargo.lock /tmpdoc +/.vscode diff --git a/Cargo.toml b/Cargo.toml index 7d0ecba..5824053 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = ["log", "netfetch", "daqingest"] +resolver = "2" [profile.release] opt-level = 3 diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index e08808c..af436b1 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -4,6 +4,11 @@ version = "0.2.0-alpha.0" authors = ["Dominik Werder "] edition = "2021" +[features] +default = [] +# default = ["bsread"] +bsread = [] + [dependencies] clap = { version = "4.3.24", features = ["derive", "cargo"] } tracing = "0.1" @@ -24,3 +29,5 @@ dbpg = { path = "../dbpg" } series = { path = "../series" } netfetch = { path = "../netfetch" } batchtools = { path = "../batchtools" } +ingest-bsread = { path = "../ingest-bsread" } +ingest-linux = { path = "../ingest-linux" } diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index e09fdbc..27a8fb2 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -16,9 +16,6 @@ pub fn main() -> Result<(), Error> { use daqingest::opts::ChannelAccess; use daqingest::opts::SubCmd; match opts.subcmd { - SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()) - .await - .map_err(|e| Error::from(e.to_string()))?, SubCmd::ListPkey => { // TODO must take scylla config from CLI let scylla_conf = err::todoval(); @@ -34,10 +31,6 @@ pub fn main() -> Result<(), Error> { let scylla_conf = err::todoval(); scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await? } - SubCmd::BsreadDump(k) => { - let mut f = netfetch::zmtp::dumper::BsreadDumper::new(k.source); - f.run().await.map_err(|e| Error::from(e.to_string()))? - } SubCmd::ChannelAccess(k) => match k { ChannelAccess::CaSearch(k) => { info!("daqingest version {}", clap::crate_version!()); @@ -50,6 +43,15 @@ pub fn main() -> Result<(), Error> { daqingest::daemon::run(conf, channels).await? } }, + #[cfg(feature = "bsread")] + SubCmd::Bsread(k) => ingest_bsread::zmtp::zmtp_client(k.into()) + .await + .map_err(|e| Error::from(e.to_string()))?, + #[cfg(feature = "bsread")] + SubCmd::BsreadDump(k) => { + let mut f = ingest_bsread::zmtp::dumper::BsreadDumper::new(k.source); + f.run().await.map_err(|e| Error::from(e.to_string()))? + } SubCmd::Version => { println!("{}", clap::crate_version!()); } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index c6dc127..120b34c 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1,21 +1,16 @@ pub mod finder; pub mod inserthook; -pub mod types; use async_channel::Receiver; use async_channel::Sender; use async_channel::WeakReceiver; -use dbpg::conn::make_pg_client; use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; -use futures_util::FutureExt; -use futures_util::StreamExt; use log::*; use netfetch::ca::conn::CaConnEvent; use netfetch::ca::conn::ConnCommand; use netfetch::ca::connset::CaConnSet; use netfetch::ca::findioc::FindIocRes; -use netfetch::ca::findioc::FindIocStream; use netfetch::ca::IngestCommons; use netfetch::ca::SlowWarnable; use netfetch::conf::CaIngestOpts; @@ -40,7 +35,6 @@ use series::ChannelStatusSeriesId; use series::SeriesId; use stats::DaemonStats; use std::collections::BTreeMap; -use std::collections::HashMap; use std::collections::VecDeque; use std::net::SocketAddrV4; use std::sync::atomic; @@ -52,11 +46,8 @@ use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; use tokio::task::JoinHandle; -use tokio_postgres::Client as PgClient; -use tokio_postgres::Row as PgRow; use tracing::info_span; use tracing::Instrument; -use types::*; const SEARCH_BATCH_MAX: usize = 256; const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; @@ -186,7 +177,10 @@ pub struct DaemonOpts { pgconf: Database, scyconf: ScyllaConfig, ttls: Ttls, + #[allow(unused)] test_bsread_addr: Option, + insert_worker_count: usize, + insert_scylla_sessions: usize, } impl DaemonOpts { @@ -276,19 +270,16 @@ impl Daemon { } }); - let insert_scylla_sessions = 1; - let insert_worker_count = 1000; let use_rate_limit_queue = false; // TODO use a new stats type: let store_stats = Arc::new(stats::CaConnStats::new()); let ttls = opts.ttls.clone(); let insert_worker_opts = Arc::new(ingest_commons.as_ref().into()); - use scywr::insertworker::spawn_scylla_insert_workers; - let jh_insert_workers = spawn_scylla_insert_workers( + let jh_insert_workers = scywr::insertworker::spawn_scylla_insert_workers( opts.scyconf.clone(), - insert_scylla_sessions, - insert_worker_count, + opts.insert_scylla_sessions, + opts.insert_worker_count, common_insert_item_queue_2.clone(), insert_worker_opts, store_stats.clone(), @@ -297,9 +288,10 @@ impl Daemon { ) .await?; + #[cfg(feature = "bsread")] if let Some(bsaddr) = &opts.test_bsread_addr { //netfetch::zmtp::Zmtp; - let zmtpopts = netfetch::zmtp::ZmtpClientOpts { + let zmtpopts = ingest_bsread::zmtp::ZmtpClientOpts { backend: opts.backend().into(), addr: bsaddr.parse().unwrap(), do_pulse_id: false, @@ -307,9 +299,9 @@ impl Daemon { array_truncate: Some(1024), process_channel_count_limit: Some(32), }; - let client = netfetch::bsreadclient::BsreadClient::new( + let client = ingest_bsread::bsreadclient::BsreadClient::new( zmtpopts, - ingest_commons.clone(), + ingest_commons.insert_item_queue.sender().unwrap().inner().clone(), channel_info_query_tx.clone(), ) .await @@ -495,8 +487,8 @@ impl Daemon { async fn check_channel_states(&mut self) -> Result<(), Error> { let (mut search_pending_count,) = self.update_channel_state_counts(); let k = self.chan_check_next.take(); - trace!("------------ check_chans start at {:?}", k); let it = if let Some(last) = k { + trace!("check_chans start at {:?}", last); self.channel_states.range_mut(last..) } else { self.channel_states.range_mut(..) @@ -1084,14 +1076,12 @@ impl Daemon { ret } - pub async fn daemon(&mut self) -> Result<(), Error> { + fn spawn_ticker(tx: Sender, stats: Arc) -> Sender { let (ticker_inp_tx, ticker_inp_rx) = async_channel::bounded::(1); let ticker = { - let tx = self.tx.clone(); - let stats = self.stats.clone(); async move { loop { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(200)).await; if SIGINT.load(atomic::Ordering::Acquire) != 0 || SIGTERM.load(atomic::Ordering::Acquire) != 0 { if SHUTDOWN_SENT.load(atomic::Ordering::Acquire) == 0 { if let Err(e) = tx.send(DaemonEvent::Shutdown).await { @@ -1119,7 +1109,13 @@ impl Daemon { } } }; + // TODO use join handle taskrun::spawn(ticker); + ticker_inp_tx + } + + pub async fn daemon(&mut self) -> Result<(), Error> { + let ticker_inp_tx = Self::spawn_ticker(self.tx.clone(), self.stats.clone()); loop { match self.rx.recv().await { Ok(item) => match self.handle_event(item, &ticker_inp_tx).await { @@ -1150,34 +1146,30 @@ static SHUTDOWN_SENT: AtomicUsize = AtomicUsize::new(0); fn handler_sigint(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { SIGINT.store(1, atomic::Ordering::Release); - let _ = netfetch::linuxhelper::unset_signal_handler(libc::SIGINT); + let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT); } fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { SIGTERM.store(1, atomic::Ordering::Release); - let _ = netfetch::linuxhelper::unset_signal_handler(libc::SIGTERM); + let _ = ingest_linux::signal::unset_signal_handler(libc::SIGTERM); } pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> { info!("start up {opts:?}"); - netfetch::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigint)?; - netfetch::linuxhelper::set_signal_handler(libc::SIGTERM, handler_sigterm)?; + ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?; + ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?; let pg = dbpg::conn::make_pg_client(opts.postgresql_config()) .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + .map_err(Error::from_string)?; - dbpg::schema::schema_check(&pg) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?; scywr::schema::migrate_scylla_data_schema(opts.scylla_config()) .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - info!("migrate_keyspace done"); - if true { - return Ok(()); - } + .map_err(Error::from_string)?; + + info!("database check done"); // TODO use a new stats type: //let store_stats = Arc::new(CaConnStats::new()); @@ -1203,6 +1195,8 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> binned: opts.ttl_binned(), }, test_bsread_addr: opts.test_bsread_addr.clone(), + insert_worker_count: opts.insert_worker_count(), + insert_scylla_sessions: opts.insert_scylla_sessions(), }; let mut daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); @@ -1223,7 +1217,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> let ch = Channel::new(s.into()); tx.send(DaemonEvent::ChannelAdd(ch)).await?; } - info!("all channels sent to daemon"); + info!("configured channels applied"); daemon_jh.await.unwrap(); if false { metrics_jh.await.unwrap(); diff --git a/daqingest/src/daemon/types.rs b/daqingest/src/daemon/types.rs deleted file mode 100644 index 729195c..0000000 --- a/daqingest/src/daemon/types.rs +++ /dev/null @@ -1,11 +0,0 @@ -use async_channel::Receiver; -use netpod::Database; -use netpod::ScyllaConfig; -use scywr::insertworker::Ttls; -use serde::Serialize; -use series::series::Existence; -use series::ChannelStatusSeriesId; -use series::SeriesId; -use std::net::SocketAddrV4; -use std::time::Instant; -use std::time::SystemTime; diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 2b99906..ae1c02f 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -1,6 +1,7 @@ use clap::ArgAction::Count; use clap::Parser; -use netfetch::zmtp::ZmtpClientOpts; +#[cfg(feature = "bsread")] +use ingest_bsread::zmtp::ZmtpClientOpts; use std::net::SocketAddr; #[derive(Debug, Parser)] @@ -18,13 +19,15 @@ pub struct DaqIngestOpts { #[derive(Debug, Parser)] pub enum SubCmd { - Bsread(Bsread), ListPkey, ListPulses, FetchEvents(FetchEvents), - BsreadDump(BsreadDump), #[command(subcommand)] ChannelAccess(ChannelAccess), + #[cfg(feature = "bsread")] + Bsread(Bsread), + #[cfg(feature = "bsread")] + BsreadDump(BsreadDump), Version, } @@ -44,6 +47,7 @@ pub struct Bsread { pub process_channel_count_limit: Option, } +#[cfg(feature = "bsread")] impl From for ZmtpClientOpts { fn from(k: Bsread) -> Self { Self { diff --git a/dbpg/src/pool.rs b/dbpg/src/pool.rs index b167c89..8641a98 100644 --- a/dbpg/src/pool.rs +++ b/dbpg/src/pool.rs @@ -71,6 +71,7 @@ impl PgClientPooled { pub struct PgPool { tx: Sender, rx: Receiver, + #[allow(unused)] handout_count: u64, } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 67c4157..c9072aa 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -15,6 +15,24 @@ use tokio::task::JoinHandle; use tokio_postgres::Client as PgClient; use tokio_postgres::Statement as PgStatement; +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace3 { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + #[derive(Debug, ThisError)] pub enum Error { Postgres(#[from] tokio_postgres::Error), @@ -55,6 +73,8 @@ impl ChannelInfoQuery { struct ChannelInfoResult { series: Existence, tx: Sender, Error>>, + // only for trace: + channel: String, } struct Worker { @@ -70,7 +90,7 @@ impl Worker { let sql = concat!( "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])", " as inp (backend, channel, scalar_type, shape_dims, rid))", - " select t.series, q1.rid from series_by_channel t", + " select t.series, q1.rid, t.channel from series_by_channel t", " join q1 on t.facility = q1.backend and t.channel = q1.channel", " and t.scalar_type = q1.scalar_type and t.shape_dims = q1.shape_dims::int[]", " and t.agg_kind = 0", @@ -138,12 +158,14 @@ impl Worker { for (qrid, tx) in tx { if let Some(row) = &e1 { let rid: i32 = row.get(1); + let channel: String = row.get(2); if rid as u32 == qrid { let series: i64 = row.get(0); let series = SeriesId::new(series as _); let res = ChannelInfoResult { series: Existence::Existing(series), tx, + channel, }; result.push(res); } @@ -236,12 +258,23 @@ impl Worker { } async fn work(&mut self) -> Result<(), Error> { - 'outer: while let Some(batch) = self.batch_rx.next().await { + while let Some(batch) = self.batch_rx.next().await { + trace2!("worker recv batch len {}", batch.len()); + for x in &batch { + trace3!( + "search for {} {} {:?} {:?}", + x.backend, + x.channel, + x.scalar_type, + x.shape_dims + ); + } let (res1, missing) = self.select(batch).await?; let res3 = if missing.len() > 0 { self.insert_missing(&missing).await?; let (res2, missing2) = self.select(missing).await?; if missing2.len() > 0 { + warn!("series ids still missing after insert"); Err(Error::SeriesMissing) } else { Ok(res2) @@ -251,11 +284,12 @@ impl Worker { }; let res4 = res3?; for r in res4 { + trace3!("try to send result for {} {:?}", r.channel, r.series); match r.tx.send(Ok(r.series)).await { Ok(()) => {} Err(_e) => { warn!("can not deliver result"); - break 'outer; + return Err(Error::ChannelError); } } } diff --git a/ingest-bsread/Cargo.toml b/ingest-bsread/Cargo.toml new file mode 100644 index 0000000..21b5b8b --- /dev/null +++ b/ingest-bsread/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "ingest-bsread" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +futures-util = "0.3" +async-channel = "1.9.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +bytes = "1.4.0" +md-5 = "0.10.5" +hex = "0.4.3" +log = { path = "../log" } +series = { path = "../series" } +stats = { path = "../stats" } +scywr = { path = "../scywr" } +dbpg = { path = "../dbpg" } +slidebuf = { path = "../slidebuf" } +ingest-linux = { path = "../ingest-linux" } +err = { path = "../../daqbuffer/crates/err" } +netpod = { path = "../../daqbuffer/crates/netpod" } +taskrun = { path = "../../daqbuffer/crates/taskrun" } +batchtools = { path = "../batchtools" } +items_0 = { path = "../../daqbuffer/crates/items_0" } +items_2 = { path = "../../daqbuffer/crates/items_2" } +streams = { path = "../../daqbuffer/crates/streams" } +bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } diff --git a/netfetch/src/bsread.rs b/ingest-bsread/src/bsread.rs similarity index 100% rename from netfetch/src/bsread.rs rename to ingest-bsread/src/bsread.rs diff --git a/netfetch/src/bsreadclient.rs b/ingest-bsread/src/bsreadclient.rs similarity index 95% rename from netfetch/src/bsreadclient.rs rename to ingest-bsread/src/bsreadclient.rs index b285364..88347d2 100644 --- a/netfetch/src/bsreadclient.rs +++ b/ingest-bsread/src/bsreadclient.rs @@ -2,7 +2,6 @@ use crate::bsread::BsreadMessage; use crate::bsread::ChannelDescDecoded; use crate::bsread::HeadB; use crate::bsread::Parser; -use crate::ca::IngestCommons; use crate::zmtp::zmtpproto; use crate::zmtp::zmtpproto::SocketType; use crate::zmtp::zmtpproto::Zmtp; @@ -23,7 +22,6 @@ use netpod::Shape; use netpod::TS_MSP_GRID_SPACING; use netpod::TS_MSP_GRID_UNIT; use scywr::iteminsertqueue::ArrayValue; -use scywr::iteminsertqueue::CommonInsertItemQueueSender; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::InsertItem; use scywr::iteminsertqueue::QueryItem; @@ -32,7 +30,6 @@ use series::SeriesId; use stats::CheckEvery; use std::io; use std::net::SocketAddr; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; use taskrun::tokio; @@ -51,10 +48,11 @@ pub enum Error { ZmtpProto(#[from] zmtpproto::Error), #[error("BadSlice")] BadSlice, + SystemNet(#[from] ingest_linux::net::Error), } impl From> for Error { - fn from(value: async_channel::SendError) -> Self { + fn from(_value: async_channel::SendError) -> Self { Self::AsyncChannelSend } } @@ -72,8 +70,7 @@ pub struct BsreadClient { rcvbuf: Option, print_stats: CheckEvery, parser: Parser, - ingest_commons: Arc, - insqtx: CommonInsertItemQueueSender, + insqtx: Sender, tmp_evtset_series: Option, channel_info_query_tx: Sender, inserted_in_ts_msp_count: u32, @@ -84,13 +81,9 @@ pub struct BsreadClient { impl BsreadClient { pub async fn new( opts: ZmtpClientOpts, - ingest_commons: Arc, + insqtx: Sender, channel_info_query_tx: Sender, ) -> Result { - let insqtx = ingest_commons - .insert_item_queue - .sender() - .ok_or_else(|| Error::InsertQueueSenderMissing)?; let ret = Self { source_addr: opts.addr, do_pulse_id: opts.do_pulse_id, @@ -98,7 +91,6 @@ impl BsreadClient { opts, print_stats: CheckEvery::new(Duration::from_millis(2000)), parser: Parser::new(), - ingest_commons, insqtx, tmp_evtset_series: None, channel_info_query_tx, @@ -215,7 +207,7 @@ impl BsreadClient { pub async fn run(&mut self) -> Result<(), Error> { let mut conn = tokio::net::TcpStream::connect(&self.source_addr).await?; if let Some(v) = self.rcvbuf { - crate::linuxhelper::set_rcv_sock_opts(&mut conn, v as u32)?; + ingest_linux::net::set_rcv_sock_opts(&mut conn, v as u32)?; } let mut zmtp = Zmtp::new(conn, SocketType::PULL); let mut i1 = 0u64; @@ -342,8 +334,8 @@ impl BsreadClient { for i1 in 0..nlim { // TODO skip decoding if header unchanged. let chn = &head_b.channels[i1]; - let chd: ChannelDescDecoded = chn.try_into()?; - let fr = &msg.frames[2 + 2 * i1]; + let _chd: ChannelDescDecoded = chn.try_into()?; + let _fr = &msg.frames[2 + 2 * i1]; // TODO store the channel information together with series in struct. } } @@ -391,13 +383,14 @@ impl BsreadClient { Ok(()) } - async fn setup_channel_writers(&mut self, scy: &ScySession, cd: &ChannelDescDecoded) -> Result<(), Error> { + #[allow(unused)] + async fn setup_channel_writers(&mut self, _scy: &ScySession, cd: &ChannelDescDecoded) -> Result<(), Error> { let has_comp = cd.compression.is_some(); if has_comp { warn!("Compression not yet supported [{}]", cd.name); return Ok(()); } - let shape_dims = cd.shape.to_scylla_vec(); + let _shape_dims = cd.shape.to_scylla_vec(); Ok(()) } diff --git a/ingest-bsread/src/lib.rs b/ingest-bsread/src/lib.rs new file mode 100644 index 0000000..33eb359 --- /dev/null +++ b/ingest-bsread/src/lib.rs @@ -0,0 +1,3 @@ +pub mod bsread; +pub mod bsreadclient; +pub mod zmtp; diff --git a/netfetch/src/zmtp.rs b/ingest-bsread/src/zmtp.rs similarity index 96% rename from netfetch/src/zmtp.rs rename to ingest-bsread/src/zmtp.rs index 89ffd94..3b2282d 100644 --- a/netfetch/src/zmtp.rs +++ b/ingest-bsread/src/zmtp.rs @@ -90,6 +90,7 @@ struct ClientRun { } impl ClientRun { + #[allow(unused)] fn new(client: BsreadClient) -> Self { let mut client = Box::pin(client); let client2 = unsafe { &mut *(&mut client as &mut _ as *mut _) } as &mut BsreadClient; @@ -114,7 +115,7 @@ pub enum ZmtpEvent { } pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { - let client = BsreadClient::new(opts.clone(), todo!(), todo!()).await?; + let client = BsreadClient::new(opts.clone(), err::todoval(), err::todoval()).await?; let fut = { async move { let mut client = client; diff --git a/netfetch/src/zmtp/dumper.rs b/ingest-bsread/src/zmtp/dumper.rs similarity index 100% rename from netfetch/src/zmtp/dumper.rs rename to ingest-bsread/src/zmtp/dumper.rs diff --git a/netfetch/src/zmtp/zmtpproto.rs b/ingest-bsread/src/zmtp/zmtpproto.rs similarity index 97% rename from netfetch/src/zmtp/zmtpproto.rs rename to ingest-bsread/src/zmtp/zmtpproto.rs index 7fb05a0..43b69c5 100644 --- a/netfetch/src/zmtp/zmtpproto.rs +++ b/ingest-bsread/src/zmtp/zmtpproto.rs @@ -2,7 +2,6 @@ use crate::bsread::ChannelDesc; use crate::bsread::GlobalTimestamp; use crate::bsread::HeadA; use crate::bsread::HeadB; -use crate::netbuf::NetBuf; use crate::zmtp::ZmtpEvent; use async_channel::Receiver; use async_channel::Sender; @@ -14,6 +13,7 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::timeunits::SEC; use serde_json::Value as JsVal; +use slidebuf::SlideBuf; use std::fmt; use std::io; use std::mem; @@ -32,7 +32,7 @@ pub enum Error { #[error("bad")] Bad, #[error("NetBuf({0})")] - NetBuf(#[from] crate::netbuf::Error), + NetBuf(#[from] slidebuf::Error), #[error("zmtp peer is not v3.x")] ZmtpInitPeerNot3x, #[error("zmtp peer is not v3.0 or v3.1")] @@ -107,8 +107,8 @@ pub struct Zmtp { socket_type: SocketType, conn: TcpStream, conn_state: ConnState, - buf: NetBuf, - outbuf: NetBuf, + buf: SlideBuf, + outbuf: SlideBuf, out_enable: bool, msglen: usize, has_more: bool, @@ -134,8 +134,8 @@ impl Zmtp { conn, //conn_state: ConnState::LockScan(1), conn_state: ConnState::InitSend, - buf: NetBuf::new(1024 * 128), - outbuf: NetBuf::new(1024 * 128), + buf: SlideBuf::new(1024 * 128), + outbuf: SlideBuf::new(1024 * 128), out_enable: false, msglen: 0, has_more: false, @@ -156,8 +156,10 @@ impl Zmtp { self.data_tx.clone() } - fn inpbuf_conn(&mut self, need_min: usize) -> (&mut TcpStream, ReadBuf) { - (&mut self.conn, self.buf.read_buf_for_fill(need_min)) + fn inpbuf_conn(&mut self, need_min: usize) -> Result<(&mut TcpStream, ReadBuf), Error> { + let buf = self.buf.available_writable_area(need_min)?; + let buf = ReadBuf::new(buf); + Ok((&mut self.conn, buf)) } fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) { @@ -268,7 +270,11 @@ impl Zmtp { Int::Item(Err(e)) } else if self.buf.len() < need_min { self.record_input_state(); - let (w, mut rbuf) = self.inpbuf_conn(need_min); + // TODO refactor error handling in this function + let (w, mut rbuf) = match self.inpbuf_conn(need_min) { + Ok(x) => x, + Err(e) => return Some(Ready(Err(e.into()))), + }; pin_mut!(w); match w.poll_read(cx, &mut rbuf) { Ready(k) => match k { @@ -495,7 +501,7 @@ impl Zmtp { Ok(None) } ConnState::ReadFrameLong => { - self.msglen = self.buf.read_u64()? as usize; + self.msglen = self.buf.read_u64_be()? as usize; trace!("parse_item ReadFrameLong msglen {}", self.msglen); self.conn_state = ConnState::ReadFrameBody(self.msglen); if self.msglen > self.buf.cap() / 2 { @@ -649,7 +655,7 @@ impl ZmtpMessage { &self.frames } - pub fn emit_to_buffer(&self, out: &mut NetBuf) -> Result<(), Error> { + pub fn emit_to_buffer(&self, out: &mut SlideBuf) -> Result<(), Error> { let n = self.frames.len(); for (i, fr) in self.frames.iter().enumerate() { let mut flags: u8 = 2; @@ -657,7 +663,7 @@ impl ZmtpMessage { flags |= 1; } out.put_u8(flags)?; - out.put_u64(fr.data().len() as u64)?; + out.put_u64_be(fr.data().len() as u64)?; out.put_slice(fr.data())?; } Ok(()) diff --git a/ingest-linux/Cargo.toml b/ingest-linux/Cargo.toml new file mode 100644 index 0000000..5b9ed24 --- /dev/null +++ b/ingest-linux/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "ingest-linux" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +libc = "0.2" +thiserror = "1.0.47" +log = { path = "../log" } +taskrun = { path = "../../daqbuffer/crates/taskrun" } diff --git a/ingest-linux/src/lib.rs b/ingest-linux/src/lib.rs new file mode 100644 index 0000000..61fddd0 --- /dev/null +++ b/ingest-linux/src/lib.rs @@ -0,0 +1,2 @@ +pub mod net; +pub mod signal; diff --git a/ingest-linux/src/net.rs b/ingest-linux/src/net.rs new file mode 100644 index 0000000..a752a15 --- /dev/null +++ b/ingest-linux/src/net.rs @@ -0,0 +1,75 @@ +use std::ffi::CStr; +use taskrun::tokio; +use thiserror::Error; +use tokio::net::TcpStream; + +#[derive(Debug, Error)] +#[error("{self}")] +pub enum Error { + SocketOptionSet, + SocketOptionGet, +} + +pub fn local_hostname() -> String { + let mut buf = vec![0u8; 128]; + let hostname = unsafe { + let ec = libc::gethostname(buf.as_mut_ptr() as _, buf.len() - 2); + if ec != 0 { + panic!(); + } + let hostname = CStr::from_ptr(&buf[0] as *const _ as _); + hostname.to_str().unwrap() + }; + hostname.into() +} + +#[test] +fn test_get_local_hostname() { + assert_ne!(local_hostname().len(), 0); +} + +pub fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { + use std::mem::size_of; + use std::os::unix::prelude::AsRawFd; + let fd = conn.as_raw_fd(); + unsafe { + type N = libc::c_int; + let n: N = rcvbuf as _; + let ec = libc::setsockopt( + fd, + libc::SOL_SOCKET, + libc::SO_RCVBUF, + &n as *const N as _, + size_of::() as _, + ); + if ec != 0 { + // error!("ec {ec}"); + return Err(Error::SocketOptionSet); + } + } + unsafe { + type N = libc::c_int; + let mut n: N = -1; + let mut l = size_of::() as libc::socklen_t; + let ec = libc::getsockopt( + fd, + libc::SOL_SOCKET, + libc::SO_RCVBUF, + &mut n as *mut N as _, + &mut l as _, + ); + if ec != 0 { + let errno = *libc::__errno_location(); + let _es = CStr::from_ptr(libc::strerror(errno)); + // error!("can not query socket option ec {ec} errno {errno} es {es:?}"); + Err(Error::SocketOptionGet) + } else { + if (n as u32) < rcvbuf * 5 / 6 { + // warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}"); + } else { + // info!("SO_RCVBUF {n}"); + } + Ok(()) + } + } +} diff --git a/ingest-linux/src/signal.rs b/ingest-linux/src/signal.rs new file mode 100644 index 0000000..1dc6e8f --- /dev/null +++ b/ingest-linux/src/signal.rs @@ -0,0 +1,60 @@ +use log::*; +use std::ffi::CStr; +use std::mem::MaybeUninit; +use thiserror::Error; + +#[derive(Debug, Error)] +#[error("{self}")] +pub enum Error { + SignalHandlerSet, + SignalHandlerUnset, +} + +pub fn set_signal_handler( + signum: libc::c_int, + cb: fn(libc::c_int, *const libc::siginfo_t, *const libc::c_void) -> (), +) -> Result<(), Error> { + //let cb: fn(libc::c_int, *const libc::siginfo_t, *const libc::c_void) -> () = handler_sigaction; + // Safe because it creates a valid value: + let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; + let sa_sigaction: libc::sighandler_t = cb as *const libc::c_void as _; + let act = libc::sigaction { + sa_sigaction, + sa_mask: mask, + sa_flags: 0, + sa_restorer: None, + }; + let (ec, msg) = unsafe { + let ec = libc::sigaction(signum, &act, std::ptr::null_mut()); + let errno = *libc::__errno_location(); + (ec, CStr::from_ptr(libc::strerror(errno))) + }; + if ec != 0 { + // Not valid to print here, but we will panic anyways after that. + eprintln!("error: {:?}", msg); + return Err(Error::SignalHandlerSet); + } + Ok(()) +} + +pub fn unset_signal_handler(signum: libc::c_int) -> Result<(), Error> { + // Safe because it creates a valid value: + let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; + let act = libc::sigaction { + sa_sigaction: libc::SIG_DFL, + sa_mask: mask, + sa_flags: 0, + sa_restorer: None, + }; + let (ec, msg) = unsafe { + let ec = libc::sigaction(signum, &act, std::ptr::null_mut()); + let errno = *libc::__errno_location(); + (ec, CStr::from_ptr(libc::strerror(errno))) + }; + if ec != 0 { + // Not valid to print here, but we will panic anyways after that. + eprintln!("error: {:?}", msg); + return Err(Error::SignalHandlerUnset); + } + Ok(()) +} diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 6ef03e2..3a243ad 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -16,9 +16,8 @@ bytes = "1.4" arrayref = "0.3" byteorder = "1.4" futures-util = "0.3" -md-5 = "0.10" -hex = "0.4" -libc = "0.2" +md-5 = "0.10.5" +hex = "0.4.3" regex = "1.8.4" axum = "0.6.18" http = "0.2" @@ -29,11 +28,13 @@ humantime = "2.1" humantime-serde = "1.1" pin-project = "1" lazy_static = "1" +libc = "0.2" log = { path = "../log" } series = { path = "../series" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } +ingest-linux = { path = "../ingest-linux" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } items_0 = { path = "../../daqbuffer/crates/items_0" } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 9dde75b..c9ae0d6 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -7,7 +7,6 @@ pub mod search; use crate::ca::connset::CaConnSet; use crate::metrics::ExtraInsertsConf; use crate::rt::TokMx; -use err::Error; use futures_util::Future; use futures_util::FutureExt; use log::*; @@ -16,7 +15,6 @@ use scywr::insertworker::InsertWorkerOpts; use scywr::iteminsertqueue::CommonInsertItemQueue; use scywr::store::DataStore; use stats::CaConnStatsAgg; -use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::atomic::AtomicU32; use std::sync::atomic::AtomicU64; @@ -146,5 +144,5 @@ where fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { crate::ca::SIGINT.store(1, Ordering::Release); - let _ = crate::linuxhelper::unset_signal_handler(libc::SIGINT); + let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT); } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index eb4db27..02dee76 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -4,7 +4,6 @@ use super::proto::CaMsg; use super::proto::CaMsgTy; use super::proto::CaProto; use super::ExtraInsertsConf; -use crate::bsread::ChannelDescDecoded; use crate::ca::proto::CreateChan; use crate::ca::proto::EventAdd; use crate::timebin::ConnTimeBin; @@ -59,6 +58,24 @@ use std::time::SystemTime; use taskrun::tokio; use tokio::net::TcpStream; +#[allow(unused)] +macro_rules! trace3 { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace4 { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + #[derive(Clone, Debug, Serialize)] pub enum ChannelConnectedInfo { Disconnected, @@ -440,6 +457,7 @@ pub struct CaConn { Pin), Error>> + Send>>, >, time_binners: BTreeMap, + ts_earliest_warn_poll_slow: Instant, } impl CaConn { @@ -493,6 +511,7 @@ impl CaConn { series_lookup_schedule: BTreeMap::new(), series_lookup_futs: FuturesUnordered::new(), time_binners: BTreeMap::new(), + ts_earliest_warn_poll_slow: Instant::now(), } } @@ -988,15 +1007,6 @@ impl CaConn { *ch_s = ChannelState::Created(series, created_state); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; - let _cd = ChannelDescDecoded { - name: name.to_string(), - scalar_type, - shape, - agg_kind: netpod::AggKind::Plain, - // TODO these play no role in series id: - byte_order: netpod::ByteOrder::Little, - compression: None, - }; cx.waker().wake_by_ref(); Ok(()) } @@ -1012,9 +1022,17 @@ impl CaConn { entry.remove(); continue; } - Err(e) => { - *entry.get_mut() = e.into_inner(); - } + Err(e) => match e { + async_channel::TrySendError::Full(_) => { + warn!("series lookup channel full"); + *entry.get_mut() = e.into_inner(); + } + async_channel::TrySendError::Closed(_) => { + warn!("series lookup channel closed"); + // *entry.get_mut() = e.into_inner(); + entry.remove(); + } + }, } } else { () @@ -1473,24 +1491,15 @@ impl CaConn { }; *ch_s = ChannelState::FetchingSeriesId(created_state); // TODO handle error in different way. Should most likely not abort. - let _cd = ChannelDescDecoded { - name: name.clone(), - scalar_type: scalar_type.clone(), - shape: shape.clone(), - agg_kind: netpod::AggKind::Plain, - // TODO these play no role in series id: - byte_order: netpod::ByteOrder::Little, - compression: None, - }; - let (tx, rx) = async_channel::bounded(1); - let query = ChannelInfoQuery { - backend: self.backend.clone(), - channel: name.clone(), - scalar_type: scalar_type.to_scylla_i32(), - shape_dims: shape.to_scylla_vec(), - tx, - }; if !self.series_lookup_schedule.contains_key(&cid) { + let (tx, rx) = async_channel::bounded(1); + let query = ChannelInfoQuery { + backend: self.backend.clone(), + channel: name.clone(), + scalar_type: scalar_type.to_scylla_i32(), + shape_dims: shape.to_scylla_vec(), + tx, + }; self.series_lookup_schedule.insert(cid, query); let fut = async move { match rx.recv().await { @@ -1749,7 +1758,9 @@ impl CaConn { Self::apply_channel_ops_with_res(res) } - fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { + fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { + self.emit_series_lookup(cx); + self.poll_channel_info_results(cx); let this = self.get_mut(); for (_, tb) in this.time_binners.iter_mut() { let iiq = &mut this.insert_item_queue; @@ -1769,12 +1780,12 @@ impl Stream for CaConn { if self.channel_set_ops.flag.load(atomic::Ordering::Acquire) > 0 { self.apply_channel_ops(); } - self.emit_series_lookup(cx); - self.poll_channel_info_results(cx); match self.ticker.poll_unpin(cx) { Ready(()) => { match self.as_mut().handle_own_ticker_tick(cx) { - Ok(_) => {} + Ok(_) => { + let _ = self.ticker.poll_unpin(cx); + } Err(e) => { error!("{e}"); self.trigger_shutdown(ChannelStatusClosedReason::InternalError); @@ -1782,7 +1793,8 @@ impl Stream for CaConn { } } self.ticker = Self::new_self_ticker(); - cx.waker().wake_by_ref(); + let _ = self.ticker.poll_unpin(cx); + // cx.waker().wake_by_ref(); } Pending => {} } @@ -1823,8 +1835,8 @@ impl Stream for CaConn { } if self.is_shutdown() { if self.insert_item_queue.len() == 0 { - trace!("no more items to flush"); if i1 >= 10 { + trace!("no more items to flush"); break Ready(Ok(())); } } else { @@ -1863,9 +1875,14 @@ impl Stream for CaConn { } }; { - let dt = poll_ts1.elapsed(); + let tsnow = Instant::now(); + let dt = tsnow.saturating_duration_since(poll_ts1); if dt > Duration::from_millis(40) { - warn!("slow poll: {}ms", dt.as_secs_f32() * 1e3); + if poll_ts1 > self.ts_earliest_warn_poll_slow { + // TODO factor out the rate limit logic in reusable type + self.ts_earliest_warn_poll_slow = tsnow + Duration::from_millis(2000); + warn!("slow poll: {}ms", dt.as_secs_f32() * 1e3); + } } } ret diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 0b4a18c..df8da87 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -1,5 +1,5 @@ -use crate::linuxhelper::local_hostname; use err::Error; +use ingest_linux::net::local_hostname; use netpod::log::*; use netpod::Database; use netpod::ScyllaConfig; diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 0e6a424..35a343f 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -1,5 +1,3 @@ -pub mod bsread; -pub mod bsreadclient; pub mod ca; pub mod conf; pub mod daemon_common; @@ -13,4 +11,3 @@ pub mod rt; #[cfg(test)] pub mod test; pub mod timebin; -pub mod zmtp; diff --git a/netfetch/src/linuxhelper.rs b/netfetch/src/linuxhelper.rs index 9a5f4a6..8b13789 100644 --- a/netfetch/src/linuxhelper.rs +++ b/netfetch/src/linuxhelper.rs @@ -1,120 +1 @@ -use err::Error; -use log::*; -use std::ffi::CStr; -use std::mem::MaybeUninit; -use taskrun::tokio; -use tokio::net::TcpStream; -pub fn local_hostname() -> String { - let mut buf = vec![0u8; 128]; - let hostname = unsafe { - let ec = libc::gethostname(buf.as_mut_ptr() as _, buf.len() - 2); - if ec != 0 { - panic!(); - } - let hostname = CStr::from_ptr(&buf[0] as *const _ as _); - hostname.to_str().unwrap() - }; - hostname.into() -} - -#[test] -fn test_get_local_hostname() { - assert_ne!(local_hostname().len(), 0); -} - -pub fn set_signal_handler( - signum: libc::c_int, - cb: fn(libc::c_int, *const libc::siginfo_t, *const libc::c_void) -> (), -) -> Result<(), Error> { - //let cb: fn(libc::c_int, *const libc::siginfo_t, *const libc::c_void) -> () = handler_sigaction; - // Safe because it creates a valid value: - let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; - let sa_sigaction: libc::sighandler_t = cb as *const libc::c_void as _; - let act = libc::sigaction { - sa_sigaction, - sa_mask: mask, - sa_flags: 0, - sa_restorer: None, - }; - let (ec, msg) = unsafe { - let ec = libc::sigaction(signum, &act, std::ptr::null_mut()); - let errno = *libc::__errno_location(); - (ec, CStr::from_ptr(libc::strerror(errno))) - }; - if ec != 0 { - // Not valid to print here, but we will panic anyways after that. - eprintln!("error: {:?}", msg); - panic!(); - } - Ok(()) -} - -pub fn unset_signal_handler(signum: libc::c_int) -> Result<(), Error> { - // Safe because it creates a valid value: - let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; - let act = libc::sigaction { - sa_sigaction: libc::SIG_DFL, - sa_mask: mask, - sa_flags: 0, - sa_restorer: None, - }; - let (ec, msg) = unsafe { - let ec = libc::sigaction(signum, &act, std::ptr::null_mut()); - let errno = *libc::__errno_location(); - (ec, CStr::from_ptr(libc::strerror(errno))) - }; - if ec != 0 { - // Not valid to print here, but we will panic anyways after that. - eprintln!("error: {:?}", msg); - panic!(); - } - Ok(()) -} - -pub fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { - use std::mem::size_of; - use std::os::unix::prelude::AsRawFd; - let fd = conn.as_raw_fd(); - unsafe { - type N = libc::c_int; - let n: N = rcvbuf as _; - let ec = libc::setsockopt( - fd, - libc::SOL_SOCKET, - libc::SO_RCVBUF, - &n as *const N as _, - size_of::() as _, - ); - if ec != 0 { - error!("ec {ec}"); - if ec != 0 { - return Err(Error::with_msg_no_trace(format!("can not set socket option"))); - } - } - } - unsafe { - type N = libc::c_int; - let mut n: N = -1; - let mut l = size_of::() as libc::socklen_t; - let ec = libc::getsockopt( - fd, - libc::SOL_SOCKET, - libc::SO_RCVBUF, - &mut n as *mut N as _, - &mut l as _, - ); - if ec != 0 { - let errno = *libc::__errno_location(); - let es = CStr::from_ptr(libc::strerror(errno)); - error!("can not query socket option ec {ec} errno {errno} es {es:?}"); - } else { - if (n as u32) < rcvbuf * 5 / 6 { - warn!("SO_RCVBUF {n} smaller than requested {rcvbuf}"); - } else { - info!("SO_RCVBUF {n}"); - } - } - } - Ok(()) -} diff --git a/scywr/src/fut.rs b/scywr/src/fut.rs index 963f5bd..2fcbffe 100644 --- a/scywr/src/fut.rs +++ b/scywr/src/fut.rs @@ -19,7 +19,12 @@ impl<'a> ScyQueryFut<'a> { where V: ValueList + Send + 'static, { - todo!("ScyQueryFut"); + let _ = scy; + let _ = query; + let _ = values; + if true { + todo!("ScyQueryFut") + }; //let fut = scy.execute(query, values); let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); Self { fut: Box::pin(fut) } diff --git a/scywr/src/futinsertloop.rs b/scywr/src/futinsertloop.rs index 108a513..bd81921 100644 --- a/scywr/src/futinsertloop.rs +++ b/scywr/src/futinsertloop.rs @@ -25,6 +25,8 @@ impl<'a> InsertLoopFut<'a> { where V: ValueList + Send + Sync + 'static, { + let _ = scy; + let _ = query; let mut values = values; if skip_insert { values.clear(); @@ -34,8 +36,10 @@ impl<'a> InsertLoopFut<'a> { // Or is it acceptable to generate all insert futures right here and poll them later? let futs: Vec<_> = values .into_iter() - .map(|vs| { - todo!("InsertLoopFut"); + .map(|_vs| { + if true { + todo!("InsertLoopFut") + }; //let fut = scy.execute(query, vs); let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); Box::pin(fut) as _ diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index ac0e5f5..881c44d 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -253,6 +253,10 @@ impl CommonInsertItemQueueSender { pub fn is_full(&self) -> bool { self.sender.is_full() } + + pub fn inner(&self) -> &async_channel::Sender { + &self.sender + } } pub struct CommonInsertItemQueue { diff --git a/slidebuf/Cargo.toml b/slidebuf/Cargo.toml new file mode 100644 index 0000000..ab0cc45 --- /dev/null +++ b/slidebuf/Cargo.toml @@ -0,0 +1,5 @@ +[package] +name = "slidebuf" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" diff --git a/slidebuf/src/lib.rs b/slidebuf/src/lib.rs new file mode 100644 index 0000000..aa7ccfe --- /dev/null +++ b/slidebuf/src/lib.rs @@ -0,0 +1,4 @@ +mod slidebuf; + +pub use slidebuf::Error; +pub use slidebuf::SlideBuf; diff --git a/slidebuf/src/slidebuf.rs b/slidebuf/src/slidebuf.rs new file mode 100644 index 0000000..e595394 --- /dev/null +++ b/slidebuf/src/slidebuf.rs @@ -0,0 +1,441 @@ +use std::fmt; + +#[derive(Debug)] +pub enum Error { + NotEnoughBytes, + NotEnoughSpace(usize, usize, usize), + TryFromSliceError, +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{self:?}") + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + None + } +} + +impl From for Error { + fn from(_: std::array::TryFromSliceError) -> Self { + Self::TryFromSliceError + } +} + +pub struct SlideBuf { + buf: Vec, + wp: usize, + rp: usize, +} + +macro_rules! check_invariants { + ($self:expr) => { + //$self.check_invariants() + }; +} + +impl SlideBuf { + pub fn new(cap: usize) -> Self { + Self { + buf: vec![0; cap], + wp: 0, + rp: 0, + } + } + + pub fn state(&self) -> (usize, usize) { + (self.rp, self.wp) + } + + pub fn len(&self) -> usize { + check_invariants!(self); + self.wp - self.rp + } + + #[inline(always)] + pub fn cap(&self) -> usize { + check_invariants!(self); + self.buf.len() + } + + pub fn wcap(&self) -> usize { + check_invariants!(self); + self.buf.len() - self.wp + } + + pub fn data(&self) -> &[u8] { + check_invariants!(self); + &self.buf[self.rp..self.wp] + } + + pub fn data_mut(&mut self) -> &mut [u8] { + check_invariants!(self); + &mut self.buf[self.rp..self.wp] + } + + pub fn reset(&mut self) { + self.rp = 0; + self.wp = 0; + } + + pub fn adv(&mut self, x: usize) -> Result<(), Error> { + check_invariants!(self); + if self.len() < x { + return Err(Error::NotEnoughBytes); + } else { + self.rp += x; + Ok(()) + } + } + + pub fn wadv(&mut self, x: usize) -> Result<(), Error> { + check_invariants!(self); + if self.wcap() < x { + self.rewind(); + } + if self.wcap() < x { + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), x)); + } else { + self.wp += x; + Ok(()) + } + } + + pub fn rp(&self) -> usize { + self.rp + } + + pub fn set_rp(&mut self, rp: usize) -> Result<(), Error> { + check_invariants!(self); + if rp > self.wp { + Err(Error::NotEnoughBytes) + } else { + self.rp = rp; + Ok(()) + } + } + + pub fn rewind_rp(&mut self, n: usize) -> Result<(), Error> { + check_invariants!(self); + if self.rp < n { + Err(Error::NotEnoughBytes) + } else { + self.rp -= n; + Ok(()) + } + } + + pub fn read_u8(&mut self) -> Result { + check_invariants!(self); + type T = u8; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = self.buf[self.rp]; + self.rp += TS; + Ok(val) + } + } + + pub fn read_u16_be(&mut self) -> Result { + check_invariants!(self); + type T = u16; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_u32_be(&mut self) -> Result { + check_invariants!(self); + type T = u32; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_u64_be(&mut self) -> Result { + check_invariants!(self); + type T = u64; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_i32_be(&mut self) -> Result { + check_invariants!(self); + type T = i32; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_i64_be(&mut self) -> Result { + check_invariants!(self); + type T = i64; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_f32_be(&mut self) -> Result { + check_invariants!(self); + type T = f32; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_f64_be(&mut self) -> Result { + check_invariants!(self); + type T = f64; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> { + check_invariants!(self); + if self.len() < n { + return Err(Error::NotEnoughBytes); + } else { + let val = self.buf[self.rp..self.rp + n].as_ref(); + self.rp += n; + Ok(val) + } + } + + /*pub fn read_buf_for_fill(&mut self, need_min: usize) -> ReadBuf { + check_invariants!(self); + self.rewind_if_needed(need_min); + let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); + read_buf + }*/ + + // TODO issue is that this return exactly the size that was asked for, + // but most of time, we want to first get some scratch space, and later + // advance the write pointer. + pub fn ___write_buf___(&mut self, n: usize) -> Result<&mut [u8], Error> { + check_invariants!(self); + self.rewind_if_needed(n); + if self.wcap() < n { + self.rewind(); + } + if self.wcap() < n { + Err(Error::NotEnoughSpace(self.cap(), self.wcap(), n)) + } else { + let ret = &mut self.buf[self.wp..self.wp + n]; + self.wp += n; + Ok(ret) + } + } + + #[inline(always)] + pub fn rewind(&mut self) { + self.buf.copy_within(self.rp..self.wp, 0); + self.wp -= self.rp; + self.rp = 0; + } + + #[inline(always)] + pub fn rewind_if_needed(&mut self, need_min: usize) { + check_invariants!(self); + if self.rp != 0 && self.rp == self.wp { + self.rp = 0; + self.wp = 0; + } else if self.cap() < self.rp + need_min { + self.rewind(); + } + } + + pub fn available_writable_area(&mut self, need_min: usize) -> Result<&mut [u8], Error> { + check_invariants!(self); + self.rewind_if_needed(need_min); + if self.wcap() < need_min { + self.rewind(); + } + if self.wcap() < need_min { + Err(Error::NotEnoughSpace(self.cap(), self.wcap(), need_min)) + } else { + let ret = &mut self.buf[self.wp..]; + Ok(ret) + } + } + + pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> { + check_invariants!(self); + self.rewind_if_needed(buf.len()); + if self.wcap() < buf.len() { + self.rewind(); + } + if self.wcap() < buf.len() { + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), buf.len())); + } else { + self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf); + self.wp += buf.len(); + Ok(()) + } + } + + pub fn put_u8(&mut self, v: u8) -> Result<(), Error> { + check_invariants!(self); + type T = u8; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } + if self.wcap() < TS { + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_u16_be(&mut self, v: u16) -> Result<(), Error> { + check_invariants!(self); + type T = u16; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } + if self.wcap() < TS { + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_u32_be(&mut self, v: u32) -> Result<(), Error> { + check_invariants!(self); + type T = u32; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } + if self.wcap() < TS { + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_u64_be(&mut self, v: u64) -> Result<(), Error> { + check_invariants!(self); + type T = u64; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } + if self.wcap() < TS { + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_f32_be(&mut self, v: f32) -> Result<(), Error> { + check_invariants!(self); + type T = f32; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } + if self.wcap() < TS { + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_f64_be(&mut self, v: f64) -> Result<(), Error> { + check_invariants!(self); + type T = f64; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } + if self.wcap() < TS { + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + #[allow(unused)] + fn check_invariants(&self) { + if self.wp > self.buf.len() { + eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); + std::process::exit(87); + } + if self.rp > self.wp { + eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); + std::process::exit(87); + } + } +} + +impl fmt::Debug for SlideBuf { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("SlideBuf") + .field("cap", &self.cap()) + .field("wp", &self.wp) + .field("rp", &self.rp) + .finish() + } +}