Refactor AsAny handling

This commit is contained in:
Dominik Werder
2022-12-12 15:53:53 +01:00
parent 87dde4712e
commit e81337c22f
29 changed files with 524 additions and 269 deletions

View File

@@ -18,6 +18,7 @@ use crate::{TimeBinned, TimeBinnerDyn, TimeBins};
use chrono::{TimeZone, Utc};
use err::Error;
use items_0::subfr::SubFrId;
use items_0::AsAnyRef;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{NanoRange, Shape};
@@ -74,6 +75,15 @@ where
}
}
impl<NTY> AsAnyRef for MinMaxAvgDim0Bins<NTY>
where
NTY: NumOps,
{
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl<NTY> MinMaxAvgDim0Bins<NTY> {
pub fn empty() -> Self {
Self {
@@ -458,10 +468,6 @@ impl<NTY: NumOps + 'static> TimeBinnableDyn for MinMaxAvgDim0Bins<NTY> {
let ret = MinMaxAvgDim0BinsTimeBinner::<NTY>::new(edges.into(), do_time_weight);
Box::new(ret)
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
pub struct MinMaxAvgDim0BinsTimeBinner<NTY: NumOps> {
@@ -547,13 +553,13 @@ impl<NTY: NumOps + 'static> TimeBinnerDyn for MinMaxAvgDim0BinsTimeBinner<NTY> {
self.agg.as_mut().unwrap()
};
if let Some(item) = item
.as_any()
.as_any_ref()
// TODO make statically sure that we attempt to cast to the correct type here:
.downcast_ref::<<MinMaxAvgDim0BinsAggregator<NTY> as TimeBinnableTypeAggregator>::Input>()
{
agg.ingest(item);
} else {
let tyid_item = std::any::Any::type_id(item.as_any());
let tyid_item = std::any::Any::type_id(item.as_any_ref());
error!("not correct item type {:?}", tyid_item);
};
if item.ends_after(agg.range().clone()) {

View File

@@ -18,11 +18,13 @@ use crate::{Fits, FitsInside, NewEmpty, ReadPbv, Sitemty, TimeBinned, WithLen};
use chrono::{TimeZone, Utc};
use err::Error;
use items_0::subfr::SubFrId;
use items_0::AsAnyRef;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{NanoRange, Shape};
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::fmt;
use std::marker::PhantomData;
use tokio::fs::File;
@@ -72,6 +74,15 @@ where
}
}
impl<NTY> AsAnyRef for MinMaxAvgDim1Bins<NTY>
where
NTY: NumOps,
{
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl<NTY> MinMaxAvgDim1Bins<NTY> {
pub fn empty() -> Self {
Self {

View File

@@ -7,6 +7,7 @@ use bincode::config::{WithOtherEndian, WithOtherIntEncoding, WithOtherTrailing};
use bincode::DefaultOptions;
use bytes::{BufMut, BytesMut};
use err::Error;
use items_0::bincode;
#[allow(unused)]
use netpod::log::*;
use serde::Serialize;
@@ -86,7 +87,7 @@ pub fn encode_to_vec<S>(item: S) -> Result<Vec<u8>, Error>
where
S: Serialize,
{
if true {
if false {
serde_json::to_vec(&item).map_err(|e| e.into())
} else {
bincode_to_vec(&item)
@@ -97,7 +98,7 @@ pub fn decode_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
if true {
if false {
serde_json::from_slice(buf).map_err(|e| e.into())
} else {
bincode_from_slice(buf)
@@ -112,10 +113,10 @@ where
let mut out = Vec::new();
//let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map();
//let writer = ciborium::ser::into_writer(&item, &mut out).unwrap();
//let mut ser = bincode_ser(&mut out);
//let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
let mut ser = serde_json::Serializer::new(&mut out);
let mut ser = bincode_ser(&mut out);
let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
//let mut ser = serde_json::Serializer::new(&mut out);
//let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
match item.erased_serialize(&mut ser2) {
Ok(_) => {
let enc = out;
@@ -333,7 +334,10 @@ where
)))
} else {
match decode_from_slice(frame.buf()) {
Ok(item) => Ok(item),
Ok(item) => {
info!("decode_from_slice {} success", std::any::type_name::<T>());
Ok(item)
}
Err(e) => {
error!("decode_frame T = {}", std::any::type_name::<T>());
error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid());

View File

@@ -19,6 +19,7 @@ use bytes::BytesMut;
use chrono::{TimeZone, Utc};
use err::Error;
use frame::{make_error_frame, make_log_frame, make_range_complete_frame, make_stats_frame};
use items_0::AsAnyRef;
#[allow(unused)]
use netpod::log::*;
use netpod::timeunits::{MS, SEC};
@@ -508,32 +509,23 @@ pub trait TimeBinnableType:
// TODO should not require Sync!
// TODO SitemtyFrameType is already supertrait of FramableInner.
pub trait TimeBinnableDyn:
std::fmt::Debug
fmt::Debug
+ FramableInner
+ FrameType
+ FrameTypeInnerDyn
+ WithLen
+ RangeOverlapInfo
+ Any
+ AsAnyRef
+ Sync
+ Send
+ 'static
{
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinnerDyn>;
fn as_any(&self) -> &dyn Any;
}
pub trait TimeBinnableDynStub:
std::fmt::Debug
+ FramableInner
+ FrameType
+ FrameTypeInnerDyn
+ WithLen
+ RangeOverlapInfo
+ Any
+ Sync
+ Send
+ 'static
fmt::Debug + FramableInner + FrameType + FrameTypeInnerDyn + WithLen + RangeOverlapInfo + Any + AsAnyRef + Sync + Send + 'static
{
}
@@ -546,10 +538,6 @@ where
error!("TODO impl time_binner_new for T {}", std::any::type_name::<T>());
err::todoval()
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
// TODO maybe this is no longer needed:
@@ -606,10 +594,6 @@ impl TimeBinnableDyn for Box<dyn TimeBinned> {
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinnerDyn> {
self.as_time_binnable_dyn().time_binner_new(edges, do_time_weight)
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
// TODO should get I/O and tokio dependence out of this crate

View File

@@ -8,6 +8,7 @@ use crate::{
TimeBinnableTypeAggregator, TimeBinnerDyn, WithLen, WithTimestamps,
};
use err::Error;
use items_0::AsAnyRef;
use netpod::log::*;
use netpod::{NanoRange, Shape};
use serde::{Deserialize, Serialize};
@@ -93,6 +94,15 @@ where
}
}
impl<NTY> AsAnyRef for ScalarEvents<NTY>
where
NTY: NumOps,
{
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl<NTY> WithLen for ScalarEvents<NTY>
where
NTY: NumOps,
@@ -601,10 +611,6 @@ impl<NTY: NumOps + 'static> TimeBinnableDyn for ScalarEvents<NTY> {
let ret = ScalarEventsTimeBinner::<NTY>::new(edges.into(), do_time_weight);
Box::new(ret)
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
impl<NTY: NumOps + 'static> EventsDyn for ScalarEvents<NTY> {
@@ -738,7 +744,7 @@ impl<NTY: NumOps + 'static> TimeBinnerDyn for ScalarEventsTimeBinner<NTY> {
self.agg.as_mut().unwrap()
};
if let Some(item) = item
.as_any()
.as_any_ref()
// TODO make statically sure that we attempt to cast to the correct type here:
.downcast_ref::<<EventValuesAggregator<NTY> as TimeBinnableTypeAggregator>::Input>()
{

View File

@@ -9,6 +9,7 @@ use crate::{
};
use err::Error;
use items_0::subfr::SubFrId;
use items_0::AsAnyRef;
use netpod::log::*;
use netpod::{x_bin_count, AggKind, NanoRange, Shape};
use serde::{Deserialize, Serialize};
@@ -67,6 +68,12 @@ where
}
}
impl<NTY> AsAnyRef for WaveEvents<NTY> where NTY:NumOps {
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl<NTY> WithLen for WaveEvents<NTY> {
fn len(&self) -> usize {
self.tss.len()