Process waveform on event fetch
This commit is contained in:
@@ -529,9 +529,22 @@ fn categorize_index_files(list: &Vec<String>) -> Result<Vec<IndexFile>, Error> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn index_file_path_list(channel: Channel, dbconf: Database) -> Result<Vec<PathBuf>, Error> {
|
||||
let dbc = database_connect(&dbconf).await?;
|
||||
let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index";
|
||||
let rows = dbc.query(sql, &[&channel.name()]).await?;
|
||||
let mut index_paths = vec![];
|
||||
for row in rows {
|
||||
index_paths.push(row.try_get(0)?);
|
||||
}
|
||||
let list = categorize_index_files(&index_paths)?;
|
||||
let ret = list.into_iter().map(|k| k.path).collect();
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
static INDEX_JSON: Mutex<Option<BTreeMap<String, Vec<String>>>> = Mutex::const_new(None);
|
||||
|
||||
pub async fn index_files_index_ref<P: Into<PathBuf> + Send>(
|
||||
async fn index_files_index_ref<P: Into<PathBuf> + Send>(
|
||||
key: &str,
|
||||
index_files_index_path: P,
|
||||
stats: &StatsChannel,
|
||||
@@ -572,21 +585,9 @@ pub async fn index_files_index_ref<P: Into<PathBuf> + Send>(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn index_file_path_list(channel: Channel, dbconf: Database) -> Result<Vec<PathBuf>, Error> {
|
||||
let dbc = database_connect(&dbconf).await?;
|
||||
let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index";
|
||||
let rows = dbc.query(sql, &[&channel.name()]).await?;
|
||||
let mut index_paths = vec![];
|
||||
for row in rows {
|
||||
index_paths.push(row.try_get(0)?);
|
||||
}
|
||||
let list = categorize_index_files(&index_paths)?;
|
||||
let ret = list.into_iter().map(|k| k.path).collect();
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
// TODO using the json index is currently no longer needed, but maybe as alternative for tests.
|
||||
async fn _index_file_path_list_old(
|
||||
#[allow(unused)]
|
||||
async fn index_file_path_list_old(
|
||||
channel: Channel,
|
||||
index_files_index_path: PathBuf,
|
||||
stats: &StatsChannel,
|
||||
|
||||
@@ -3,10 +3,11 @@ use crate::archeng::blockstream::BlockStream;
|
||||
use crate::events::{FrameMaker, FrameMakerTrait};
|
||||
use err::Error;
|
||||
use futures_util::{Stream, StreamExt};
|
||||
use items::binnedevents::XBinnedEvents;
|
||||
use items::binnedevents::{SingleBinWaveEvents, XBinnedEvents};
|
||||
use items::eventsitem::EventsItem;
|
||||
use items::plainevents::PlainEvents;
|
||||
use items::{Framable, LogItem, RangeCompletableItem, StreamItem};
|
||||
use items::plainevents::{PlainEvents, WavePlainEvents};
|
||||
use items::waveevents::WaveXBinner;
|
||||
use items::{EventsNodeProcessor, Framable, LogItem, RangeCompletableItem, StreamItem};
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{log::*, AggKind, Shape};
|
||||
use netpod::{ChannelArchiver, ChannelConfigQuery};
|
||||
@@ -28,8 +29,14 @@ pub async fn make_event_pipe(
|
||||
};
|
||||
debug!("Channel config: {:?}", channel_config);
|
||||
let ixpaths = crate::archeng::indexfiles::index_file_path_list(evq.channel.clone(), conf.database.clone()).await?;
|
||||
info!("got categorized ixpaths: {:?}", ixpaths);
|
||||
let ixpath = ixpaths.first().unwrap().clone();
|
||||
debug!("got categorized ixpaths: {:?}", ixpaths);
|
||||
let ixpath = if let Some(x) = ixpaths.first() {
|
||||
x.clone()
|
||||
} else {
|
||||
return Err(Error::with_msg_no_trace("no index file for channel")
|
||||
.mark_bad_request()
|
||||
.add_public_msg(format!("No index file for {}", evq.channel.name)));
|
||||
};
|
||||
use crate::archeng::blockstream::BlockItem;
|
||||
let refs = blockref_stream(
|
||||
evq.channel.clone(),
|
||||
@@ -45,6 +52,8 @@ pub async fn make_event_pipe(
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
let cfgshape = channel_config.shape.clone();
|
||||
let q_agg_kind = evq.agg_kind.clone();
|
||||
let filtered = RangeFilter::new(blocks, evq.range.clone(), evq.agg_kind.need_expand());
|
||||
let xtrans = match channel_config.shape {
|
||||
Shape::Scalar => match evq.agg_kind {
|
||||
@@ -78,9 +87,105 @@ pub async fn make_event_pipe(
|
||||
AggKind::DimXBinsN(_) => err::todoval(),
|
||||
AggKind::EventBlobs => err::todoval(),
|
||||
},
|
||||
Shape::Wave(_n1) => match evq.agg_kind {
|
||||
AggKind::Plain => Box::pin(filtered) as Pin<Box<dyn Stream<Item = _> + Send>>,
|
||||
AggKind::TimeWeightedScalar | AggKind::DimXBins1 => {
|
||||
let tr = filtered.map(move |j| match j {
|
||||
Ok(j) => match j {
|
||||
StreamItem::DataItem(j) => match j {
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
|
||||
}
|
||||
RangeCompletableItem::Data(j) => match j {
|
||||
EventsItem::Plain(j) => match j {
|
||||
PlainEvents::Scalar(_) => {
|
||||
warn!("EventsItem::Plain Scalar for {:?} {:?}", cfgshape, q_agg_kind);
|
||||
panic!()
|
||||
}
|
||||
PlainEvents::Wave(j) => {
|
||||
trace!("EventsItem::Plain Wave for {:?} {:?}", cfgshape, q_agg_kind);
|
||||
match j {
|
||||
WavePlainEvents::Byte(j) => {
|
||||
let binner =
|
||||
WaveXBinner::<i8>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = SingleBinWaveEvents::Byte(out);
|
||||
let item = XBinnedEvents::SingleBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
WavePlainEvents::Short(j) => {
|
||||
let binner =
|
||||
WaveXBinner::<i16>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = SingleBinWaveEvents::Short(out);
|
||||
let item = XBinnedEvents::SingleBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
WavePlainEvents::Int(j) => {
|
||||
let binner =
|
||||
WaveXBinner::<i32>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = SingleBinWaveEvents::Int(out);
|
||||
let item = XBinnedEvents::SingleBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
WavePlainEvents::Float(j) => {
|
||||
let binner =
|
||||
WaveXBinner::<f32>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = SingleBinWaveEvents::Float(out);
|
||||
let item = XBinnedEvents::SingleBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
WavePlainEvents::Double(j) => {
|
||||
let binner =
|
||||
WaveXBinner::<f64>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = SingleBinWaveEvents::Double(out);
|
||||
let item = XBinnedEvents::SingleBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
EventsItem::XBinnedEvents(j) => match j {
|
||||
XBinnedEvents::Scalar(j) => {
|
||||
warn!("XBinnedEvents::Scalar for {:?} {:?}", cfgshape, q_agg_kind);
|
||||
let item = XBinnedEvents::Scalar(j);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
XBinnedEvents::SingleBinWave(j) => {
|
||||
warn!("XBinnedEvents::SingleBinWave for {:?} {:?}", cfgshape, q_agg_kind);
|
||||
let item = XBinnedEvents::SingleBinWave(j);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
XBinnedEvents::MultiBinWave(_) => todo!(),
|
||||
},
|
||||
},
|
||||
},
|
||||
StreamItem::Log(j) => Ok(StreamItem::Log(j)),
|
||||
StreamItem::Stats(j) => Ok(StreamItem::Stats(j)),
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
Box::pin(tr) as _
|
||||
}
|
||||
AggKind::DimXBinsN(_) => err::todoval(),
|
||||
AggKind::EventBlobs => err::todoval(),
|
||||
},
|
||||
_ => {
|
||||
error!("TODO shape {:?}", channel_config.shape);
|
||||
panic!()
|
||||
let err = Error::with_msg_no_trace(format!("TODO shape {:?}", channel_config.shape))
|
||||
.mark_bad_request()
|
||||
.add_public_msg(format!("can not yet handle shape {:?}", channel_config.shape));
|
||||
Box::pin(futures_util::stream::iter([Err(err)]))
|
||||
}
|
||||
};
|
||||
let mut frame_maker = Box::new(FrameMaker::with_item_type(
|
||||
|
||||
Reference in New Issue
Block a user