Refactor, replace by generic impl

This commit is contained in:
Dominik Werder
2022-11-09 16:20:23 +01:00
parent 9036160253
commit 8ab96e565e
13 changed files with 94 additions and 135 deletions

View File

@@ -7,7 +7,7 @@ use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use http::{StatusCode, Uri};
use items::frame::decode_frame;
use items::{FrameDecodable, FrameType, FrameTypeStaticSYC, TimeBinnableType};
use items::{FrameDecodable, FrameType, FrameTypeInnerStatic, TimeBinnableType};
use items::{RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::query::CacheUsage;
@@ -33,7 +33,7 @@ pub struct FetchedPreBinned<TBT> {
impl<TBT> FetchedPreBinned<TBT> {
pub fn new(query: &PreBinnedQuery, host: String, port: u16) -> Result<Self, Error>
where
TBT: FrameTypeStaticSYC + TimeBinnableType,
TBT: FrameTypeInnerStatic + TimeBinnableType,
Sitemty<TBT>: FrameDecodable,
{
// TODO should not assume http:
@@ -53,7 +53,7 @@ impl<TBT> FetchedPreBinned<TBT> {
impl<TBT> Stream for FetchedPreBinned<TBT>
where
TBT: FrameTypeStaticSYC + TimeBinnableType,
TBT: FrameTypeInnerStatic + TimeBinnableType,
Sitemty<TBT>: FrameDecodable,
{
type Item = Sitemty<TBT>;

View File

@@ -12,8 +12,8 @@ use futures_core::Stream;
use futures_util::StreamExt;
use items::numops::{BoolNum, NumOps, StringNum};
use items::{
Appendable, Clearable, EventsNodeProcessor, Framable, FrameDecodable, FrameType, PushableIndex,
RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem, TimeBinnableType, TimeBinned,
Appendable, Clearable, EventsNodeProcessor, Framable, FrameDecodable, FrameType, FrameTypeInnerDyn, PushableIndex,
RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType, TimeBinned,
};
use netpod::log::*;
use netpod::{AggKind, ByteOrder, ChannelTyped, NodeConfigCached, ScalarType, Shape};
@@ -35,7 +35,7 @@ where
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + Clearable + 'static,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: SitemtyFrameType + TimeBinned,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: FrameTypeInnerDyn + TimeBinned,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: Framable + FrameType + FrameDecodable,
{

View File

@@ -4,8 +4,8 @@ use bytes::{Buf, BytesMut};
use err::Error;
use futures_util::{Stream, StreamExt};
use items::{
Appendable, ByteEstimate, Clearable, FrameTypeStaticSYC, PushableIndex, RangeCompletableItem, SitemtyFrameType,
StatsItem, StreamItem, WithLen, WithTimestamps,
Appendable, ByteEstimate, Clearable, FrameTypeInnerStatic, PushableIndex, RangeCompletableItem, StatsItem,
StreamItem, WithLen, WithTimestamps,
};
use netpod::histo::HistoLog2;
use netpod::log::*;
@@ -528,16 +528,10 @@ impl EventFull {
}
}
impl FrameTypeStaticSYC for EventFull {
impl FrameTypeInnerStatic for EventFull {
const FRAME_TYPE_ID: u32 = items::EVENT_FULL_FRAME_TYPE_ID;
}
impl SitemtyFrameType for EventFull {
fn frame_type_id(&self) -> u32 {
<Self as FrameTypeStaticSYC>::FRAME_TYPE_ID
}
}
impl WithLen for EventFull {
fn len(&self) -> usize {
self.tss.len()

View File

@@ -2,7 +2,7 @@ use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use futures_core::Stream;
use futures_util::StreamExt;
use items::frame::decode_frame;
use items::{FrameTypeStaticSYC, Sitemty, StreamItem};
use items::{FrameTypeInnerStatic, Sitemty, StreamItem};
use netpod::log::*;
use serde::de::DeserializeOwned;
use std::marker::PhantomData;
@@ -37,7 +37,7 @@ where
impl<T, I> Stream for EventsFromFrames<T, I>
where
T: AsyncRead + Unpin,
I: FrameTypeStaticSYC + DeserializeOwned + Unpin,
I: FrameTypeInnerStatic + DeserializeOwned + Unpin,
{
type Item = Sitemty<I>;