diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 0b4c0ba..e3c974f 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -93,21 +93,10 @@ async fn go() -> Result<(), Error> { #[test] fn simple_fetch() { use netpod::Nanos; - use netpod::{ - timeunits::*, ByteOrder, Channel, ChannelConfig, Cluster, Database, Node, NodeConfig, ScalarType, Shape, - }; + use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, ScalarType, Shape}; taskrun::run(async { + let _rh = daqbuffer::nodes::require_test_hosts_running()?; let t1 = chrono::Utc::now(); - let node = Node { - host: "localhost".into(), - listen: "0.0.0.0".into(), - port: 8360, - port_raw: 8360 + 100, - data_base_path: err::todoval(), - ksprefix: "daq_swissfel".into(), - split: 0, - backend: "testbackend".into(), - }; let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { @@ -118,7 +107,7 @@ fn simple_fetch() { time_bin_size: Nanos { ns: DAY }, array: true, scalar_type: ScalarType::F64, - shape: Shape::Wave(err::todoval()), + shape: Shape::Wave(42), byte_order: ByteOrder::big_endian(), compression: true, }, @@ -126,23 +115,7 @@ fn simple_fetch() { tb_file_count: 1, buffer_size: 1024 * 8, }; - let cluster = Cluster { - nodes: vec![node], - database: Database { - name: "daqbuffer".into(), - host: "localhost".into(), - user: "daqbuffer".into(), - pass: "daqbuffer".into(), - }, - }; - let node_config = NodeConfig { - name: format!("{}:{}", cluster.nodes[0].host, cluster.nodes[0].port), - cluster, - }; - let node_config: Result = node_config.into(); - let node_config = node_config?; let query_string = serde_json::to_string(&query).unwrap(); - let host = tokio::spawn(httpret::host(node_config.clone())); let req = hyper::Request::builder() .method(http::Method::POST) .uri("http://localhost:8360/api/4/parsed_raw") @@ -176,8 +149,6 @@ fn simple_fetch() { ntot / 1024 / 1024, throughput ); - drop(host); - //Err::<(), _>(format!("test error").into()) Ok(()) }) .unwrap(); diff --git a/daqbuffer/src/lib.rs b/daqbuffer/src/lib.rs index 537908c..5cdb0dd 100644 --- a/daqbuffer/src/lib.rs +++ b/daqbuffer/src/lib.rs @@ -6,6 +6,7 @@ use tracing::{debug, error, info, trace, warn}; pub mod cli; pub mod client; +pub mod nodes; #[cfg(test)] pub mod test; diff --git a/daqbuffer/src/nodes.rs b/daqbuffer/src/nodes.rs new file mode 100644 index 0000000..2535a90 --- /dev/null +++ b/daqbuffer/src/nodes.rs @@ -0,0 +1,57 @@ +use crate::spawn_test_hosts; +use err::Error; +use netpod::{Cluster, Database, Node}; +use std::sync::{Arc, Mutex}; +use tokio::task::JoinHandle; + +pub struct RunningHosts { + pub cluster: Cluster, + _jhs: Vec>>, +} + +lazy_static::lazy_static! { + static ref HOSTS_RUNNING: Mutex>> = Mutex::new(None); +} + +pub fn require_test_hosts_running() -> Result, Error> { + let mut g = HOSTS_RUNNING.lock().unwrap(); + match g.as_ref() { + None => { + let cluster = test_cluster(); + let jhs = spawn_test_hosts(cluster.clone()); + let ret = RunningHosts { + cluster: cluster.clone(), + _jhs: jhs, + }; + let a = Arc::new(ret); + *g = Some(a.clone()); + Ok(a) + } + Some(gg) => Ok(gg.clone()), + } +} + +fn test_cluster() -> Cluster { + let nodes = (0..3) + .into_iter() + .map(|id| Node { + host: "localhost".into(), + listen: "0.0.0.0".into(), + port: 8360 + id as u16, + port_raw: 8360 + id as u16 + 100, + data_base_path: format!("../tmpdata/node{:02}", id).into(), + ksprefix: "ks".into(), + split: id, + backend: "testbackend".into(), + }) + .collect(); + Cluster { + nodes: nodes, + database: Database { + name: "daqbuffer".into(), + host: "localhost".into(), + user: "daqbuffer".into(), + pass: "daqbuffer".into(), + }, + } +} diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index 7daf557..81f4552 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -1,4 +1,4 @@ -use crate::spawn_test_hosts; +use crate::nodes::require_test_hosts_running; use bytes::BytesMut; use chrono::{DateTime, Utc}; use disk::agg::streams::{StatsItem, StreamItem}; @@ -14,68 +14,14 @@ use futures_util::TryStreamExt; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{AggKind, Channel, Cluster, Database, HostPort, NanoRange, Node, PerfOpts}; +use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange, PerfOpts}; use serde::de::DeserializeOwned; use std::fmt::Debug; use std::future::ready; -use std::sync::{Arc, Mutex}; use tokio::io::AsyncRead; -use tokio::task::JoinHandle; pub mod json; -struct RunningHosts { - cluster: Cluster, - _jhs: Vec>>, -} - -lazy_static::lazy_static! { - static ref HOSTS_RUNNING: Mutex>> = Mutex::new(None); -} - -fn require_test_hosts_running() -> Result, Error> { - let mut g = HOSTS_RUNNING.lock().unwrap(); - match g.as_ref() { - None => { - let cluster = test_cluster(); - let jhs = spawn_test_hosts(cluster.clone()); - let ret = RunningHosts { - cluster: cluster.clone(), - _jhs: jhs, - }; - let a = Arc::new(ret); - *g = Some(a.clone()); - Ok(a) - } - Some(gg) => Ok(gg.clone()), - } -} - -fn test_cluster() -> Cluster { - let nodes = (0..3) - .into_iter() - .map(|id| Node { - host: "localhost".into(), - listen: "0.0.0.0".into(), - port: 8360 + id as u16, - port_raw: 8360 + id as u16 + 100, - data_base_path: format!("../tmpdata/node{:02}", id).into(), - ksprefix: "ks".into(), - split: id, - backend: "testbackend".into(), - }) - .collect(); - Cluster { - nodes: nodes, - database: Database { - name: "daqbuffer".into(), - host: "localhost".into(), - user: "daqbuffer".into(), - pass: "daqbuffer".into(), - }, - } -} - #[test] fn get_binned_binary() { taskrun::run(get_binned_binary_inner()).unwrap(); diff --git a/daqbuffer/src/test/json.rs b/daqbuffer/src/test/json.rs index 0e236c0..c3892d0 100644 --- a/daqbuffer/src/test/json.rs +++ b/daqbuffer/src/test/json.rs @@ -1,4 +1,4 @@ -use crate::test::require_test_hosts_running; +use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; use disk::binned::query::BinnedQuery; use err::Error; @@ -6,6 +6,7 @@ use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange}; +use std::time::Duration; #[test] fn get_binned_json_0() { @@ -15,22 +16,46 @@ fn get_binned_json_0() { async fn get_binned_json_0_inner() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; - get_binned_json_0_inner2( - "wave-f64-be-n21", + get_binned_json_common( + "scalar-i32-be", "1970-01-01T00:20:10.000Z", "1970-01-01T01:20:30.000Z", 10, cluster, + 13, + true, ) .await } -async fn get_binned_json_0_inner2( +#[test] +fn get_binned_json_1() { + taskrun::run(get_binned_json_1_inner()).unwrap(); +} + +async fn get_binned_json_1_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_binned_json_common( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:45.000Z", + 10, + cluster, + 13, + true, + ) + .await +} + +async fn get_binned_json_common( channel_name: &str, beg_date: &str, end_date: &str, bin_count: u32, cluster: &Cluster, + expect_bin_count: u32, + expect_finalised_range: bool, ) -> Result<(), Error> { let t1 = Utc::now(); let agg_kind = AggKind::DimXBins1; @@ -43,7 +68,8 @@ async fn get_binned_json_0_inner2( name: channel_name.into(), }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = BinnedQuery::new(channel, range, bin_count, agg_kind); + let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + query.set_timeout(Duration::from_millis(15000)); let url = query.url(&HostPort::from_node(node0)); info!("get_binned_json_0 get {}", url); let req = hyper::Request::builder() @@ -57,14 +83,38 @@ async fn get_binned_json_0_inner2( error!("client response {:?}", res); } let res = hyper::body::to_bytes(res.into_body()).await?; + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + info!("get_binned_json_0 DONE time {} ms", ms); let res = String::from_utf8(res.to_vec())?; let res: serde_json::Value = serde_json::from_str(res.as_str())?; info!( "result from endpoint: --------------\n{}\n--------------", serde_json::to_string_pretty(&res)? ); - let t2 = chrono::Utc::now(); - let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - info!("get_binned_json_0 DONE time {} ms", ms); + if expect_finalised_range { + if !res + .get("finalisedRange") + .ok_or(Error::with_msg("missing finalisedRange"))? + .as_bool() + .ok_or(Error::with_msg("key finalisedRange not bool"))? + { + return Err(Error::with_msg("expected finalisedRange")); + } + } else if res.get("finalisedRange").is_some() { + return Err(Error::with_msg("expect absent finalisedRange")); + } + if res.get("counts").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); + } + if res.get("mins").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); + } + if res.get("maxs").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); + } + if res.get("avgs").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); + } Ok(()) } diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index 7b63dc8..fdb1757 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -1,5 +1,6 @@ use crate::agg::binnedt::{TimeBinnableType, TimeBinnableTypeAggregator}; use crate::agg::streams::Appendable; +use crate::agg::{Fits, FitsInside}; use crate::binned::{ EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, @@ -83,9 +84,36 @@ impl RangeOverlapInfo for XBinnedScalarEvents { } } +impl FitsInside for XBinnedScalarEvents { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.tss.is_empty() { + Fits::Empty + } else { + let t1 = *self.tss.first().unwrap(); + let t2 = *self.tss.last().unwrap(); + if t2 < range.beg { + Fits::Lower + } else if t1 > range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + impl FilterFittingInside for XBinnedScalarEvents { - fn filter_fitting_inside(self, _fit_range: NanoRange) -> Option { - todo!() + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } } } diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 4262fbb..fac8ee2 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -16,7 +16,7 @@ pub enum StreamItem { Stats(StatsItem), } -pub trait Collector { +pub trait Collector: WithLen { type Input: Collectable; type Output: Serialize; fn ingest(&mut self, src: &Self::Input); diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 9897c88..862a63c 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -220,16 +220,28 @@ fn make_num_pipeline_entry( ) -> Result where PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, { match scalar_type { + ScalarType::U8 => match_end!(u8, byte_order, shape, query, ppp, node_config), ScalarType::U16 => match_end!(u16, byte_order, shape, query, ppp, node_config), + ScalarType::U32 => match_end!(u32, byte_order, shape, query, ppp, node_config), + ScalarType::U64 => match_end!(u64, byte_order, shape, query, ppp, node_config), + ScalarType::I8 => match_end!(i8, byte_order, shape, query, ppp, node_config), + ScalarType::I16 => match_end!(i16, byte_order, shape, query, ppp, node_config), ScalarType::I32 => match_end!(i32, byte_order, shape, query, ppp, node_config), + ScalarType::I64 => match_end!(i64, byte_order, shape, query, ppp, node_config), + ScalarType::F32 => match_end!(f32, byte_order, shape, query, ppp, node_config), ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config), - // TODO complete set - _ => todo!(), } } @@ -240,8 +252,15 @@ async fn make_num_pipeline( ) -> Result where PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, { if query.channel().backend != node_config.node.backend { @@ -306,11 +325,17 @@ where } } -struct CollectForJson {} +struct CollectForJson { + timeout: Duration, + abort_after_bin_count: u32, +} impl CollectForJson { - pub fn new() -> Self { - Self {} + pub fn new(timeout: Duration, abort_after_bin_count: u32) -> Self { + Self { + timeout, + abort_after_bin_count, + } } } @@ -322,11 +347,16 @@ pub struct JsonCollector { } impl JsonCollector { - pub fn new(inp: Pin>> + Send>>, bin_count_exp: u32) -> Self + pub fn new( + inp: Pin>> + Send>>, + bin_count_exp: u32, + timeout: Duration, + abort_after_bin_count: u32, + ) -> Self where NTY: NumOps + Serialize + 'static, { - let fut = collect_all(inp, bin_count_exp); + let fut = collect_all(inp, bin_count_exp, timeout, abort_after_bin_count); let fut = Box::pin(fut); Self { fut, done: false } } @@ -397,7 +427,7 @@ where inp: Pin>> + Send>>, bin_count_exp: u32, ) -> Pin> + Send>> { - let s = JsonCollector::new(inp, bin_count_exp); + let s = JsonCollector::new(inp, bin_count_exp, self.timeout, self.abort_after_bin_count); Box::pin(s) } } @@ -488,12 +518,17 @@ impl Serialize for IsoDateTime { } } -pub async fn collect_all(stream: S, bin_count_exp: u32) -> Result +pub async fn collect_all( + stream: S, + bin_count_exp: u32, + timeout: Duration, + abort_after_bin_count: u32, +) -> Result where S: Stream> + Unpin, T: Collectable, { - let deadline = tokio::time::Instant::now() + Duration::from_millis(1000); + let deadline = tokio::time::Instant::now() + timeout; let mut collector = ::new_collector(bin_count_exp); let mut i1 = 0; let mut stream = stream; @@ -501,11 +536,15 @@ where let item = if i1 == 0 { stream.next().await } else { - match tokio::time::timeout_at(deadline, stream.next()).await { - Ok(k) => k, - Err(_) => { - collector.set_timed_out(); - None + if abort_after_bin_count > 0 && collector.len() >= abort_after_bin_count as usize { + None + } else { + match tokio::time::timeout_at(deadline, stream.next()).await { + Ok(k) => k, + Err(_) => { + collector.set_timed_out(); + None + } } } }; @@ -542,7 +581,12 @@ pub async fn binned_json( node_config: &NodeConfigCached, query: &BinnedQuery, ) -> Result> + Send>>, Error> { - let pl = make_num_pipeline(query, CollectForJson::new(), node_config).await?; + let pl = make_num_pipeline( + query, + CollectForJson::new(query.timeout(), query.abort_after_bin_count()), + node_config, + ) + .await?; let ret = pl.stream.map(|item| { let fr = item.to_json_result()?; let buf = fr.to_json_bytes()?; @@ -933,6 +977,15 @@ impl MinMaxAvgBinsCollector { } } +impl WithLen for MinMaxAvgBinsCollector +where + NTY: NumOps + Serialize, +{ + fn len(&self) -> usize { + self.vals.ts1s.len() + } +} + impl Collector for MinMaxAvgBinsCollector where NTY: NumOps + Serialize, @@ -974,7 +1027,7 @@ where }; let ret = MinMaxAvgBinsCollectedResult:: { ts_bin_edges: tsa, - counts: vec![], + counts: self.vals.counts, mins: self.vals.mins, maxs: self.vals.maxs, avgs: self.vals.avgs, diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 911cb3a..7189ae0 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -3,6 +3,7 @@ use err::Error; use netpod::log::*; use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PreBinnedPatchCoord, ToNanos}; use std::collections::BTreeMap; +use std::time::Duration; #[derive(Clone, Debug)] pub struct PreBinnedQuery { @@ -171,6 +172,8 @@ pub struct BinnedQuery { cache_usage: CacheUsage, disk_stats_every: ByteSize, report_error: bool, + timeout: Duration, + abort_after_bin_count: u32, } impl BinnedQuery { @@ -183,6 +186,8 @@ impl BinnedQuery { cache_usage: CacheUsage::Use, disk_stats_every: ByteSize(1024 * 1024 * 4), report_error: false, + timeout: Duration::from_millis(2000), + abort_after_bin_count: 0, } } @@ -217,6 +222,17 @@ impl BinnedQuery { .map_or("false", |k| k) .parse() .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + timeout: params + .get("timeout") + .map_or("2000", |k| k) + .parse::() + .map(|k| Duration::from_millis(k)) + .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, + abort_after_bin_count: params + .get("abortAfterBinCount") + .map_or("0", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?, }; info!("BinnedQuery::from_request {:?}", ret); Ok(ret) @@ -250,6 +266,14 @@ impl BinnedQuery { self.report_error } + pub fn timeout(&self) -> Duration { + self.timeout + } + + pub fn abort_after_bin_count(&self) -> u32 { + self.abort_after_bin_count + } + pub fn set_cache_usage(&mut self, k: CacheUsage) { self.cache_usage = k; } @@ -258,12 +282,16 @@ impl BinnedQuery { self.disk_stats_every = k; } + pub fn set_timeout(&mut self, k: Duration) { + self.timeout = k; + } + // TODO the BinnedQuery itself should maybe already carry the full HostPort? // On the other hand, want to keep the flexibility for the fail over possibility.. pub fn url(&self, host: &HostPort) -> String { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; format!( - "http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}", + "http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}&timeout={}&abortAfterBinCount={}", host.host, host.port, self.cache_usage, @@ -273,6 +301,8 @@ impl BinnedQuery { Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), self.disk_stats_every.bytes() / 1024, + self.timeout.as_millis(), + self.abort_after_bin_count, ) } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 4c69f23..0d7150a 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -76,15 +76,6 @@ impl AsyncRead for HttpBodyAsAsyncRead { } } -pub struct BytesWrap {} - -impl From for Bytes { - fn from(_k: BytesWrap) -> Self { - error!("TODO convert result to octets"); - todo!("TODO convert result to octets") - } -} - pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 { let mut hash = tiny_keccak::Sha3::v256(); hash.update(channel.backend.as_bytes()); diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 3fd2def..dce36c8 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,6 +1,7 @@ use crate::agg::binnedt::TimeBinnableType; use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::streams::{Appendable, StreamItem}; +use crate::agg::{Fits, FitsInside}; use crate::binned::{ EventValuesAggregator, EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, @@ -213,9 +214,36 @@ impl RangeOverlapInfo for EventValues { } } +impl FitsInside for EventValues { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.tss.is_empty() { + Fits::Empty + } else { + let t1 = *self.tss.first().unwrap(); + let t2 = *self.tss.last().unwrap(); + if t2 < range.beg { + Fits::Lower + } else if t1 > range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + impl FilterFittingInside for EventValues { - fn filter_fitting_inside(self, _fit_range: NanoRange) -> Option { - todo!() + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } } } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index b8d9b13..63cbf59 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -91,8 +91,8 @@ impl Stream for FileReader { } } -#[allow(dead_code)] struct Fopen1 { + #[allow(dead_code)] opts: OpenOptions, fut: Pin>>>, term: bool, @@ -104,8 +104,6 @@ impl Fopen1 { let mut o1 = OpenOptions::new(); let o2 = o1.read(true); let res = o2.open(path); - //() == res; - //todo!() res.await }) as Pin>>>; let _fut2: Box> = Box::new(async { 123 }); diff --git a/err/src/lib.rs b/err/src/lib.rs index 5cb9fda..3406916 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -4,13 +4,13 @@ Error handling and reporting. use http::uri::InvalidUri; use nom::error::ErrorKind; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::net::AddrParseError; use std::num::{ParseFloatError, ParseIntError}; use std::string::FromUtf8Error; -use tokio::task::JoinError; use std::sync::PoisonError; +use tokio::task::JoinError; /** The common error type for this application. @@ -23,14 +23,6 @@ pub struct Error { trace_str: Option, } -#[allow(dead_code)] -fn ser_trace(_: &Option, _: S) -> Result -where - S: Serializer, -{ - todoval() -} - impl Error { pub fn with_msg>(s: S) -> Self { Self { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index c7ee88d..8308477 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -257,8 +257,8 @@ impl hyper::body::HttpBody for BodyStreamWrap { type Data = bytes::Bytes; type Error = Error; - fn poll_data(self: Pin<&mut Self>, _cx: &mut Context) -> Poll>> { - todo!() + fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + self.0.inner.poll_next_unpin(cx) } fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll, Self::Error>> {