WIP
This commit is contained in:
@@ -22,10 +22,12 @@ use netpod::FromUrl;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ReqCtx;
|
||||
use nodenet::client::OpenBoxedBytesViaHttp;
|
||||
use nodenet::scylla::ScyllaEventReadProvider;
|
||||
use query::api4::binned::BinnedQuery;
|
||||
use scyllaconn::bincache::ScyllaCacheReadProvider;
|
||||
use scyllaconn::worker::ScyllaQueue;
|
||||
use std::sync::Arc;
|
||||
use streams::timebin::cached::reader::EventsReadProvider;
|
||||
use streams::timebin::CacheReadProvider;
|
||||
use tracing::Instrument;
|
||||
use url::Url;
|
||||
@@ -156,12 +158,23 @@ async fn binned_json(
|
||||
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
|
||||
let open_bytes = Arc::pin(open_bytes);
|
||||
let cache_read_provider = scyqueue
|
||||
.clone()
|
||||
.map(|qu| ScyllaCacheReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>);
|
||||
let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes, cache_read_provider)
|
||||
.instrument(span1)
|
||||
.await
|
||||
.map_err(|e| Error::BinnedStream(e))?;
|
||||
let events_read_provider = scyqueue
|
||||
.map(|qu| ScyllaEventReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>);
|
||||
let item = streams::timebinnedjson::timebinned_json(
|
||||
query,
|
||||
ch_conf,
|
||||
ctx,
|
||||
open_bytes,
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
)
|
||||
.instrument(span1)
|
||||
.await
|
||||
.map_err(|e| Error::BinnedStream(e))?;
|
||||
let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -156,6 +156,8 @@ pub trait Events:
|
||||
fn to_json_vec_u8(&self) -> Vec<u8>;
|
||||
fn to_cbor_vec_u8(&self) -> Vec<u8>;
|
||||
fn clear(&mut self);
|
||||
// TODO: can not name EventsDim0 from here, so use trait object for now. Anyway is a workaround.
|
||||
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events>;
|
||||
}
|
||||
|
||||
impl WithLen for Box<dyn Events> {
|
||||
@@ -290,4 +292,8 @@ impl Events for Box<dyn Events> {
|
||||
fn clear(&mut self) {
|
||||
Events::clear(self.as_mut())
|
||||
}
|
||||
|
||||
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
|
||||
Events::to_dim0_f32_for_binning(self.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +117,30 @@ macro_rules! on_sitemty_data {
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! try_map_sitemty_data {
|
||||
($item:expr, $ex:expr) => {{
|
||||
use $crate::streamitem::RangeCompletableItem;
|
||||
use $crate::streamitem::StreamItem;
|
||||
match $item {
|
||||
Ok(x) => match x {
|
||||
StreamItem::DataItem(x) => match x {
|
||||
RangeCompletableItem::Data(x) => match $ex(x) {
|
||||
Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))),
|
||||
Err(e) => Err(e),
|
||||
},
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
|
||||
}
|
||||
},
|
||||
StreamItem::Log(x) => Ok(StreamItem::Log(x)),
|
||||
StreamItem::Stats(x) => Ok(StreamItem::Stats(x)),
|
||||
},
|
||||
Err(x) => Err(x),
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
pub fn sitem_data<X>(x: X) -> Sitemty<X> {
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
|
||||
}
|
||||
|
||||
@@ -109,6 +109,40 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> fmt::Display for BinsDim0<NTY>
|
||||
where
|
||||
NTY: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
let self_name = any::type_name::<Self>();
|
||||
if true {
|
||||
write!(
|
||||
fmt,
|
||||
"{self_name} count {:?} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
|
||||
self.ts1s.len(),
|
||||
self.ts1s.iter().map(|&k| TsNano::from_ns(k)).collect::<Vec<_>>(),
|
||||
self.ts2s.iter().map(|&k| TsNano::from_ns(k)).collect::<Vec<_>>(),
|
||||
self.counts,
|
||||
self.mins,
|
||||
self.maxs,
|
||||
self.avgs,
|
||||
)
|
||||
} else {
|
||||
write!(
|
||||
fmt,
|
||||
"{self_name} count {:?} edges {:?} .. {:?} counts {:?} .. {:?} avgs {:?} .. {:?}",
|
||||
self.ts1s.len(),
|
||||
self.ts1s.front().map(|&k| TsNano::from_ns(k)),
|
||||
self.ts2s.back().map(|&k| TsNano::from_ns(k)),
|
||||
self.counts.front(),
|
||||
self.counts.back(),
|
||||
self.avgs.front(),
|
||||
self.avgs.back(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> BinsDim0<NTY> {
|
||||
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
|
||||
self.ts1s.push_back(ts1);
|
||||
@@ -317,13 +351,16 @@ where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
ts1now: TsNano,
|
||||
ts2now: TsNano,
|
||||
binrange: BinnedRange<TsNano>,
|
||||
do_time_weight: bool,
|
||||
emit_empty_bins: bool,
|
||||
range_complete: bool,
|
||||
buf: <Self as TimeBinnerTy>::Output,
|
||||
out: <Self as TimeBinnerTy>::Output,
|
||||
bins_ready_count: usize,
|
||||
cnt: u64,
|
||||
min: STY,
|
||||
max: STY,
|
||||
avg: f64,
|
||||
}
|
||||
|
||||
impl<STY> BinsDim0TimeBinnerTy<STY>
|
||||
@@ -333,18 +370,31 @@ where
|
||||
pub fn new(binrange: BinnedRange<TsNano>, do_time_weight: bool, emit_empty_bins: bool) -> Self {
|
||||
// let ts1now = TsNano::from_ns(binrange.bin_off * binrange.bin_len.ns());
|
||||
// let ts2 = ts1.add_dt_nano(binrange.bin_len.to_dt_nano());
|
||||
let ts1now = TsNano::from_ns(binrange.full_range().beg());
|
||||
let ts2now = ts1now.add_dt_nano(binrange.bin_len.to_dt_nano());
|
||||
let buf = <Self as TimeBinnerTy>::Output::empty();
|
||||
Self {
|
||||
ts1now: TsNano::from_ns(binrange.full_range().beg()),
|
||||
ts1now,
|
||||
ts2now,
|
||||
binrange,
|
||||
do_time_weight,
|
||||
emit_empty_bins,
|
||||
range_complete: false,
|
||||
buf,
|
||||
out: <Self as TimeBinnerTy>::Output::empty(),
|
||||
bins_ready_count: 0,
|
||||
cnt: 0,
|
||||
min: STY::zero_b(),
|
||||
max: STY::zero_b(),
|
||||
avg: 0.,
|
||||
}
|
||||
}
|
||||
|
||||
// used internally for the aggregation
|
||||
fn reset_agg(&mut self) {
|
||||
self.cnt = 0;
|
||||
self.min = STY::zero_b();
|
||||
self.max = STY::zero_b();
|
||||
self.avg = 0.;
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> TimeBinnerTy for BinsDim0TimeBinnerTy<STY>
|
||||
@@ -355,8 +405,77 @@ where
|
||||
type Output = BinsDim0<STY>;
|
||||
|
||||
fn ingest(&mut self, item: &mut Self::Input) {
|
||||
// item.ts1s;
|
||||
todo!("TimeBinnerTy::ingest")
|
||||
let mut count_before = 0;
|
||||
for (((((&ts1, &ts2), &cnt), min), max), &avg) in item
|
||||
.ts1s
|
||||
.iter()
|
||||
.zip(&item.ts2s)
|
||||
.zip(&item.counts)
|
||||
.zip(&item.mins)
|
||||
.zip(&item.maxs)
|
||||
.zip(&item.avgs)
|
||||
{
|
||||
if ts1 < self.ts1now.ns() {
|
||||
// warn!("encountered bin from time before {} {}", ts1, self.ts1now.ns());
|
||||
count_before += 1;
|
||||
continue;
|
||||
} else {
|
||||
if ts2 > self.ts2now.ns() {
|
||||
if ts2 - ts1 > self.ts2now.ns() - self.ts1now.ns() {
|
||||
panic!("incoming bin len too large");
|
||||
} else if ts1 < self.ts2now.ns() {
|
||||
panic!("encountered unaligned input bin");
|
||||
} else {
|
||||
let mut i = 0;
|
||||
while ts1 >= self.ts2now.ns() {
|
||||
self.cycle();
|
||||
i += 1;
|
||||
if i > 50000 {
|
||||
panic!("cycle forward too many iterations");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// ok, we're still inside the current bin
|
||||
}
|
||||
}
|
||||
if cnt == 0 {
|
||||
// ignore input bin, it does not contain any valid information.
|
||||
} else {
|
||||
if self.cnt == 0 {
|
||||
self.cnt = cnt;
|
||||
self.min = min.clone();
|
||||
self.max = max.clone();
|
||||
if self.do_time_weight {
|
||||
let f = (ts2 - ts1) as f64 / (self.ts2now.ns() - self.ts1now.ns()) as f64;
|
||||
self.avg = avg as f64 * f;
|
||||
} else {
|
||||
panic!("TODO non-time-weighted binning to be impl");
|
||||
}
|
||||
} else {
|
||||
self.cnt += cnt;
|
||||
if *min < self.min {
|
||||
self.min = min.clone();
|
||||
}
|
||||
if *max > self.max {
|
||||
self.max = max.clone();
|
||||
}
|
||||
if self.do_time_weight {
|
||||
let f = (ts2 - ts1) as f64 / (self.ts2now.ns() - self.ts1now.ns()) as f64;
|
||||
self.avg += avg as f64 * f;
|
||||
} else {
|
||||
panic!("TODO non-time-weighted binning to be impl");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if count_before != 0 {
|
||||
warn!(
|
||||
"----- seen {} / {} input bins from time before",
|
||||
count_before,
|
||||
item.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn set_range_complete(&mut self) {
|
||||
@@ -364,27 +483,51 @@ where
|
||||
}
|
||||
|
||||
fn bins_ready_count(&self) -> usize {
|
||||
self.bins_ready_count
|
||||
self.out.len()
|
||||
}
|
||||
|
||||
fn bins_ready(&mut self) -> Option<Self::Output> {
|
||||
todo!("TimeBinnerTy::bins_ready")
|
||||
if self.out.len() != 0 {
|
||||
let ret = core::mem::replace(&mut self.out, BinsDim0::empty());
|
||||
Some(ret)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn push_in_progress(&mut self, push_empty: bool) {
|
||||
todo!("TimeBinnerTy::push_in_progress")
|
||||
if self.cnt == 0 && !push_empty {
|
||||
self.reset_agg();
|
||||
} else {
|
||||
self.out.ts1s.push_back(self.ts1now.ns());
|
||||
self.out.ts2s.push_back(self.ts2now.ns());
|
||||
self.out.counts.push_back(self.cnt);
|
||||
self.out.mins.push_back(self.min.clone());
|
||||
self.out.maxs.push_back(self.max.clone());
|
||||
self.out.avgs.push_back(self.avg as f32);
|
||||
self.reset_agg();
|
||||
}
|
||||
}
|
||||
|
||||
fn cycle(&mut self) {
|
||||
todo!("TimeBinnerTy::cycle")
|
||||
self.push_in_progress(true);
|
||||
self.ts1now = self.ts1now.add_dt_nano(self.binrange.bin_len.to_dt_nano());
|
||||
self.ts2now = self.ts2now.add_dt_nano(self.binrange.bin_len.to_dt_nano());
|
||||
}
|
||||
|
||||
fn empty(&self) -> Option<Self::Output> {
|
||||
todo!("TimeBinnerTy::empty")
|
||||
Some(<Self as TimeBinnerTy>::Output::empty())
|
||||
}
|
||||
|
||||
fn append_empty_until_end(&mut self) {
|
||||
todo!("TimeBinnerTy::append_empty_until_end")
|
||||
let mut i = 0;
|
||||
while self.ts2now.ns() < self.binrange.full_range().end() {
|
||||
self.cycle();
|
||||
i += 1;
|
||||
if i > 100000 {
|
||||
panic!("append_empty_until_end too many iterations");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1012,6 +1012,14 @@ impl Events for ChannelEvents {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
|
||||
use ChannelEvents::*;
|
||||
match self {
|
||||
Events(x) => x.to_dim0_f32_for_binning(),
|
||||
Status(x) => panic!("ChannelEvents::to_dim0_f32_for_binning"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Collectable for ChannelEvents {
|
||||
|
||||
@@ -189,8 +189,8 @@ where
|
||||
"{} {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}",
|
||||
self.type_name(),
|
||||
self.tss.len(),
|
||||
self.tss.front().map(|x| x / SEC),
|
||||
self.tss.back().map(|x| x / SEC),
|
||||
self.tss.front().map(|&x| TsNano::from_ns(x)),
|
||||
self.tss.back().map(|&x| TsNano::from_ns(x)),
|
||||
self.values.front(),
|
||||
self.values.back(),
|
||||
)
|
||||
@@ -1068,6 +1068,14 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
|
||||
self.pulses.clear();
|
||||
self.values.clear();
|
||||
}
|
||||
|
||||
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
|
||||
let mut ret = EventsDim0::empty();
|
||||
for (&ts, val) in self.tss.iter().zip(self.values.iter()) {
|
||||
ret.push(ts, 0, val.as_prim_f32_b());
|
||||
}
|
||||
Box::new(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -495,4 +495,8 @@ impl Events for EventsDim0Enum {
|
||||
fn clear(&mut self) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
|
||||
todo!("{}::to_dim0_f32_for_binning", self.type_name())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -989,6 +989,10 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
|
||||
self.pulses.clear();
|
||||
self.values.clear();
|
||||
}
|
||||
|
||||
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
|
||||
todo!("{}::to_dim0_f32_for_binning", self.type_name())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -377,6 +377,10 @@ impl<STY: ScalarOps> Events for EventsXbinDim0<STY> {
|
||||
self.maxs.clear();
|
||||
self.avgs.clear();
|
||||
}
|
||||
|
||||
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
|
||||
todo!("{}::to_dim0_f32_for_binning", self.type_name())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -1897,14 +1897,12 @@ impl TsNano {
|
||||
}
|
||||
|
||||
impl fmt::Debug for TsNano {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
let ts = Utc
|
||||
.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32)
|
||||
.earliest()
|
||||
.unwrap_or(Default::default());
|
||||
f.debug_struct("TsNano")
|
||||
.field("ts", &ts.format(DATETIME_FMT_3MS).to_string())
|
||||
.finish()
|
||||
write!(fmt, "TsNano {{ {} }}", ts.format(DATETIME_FMT_3MS))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2380,22 +2378,31 @@ where
|
||||
pub bin_cnt: u64,
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for BinnedRange<T>
|
||||
where
|
||||
T: Dim0Index,
|
||||
{
|
||||
impl fmt::Debug for BinnedRange<TsNano> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("BinnedRange")
|
||||
.field("bin_len", &self.bin_len)
|
||||
.field("bin_off", &self.bin_off)
|
||||
.field("bin_cnt", &self.bin_cnt)
|
||||
.finish()
|
||||
let beg = self.bin_len.times(self.bin_off);
|
||||
let end = self.bin_len.times(self.bin_off + self.bin_cnt);
|
||||
write!(fmt, "BinnedRange {{ {}, {}, {} }}", beg, end, self.bin_len.to_dt_ms())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for BinnedRange<PulseId> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "BinnedRange<PulseId> {{ .. }}")
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for BinnedRange<TsNano> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt::Debug::fmt(self, fmt)
|
||||
}
|
||||
}
|
||||
|
||||
impl BinnedRange<TsNano> {
|
||||
pub fn to_nano_range(&self) -> NanoRange {
|
||||
self.full_range()
|
||||
let beg = self.bin_len.times(self.bin_off).as_u64();
|
||||
let end = self.bin_len.times(self.bin_off + self.bin_cnt).as_u64();
|
||||
NanoRange { beg, end }
|
||||
}
|
||||
|
||||
pub fn from_nano_range(range: NanoRange, bin_len: DtMs) -> Self {
|
||||
@@ -2407,6 +2414,14 @@ impl BinnedRange<TsNano> {
|
||||
bin_cnt: off2 - off1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn nano_beg(&self) -> TsNano {
|
||||
self.bin_len.times(self.bin_off)
|
||||
}
|
||||
|
||||
pub fn nano_end(&self) -> TsNano {
|
||||
self.bin_len.times(self.bin_off + self.bin_cnt)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> BinnedRange<T>
|
||||
|
||||
@@ -74,6 +74,10 @@ impl NanoRange {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_ns_u64(beg: u64, end: u64) -> Self {
|
||||
Self { beg, end }
|
||||
}
|
||||
|
||||
pub fn delta(&self) -> u64 {
|
||||
self.end - self.beg
|
||||
}
|
||||
@@ -113,7 +117,7 @@ pub struct PulseRange {
|
||||
pub end: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum SeriesRange {
|
||||
TimeRange(NanoRange),
|
||||
PulseRange(PulseRange),
|
||||
@@ -163,6 +167,15 @@ impl SeriesRange {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SeriesRange {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
SeriesRange::TimeRange(range) => write!(fmt, "SeriesRange::TimeRange {{ {} }}", range),
|
||||
SeriesRange::PulseRange(_) => write!(fmt, "SeriesRange::PulseRange {{ .. }}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NanoRange> for SeriesRange {
|
||||
fn from(k: NanoRange) -> Self {
|
||||
Self::TimeRange(k)
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::TryStreamExt;
|
||||
use items_0::on_sitemty_data;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::try_map_sitemty_data;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use netpod::log::*;
|
||||
use netpod::ChConf;
|
||||
@@ -16,6 +20,9 @@ use scyllaconn::events2::mergert;
|
||||
use scyllaconn::worker::ScyllaQueue;
|
||||
use scyllaconn::SeriesId;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use streams::timebin::cached::reader::EventsReadProvider;
|
||||
use taskrun::tokio;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
@@ -129,3 +136,70 @@ pub async fn scylla_channel_event_stream(
|
||||
});
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
struct ScyllaEventsReadStream {
|
||||
fut1: Option<
|
||||
Pin<Box<dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error>> + Send>>,
|
||||
>,
|
||||
stream: Option<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>>,
|
||||
}
|
||||
|
||||
impl Stream for ScyllaEventsReadStream {
|
||||
type Item = Sitemty<ChannelEvents>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if let Some(fut) = self.fut1.as_mut() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(Ok(x)) => {
|
||||
self.fut1 = None;
|
||||
self.stream = Some(x);
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if let Some(fut) = self.stream.as_mut() {
|
||||
match fut.poll_next_unpin(cx) {
|
||||
Ready(Some(x)) => {
|
||||
let x = try_map_sitemty_data!(x, |x| match x {
|
||||
ChannelEvents::Events(x) => {
|
||||
let x = x.to_dim0_f32_for_binning();
|
||||
Ok(ChannelEvents::Events(x))
|
||||
}
|
||||
ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)),
|
||||
});
|
||||
Ready(Some(x))
|
||||
}
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
} else {
|
||||
Ready(None)
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ScyllaEventReadProvider {
|
||||
scyqueue: ScyllaQueue,
|
||||
}
|
||||
|
||||
impl ScyllaEventReadProvider {
|
||||
pub fn new(scyqueue: ScyllaQueue) -> Self {
|
||||
Self { scyqueue }
|
||||
}
|
||||
}
|
||||
|
||||
impl EventsReadProvider for ScyllaEventReadProvider {
|
||||
fn read(&self, evq: EventsSubQuery, chconf: ChConf) -> streams::timebin::cached::reader::EventsReading {
|
||||
let scyqueue = self.scyqueue.clone();
|
||||
let fut1 = async move { crate::scylla::scylla_channel_event_stream(evq, chconf, &scyqueue).await };
|
||||
let stream = ScyllaEventsReadStream {
|
||||
fut1: Some(Box::pin(fut1)),
|
||||
stream: None,
|
||||
};
|
||||
streams::timebin::cached::reader::EventsReading::new(Box::pin(stream))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,20 +7,24 @@ use futures_util::Future;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::timebin::TimeBinned;
|
||||
use items_0::Empty;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use netpod::log::*;
|
||||
use netpod::query::CacheUsage;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::AggKind;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::ChannelTyped;
|
||||
use netpod::Dim0Kind;
|
||||
use netpod::DtMs;
|
||||
use netpod::PreBinnedPatchCoord;
|
||||
use netpod::PreBinnedPatchCoordEnum;
|
||||
use netpod::PreBinnedPatchRange;
|
||||
use netpod::PreBinnedPatchRangeEnum;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use query::transform::TransformQuery;
|
||||
use scylla::Session as ScySession;
|
||||
use std::collections::VecDeque;
|
||||
@@ -532,8 +536,88 @@ impl ScyllaCacheReadProvider {
|
||||
}
|
||||
|
||||
impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider {
|
||||
fn read(&self) -> streams::timebin::cached::reader::Reading {
|
||||
fn read(&self, series: u64, range: BinnedRange<TsNano>) -> streams::timebin::cached::reader::CacheReading {
|
||||
warn!("impl CacheReadProvider for ScyllaCacheReadProvider");
|
||||
todo!("impl CacheReadProvider for ScyllaCacheReadProvider")
|
||||
}
|
||||
|
||||
fn write(&self, series: u64, bins: BinsDim0<f32>) -> streams::timebin::cached::reader::CacheWriting {
|
||||
let scyqueue = self.scyqueue.clone();
|
||||
let fut = async move { scyqueue.write_cache_f32(series, bins).await };
|
||||
streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn worker_write(
|
||||
series: u64,
|
||||
bins: BinsDim0<f32>,
|
||||
scy: &ScySession,
|
||||
) -> Result<(), streams::timebin::cached::reader::Error> {
|
||||
let mut msp_last = u64::MAX;
|
||||
for (((((&ts1, &ts2), &cnt), &min), &max), &avg) in bins
|
||||
.ts1s
|
||||
.iter()
|
||||
.zip(bins.ts2s.iter())
|
||||
.zip(bins.counts.iter())
|
||||
.zip(bins.mins.iter())
|
||||
.zip(bins.maxs.iter())
|
||||
.zip(bins.avgs.iter())
|
||||
{
|
||||
let bin_len = DtMs::from_ms_u64((ts2 - ts1) / 1000000);
|
||||
let part_len = DtMs::from_ms_u64(bin_len.ms() * 1000);
|
||||
let div = part_len.ns();
|
||||
let msp = ts1 / div;
|
||||
let off = (ts1 - msp * div) / bin_len.ns();
|
||||
let params = (
|
||||
series as i64,
|
||||
bin_len.ms() as i32,
|
||||
msp as i64,
|
||||
off as i32,
|
||||
cnt as i64,
|
||||
min,
|
||||
max,
|
||||
avg,
|
||||
);
|
||||
eprintln!("cache write {:?}", params);
|
||||
scy.query(
|
||||
"insert into sf_st.st_binned_scalar_f32 (series, bin_len_ms, ts_msp, off, count, min, max, avg) values (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
params,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn worker_read(
|
||||
series: u64,
|
||||
range: BinnedRange<TsNano>,
|
||||
scy: &ScySession,
|
||||
) -> Result<BinsDim0<f32>, streams::timebin::cached::reader::Error> {
|
||||
let bin_len: DtMs = todo!();
|
||||
let part_len = DtMs::from_ms_u64(bin_len.ms() * 1000);
|
||||
let div = part_len.ns();
|
||||
let msp: u64 = 0;
|
||||
let offs: core::ops::Range<u32> = todo!();
|
||||
let cql = "select off, count, min, max, avg from sf_st.st_binned_scalar_f32 where series = ? and bin_len_ms = ? and ts_msp = ? and off >= ? and off < ?";
|
||||
let params = (
|
||||
series as i64,
|
||||
bin_len.ms() as i32,
|
||||
msp as i64,
|
||||
offs.start as i32,
|
||||
offs.end as i32,
|
||||
);
|
||||
let res = scy
|
||||
.query_iter(cql, params)
|
||||
.await
|
||||
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
|
||||
let it = res.into_typed::<(i32, i64, f32, f32, f32)>();
|
||||
let mut bins = BinsDim0::empty();
|
||||
while let Some(x) = it.next().await {
|
||||
let row = x.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
|
||||
let off = row.0 as u64;
|
||||
// TODO push bins
|
||||
todo!("push bins");
|
||||
}
|
||||
Ok(bins)
|
||||
}
|
||||
|
||||
@@ -25,22 +25,13 @@ use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_fetch {
|
||||
($($arg:tt)*) => {
|
||||
if true {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_emit {
|
||||
($($arg:tt)*) => {
|
||||
if true {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
macro_rules! tracer_poll_enter {
|
||||
($self:expr) => {
|
||||
@@ -158,7 +149,7 @@ pub struct MergeRts {
|
||||
|
||||
impl MergeRts {
|
||||
pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self {
|
||||
info!("MergeRts readopts {readopts:?}");
|
||||
trace_init!("MergeRts readopts {readopts:?}");
|
||||
Self {
|
||||
ch_conf,
|
||||
range_mt: range.clone(),
|
||||
|
||||
@@ -118,7 +118,6 @@ impl MspStreamRt {
|
||||
async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await }
|
||||
};
|
||||
let do_trace_detail = netpod::TRACE_SERIES_ID.contains(&series.id());
|
||||
info!("------------------------------------- TEST INFO");
|
||||
trace_emit!(do_trace_detail, "------------------------------------- TEST TRACE");
|
||||
Self {
|
||||
rt,
|
||||
|
||||
@@ -2,7 +2,7 @@ use core::fmt;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::TsNano;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct ScyllaSeriesRange {
|
||||
beg: u64,
|
||||
end: u64,
|
||||
@@ -29,6 +29,17 @@ impl ScyllaSeriesRange {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for ScyllaSeriesRange {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
fmt,
|
||||
"ScyllaSeriesRange {{ beg: {}, end: {} }}",
|
||||
TsNano::from_ns(self.beg),
|
||||
TsNano::from_ns(self.end)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&SeriesRange> for ScyllaSeriesRange {
|
||||
fn from(value: &SeriesRange) -> Self {
|
||||
match value {
|
||||
|
||||
@@ -7,6 +7,7 @@ use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Future;
|
||||
use items_0::Events;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use netpod::log::*;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ScyllaConfig;
|
||||
@@ -29,6 +30,7 @@ pub enum Error {
|
||||
Join,
|
||||
Toplist(#[from] crate::accounting::toplist::Error),
|
||||
MissingKeyspaceConfig,
|
||||
CacheWriteF32(#[from] streams::timebin::cached::reader::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -47,6 +49,11 @@ enum Job {
|
||||
TsMs,
|
||||
Sender<Result<crate::accounting::toplist::UsageData, crate::accounting::toplist::Error>>,
|
||||
),
|
||||
WriteCacheF32(
|
||||
u64,
|
||||
BinsDim0<f32>,
|
||||
Sender<Result<(), streams::timebin::cached::reader::Error>>,
|
||||
),
|
||||
}
|
||||
|
||||
struct ReadNextValues {
|
||||
@@ -117,6 +124,24 @@ impl ScyllaQueue {
|
||||
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn write_cache_f32(
|
||||
&self,
|
||||
series: u64,
|
||||
bins: BinsDim0<f32>,
|
||||
) -> Result<(), streams::timebin::cached::reader::Error> {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let job = Job::WriteCacheF32(series, bins, tx);
|
||||
self.tx
|
||||
.send(job)
|
||||
.await
|
||||
.map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?;
|
||||
let res = rx
|
||||
.recv()
|
||||
.await
|
||||
.map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -191,6 +216,12 @@ impl ScyllaWorker {
|
||||
// TODO count for stats
|
||||
}
|
||||
}
|
||||
Job::WriteCacheF32(series, bins, tx) => {
|
||||
let res = super::bincache::worker_write(series, bins, &scy).await;
|
||||
if tx.send(res).await.is_err() {
|
||||
// TODO count for stats
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("scylla worker finished");
|
||||
|
||||
@@ -19,10 +19,7 @@ use tokio::io::AsyncRead;
|
||||
pub type BoxedBytesStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 {
|
||||
($($arg:tt)*) => ();
|
||||
($($arg:tt)*) => (trace!($($arg)*));
|
||||
}
|
||||
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); }
|
||||
|
||||
impl err::ToErr for crate::slidebuf::Error {
|
||||
fn to_err(self) -> Error {
|
||||
@@ -104,7 +101,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_upstream(&mut self, cx: &mut Context) -> Poll<Result<usize, Error>> {
|
||||
fn poll_upstream(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<usize, Error>> {
|
||||
trace2!("poll_upstream");
|
||||
use Poll::*;
|
||||
// use tokio::io::AsyncRead;
|
||||
@@ -112,7 +109,6 @@ where
|
||||
// let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min.saturating_sub(self.buf.len()))?);
|
||||
let inp = &mut self.inp;
|
||||
pin_mut!(inp);
|
||||
trace!("poll_upstream");
|
||||
match inp.poll_next(cx) {
|
||||
Ready(Some(Ok(x))) => match self.buf.available_writable_area(x.len()) {
|
||||
Ok(dst) => {
|
||||
@@ -254,7 +250,7 @@ where
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match self.poll_upstream(cx) {
|
||||
match self.as_mut().poll_upstream(cx) {
|
||||
Ready(Ok(n1)) => {
|
||||
if n1 == 0 {
|
||||
self.done = true;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
mod basic;
|
||||
pub mod cached;
|
||||
pub mod fromevents;
|
||||
mod fromlayers;
|
||||
mod gapfill;
|
||||
mod grid;
|
||||
|
||||
@@ -17,31 +17,16 @@ use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 {
|
||||
($($arg:tt)*) => {
|
||||
if false {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
macro_rules! debug_first { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace3 {
|
||||
($($arg:tt)*) => {
|
||||
if false {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace4 {
|
||||
($($arg:tt)*) => {
|
||||
if false {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
type SitemtyStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
|
||||
|
||||
@@ -115,7 +100,7 @@ where
|
||||
self.process_item(item);
|
||||
let mut do_emit = false;
|
||||
if self.done_first_input == false {
|
||||
debug!(
|
||||
debug_first!(
|
||||
"emit container after the first input len {} binner {}",
|
||||
item_len,
|
||||
self.binner.is_some()
|
||||
@@ -191,13 +176,13 @@ where
|
||||
trace2!("================= handle_none");
|
||||
let self_range_final = self.range_final;
|
||||
if let Some(binner) = self.binner.as_mut() {
|
||||
trace!("bins ready count before finish {}", binner.bins_ready_count());
|
||||
trace2!("bins ready count before finish {}", binner.bins_ready_count());
|
||||
// TODO rework the finish logic
|
||||
if self_range_final {
|
||||
binner.set_range_complete();
|
||||
}
|
||||
binner.push_in_progress(false);
|
||||
trace!("bins ready count after finish {}", binner.bins_ready_count());
|
||||
trace2!("bins ready count after finish {}", binner.bins_ready_count());
|
||||
if let Some(bins) = binner.bins_ready() {
|
||||
self.done_data = true;
|
||||
Ok(Break(Ready(sitem_data(bins))))
|
||||
@@ -260,6 +245,7 @@ where
|
||||
} else if self.done_data {
|
||||
self.done = true;
|
||||
if self.range_final {
|
||||
info!("TimeBinnedStream EMIT RANGE FINAL");
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
} else {
|
||||
continue;
|
||||
|
||||
@@ -1,23 +1,71 @@
|
||||
use crate as streams;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::timebin::TimeBinnable;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::ChConf;
|
||||
use netpod::DtMs;
|
||||
use netpod::TsNano;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
pub struct Reading {
|
||||
fut: Pin<Box<dyn Future<Output = Result<BinsDim0<f32>, Box<dyn std::error::Error>>> + Send>>,
|
||||
pub struct EventsReading {
|
||||
stream: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
|
||||
}
|
||||
|
||||
impl Future for Reading {
|
||||
type Output = Result<BinsDim0<f32>, Box<dyn std::error::Error>>;
|
||||
impl EventsReading {
|
||||
pub fn new(stream: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>) -> Self {
|
||||
Self { stream }
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for EventsReading {
|
||||
type Item = Sitemty<ChannelEvents>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
self.stream.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait EventsReadProvider: Send + Sync {
|
||||
fn read(&self, evq: EventsSubQuery, chconf: ChConf) -> EventsReading;
|
||||
}
|
||||
|
||||
pub struct CacheReading {
|
||||
fut: Pin<Box<dyn Future<Output = Result<BinsDim0<f32>, Box<dyn std::error::Error + Send>>> + Send>>,
|
||||
}
|
||||
|
||||
impl Future for CacheReading {
|
||||
type Output = Result<BinsDim0<f32>, Box<dyn std::error::Error + Send>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
self.fut.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CacheWriting {
|
||||
fut: Pin<Box<dyn Future<Output = Result<(), streams::timebin::cached::reader::Error>> + Send>>,
|
||||
}
|
||||
|
||||
impl CacheWriting {
|
||||
pub fn new(fut: Pin<Box<dyn Future<Output = Result<(), streams::timebin::cached::reader::Error>> + Send>>) -> Self {
|
||||
Self { fut }
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for CacheWriting {
|
||||
type Output = Result<(), streams::timebin::cached::reader::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
self.fut.poll_unpin(cx)
|
||||
@@ -25,13 +73,17 @@ impl Future for Reading {
|
||||
}
|
||||
|
||||
pub trait CacheReadProvider: Send + Sync {
|
||||
fn read(&self) -> Reading;
|
||||
fn read(&self, series: u64, range: BinnedRange<TsNano>) -> CacheReading;
|
||||
fn write(&self, series: u64, bins: BinsDim0<f32>) -> CacheWriting;
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "BinCachedReader")]
|
||||
pub enum Error {
|
||||
TodoImpl,
|
||||
ChannelSend,
|
||||
ChannelRecv,
|
||||
Scylla(String),
|
||||
}
|
||||
|
||||
pub struct CachedReader {
|
||||
@@ -55,6 +107,13 @@ impl Stream for CachedReader {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
// TODO
|
||||
// Must split over different msp (because pkey).
|
||||
// If we choose the partitioning length low enough, no need to issue multiple queries.
|
||||
// Change the worker interface:
|
||||
// We should already compute here the msp and off because we must here implement the loop logic.
|
||||
// Therefore worker interface should not accept BinnedRange, but msp and off range.
|
||||
error!("TODO CachedReader impl split reads over known ranges");
|
||||
// Ready(Some(Err(Error::TodoImpl)))
|
||||
Ready(None)
|
||||
}
|
||||
|
||||
80
crates/streams/src/timebin/fromevents.rs
Normal file
80
crates/streams/src/timebin/fromevents.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use super::cached::reader::EventsReadProvider;
|
||||
use super::cached::reader::EventsReading;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::ChConf;
|
||||
use netpod::TsNano;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "ReadingBinnedFromEvents")]
|
||||
pub enum Error {}
|
||||
|
||||
pub struct BinnedFromEvents {
|
||||
stream: Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>,
|
||||
}
|
||||
|
||||
impl BinnedFromEvents {
|
||||
pub fn new(
|
||||
range: BinnedRange<TsNano>,
|
||||
evq: EventsSubQuery,
|
||||
chconf: ChConf,
|
||||
do_time_weight: bool,
|
||||
read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<Self, Error> {
|
||||
if !evq.range().is_time() {
|
||||
panic!();
|
||||
}
|
||||
let stream = read_provider.read(evq, chconf);
|
||||
let stream = Box::pin(stream);
|
||||
let stream = super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight);
|
||||
let stream = stream.map(|item| match item {
|
||||
Ok(x) => match x {
|
||||
StreamItem::DataItem(x) => match x {
|
||||
RangeCompletableItem::Data(mut x) => {
|
||||
// TODO need a typed time binner
|
||||
if let Some(x) = x.as_any_mut().downcast_mut::<BinsDim0<f32>>() {
|
||||
let y = x.clone();
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(y)))
|
||||
} else {
|
||||
Err(::err::Error::with_msg_no_trace(
|
||||
"GapFill expects incoming BinsDim0<f32>",
|
||||
))
|
||||
}
|
||||
}
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
info!("BinnedFromEvents sees range final");
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
|
||||
}
|
||||
},
|
||||
StreamItem::Log(x) => Ok(StreamItem::Log(x)),
|
||||
StreamItem::Stats(x) => Ok(StreamItem::Stats(x)),
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
let ret = Self {
|
||||
stream: Box::pin(stream),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for BinnedFromEvents {
|
||||
type Item = Sitemty<BinsDim0<f32>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
self.stream.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::cached::reader::CacheReadProvider;
|
||||
use super::cached::reader::EventsReadProvider;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use crate::timebin::fromevents::BinnedFromEvents;
|
||||
use crate::timebin::grid::find_next_finer_bin_len;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
@@ -13,12 +15,17 @@ use items_0::streamitem::StreamItem;
|
||||
use items_0::timebin::TimeBinnableTy;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::BinnedRangeEnum;
|
||||
use netpod::ChConf;
|
||||
use netpod::ChannelTypeConfigGen;
|
||||
use netpod::DtMs;
|
||||
use netpod::ReqCtx;
|
||||
use netpod::SeriesKind;
|
||||
use netpod::TsNano;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use query::api4::events::EventsSubQuerySelect;
|
||||
use query::api4::events::EventsSubQuerySettings;
|
||||
use query::transform::TransformQuery;
|
||||
use std::pin::Pin;
|
||||
@@ -29,8 +36,9 @@ use std::task::Poll;
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "TimeBinnedFromLayers")]
|
||||
pub enum Error {
|
||||
Logic,
|
||||
GapFill(#[from] super::gapfill::Error),
|
||||
BinnedFromEvents(#[from] super::fromevents::Error),
|
||||
SfDatabufferNotSupported,
|
||||
}
|
||||
|
||||
type BoxedInput = Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>;
|
||||
@@ -61,7 +69,8 @@ impl TimeBinnedFromLayers {
|
||||
range: BinnedRange<TsNano>,
|
||||
do_time_weight: bool,
|
||||
bin_len_layers: Vec<DtMs>,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider + Send>,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<Self, Error> {
|
||||
info!(
|
||||
"{}::new {:?} {:?} {:?}",
|
||||
@@ -72,19 +81,20 @@ impl TimeBinnedFromLayers {
|
||||
);
|
||||
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
|
||||
if bin_len_layers.contains(&bin_len) {
|
||||
info!("{}::new bin_len in layers", Self::type_name());
|
||||
info!("{}::new bin_len in layers {:?}", Self::type_name(), range);
|
||||
let inp = super::gapfill::GapFill::new(
|
||||
"FromLayers".into(),
|
||||
ch_conf.clone(),
|
||||
transform_query.clone(),
|
||||
sub.clone(),
|
||||
log_level.clone(),
|
||||
ctx.clone(),
|
||||
open_bytes.clone(),
|
||||
series,
|
||||
range,
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
cache_read_provider,
|
||||
events_read_provider.clone(),
|
||||
)?;
|
||||
let ret = Self {
|
||||
ch_conf,
|
||||
@@ -99,22 +109,26 @@ impl TimeBinnedFromLayers {
|
||||
} else {
|
||||
match find_next_finer_bin_len(bin_len, &bin_len_layers) {
|
||||
Some(finer) => {
|
||||
// TODO
|
||||
// produce from binned sub-stream with additional binner.
|
||||
let range = BinnedRange::from_nano_range(range.to_nano_range(), finer);
|
||||
warn!("{}::new next finer {:?} {:?}", Self::type_name(), finer, range);
|
||||
let range_finer = BinnedRange::from_nano_range(range.to_nano_range(), finer);
|
||||
warn!(
|
||||
"{}::new next finer from bins {:?} {:?}",
|
||||
Self::type_name(),
|
||||
finer,
|
||||
range_finer
|
||||
);
|
||||
let inp = super::gapfill::GapFill::new(
|
||||
"FromLayers".into(),
|
||||
ch_conf.clone(),
|
||||
transform_query.clone(),
|
||||
sub.clone(),
|
||||
log_level.clone(),
|
||||
ctx.clone(),
|
||||
open_bytes.clone(),
|
||||
series,
|
||||
range.clone(),
|
||||
range_finer.clone(),
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
cache_read_provider,
|
||||
events_read_provider.clone(),
|
||||
)?;
|
||||
let inp = super::basic::TimeBinnedStream::new(
|
||||
Box::pin(inp),
|
||||
@@ -133,10 +147,39 @@ impl TimeBinnedFromLayers {
|
||||
Ok(ret)
|
||||
}
|
||||
None => {
|
||||
warn!("{}::new NO next finer", Self::type_name());
|
||||
// TODO
|
||||
// produce from events
|
||||
todo!()
|
||||
warn!("{}::new next finer from events", Self::type_name());
|
||||
let series_range = SeriesRange::TimeRange(range.to_nano_range());
|
||||
let one_before_range = true;
|
||||
let select = EventsSubQuerySelect::new(
|
||||
ch_conf.clone(),
|
||||
series_range,
|
||||
one_before_range,
|
||||
transform_query.clone(),
|
||||
);
|
||||
let evq = EventsSubQuery::from_parts(select, sub.clone(), ctx.reqid().into(), log_level.clone());
|
||||
match &ch_conf {
|
||||
ChannelTypeConfigGen::Scylla(chconf) => {
|
||||
let inp = BinnedFromEvents::new(
|
||||
range,
|
||||
evq,
|
||||
chconf.clone(),
|
||||
do_time_weight,
|
||||
events_read_provider,
|
||||
)?;
|
||||
let ret = Self {
|
||||
ch_conf,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
inp: Box::pin(inp),
|
||||
};
|
||||
warn!("{}::new setup from events", Self::type_name());
|
||||
Ok(ret)
|
||||
}
|
||||
ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,27 @@
|
||||
use super::cached::reader::CacheReadProvider;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use super::cached::reader::EventsReadProvider;
|
||||
use crate::timebin::fromevents::BinnedFromEvents;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::Empty;
|
||||
use items_0::WithLen;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::ChannelTypeConfigGen;
|
||||
use netpod::DtMs;
|
||||
use netpod::ReqCtx;
|
||||
use netpod::TsNano;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use query::api4::events::EventsSubQuerySelect;
|
||||
use query::api4::events::EventsSubQuerySettings;
|
||||
use query::transform::TransformQuery;
|
||||
use std::pin::Pin;
|
||||
@@ -22,11 +29,27 @@ use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! debug_init { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! debug_setup { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! debug_cache { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_handle { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "BinCachedGapFill")]
|
||||
pub enum Error {
|
||||
CacheReader(#[from] super::cached::reader::Error),
|
||||
GapFromFiner,
|
||||
#[error("InputBeforeRange({0}, {1})")]
|
||||
InputBeforeRange(NanoRange, BinnedRange<TsNano>),
|
||||
SfDatabufferNotSupported,
|
||||
EventsReader(#[from] super::fromevents::Error),
|
||||
}
|
||||
|
||||
type INP = Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>;
|
||||
@@ -34,41 +57,51 @@ type INP = Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>;
|
||||
// Try to read from cache for the given bin len.
|
||||
// For gaps in the stream, construct an alternative input from finer bin len with a binner.
|
||||
pub struct GapFill {
|
||||
dbgname: String,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
transform_query: TransformQuery,
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
series: u64,
|
||||
range: BinnedRange<TsNano>,
|
||||
do_time_weight: bool,
|
||||
bin_len_layers: Vec<DtMs>,
|
||||
inp: INP,
|
||||
inp: Option<INP>,
|
||||
inp_range_final: bool,
|
||||
inp_buf: Option<BinsDim0<f32>>,
|
||||
inp_finer: Option<INP>,
|
||||
inp_finer_range_final: bool,
|
||||
inp_finer_range_final_cnt: u32,
|
||||
inp_finer_range_final_max: u32,
|
||||
inp_finer_fills_gap: bool,
|
||||
last_bin_ts2: Option<TsNano>,
|
||||
exp_finer_range: NanoRange,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
bins_for_cache_write: BinsDim0<f32>,
|
||||
done: bool,
|
||||
cache_writing: Option<super::cached::reader::CacheWriting>,
|
||||
}
|
||||
|
||||
impl GapFill {
|
||||
// bin_len of the given range must be a cacheable bin_len.
|
||||
pub fn new(
|
||||
dbgname_parent: String,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
transform_query: TransformQuery,
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
|
||||
series: u64,
|
||||
range: BinnedRange<TsNano>,
|
||||
do_time_weight: bool,
|
||||
bin_len_layers: Vec<DtMs>,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<Self, Error> {
|
||||
// super::fromlayers::TimeBinnedFromLayers::new(series, range, do_time_weight, bin_len_layers)?;
|
||||
let dbgname = format!("{}--[{}]", dbgname_parent, range);
|
||||
debug_init!("new dbgname {}", dbgname);
|
||||
let inp = super::cached::reader::CachedReader::new(
|
||||
series,
|
||||
range.bin_len.to_dt_ms(),
|
||||
@@ -80,28 +113,38 @@ impl GapFill {
|
||||
Err(e) => Err(::err::Error::from_string(e)),
|
||||
});
|
||||
let ret = Self {
|
||||
dbgname,
|
||||
ch_conf,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
series,
|
||||
range,
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
inp: Box::pin(inp),
|
||||
inp: Some(Box::pin(inp)),
|
||||
inp_range_final: false,
|
||||
inp_buf: None,
|
||||
inp_finer: None,
|
||||
inp_finer_range_final: false,
|
||||
inp_finer_range_final_cnt: 0,
|
||||
inp_finer_range_final_max: 0,
|
||||
inp_finer_fills_gap: false,
|
||||
last_bin_ts2: None,
|
||||
// TODO just dummy:
|
||||
exp_finer_range: NanoRange { beg: 0, end: 0 },
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
bins_for_cache_write: BinsDim0::empty(),
|
||||
done: false,
|
||||
cache_writing: None,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn handle_bins_finer(mut self: Pin<&mut Self>, bins: BinsDim0<f32>) -> Result<BinsDim0<f32>, Error> {
|
||||
trace_handle!("{} handle_bins_finer {}", self.dbgname, bins);
|
||||
for (&ts1, &ts2) in bins.ts1s.iter().zip(&bins.ts2s) {
|
||||
if let Some(last) = self.last_bin_ts2 {
|
||||
if ts1 != last.ns() {
|
||||
@@ -110,25 +153,37 @@ impl GapFill {
|
||||
}
|
||||
self.last_bin_ts2 = Some(TsNano::from_ns(ts2));
|
||||
}
|
||||
|
||||
// TODO keep bins from finer source.
|
||||
// Only write bins to cache if we receive another
|
||||
|
||||
if bins.len() != 0 {
|
||||
bins.clone().drain_into(&mut self.bins_for_cache_write, 0..bins.len());
|
||||
}
|
||||
self.cache_write_intermediate()?;
|
||||
// TODO make sure that input does not send "made-up" empty future bins.
|
||||
// On the other hand, if the request is over past range, but the channel was silent ever since?
|
||||
// Then we should in principle know that from is-alive status checking.
|
||||
// So, until then, allow made-up bins?
|
||||
// Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way.
|
||||
|
||||
Ok(bins)
|
||||
}
|
||||
|
||||
fn handle_bins(mut self: Pin<&mut Self>, bins: BinsDim0<f32>) -> Result<BinsDim0<f32>, Error> {
|
||||
trace_handle!("{} handle_bins {}", self.dbgname, bins);
|
||||
// TODO could use an interface to iterate over opaque bin items that only expose
|
||||
// edge and count information with all remaining values opaque.
|
||||
for (i, (&ts1, &ts2)) in bins.ts1s.iter().zip(&bins.ts2s).enumerate() {
|
||||
if ts1 < self.range.nano_beg().ns() {
|
||||
return Err(Error::InputBeforeRange(
|
||||
NanoRange::from_ns_u64(ts1, ts2),
|
||||
self.range.clone(),
|
||||
));
|
||||
}
|
||||
if let Some(last) = self.last_bin_ts2 {
|
||||
if ts1 != last.ns() {
|
||||
trace_handle!(
|
||||
"{} detect a gap ------------- SETUP SUB STREAM ts1 {} last {}",
|
||||
self.dbgname,
|
||||
ts1,
|
||||
last
|
||||
);
|
||||
let mut ret = <BinsDim0<f32> as items_0::Empty>::empty();
|
||||
let mut bins = bins;
|
||||
bins.drain_into(&mut ret, 0..i);
|
||||
@@ -137,7 +192,7 @@ impl GapFill {
|
||||
beg: last.ns(),
|
||||
end: ts1,
|
||||
};
|
||||
self.setup_inp_finer(range)?;
|
||||
self.setup_inp_finer(range, true)?;
|
||||
return Ok(ret);
|
||||
}
|
||||
}
|
||||
@@ -146,25 +201,35 @@ impl GapFill {
|
||||
Ok(bins)
|
||||
}
|
||||
|
||||
fn setup_inp_finer(mut self: Pin<&mut Self>, range: NanoRange) -> Result<(), Error> {
|
||||
// Set up range to fill from finer.
|
||||
fn setup_inp_finer(mut self: Pin<&mut Self>, range: NanoRange, inp_finer_fills_gap: bool) -> Result<(), Error> {
|
||||
self.inp_finer_range_final = false;
|
||||
self.inp_finer_range_final_max += 1;
|
||||
self.inp_finer_fills_gap = inp_finer_fills_gap;
|
||||
self.exp_finer_range = range.clone();
|
||||
if let Some(bin_len_finer) =
|
||||
super::grid::find_next_finer_bin_len(self.range.bin_len.to_dt_ms(), &self.bin_len_layers)
|
||||
{
|
||||
debug_setup!(
|
||||
"{} setup_inp_finer next finer from bins {} {} from {}",
|
||||
self.dbgname,
|
||||
range,
|
||||
bin_len_finer,
|
||||
self.range.bin_len.to_dt_ms()
|
||||
);
|
||||
let range_finer = BinnedRange::from_nano_range(range, bin_len_finer);
|
||||
let inp_finer = GapFill::new(
|
||||
self.dbgname.clone(),
|
||||
self.ch_conf.clone(),
|
||||
self.transform_query.clone(),
|
||||
self.sub.clone(),
|
||||
self.log_level.clone(),
|
||||
self.ctx.clone(),
|
||||
self.open_bytes.clone(),
|
||||
self.series,
|
||||
range_finer.clone(),
|
||||
self.do_time_weight,
|
||||
self.bin_len_layers.clone(),
|
||||
self.cache_read_provider.clone(),
|
||||
self.events_read_provider.clone(),
|
||||
)?;
|
||||
let stream = Box::pin(inp_finer);
|
||||
let do_time_weight = self.do_time_weight;
|
||||
@@ -173,51 +238,77 @@ impl GapFill {
|
||||
super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight);
|
||||
self.inp_finer = Some(Box::pin(stream));
|
||||
} else {
|
||||
let do_time_weight = self.do_time_weight;
|
||||
debug_setup!("{} setup_inp_finer next finer from events {}", self.dbgname, range);
|
||||
let series_range = SeriesRange::TimeRange(range.clone());
|
||||
let one_before_range = true;
|
||||
let range = BinnedRange::from_nano_range(range, self.range.bin_len.to_dt_ms());
|
||||
let stream = crate::timebinnedjson::TimeBinnableStream::new(
|
||||
range.full_range(),
|
||||
one_before_range,
|
||||
let select = EventsSubQuerySelect::new(
|
||||
self.ch_conf.clone(),
|
||||
series_range,
|
||||
one_before_range,
|
||||
self.transform_query.clone(),
|
||||
self.sub.clone(),
|
||||
self.log_level.clone(),
|
||||
self.ctx.clone(),
|
||||
self.open_bytes.clone(),
|
||||
);
|
||||
// let stream: Pin<Box<dyn items_0::transform::TimeBinnableStreamTrait>> = stream;
|
||||
let stream = Box::pin(stream);
|
||||
// TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning.
|
||||
let stream =
|
||||
super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight);
|
||||
let stream = stream.map(|item| match item {
|
||||
Ok(x) => match x {
|
||||
StreamItem::DataItem(x) => match x {
|
||||
RangeCompletableItem::Data(mut x) => {
|
||||
// TODO need a typed time binner
|
||||
if let Some(x) = x.as_any_mut().downcast_mut::<BinsDim0<f32>>() {
|
||||
let y = x.clone();
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(y)))
|
||||
} else {
|
||||
Err(::err::Error::with_msg_no_trace(
|
||||
"GapFill expects incoming BinsDim0<f32>",
|
||||
))
|
||||
}
|
||||
}
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
|
||||
}
|
||||
},
|
||||
StreamItem::Log(x) => Ok(StreamItem::Log(x)),
|
||||
StreamItem::Stats(x) => Ok(StreamItem::Stats(x)),
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
// let stream: Pin<
|
||||
// Box<dyn Stream<Item = Sitemty<Box<dyn items_0::timebin::TimeBinned>>> + Send>,
|
||||
// > = Box::pin(stream);
|
||||
self.inp_finer = Some(Box::pin(stream));
|
||||
let evq = EventsSubQuery::from_parts(
|
||||
select,
|
||||
self.sub.clone(),
|
||||
self.ctx.reqid().into(),
|
||||
self.log_level.clone(),
|
||||
);
|
||||
match &self.ch_conf {
|
||||
ChannelTypeConfigGen::Scylla(chconf) => {
|
||||
let range = BinnedRange::from_nano_range(range.clone(), self.range.bin_len.to_dt_ms());
|
||||
let inp = BinnedFromEvents::new(
|
||||
range,
|
||||
evq,
|
||||
chconf.clone(),
|
||||
self.do_time_weight,
|
||||
self.events_read_provider.clone(),
|
||||
)?;
|
||||
self.inp_finer = Some(Box::pin(inp));
|
||||
}
|
||||
ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cache_write(mut self: Pin<&mut Self>, bins: BinsDim0<f32>) -> Result<(), Error> {
|
||||
self.cache_writing = Some(self.cache_read_provider.write(self.series, bins));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cache_write_on_end(mut self: Pin<&mut Self>) -> Result<(), Error> {
|
||||
if self.inp_finer_fills_gap {
|
||||
// TODO can consider all incoming bins as final by assumption.
|
||||
}
|
||||
let aa = &self.bins_for_cache_write;
|
||||
if aa.len() >= 2 {
|
||||
for (i, (&c1, &c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() {
|
||||
if c1 != 0 {
|
||||
let n = aa.len() - (1 + i);
|
||||
debug_cache!("{} cache_write_on_end consider {} for write", self.dbgname, n);
|
||||
let mut bins_write = BinsDim0::empty();
|
||||
self.bins_for_cache_write.drain_into(&mut bins_write, 0..n);
|
||||
self.cache_write(bins_write)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cache_write_intermediate(mut self: Pin<&mut Self>) -> Result<(), Error> {
|
||||
let aa = &self.bins_for_cache_write;
|
||||
if aa.len() >= 2 {
|
||||
for (i, (&c1, &c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() {
|
||||
if c1 != 0 {
|
||||
let n = aa.len() - (1 + i);
|
||||
debug_cache!("{} cache_write_intermediate consider {} for write", self.dbgname, n);
|
||||
let mut bins_write = BinsDim0::empty();
|
||||
self.bins_for_cache_write.drain_into(&mut bins_write, 0..n);
|
||||
self.cache_write(bins_write)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -229,7 +320,21 @@ impl Stream for GapFill {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if let Some(inp_finer) = self.inp_finer.as_mut() {
|
||||
break if self.done {
|
||||
Ready(None)
|
||||
} else if let Some(fut) = self.cache_writing.as_mut() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(Ok(())) => {
|
||||
self.cache_writing = None;
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
self.cache_writing = None;
|
||||
Ready(Some(Err(::err::Error::from_string(e))))
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if let Some(inp_finer) = self.inp_finer.as_mut() {
|
||||
// TODO
|
||||
// detect also gaps here: if gap from finer, then error.
|
||||
// on CacheUsage Use or Rereate:
|
||||
@@ -243,26 +348,48 @@ impl Stream for GapFill {
|
||||
}
|
||||
}
|
||||
StreamItem::DataItem(RangeCompletableItem::RangeComplete) => {
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
trace_handle!("{} RECV RANGE FINAL", self.dbgname);
|
||||
self.inp_finer_range_final = true;
|
||||
self.inp_finer_range_final_cnt += 1;
|
||||
match self.as_mut().cache_write_on_end() {
|
||||
Ok(()) => continue,
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
}
|
||||
}
|
||||
StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))),
|
||||
StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))),
|
||||
},
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Ready(None) => {
|
||||
trace_handle!(
|
||||
"{} inp_finer Ready(None) last_bin_ts2 {:?}",
|
||||
self.dbgname,
|
||||
self.last_bin_ts2
|
||||
);
|
||||
self.inp_finer = None;
|
||||
if let Some(j) = self.last_bin_ts2 {
|
||||
if j.ns() != self.exp_finer_range.end() {
|
||||
trace_handle!(
|
||||
"{} inp_finer Ready(None) last_bin_ts2 {:?} exp_finer_range {:?}",
|
||||
self.dbgname,
|
||||
self.last_bin_ts2,
|
||||
self.exp_finer_range
|
||||
);
|
||||
Ready(Some(Err(::err::Error::from_string(
|
||||
"finer input didn't deliver to the end",
|
||||
))))
|
||||
} else {
|
||||
self.last_bin_ts2 = None;
|
||||
self.exp_finer_range = NanoRange { beg: 0, end: 0 };
|
||||
self.inp_finer = None;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
Ready(Some(Err(::err::Error::from_string("finer input delivered nothing"))))
|
||||
error!(
|
||||
"{} inp_finer Ready(None) last_bin_ts2 {:?}",
|
||||
self.dbgname, self.last_bin_ts2
|
||||
);
|
||||
Ready(Some(Err(::err::Error::from_string(
|
||||
"finer input delivered nothing, received nothing at all so far",
|
||||
))))
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
@@ -272,46 +399,52 @@ impl Stream for GapFill {
|
||||
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))),
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
}
|
||||
} else {
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
} else if let Some(inp) = self.inp.as_mut() {
|
||||
match inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => match x {
|
||||
StreamItem::DataItem(RangeCompletableItem::Data(x)) => match self.as_mut().handle_bins(x) {
|
||||
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))),
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
},
|
||||
StreamItem::DataItem(RangeCompletableItem::RangeComplete) => {
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
self.inp_range_final = true;
|
||||
continue;
|
||||
}
|
||||
StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))),
|
||||
StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))),
|
||||
},
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Ready(None) => {
|
||||
self.inp = None;
|
||||
// TODO assert that we have emitted up to the requested range.
|
||||
// If not, request the remaining range from "finer" input.
|
||||
if let Some(j) = self.last_bin_ts2 {
|
||||
if j.ns() != self.exp_finer_range.end() {
|
||||
if j != self.range.nano_end() {
|
||||
let range = NanoRange {
|
||||
beg: j.ns(),
|
||||
end: self.range.full_range().end(),
|
||||
};
|
||||
match self.as_mut().setup_inp_finer(range) {
|
||||
warn!(
|
||||
"----- RECEIVED SOMETHING, BUT NOT ALL, setup rest from finer {} {} {}",
|
||||
self.range, j, range
|
||||
);
|
||||
match self.as_mut().setup_inp_finer(range, false) {
|
||||
Ok(()) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
}
|
||||
} else {
|
||||
// self.last_bin_ts2 = None;
|
||||
// self.exp_finer_range = NanoRange { beg: 0, end: 0 };
|
||||
// self.inp_finer = None;
|
||||
// continue;
|
||||
info!("----- RECEIVED EVERYTHING");
|
||||
Ready(None)
|
||||
}
|
||||
} else {
|
||||
warn!("----- NOTHING IN CACHE, SETUP FULL FROM FINER");
|
||||
let range = self.range.full_range();
|
||||
match self.as_mut().setup_inp_finer(range) {
|
||||
let range = self.range.to_nano_range();
|
||||
warn!(
|
||||
"----- RECEIVED NOTHING SO FAR AT ALL, setup full range from finer {} {}",
|
||||
self.range, range
|
||||
);
|
||||
match self.as_mut().setup_inp_finer(range, false) {
|
||||
Ok(()) => {
|
||||
continue;
|
||||
}
|
||||
@@ -321,24 +454,16 @@ impl Stream for GapFill {
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else {
|
||||
self.done = true;
|
||||
if self.inp_finer_range_final_cnt == self.inp_finer_range_final_max {
|
||||
trace_handle!("{} RANGE FINAL ALL", self.dbgname);
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
} else {
|
||||
trace_handle!("{} SUBSTREAMS NOT FINAL", self.dbgname);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
// When do we detect a gap:
|
||||
// - when the current item poses a gap to the last.
|
||||
// - when we see EOS before the requested range is filled.
|
||||
// Requirements:
|
||||
// Must always request fully cache-aligned ranges.
|
||||
// Must remember where the last bin ended.
|
||||
|
||||
// When a gap is detected:
|
||||
// - buffer the current item, if there is one (can also be EOS).
|
||||
// - create a new producer of bin:
|
||||
// - GapFillwith finer range? FromFiner(series, bin_len, range) ?
|
||||
// - TimeBinnedFromLayers for a bin_len in layers would also go directly into GapFill.
|
||||
// what does FromFiner bring to the table?
|
||||
// It does not attempt to read the given bin-len from a cache, because we just did attempt that.
|
||||
// It still requires that bin-len is cacheable. (NO! it must work with the layering that I passed!)
|
||||
// Then it finds the next cacheable
|
||||
// Ready(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::rangefilter2::RangeFilter2;
|
||||
use crate::tcprawclient::container_stream_from_bytes_stream;
|
||||
use crate::tcprawclient::make_sub_query;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use crate::timebin::cached::reader::EventsReadProvider;
|
||||
use crate::timebin::CacheReadProvider;
|
||||
use crate::timebin::TimeBinnedStream;
|
||||
use crate::transform::build_merged_event_transform;
|
||||
@@ -310,10 +311,11 @@ async fn timebinned_stream(
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
cache_read_provider: Option<Arc<dyn CacheReadProvider>>,
|
||||
events_read_provider: Option<Arc<dyn EventsReadProvider>>,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
|
||||
use netpod::query::CacheUsage;
|
||||
match (query.cache_usage(), cache_read_provider) {
|
||||
(CacheUsage::Use | CacheUsage::Recreate, Some(cache_read_provider)) => {
|
||||
match (query.cache_usage(), cache_read_provider, events_read_provider) {
|
||||
(CacheUsage::Use | CacheUsage::Recreate, Some(cache_read_provider), Some(events_read_provider)) => {
|
||||
let series = if let Some(x) = query.channel().series() {
|
||||
x
|
||||
} else {
|
||||
@@ -351,6 +353,7 @@ async fn timebinned_stream(
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
)
|
||||
.map_err(Error::from_string)?;
|
||||
let stream = stream.map(|item| {
|
||||
@@ -408,6 +411,7 @@ pub async fn timebinned_json(
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
cache_read_provider: Option<Arc<dyn CacheReadProvider>>,
|
||||
events_read_provider: Option<Arc<dyn EventsReadProvider>>,
|
||||
) -> Result<JsonValue, Error> {
|
||||
let deadline = Instant::now() + query.timeout_content().unwrap_or(Duration::from_millis(5000));
|
||||
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
|
||||
@@ -421,6 +425,7 @@ pub async fn timebinned_json(
|
||||
ctx,
|
||||
open_bytes,
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
)
|
||||
.await?;
|
||||
let stream = timebinned_to_collectable(stream);
|
||||
|
||||
@@ -230,6 +230,15 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
|
||||
break;
|
||||
}
|
||||
}
|
||||
for e in &tracing_trace {
|
||||
tmp1.clear();
|
||||
tmp1.push_str(e);
|
||||
tmp1.push_str("::");
|
||||
if meta.target() == &tmp1[..tmp1.len() - 2] || meta.target().starts_with(&tmp1) {
|
||||
target_match = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if target_match {
|
||||
let mut sr = ctx.lookup_current();
|
||||
let mut allow = false;
|
||||
|
||||
Reference in New Issue
Block a user