This commit is contained in:
Dominik Werder
2024-09-18 12:12:53 +02:00
parent ab6b0322c9
commit e4f8ad1e91
25 changed files with 520 additions and 289 deletions

View File

@@ -64,14 +64,15 @@ fn time_bin_00() -> Result<(), Error> {
let bins = BinsDim0::empty();
d.push_back(bins);
let mut bins = BinsDim0::empty();
bins.push(SEC * 0, SEC * 1, 0, 0.0, 0.0, 0.0);
bins.push(SEC * 1, SEC * 2, 2, 0.0535830, 100.0589, 50.05624);
bins.push(SEC * 2, SEC * 3, 2, 200.06143, 300.07645, 250.06894);
bins.push(SEC * 3, SEC * 4, 2, 400.08554, 500.05222, 450.06888);
bins.push(SEC * 4, SEC * 5, 2, 600.0025, 700.09094, 650.04675);
// Currently can not cosntruct bins without minmaxlst
// bins.push(SEC * 0, SEC * 1, 0, 0.0, 0.0, 0.0);
bins.push(SEC * 1, SEC * 2, 2, 0.0535830, 100.0589, 50.05624, 100.0589);
bins.push(SEC * 2, SEC * 3, 2, 200.06143, 300.07645, 250.06894, 300.07645);
bins.push(SEC * 3, SEC * 4, 2, 400.08554, 500.05222, 450.06888, 500.05222);
bins.push(SEC * 4, SEC * 5, 2, 600.0025, 700.09094, 650.04675, 700.09094);
d.push_back(bins);
let mut bins = BinsDim0::empty();
bins.push(SEC * 5, SEC * 6, 2, 800.0619, 900.02844, 850.04517);
bins.push(SEC * 5, SEC * 6, 2, 800.0619, 900.02844, 850.04517, 900.02844);
d.push_back(bins);
d
};
@@ -342,6 +343,7 @@ fn timebin_multi_stage_00() -> Result<(), Error> {
20 + 2 * i as i32,
21 + 2 * i as i32,
20.5 + 2. * i as f32,
21 + 2 * i as i32,
);
}
bins
@@ -356,6 +358,7 @@ fn timebin_multi_stage_00() -> Result<(), Error> {
20 + 4 * i as i32,
23 + 4 * i as i32,
21.5 + 4. * i as f32,
23 + 4 * i as i32,
);
}
bins

View File

@@ -7,6 +7,7 @@ use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_2::binsdim0::BinsDim0;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::ChConf;
@@ -40,6 +41,16 @@ impl BinnedFromEvents {
panic!();
}
let stream = read_provider.read(evq, chconf);
let stream = stream.map(|x| {
let x = items_0::try_map_sitemty_data!(x, |x| match x {
ChannelEvents::Events(x) => {
let x = x.to_dim0_f32_for_binning();
Ok(ChannelEvents::Events(x))
}
ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)),
});
x
});
let stream = Box::pin(stream);
let stream = super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight);
let stream = stream.map(|item| match item {

View File

@@ -40,6 +40,8 @@ pub enum Error {
GapFill(#[from] super::gapfill::Error),
BinnedFromEvents(#[from] super::fromevents::Error),
SfDatabufferNotSupported,
#[error("FinerGridMismatch({0}, {1})")]
FinerGridMismatch(DtMs, DtMs),
}
type BoxedInput = Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>;
@@ -75,7 +77,7 @@ impl TimeBinnedFromLayers {
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Self, Error> {
info!(
debug!(
"{}::new {:?} {:?} {:?}",
Self::type_name(),
series,
@@ -84,7 +86,7 @@ impl TimeBinnedFromLayers {
);
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
if bin_len_layers.contains(&bin_len) {
info!("{}::new bin_len in layers {:?}", Self::type_name(), range);
debug!("{}::new bin_len in layers {:?}", Self::type_name(), range);
let inp = super::gapfill::GapFill::new(
"FromLayers".into(),
ch_conf.clone(),
@@ -114,8 +116,11 @@ impl TimeBinnedFromLayers {
} else {
match find_next_finer_bin_len(bin_len, &bin_len_layers) {
Some(finer) => {
if bin_len.ms() % finer.ms() != 0 {
return Err(Error::FinerGridMismatch(bin_len, finer));
}
let range_finer = BinnedRange::from_nano_range(range.to_nano_range(), finer);
warn!(
debug!(
"{}::new next finer from bins {:?} {:?}",
Self::type_name(),
finer,
@@ -154,7 +159,7 @@ impl TimeBinnedFromLayers {
Ok(ret)
}
None => {
warn!("{}::new next finer from events", Self::type_name());
debug!("{}::new next finer from events", Self::type_name());
let series_range = SeriesRange::TimeRange(range.to_nano_range());
let one_before_range = true;
let select = EventsSubQuerySelect::new(
@@ -183,7 +188,7 @@ impl TimeBinnedFromLayers {
open_bytes,
inp: Box::pin(inp),
};
warn!("{}::new setup from events", Self::type_name());
debug!("{}::new setup from events", Self::type_name());
Ok(ret)
}
ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported),

View File

@@ -305,7 +305,7 @@ impl GapFill {
}
let aa = &self.bins_for_cache_write;
if aa.len() >= 2 {
for (i, (&c1, &_c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() {
for (i, (&c1, &_c2)) in aa.cnts.iter().rev().zip(aa.cnts.iter().rev().skip(1)).enumerate() {
if c1 != 0 {
let n = aa.len() - (1 + i);
debug_cache!("{} cache_write_on_end consider {} for write", self.dbgname, n);
@@ -322,7 +322,7 @@ impl GapFill {
fn cache_write_intermediate(mut self: Pin<&mut Self>) -> Result<(), Error> {
let aa = &self.bins_for_cache_write;
if aa.len() >= 2 {
for (i, (&c1, &_c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() {
for (i, (&c1, &_c2)) in aa.cnts.iter().rev().zip(aa.cnts.iter().rev().skip(1)).enumerate() {
if c1 != 0 {
let n = aa.len() - (1 + i);
debug_cache!("{} cache_write_intermediate consider {} for write", self.dbgname, n);
@@ -462,9 +462,9 @@ impl Stream for GapFill {
beg: j.ns(),
end: self.range.full_range().end(),
};
warn!(
"----- RECEIVED SOMETHING, BUT NOT ALL, setup rest from finer {} {} {}",
self.range, j, range
debug!(
"{} received something but not all, setup rest from finer {} {} {}",
self.dbgname, self.range, j, range
);
match self.as_mut().setup_inp_finer(range, false) {
Ok(()) => {
@@ -473,14 +473,14 @@ impl Stream for GapFill {
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
}
} else {
info!("----- RECEIVED EVERYTHING");
debug!("{} received everything", self.dbgname);
Ready(None)
}
} else {
let range = self.range.to_nano_range();
warn!(
"----- RECEIVED NOTHING SO FAR AT ALL, setup full range from finer {} {}",
self.range, range
debug!(
"{} received nothing at all, setup full range from finer {} {}",
self.dbgname, self.range, range
);
match self.as_mut().setup_inp_finer(range, false) {
Ok(()) => {
@@ -495,10 +495,10 @@ impl Stream for GapFill {
} else {
self.done = true;
if self.inp_finer_range_final_cnt == self.inp_finer_range_final_max {
trace_handle!("{} RANGE FINAL ALL", self.dbgname);
trace_handle!("{} range finale all", self.dbgname);
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
trace_handle!("{} SUBSTREAMS NOT FINAL", self.dbgname);
trace_handle!("{} substreams not final", self.dbgname);
continue;
}
};

View File

@@ -52,7 +52,6 @@ fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream<Item = R>) -> impl
stream
}
// TODO factor out, it is use now also from GapFill.
pub async fn timebinnable_stream(
range: NanoRange,
one_before_range: bool,
@@ -92,6 +91,7 @@ pub async fn timebinnable_stream(
on_sitemty_data!(k, |k| {
let k: Box<dyn Events> = Box::new(k);
// trace!("got len {}", k.len());
let k = k.to_dim0_f32_for_binning();
let k = tr.0.transform(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
@@ -316,19 +316,24 @@ async fn timebinned_stream(
events_read_provider: Option<Arc<dyn EventsReadProvider>>,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
use netpod::query::CacheUsage;
match (query.cache_usage(), cache_read_provider, events_read_provider) {
(CacheUsage::Use | CacheUsage::Recreate, Some(cache_read_provider), Some(events_read_provider)) => {
let series = if let Some(x) = query.channel().series() {
x
} else {
return Err(Error::with_msg_no_trace(
"cached time binned only available given a series id",
));
};
info!("--- CACHING PATH ---");
info!("{query:?}");
info!("subgrids {:?}", query.subgrids());
let range = binned_range.binned_range_time().to_nano_range();
let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Use);
match (
ch_conf.series(),
cache_usage.clone(),
cache_read_provider,
events_read_provider,
) {
(
Some(series),
CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore,
Some(cache_read_provider),
Some(events_read_provider),
) => {
debug!(
"timebinned_stream caching {:?} subgrids {:?}",
query,
query.subgrids()
);
let do_time_weight = true;
let bin_len_layers = if let Some(subgrids) = query.subgrids() {
subgrids
@@ -336,16 +341,11 @@ async fn timebinned_stream(
.map(|&x| DtMs::from_ms_u64(1000 * x.as_secs()))
.collect()
} else {
vec![
DtMs::from_ms_u64(1000 * 10),
DtMs::from_ms_u64(1000 * 60 * 60),
// DtMs::from_ms_u64(1000 * 60 * 60 * 12),
// DtMs::from_ms_u64(1000 * 10),
]
netpod::time_bin_len_cache_opts().to_vec()
};
let stream = crate::timebin::TimeBinnedFromLayers::new(
ch_conf,
query.cache_usage(),
cache_usage,
query.transform().clone(),
EventsSubQuerySettings::from(&query),
query.log_level().into(),
@@ -369,10 +369,8 @@ async fn timebinned_stream(
}
_ => {
let range = binned_range.binned_range_time().to_nano_range();
let do_time_weight = true;
let one_before_range = true;
let stream = timebinnable_stream(
range,
one_before_range,
@@ -419,7 +417,7 @@ pub async fn timebinned_json(
let deadline = Instant::now()
+ query
.timeout_content()
.unwrap_or(Duration::from_millis(5000))
.unwrap_or(Duration::from_millis(3000))
.min(Duration::from_millis(5000))
.max(Duration::from_millis(200));
let binned_range = query.covering_range()?;
@@ -439,18 +437,19 @@ pub async fn timebinned_json(
let stream = timebinned_to_collectable(stream);
let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range));
let collected: BoxFuture<_> = Box::pin(collected);
let collected = collected.await?;
info!("timebinned_json collected type_name {:?}", collected.type_name());
let collected = if let Some(bins) = collected
let collres = collected.await?;
info!("timebinned_json collected type_name {:?}", collres.type_name());
let collres = if let Some(bins) = collres
.as_any_ref()
.downcast_ref::<items_2::binsdim0::BinsDim0CollectedResult<netpod::EnumVariant>>()
{
info!("MATCHED");
bins.boxed_collected_with_enum_fix()
warn!("unexpected binned enum");
// bins.boxed_collected_with_enum_fix()
collres
} else {
collected
collres
};
let jsval = serde_json::to_value(&collected)?;
let jsval = serde_json::to_value(&collres)?;
Ok(jsval)
}
@@ -461,8 +460,9 @@ fn take_collector_result(coll: &mut Box<dyn items_0::collect_s::Collector>) -> O
.as_any_ref()
.downcast_ref::<items_2::binsdim0::BinsDim0CollectedResult<netpod::EnumVariant>>()
{
info!("MATCHED ENUM");
bins.boxed_collected_with_enum_fix()
warn!("unexpected binned enum");
// bins.boxed_collected_with_enum_fix()
collres
} else {
collres
};