diff --git a/disk/src/agg.rs b/disk/src/agg.rs index f324640..0d2284a 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -405,7 +405,7 @@ where let ele_count = n1 / ty.bytes() as usize; let mut j = Vec::with_capacity(ele_count); let mut p1 = 0; - for i1 in 0..ele_count { + for _ in 0..ele_count { let u = unsafe { let mut r = [0u8; BY]; std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 1e59616..c2b6e7f 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -3,13 +3,13 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::BinSpecDimT; +use netpod::{BinSpecDimT, BinnedRange}; use std::pin::Pin; use std::task::{Context, Poll}; pub trait IntoBinnedT { type StreamOut: Stream; - fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut; + fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; } impl IntoBinnedT for T @@ -20,7 +20,7 @@ where { type StreamOut = IntoBinnedTDefaultStream; - fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut { + fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut { IntoBinnedTDefaultStream::new(self, spec) } } @@ -32,7 +32,7 @@ where { inp: S, aggtor: Option, - spec: BinSpecDimT, + spec: BinnedRange, curbin: u32, left: Option>>>, errored: bool, @@ -45,7 +45,7 @@ where I: AggregatableTdim, S: Stream>, { - pub fn new(inp: S, spec: BinSpecDimT) -> Self { + pub fn new(inp: S, spec: BinnedRange) -> Self { let range = spec.get_range(0); Self { inp, diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 36b92d4..bd1d59c 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -5,7 +5,7 @@ use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::make_test_node; use futures_util::StreamExt; use netpod::timeunits::*; -use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape}; +use netpod::{BinSpecDimT, BinnedRange, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape}; use std::future::ready; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -66,7 +66,7 @@ async fn agg_x_dim_0_inner() { } k }) - .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap()) .map(|k| { if false { trace!("after T binning {:?}", k.as_ref().unwrap()); @@ -134,7 +134,7 @@ async fn agg_x_dim_1_inner() { //info!("after X binning {:?}", k.as_ref().unwrap()); k }) - .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap()) .map(|k| { info!("after T binning {:?}", k.as_ref().unwrap()); k diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 99325c9..fb13b5c 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -5,7 +5,7 @@ use futures_core::Stream; use futures_util::StreamExt; #[allow(unused_imports)] use netpod::log::*; -use netpod::{AggKind, Channel, NodeConfig, PreBinnedPatchIterator}; +use netpod::{AggKind, BinSpecDimT, BinnedRange, Channel, NodeConfig, PreBinnedPatchIterator}; use netpod::{NanoRange, RetStreamExt}; use std::future::ready; use std::pin::Pin; @@ -19,29 +19,30 @@ impl BinnedStream { pub fn new( patch_it: PreBinnedPatchIterator, channel: Channel, - range: NanoRange, + range: BinnedRange, agg_kind: AggKind, node_config: &NodeConfig, ) -> Self { let patches: Vec<_> = patch_it.collect(); - warn!("BinnedStream will open a PreBinnedValueStream"); + warn!("BinnedStream::new"); for p in &patches { - info!("BinnedStream -> patch {:?}", p); + info!("BinnedStream::new patch {:?}", p); } + use super::agg::binnedt::IntoBinnedT; let inp = futures_util::stream::iter(patches.into_iter()) .map({ let node_config = node_config.clone(); move |coord| PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config) }) .flatten() - .only_first_error() .filter_map({ let range = range.clone(); move |k: Result| { + let fit_range = range.full_range(); let g = match k { Ok(k) => { use super::agg::{Fits, FitsInside}; - match k.fits_inside(range.clone()) { + match k.fits_inside(fit_range) { Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower @@ -56,6 +57,17 @@ impl BinnedStream { }; ready(g) } + }) + .map(|k| k) + .into_binned_t(range) + .map(|k| match k { + Ok(k) => { + // TODO instead of converting, let binner already return batches. + let mut ret = MinMaxAvgScalarBinBatch::empty(); + ret.push_single(&k); + Ok(ret) + } + Err(e) => Err(e), }); Self { inp: Box::pin(inp) } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 1cff45a..0a61ec2 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -12,7 +12,7 @@ use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use hyper::Response; use netpod::{ - AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, + AggKind, BinnedRange, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, }; use serde::{Deserialize, Serialize}; @@ -73,15 +73,24 @@ pub async fn binned_bytes_for_http( let channel_config = read_local_config(&query.channel, node).await?; let entry = extract_matching_config_entry(range, &channel_config); info!("found config entry {:?}", entry); - let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); - match grid { - Some(spec) => { - info!("GOT PreBinnedPatchGridSpec: {:?}", spec); - warn!("Pass here to BinnedStream what kind of Agg, range, ..."); + let pre_range = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); + match pre_range { + Some(pre_range) => { + info!("Found pre_range: {:?}", pre_range); + let range = BinnedRange::covering_range(range.clone(), query.count) + .ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?; + if range.grid_spec.bin_t_len() > pre_range.grid_spec.bin_t_len() { + let msg = format!( + "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", + pre_range, range + ); + error!("{}", msg); + return Err(Error::with_msg(msg)); + } let s1 = BinnedStream::new( - PreBinnedPatchIterator::from_range(spec), + PreBinnedPatchIterator::from_range(pre_range), query.channel.clone(), - query.range.clone(), + range, query.agg_kind.clone(), node_config, ); @@ -89,7 +98,7 @@ pub async fn binned_bytes_for_http( Ok(ret) } None => { - // Merge raw data + // TODO Merge raw data. error!("binned_bytes_for_http TODO merge raw data"); todo!() } @@ -153,8 +162,10 @@ pub struct PreBinnedQuery { impl PreBinnedQuery { pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); + let patch_ix = params.get("patch_ix").unwrap().parse().unwrap(); + let bin_t_len = params.get("bin_t_len").unwrap().parse().unwrap(); let ret = PreBinnedQuery { - patch: PreBinnedPatchCoord::from_query_params(¶ms), + patch: PreBinnedPatchCoord::new(bin_t_len, patch_ix), agg_kind: AggKind::DimXBins1, channel: Channel { backend: params.get("channel_backend").unwrap().into(), diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 2305f8c..cd2ba0c 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -10,7 +10,7 @@ use futures_core::Stream; use futures_util::{FutureExt, StreamExt, TryStreamExt}; use netpod::log::*; use netpod::{ - AggKind, BinSpecDimT, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, + AggKind, BinSpecDimT, BinnedRange, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, }; use std::future::{ready, Future}; @@ -135,12 +135,13 @@ impl PreBinnedValueStream { // TODO use a ctor, remove from BinSpecDimT the redundant variable. // If given a timestamp range, verify that it divides. // For ranges, use a range type. - let spec = BinSpecDimT { + let _spec = BinSpecDimT { bs: self.patch_coord.bin_t_len(), ts1: self.patch_coord.patch_beg(), ts2: self.patch_coord.patch_end(), count, }; + let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap(); let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); let s2 = s1 .map(|k| { @@ -151,7 +152,7 @@ impl PreBinnedValueStream { } k }) - .into_binned_t(spec) + .into_binned_t(range) .map_ok({ let mut a = MinMaxAvgScalarBinBatch::empty(); move |k| { diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index db88108..d73289c 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -15,6 +15,8 @@ pub struct PreBinnedValueFetchedStream { uri: http::Uri, resfut: Option, res: Option>, + errored: bool, + completed: bool, } impl PreBinnedValueFetchedStream { @@ -44,6 +46,8 @@ impl PreBinnedValueFetchedStream { uri, resfut: None, res: None, + errored: false, + completed: false, } } } @@ -58,6 +62,13 @@ impl Stream for PreBinnedValueFetchedStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.completed { + panic!("poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } 'outer: loop { break if let Some(res) = self.res.as_mut() { pin_mut!(res); diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index d3df7cf..5d4bd11 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -265,68 +265,6 @@ impl BinSpecDimT { } } -#[derive(Clone)] -pub struct PreBinnedPatchGridSpec { - bin_t_len: u64, -} - -impl PreBinnedPatchGridSpec { - pub fn new(bin_t_len: u64) -> Self { - let mut ok = false; - for &j in PATCH_T_LEN_OPTIONS.iter() { - if bin_t_len == j { - ok = true; - break; - } - } - if !ok { - panic!("invalid bin_t_len for PreBinnedPatchGridSpec {}", bin_t_len); - } - Self { bin_t_len } - } - - pub fn from_query_params(params: &BTreeMap) -> Self { - let bin_t_len = params.get("bin_t_len").unwrap().parse().unwrap(); - if !Self::is_valid_bin_t_len(bin_t_len) { - panic!("invalid bin_t_len {}", bin_t_len); - } - Self { bin_t_len: bin_t_len } - } - - pub fn bin_t_len(&self) -> u64 { - self.bin_t_len - } - - pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { - for &j in BIN_T_LEN_OPTIONS.iter() { - if bin_t_len == j { - return true; - } - } - return false; - } - - pub fn patch_t_len(&self) -> u64 { - for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() { - if self.bin_t_len == j { - return PATCH_T_LEN_OPTIONS[i1]; - } - } - panic!() - } -} - -impl std::fmt::Debug for PreBinnedPatchGridSpec { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "PreBinnedPatchGridSpec {{ bin_t_len: {:?}, patch_t_len(): {:?} }}", - self.bin_t_len / SEC, - self.patch_t_len() / SEC, - ) - } -} - const BIN_T_LEN_OPTIONS: [u64; 6] = [SEC * 10, MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4]; const PATCH_T_LEN_OPTIONS: [u64; 6] = [MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4, DAY * 12]; @@ -367,6 +305,53 @@ const BIN_THRESHOLDS: [u64; 33] = [ WEEK * 60, ]; +#[derive(Clone)] +pub struct PreBinnedPatchGridSpec { + bin_t_len: u64, +} + +impl PreBinnedPatchGridSpec { + pub fn new(bin_t_len: u64) -> Self { + if !Self::is_valid_bin_t_len(bin_t_len) { + panic!("PreBinnedPatchGridSpec invalid bin_t_len {}", bin_t_len); + } + Self { bin_t_len } + } + + pub fn bin_t_len(&self) -> u64 { + self.bin_t_len + } + + pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { + for &j in BIN_T_LEN_OPTIONS.iter() { + if bin_t_len == j { + return true; + } + } + return false; + } + + pub fn patch_t_len(&self) -> u64 { + for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() { + if self.bin_t_len == j { + return PATCH_T_LEN_OPTIONS[i1]; + } + } + panic!() + } +} + +impl std::fmt::Debug for PreBinnedPatchGridSpec { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "PreBinnedPatchGridSpec {{ bin_t_len: {:?}, patch_t_len(): {:?} }}", + self.bin_t_len / SEC, + self.patch_t_len() / SEC, + ) + } +} + #[derive(Clone, Debug)] pub struct PreBinnedPatchRange { pub grid_spec: PreBinnedPatchGridSpec, @@ -453,10 +438,9 @@ impl PreBinnedPatchCoord { ) } - pub fn from_query_params(params: &BTreeMap) -> Self { - let patch_ix = params.get("patch_ix").unwrap().parse().unwrap(); + pub fn new(bin_t_len: u64, patch_ix: u64) -> Self { Self { - spec: PreBinnedPatchGridSpec::from_query_params(params), + spec: PreBinnedPatchGridSpec::new(bin_t_len), ix: patch_ix, } } @@ -496,6 +480,94 @@ impl Iterator for PreBinnedPatchIterator { } } +#[derive(Clone)] +pub struct BinnedGridSpec { + bin_t_len: u64, +} + +impl BinnedGridSpec { + pub fn new(bin_t_len: u64) -> Self { + if !Self::is_valid_bin_t_len(bin_t_len) { + panic!("BinnedGridSpec::new invalid bin_t_len {}", bin_t_len); + } + Self { bin_t_len } + } + + pub fn bin_t_len(&self) -> u64 { + self.bin_t_len + } + + pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { + for &j in BIN_T_LEN_OPTIONS.iter() { + if bin_t_len == j { + return true; + } + } + return false; + } +} + +impl std::fmt::Debug for BinnedGridSpec { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + if self.bin_t_len < SEC * 90 { + write!(fmt, "BinnedGridSpec {{ bin_t_len: {:?} ms }}", self.bin_t_len / MS,) + } else { + write!(fmt, "BinnedGridSpec {{ bin_t_len: {:?} s }}", self.bin_t_len / SEC,) + } + } +} + +#[derive(Clone, Debug)] +pub struct BinnedRange { + pub grid_spec: BinnedGridSpec, + pub offset: u64, + pub count: u64, +} + +impl BinnedRange { + pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option { + assert!(min_bin_count >= 1); + assert!(min_bin_count <= 2000); + let dt = range.delta(); + assert!(dt <= DAY * 14); + let bs = dt / min_bin_count; + let mut i1 = BIN_THRESHOLDS.len(); + loop { + if i1 <= 0 { + break None; + } else { + i1 -= 1; + let t = BIN_THRESHOLDS[i1]; + if t <= bs || i1 == 0 { + let grid_spec = BinnedGridSpec { bin_t_len: t }; + let pl = grid_spec.bin_t_len(); + let ts1 = range.beg / pl * pl; + let ts2 = (range.end + pl - 1) / pl * pl; + let count = (ts2 - ts1) / pl; + let offset = ts1 / pl; + break Some(Self { + grid_spec, + count, + offset, + }); + } + } + } + } + pub fn get_range(&self, ix: u32) -> NanoRange { + NanoRange { + beg: (self.offset + ix as u64) * self.grid_spec.bin_t_len, + end: (self.offset + ix as u64 + 1) * self.grid_spec.bin_t_len, + } + } + pub fn full_range(&self) -> NanoRange { + NanoRange { + beg: (self.offset + 0) * self.grid_spec.bin_t_len, + end: (self.offset + self.count) * self.grid_spec.bin_t_len, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum AggKind { DimXBins1, diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index ba82c3c..f8e5567 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -17,7 +17,7 @@ pub fn main() { async fn go() -> Result<(), Error> { use clap::Clap; - use retrieval::cli::{Opts, SubCmd}; + use retrieval::cli::{ClientType, Opts, SubCmd}; let opts = Opts::parse(); match opts.subcmd { SubCmd::Retrieval(subcmd) => { @@ -33,6 +33,13 @@ async fn go() -> Result<(), Error> { .ok_or(Error::with_msg(format!("nodeid config error")))?; retrieval::run_node(node_config.clone(), node.clone()).await?; } + SubCmd::Client(client) => match client.client_type { + ClientType::Binned(opts) => { + let beg = opts.beg.parse()?; + let end = opts.end.parse()?; + retrieval::client::get_binned(opts.host, opts.port, opts.channel, beg, end, opts.bins).await?; + } + }, } Ok(()) } diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs index 361fed8..a1f581a 100644 --- a/retrieval/src/cli.rs +++ b/retrieval/src/cli.rs @@ -12,6 +12,7 @@ pub struct Opts { #[derive(Debug, Clap)] pub enum SubCmd { Retrieval(Retrieval), + Client(Client), } #[derive(Debug, Clap)] @@ -19,3 +20,30 @@ pub struct Retrieval { #[clap(long)] pub config: String, } + +#[derive(Debug, Clap)] +pub struct Client { + #[clap(subcommand)] + pub client_type: ClientType, +} + +#[derive(Debug, Clap)] +pub enum ClientType { + Binned(BinnedClient), +} + +#[derive(Debug, Clap)] +pub struct BinnedClient { + #[clap(long)] + pub host: String, + #[clap(long)] + pub port: u16, + #[clap(long)] + pub channel: String, + #[clap(long)] + pub beg: String, + #[clap(long)] + pub end: String, + #[clap(long)] + pub bins: u32, +} diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs new file mode 100644 index 0000000..0916194 --- /dev/null +++ b/retrieval/src/client.rs @@ -0,0 +1,86 @@ +use chrono::{DateTime, Utc}; +use disk::frame::inmem::InMemoryFrameAsyncReadStream; +use err::Error; +use futures_util::TryStreamExt; +use hyper::Body; +use netpod::log::*; + +pub async fn get_binned( + host: String, + port: u16, + channel_name: String, + beg_date: DateTime, + end_date: DateTime, + bin_count: u32, +) -> Result<(), Error> { + let t1 = Utc::now(); + let channel_backend = "NOBACKEND"; + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + let uri = format!( + "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&beg_date={}&end_date={}&bin_count={}", + host, + port, + channel_backend, + channel_name, + beg_date.format(date_fmt), + end_date.format(date_fmt), + bin_count, + ); + info!("URI {:?}", uri); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(uri) + .body(Body::empty())?; + info!("Request for {:?}", req); + let client = hyper::Client::new(); + let res = client.request(req).await?; + info!("client response {:?}", res); + //let (res_head, mut res_body) = res.into_parts(); + let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); + let s2 = InMemoryFrameAsyncReadStream::new(s1); + use futures_util::StreamExt; + use std::future::ready; + let mut bin_count = 0; + let s3 = s2 + .map_err(|e| error!("{:?}", e)) + .filter_map(|item| { + let g = match item { + Ok(frame) => { + type ExpectedType = disk::cache::BinnedBytesForHttpStreamFrame; + info!("frame len {}", frame.buf().len()); + match bincode::deserialize::(frame.buf()) { + Ok(item) => match item { + Ok(item) => { + info!("item: {:?}", item); + bin_count += 1; + Some(Ok(item)) + } + Err(e) => { + error!("error frame: {:?}", e); + Some(Err(e)) + } + }, + Err(e) => { + error!("bincode error: {:?}", e); + Some(Err(e.into())) + } + } + } + Err(e) => Some(Err(Error::with_msg(format!("{:?}", e)))), + }; + ready(g) + }) + .for_each(|_| ready(())); + s3.await; + let t2 = chrono::Utc::now(); + let ntot = 0; + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + let throughput = ntot / 1024 * 1000 / ms; + info!( + "get_cached_0 DONE total download {} MB throughput {:5} kB/s bin_count {}", + ntot / 1024 / 1024, + throughput, + bin_count, + ); + Ok(()) +} diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index 46ef91b..4e184ce 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -5,6 +5,7 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info, trace, warn}; pub mod cli; +pub mod client; #[cfg(test)] pub mod test; diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 2be94e4..9995c6d 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -1,6 +1,6 @@ use crate::spawn_test_hosts; use bytes::BytesMut; -use chrono::Utc; +use chrono::{DateTime, Utc}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_util::TryStreamExt; @@ -40,12 +40,12 @@ fn get_cached_0() { } async fn get_cached_0_inner() -> Result<(), Error> { - let t1 = chrono::Utc::now(); + let t1 = Utc::now(); let cluster = test_cluster(); let node0 = &cluster.nodes[0]; let hosts = spawn_test_hosts(cluster.clone()); - let beg_date: chrono::DateTime = "1970-01-01T00:20:10.000Z".parse()?; - let end_date: chrono::DateTime = "1970-01-01T00:20:51.000Z".parse()?; + let beg_date: DateTime = "1970-01-01T00:20:10.000Z".parse()?; + let end_date: DateTime = "1970-01-01T00:20:51.000Z".parse()?; let channel_backend = "back"; let channel_name = "wave1"; let bin_count = 4;