diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 6069c7e..f6ab8bb 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -142,7 +142,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "336d835910fab747186c56586562cb46f42809c2843ef3a84f47509009522838" dependencies = [ "concurrent-queue", - "event-listener 3.0.0", + "event-listener 3.0.1", "event-listener-strategy", "futures-core", "pin-project-lite", @@ -519,9 +519,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fbc60abd742b35f2492f808e1abbb83d45f72db402e14c55057edc9c7b1e9e4" +checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" dependencies = [ "libc", ] @@ -882,6 +882,7 @@ version = "0.0.5" dependencies = [ "anyhow", "async-channel 1.9.0", + "async-channel 2.0.0", "backtrace", "chrono", "http", @@ -913,9 +914,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "event-listener" -version = "3.0.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29e56284f00d94c1bc7fd3c77027b4623c88c1f53d8d2394c6199f2921dea325" +checksum = "01cec0252c2afff729ee6f00e903d479fba81784c8e2bd77447673471fdfaea1" dependencies = [ "concurrent-queue", "parking", @@ -928,7 +929,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d96b852f1345da36d551b9473fa1e2b1eb5c5195585c6c018118bc92a8d91160" dependencies = [ - "event-listener 3.0.0", + "event-listener 3.0.1", "pin-project-lite", ] @@ -998,9 +999,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" dependencies = [ "futures-channel", "futures-core", @@ -1013,9 +1014,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -1023,15 +1024,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" dependencies = [ "futures-core", "futures-task", @@ -1040,15 +1041,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", @@ -1057,21 +1058,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -1420,9 +1421,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.2", @@ -1446,6 +1447,7 @@ dependencies = [ "log 0.0.1", "md-5", "netpod", + "pin-project", "scywr", "serde", "serde_json", @@ -1968,7 +1970,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.4.1", + "redox_syscall", "smallvec", "windows-targets", ] @@ -2297,15 +2299,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "redox_syscall" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -2460,9 +2453,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.20" +version = "0.38.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67ce50cb2e16c2903e30d1cbccfd8387a74b9d4c938b6a4c5ec6cc7556f7a8a0" +checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" dependencies = [ "bitflags 2.4.1", "errno", @@ -2666,9 +2659,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.107" +version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ "itoa", "ryu", @@ -2703,7 +2696,7 @@ version = "0.9.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c" dependencies = [ - "indexmap 2.0.2", + "indexmap 2.1.0", "itoa", "ryu", "serde", @@ -2787,6 +2780,11 @@ checksum = "826167069c09b99d56f31e9ae5c99049e932a98c9dc2dac47645b08dbbf76ba7" [[package]] name = "slidebuf" version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c18bf8dc5ecf7df5a3f2a261b052fd69ca925f1f75e2663669aa873444e7522b" +dependencies = [ + "bytes", +] [[package]] name = "smallvec" @@ -2996,13 +2994,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.8.0" +version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", "fastrand", - "redox_syscall 0.3.5", + "redox_syscall", "rustix", "windows-sys 0.48.0", ] @@ -3213,7 +3211,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.2", + "indexmap 2.1.0", "toml_datetime", "winnow", ] @@ -3534,9 +3532,9 @@ checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] name = "wasmer" -version = "4.2.2" +version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e626f958755a90a6552b9528f59b58a62ae288e6c17fcf40e99495bc33c60f0" +checksum = "50cb1ae2956aac1fbbcf334c543c1143cdf7d5b0a5fb6c3d23a17bf37dd1f47b" dependencies = [ "bytes", "cfg-if", @@ -3562,9 +3560,9 @@ dependencies = [ [[package]] name = "wasmer-compiler" -version = "4.2.2" +version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "848e1922694cf97f4df680a0534c9d72c836378b5eb2313c1708fe1a75b40044" +checksum = "12fd9aeef339095798d1e04957d5657d97490b1112f145cbf08b98f6393b4a0a" dependencies = [ "backtrace", "bytes", @@ -3589,9 +3587,9 @@ dependencies = [ [[package]] name = "wasmer-compiler-cranelift" -version = "4.2.2" +version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d96bce6fad15a954edcfc2749b59e47ea7de524b6ef3df392035636491a40b4" +checksum = "344f5f1186c122756232fe7f156cc8d2e7bf333d5a658e81e25efa3415c26d07" dependencies = [ "cranelift-codegen", "cranelift-entity", @@ -3608,9 +3606,9 @@ dependencies = [ [[package]] name = "wasmer-derive" -version = "4.2.2" +version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f08f80d166a9279671b7af7a09409c28ede2e0b4e3acabbf0e3cb22c8038ba7" +checksum = "2ac8c1f2dc0ed3c7412a5546e468365184a461f8ce7dfe2a707b621724339f91" dependencies = [ "proc-macro-error", "proc-macro2", @@ -3620,9 +3618,9 @@ dependencies = [ [[package]] name = "wasmer-types" -version = "4.2.2" +version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae2c892882f0b416783fb4310e5697f5c30587f6f9555f9d4f2be85ab39d5d3d" +checksum = "5a57ecbf218c0a9348d4dfbdac0f9d42d9201ae276dffb13e61ea4ff939ecce7" dependencies = [ "bytecheck", "enum-iterator", @@ -3636,9 +3634,9 @@ dependencies = [ [[package]] name = "wasmer-vm" -version = "4.2.2" +version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c0a9a57b627fb39e5a491058d4365f099bc9b140031c000fded24a3306d9480" +checksum = "60c3513477bc0097250f6e34a640e2a903bb0ee57e6bb518c427f72c06ac7728" dependencies = [ "backtrace", "cc", @@ -3834,9 +3832,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.17" +version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" +checksum = "176b6138793677221d420fd2f0aeeced263f197688b36484660da767bca2fa32" dependencies = [ "memchr", ] diff --git a/ingest-bsread/Cargo.toml b/ingest-bsread/Cargo.toml index 2f618b6..178eb1d 100644 --- a/ingest-bsread/Cargo.toml +++ b/ingest-bsread/Cargo.toml @@ -13,12 +13,12 @@ bytes = "1.4.0" md-5 = "0.10.5" hex = "0.4.3" pin-project = "1" +slidebuf = "0.0.1" log = { path = "../log" } series = { path = "../series" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } -slidebuf = { path = "../slidebuf" } ingest-linux = { path = "../ingest-linux" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 7ded646..804b7aa 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -4,6 +4,9 @@ version = "0.0.3" authors = ["Dominik Werder "] edition = "2021" +[lib] +doctest = false + [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -29,13 +32,13 @@ humantime-serde = "1.1.1" pin-project = "1" lazy_static = "1" libc = "0.2" +slidebuf = "0.0.1" log = { path = "../log" } series = { path = "../series" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } ingest-linux = { path = "../ingest-linux" } -slidebuf = { path = "../slidebuf" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } items_0 = { path = "../../daqbuffer/crates/items_0" } diff --git a/netfetch/src/ca/connset_input_merge.rs b/netfetch/src/ca/connset_input_merge.rs index 92878f9..3886aa8 100644 --- a/netfetch/src/ca/connset_input_merge.rs +++ b/netfetch/src/ca/connset_input_merge.rs @@ -47,13 +47,16 @@ impl Stream for InputMerge { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let selfp = self.project(); let ret = { - if let Some(inp) = selfp.inp3.as_pin_mut() { + let mut selfp = self.as_mut().project(); + if let Some(inp) = selfp.inp3.as_mut().as_pin_mut() { match inp.poll_next(cx) { Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())), Ready(None) => { - // self.inp3 = None; + unsafe { + // TODO what guarantees that I can drop the content here like this? + self.as_mut().get_unchecked_mut().inp3 = None; + } None } Pending => None, @@ -65,11 +68,15 @@ impl Stream for InputMerge { let ret = if let Some(x) = ret { Some(x) } else { - if let Some(inp) = selfp.inp2.as_pin_mut() { + let mut selfp = self.as_mut().project(); + if let Some(inp) = selfp.inp2.as_mut().as_pin_mut() { match inp.poll_next(cx) { Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())), Ready(None) => { - // self.inp2 = None; + unsafe { + // TODO what guarantees that I can drop the content here like this? + self.as_mut().get_unchecked_mut().inp2 = None; + } None } Pending => None, @@ -81,11 +88,15 @@ impl Stream for InputMerge { if let Some(x) = ret { Ready(Some(x)) } else { - if let Some(inp) = selfp.inp1.as_pin_mut() { + let mut selfp = self.as_mut().project(); + if let Some(inp) = selfp.inp1.as_mut().as_pin_mut() { match inp.poll_next(cx) { Ready(Some(x)) => Ready(Some(x)), Ready(None) => { - // self.inp1 = None; + unsafe { + // TODO what guarantees that I can drop the content here like this? + self.as_mut().get_unchecked_mut().inp1 = None; + } Ready(None) } Pending => Pending, diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 95b8730..435a6ab 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -217,7 +217,7 @@ async fn finder_worker_single( items.extend(to_add.into_iter()); let items = items; for e in &items { - if crate::ca::connset::trigger.contains(&e.channel.as_str()) { + if true || crate::ca::connset::trigger.contains(&e.channel.as_str()) { debug!("found in database: {e:?}"); } } @@ -262,6 +262,7 @@ async fn finder_network_if_not_found( let mut res = VecDeque::new(); let mut net = VecDeque::new(); for e in item { + trace!("finder_network_if_not_found sees {e:?}"); if e.addr.is_none() { net.push_back(e.channel); } else { diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index f165954..2a4de75 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -402,13 +402,14 @@ impl CaMsgTy { fn payload_len(&self) -> usize { use CaMsgTy::*; + trace!("payload_len for {self:?}"); match self { Version => 0, VersionRes(_) => 0, Error(x) => (16 + x.msg.len() + 1 + 7) / 8 * 8, ClientName => 0x10, ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8, - HostName(_) => 0x18, + HostName(x) => (x.len() + 1 + 7) / 8 * 8, Search(x) => (x.channel.len() + 1 + 7) / 8 * 8, SearchRes(_) => 8, CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8, diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 9171bbc..4cfe5d3 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -22,7 +22,7 @@ pub struct CaIngestOpts { whitelist: Option, blacklist: Option, max_simul: Option, - #[serde(with = "humantime_serde")] + #[serde(default, with = "humantime_serde")] timeout: Option, postgresql: Database, scylla: ScyllaConfig, @@ -35,13 +35,13 @@ pub struct CaIngestOpts { store_workers_rate: Option, insert_frac: Option, use_rate_limit_queue: Option, - #[serde(with = "humantime_serde")] + #[serde(default, with = "humantime_serde")] ttl_index: Option, - #[serde(with = "humantime_serde")] + #[serde(default, with = "humantime_serde")] ttl_d0: Option, - #[serde(with = "humantime_serde")] + #[serde(default, with = "humantime_serde")] ttl_d1: Option, - #[serde(with = "humantime_serde")] + #[serde(default, with = "humantime_serde")] ttl_binned: Option, pub test_bsread_addr: Option, } @@ -138,8 +138,7 @@ impl CaIngestOpts { fn parse_config_minimal() { let conf = r###" backend: scylla -ttl_d1: 10m 3s -ttl_binned: 70d +ttl_d1: 10m 3s 45ms api_bind: "0.0.0.0:3011" channels: /some/path/file.txt search: @@ -158,14 +157,13 @@ scylla: keyspace: ks1 "###; let res: Result = serde_yaml::from_slice(conf.as_bytes()); - assert_eq!(res.is_ok(), true); let conf = res.unwrap(); assert_eq!(conf.channels, PathBuf::from("/some/path/file.txt")); assert_eq!(conf.api_bind, Some("0.0.0.0:3011".to_string())); assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string())); assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); - assert_eq!(conf.ttl_binned, Some(Duration::from_secs(60 * 60 * 70))); + assert_eq!(conf.ttl_binned, None); } #[test] diff --git a/slidebuf/Cargo.toml b/slidebuf/Cargo.toml deleted file mode 100644 index ab0cc45..0000000 --- a/slidebuf/Cargo.toml +++ /dev/null @@ -1,5 +0,0 @@ -[package] -name = "slidebuf" -version = "0.0.1" -authors = ["Dominik Werder "] -edition = "2021" diff --git a/slidebuf/src/lib.rs b/slidebuf/src/lib.rs deleted file mode 100644 index aa7ccfe..0000000 --- a/slidebuf/src/lib.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod slidebuf; - -pub use slidebuf::Error; -pub use slidebuf::SlideBuf; diff --git a/slidebuf/src/slidebuf.rs b/slidebuf/src/slidebuf.rs deleted file mode 100644 index e595394..0000000 --- a/slidebuf/src/slidebuf.rs +++ /dev/null @@ -1,441 +0,0 @@ -use std::fmt; - -#[derive(Debug)] -pub enum Error { - NotEnoughBytes, - NotEnoughSpace(usize, usize, usize), - TryFromSliceError, -} - -impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{self:?}") - } -} - -impl std::error::Error for Error { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - None - } -} - -impl From for Error { - fn from(_: std::array::TryFromSliceError) -> Self { - Self::TryFromSliceError - } -} - -pub struct SlideBuf { - buf: Vec, - wp: usize, - rp: usize, -} - -macro_rules! check_invariants { - ($self:expr) => { - //$self.check_invariants() - }; -} - -impl SlideBuf { - pub fn new(cap: usize) -> Self { - Self { - buf: vec![0; cap], - wp: 0, - rp: 0, - } - } - - pub fn state(&self) -> (usize, usize) { - (self.rp, self.wp) - } - - pub fn len(&self) -> usize { - check_invariants!(self); - self.wp - self.rp - } - - #[inline(always)] - pub fn cap(&self) -> usize { - check_invariants!(self); - self.buf.len() - } - - pub fn wcap(&self) -> usize { - check_invariants!(self); - self.buf.len() - self.wp - } - - pub fn data(&self) -> &[u8] { - check_invariants!(self); - &self.buf[self.rp..self.wp] - } - - pub fn data_mut(&mut self) -> &mut [u8] { - check_invariants!(self); - &mut self.buf[self.rp..self.wp] - } - - pub fn reset(&mut self) { - self.rp = 0; - self.wp = 0; - } - - pub fn adv(&mut self, x: usize) -> Result<(), Error> { - check_invariants!(self); - if self.len() < x { - return Err(Error::NotEnoughBytes); - } else { - self.rp += x; - Ok(()) - } - } - - pub fn wadv(&mut self, x: usize) -> Result<(), Error> { - check_invariants!(self); - if self.wcap() < x { - self.rewind(); - } - if self.wcap() < x { - return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), x)); - } else { - self.wp += x; - Ok(()) - } - } - - pub fn rp(&self) -> usize { - self.rp - } - - pub fn set_rp(&mut self, rp: usize) -> Result<(), Error> { - check_invariants!(self); - if rp > self.wp { - Err(Error::NotEnoughBytes) - } else { - self.rp = rp; - Ok(()) - } - } - - pub fn rewind_rp(&mut self, n: usize) -> Result<(), Error> { - check_invariants!(self); - if self.rp < n { - Err(Error::NotEnoughBytes) - } else { - self.rp -= n; - Ok(()) - } - } - - pub fn read_u8(&mut self) -> Result { - check_invariants!(self); - type T = u8; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::NotEnoughBytes); - } else { - let val = self.buf[self.rp]; - self.rp += TS; - Ok(val) - } - } - - pub fn read_u16_be(&mut self) -> Result { - check_invariants!(self); - type T = u16; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::NotEnoughBytes); - } else { - let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); - self.rp += TS; - Ok(val) - } - } - - pub fn read_u32_be(&mut self) -> Result { - check_invariants!(self); - type T = u32; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::NotEnoughBytes); - } else { - let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); - self.rp += TS; - Ok(val) - } - } - - pub fn read_u64_be(&mut self) -> Result { - check_invariants!(self); - type T = u64; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::NotEnoughBytes); - } else { - let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); - self.rp += TS; - Ok(val) - } - } - - pub fn read_i32_be(&mut self) -> Result { - check_invariants!(self); - type T = i32; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::NotEnoughBytes); - } else { - let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); - self.rp += TS; - Ok(val) - } - } - - pub fn read_i64_be(&mut self) -> Result { - check_invariants!(self); - type T = i64; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::NotEnoughBytes); - } else { - let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); - self.rp += TS; - Ok(val) - } - } - - pub fn read_f32_be(&mut self) -> Result { - check_invariants!(self); - type T = f32; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::NotEnoughBytes); - } else { - let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); - self.rp += TS; - Ok(val) - } - } - - pub fn read_f64_be(&mut self) -> Result { - check_invariants!(self); - type T = f64; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::NotEnoughBytes); - } else { - let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); - self.rp += TS; - Ok(val) - } - } - - pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> { - check_invariants!(self); - if self.len() < n { - return Err(Error::NotEnoughBytes); - } else { - let val = self.buf[self.rp..self.rp + n].as_ref(); - self.rp += n; - Ok(val) - } - } - - /*pub fn read_buf_for_fill(&mut self, need_min: usize) -> ReadBuf { - check_invariants!(self); - self.rewind_if_needed(need_min); - let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); - read_buf - }*/ - - // TODO issue is that this return exactly the size that was asked for, - // but most of time, we want to first get some scratch space, and later - // advance the write pointer. - pub fn ___write_buf___(&mut self, n: usize) -> Result<&mut [u8], Error> { - check_invariants!(self); - self.rewind_if_needed(n); - if self.wcap() < n { - self.rewind(); - } - if self.wcap() < n { - Err(Error::NotEnoughSpace(self.cap(), self.wcap(), n)) - } else { - let ret = &mut self.buf[self.wp..self.wp + n]; - self.wp += n; - Ok(ret) - } - } - - #[inline(always)] - pub fn rewind(&mut self) { - self.buf.copy_within(self.rp..self.wp, 0); - self.wp -= self.rp; - self.rp = 0; - } - - #[inline(always)] - pub fn rewind_if_needed(&mut self, need_min: usize) { - check_invariants!(self); - if self.rp != 0 && self.rp == self.wp { - self.rp = 0; - self.wp = 0; - } else if self.cap() < self.rp + need_min { - self.rewind(); - } - } - - pub fn available_writable_area(&mut self, need_min: usize) -> Result<&mut [u8], Error> { - check_invariants!(self); - self.rewind_if_needed(need_min); - if self.wcap() < need_min { - self.rewind(); - } - if self.wcap() < need_min { - Err(Error::NotEnoughSpace(self.cap(), self.wcap(), need_min)) - } else { - let ret = &mut self.buf[self.wp..]; - Ok(ret) - } - } - - pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> { - check_invariants!(self); - self.rewind_if_needed(buf.len()); - if self.wcap() < buf.len() { - self.rewind(); - } - if self.wcap() < buf.len() { - return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), buf.len())); - } else { - self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf); - self.wp += buf.len(); - Ok(()) - } - } - - pub fn put_u8(&mut self, v: u8) -> Result<(), Error> { - check_invariants!(self); - type T = u8; - const TS: usize = std::mem::size_of::(); - self.rewind_if_needed(TS); - if self.wcap() < TS { - self.rewind(); - } - if self.wcap() < TS { - return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); - } else { - self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); - self.wp += TS; - Ok(()) - } - } - - pub fn put_u16_be(&mut self, v: u16) -> Result<(), Error> { - check_invariants!(self); - type T = u16; - const TS: usize = std::mem::size_of::(); - self.rewind_if_needed(TS); - if self.wcap() < TS { - self.rewind(); - } - if self.wcap() < TS { - return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); - } else { - self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); - self.wp += TS; - Ok(()) - } - } - - pub fn put_u32_be(&mut self, v: u32) -> Result<(), Error> { - check_invariants!(self); - type T = u32; - const TS: usize = std::mem::size_of::(); - self.rewind_if_needed(TS); - if self.wcap() < TS { - self.rewind(); - } - if self.wcap() < TS { - return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); - } else { - self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); - self.wp += TS; - Ok(()) - } - } - - pub fn put_u64_be(&mut self, v: u64) -> Result<(), Error> { - check_invariants!(self); - type T = u64; - const TS: usize = std::mem::size_of::(); - self.rewind_if_needed(TS); - if self.wcap() < TS { - self.rewind(); - } - if self.wcap() < TS { - return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); - } else { - self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); - self.wp += TS; - Ok(()) - } - } - - pub fn put_f32_be(&mut self, v: f32) -> Result<(), Error> { - check_invariants!(self); - type T = f32; - const TS: usize = std::mem::size_of::(); - self.rewind_if_needed(TS); - if self.wcap() < TS { - self.rewind(); - } - if self.wcap() < TS { - return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); - } else { - self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); - self.wp += TS; - Ok(()) - } - } - - pub fn put_f64_be(&mut self, v: f64) -> Result<(), Error> { - check_invariants!(self); - type T = f64; - const TS: usize = std::mem::size_of::(); - self.rewind_if_needed(TS); - if self.wcap() < TS { - self.rewind(); - } - if self.wcap() < TS { - return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); - } else { - self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); - self.wp += TS; - Ok(()) - } - } - - #[allow(unused)] - fn check_invariants(&self) { - if self.wp > self.buf.len() { - eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); - std::process::exit(87); - } - if self.rp > self.wp { - eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); - std::process::exit(87); - } - } -} - -impl fmt::Debug for SlideBuf { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("SlideBuf") - .field("cap", &self.cap()) - .field("wp", &self.wp) - .field("rp", &self.rp) - .finish() - } -}