From 4d7ec670101241b92a790291b2efdfd8e91d9281 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 22 Oct 2021 17:45:13 +0200 Subject: [PATCH] Enable fetch of channel info and events --- archapp/src/archeng.rs | 108 ++++++++++++++++++++--- archapp/src/archeng/datablockstream.rs | 46 +++++----- archapp/src/archeng/datastream.rs | 4 +- archapp/src/archeng/pipe.rs | 38 ++++++++ archapp/src/events.rs | 117 +++++++++++++++++++++---- archapp/src/lib.rs | 50 +++++++++-- disk/src/binned.rs | 2 +- httpclient/src/lib.rs | 2 +- httpret/src/channelarchiver.rs | 1 - httpret/src/lib.rs | 45 +++++++--- items/src/waveevents.rs | 11 +++ netpod/src/lib.rs | 1 + nodenet/src/conn.rs | 7 +- taskrun/src/append.rs | 14 +-- taskrun/src/lib.rs | 2 +- 15 files changed, 363 insertions(+), 85 deletions(-) create mode 100644 archapp/src/archeng/pipe.rs diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 96ee144..506a6a6 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -1,5 +1,6 @@ pub mod datablockstream; pub mod datastream; +pub mod pipe; use crate::{EventsItem, PlainEvents, ScalarPlainEvents}; use async_channel::{Receiver, Sender}; @@ -7,8 +8,9 @@ use err::Error; use futures_core::Future; use futures_util::StreamExt; use items::eventvalues::EventValues; +use items::{RangeCompletableItem, StreamItem}; use netpod::timeunits::SEC; -use netpod::{log::*, ChannelArchiver, DataHeaderPos, FilePos, Nanos}; +use netpod::{log::*, ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, DataHeaderPos, FilePos, Nanos}; use std::convert::TryInto; use std::io::{self, SeekFrom}; use std::path::PathBuf; @@ -48,7 +50,9 @@ pub async fn open_read(path: PathBuf) -> io::Result { let res = OpenOptions::new().read(true).open(path).await; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; - //info!("timed open_read dt: {:.3} ms", dt); + if false { + info!("timed open_read dt: {:.3} ms", dt); + } res } @@ -57,7 +61,9 @@ async fn seek(file: &mut File, pos: SeekFrom) -> io::Result { let res = file.seek(pos).await; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; - //info!("timed seek dt: {:.3} ms", dt); + if false { + info!("timed seek dt: {:.3} ms", dt); + } res } @@ -66,7 +72,9 @@ async fn read(file: &mut File, buf: &mut [u8]) -> io::Result { let res = file.read(buf).await; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; - //info!("timed read dt: {:.3} ms res: {:?}", dt, res); + if false { + info!("timed read dt: {:.3} ms res: {:?}", dt, res); + } res } @@ -75,7 +83,9 @@ async fn read_exact(file: &mut File, buf: &mut [u8]) -> io::Result { let res = file.read_exact(buf).await; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; - //info!("timed read_exact dt: {:.3} ms res: {:?}", dt, res); + if false { + info!("timed read_exact dt: {:.3} ms res: {:?}", dt, res); + } res } @@ -232,7 +242,7 @@ fn read_string(buf: &[u8]) -> Result { .iter() .map(|k| *k) .enumerate() - .take_while(|&(i, k)| k != 0) + .take_while(|&(_, k)| k != 0) .last() .map(|(i, _)| i); let ret = match imax { @@ -481,12 +491,12 @@ pub async fn search_record( //info!("looking at record i {}", i); if rec.ts2.ns > beg.ns { if node.is_leaf { - info!("found leaf match at {} / {}", i, nr); + trace!("found leaf match at {} / {}", i, nr); let ret = RTreeNodeAtRecord { node, rix: i }; let stats = TreeSearchStats::new(ts1, node_reads); return Ok((Some(ret), stats)); } else { - info!("found non-leaf match at {} / {}", i, nr); + trace!("found non-leaf match at {} / {}", i, nr); let pos = FilePos { pos: rec.child_or_id }; node = read_rtree_node(file, pos, rtree_m).await?; node_reads += 1; @@ -668,9 +678,12 @@ pub async fn channel_list(index_path: PathBuf) -> Result, Error> { } else if basics.version == 3 { &hver3 } else { - panic!(); + return Err(Error::with_msg_no_trace(format!( + "unexpected version {}", + basics.version + ))); }; - for (i, name_hash_entry) in basics.name_hash_entries.iter().enumerate() { + for (_i, name_hash_entry) in basics.name_hash_entries.iter().enumerate() { if name_hash_entry.named_hash_channel_entry_pos != 0 { let pos = FilePos { pos: name_hash_entry.named_hash_channel_entry_pos, @@ -684,7 +697,8 @@ pub async fn channel_list(index_path: PathBuf) -> Result, Error> { Ok(ret) } -async fn datarange_stream_fill(channel_name: &str, tx: Sender) { +#[allow(dead_code)] +async fn datarange_stream_fill(_channel_name: &str, _tx: Sender) { // Search the first relevant leaf node. // Pipe all ranges from there, and continue with nodes. // Issue: can not stop because I don't look into the files. @@ -694,8 +708,8 @@ async fn datarange_stream_fill(channel_name: &str, tx: Sender) { // Should contain enough information to allow one to open and position a relevant datafile. pub struct Datarange {} -pub fn datarange_stream(channel_name: &str) -> Result, Error> { - let (tx, rx) = async_channel::bounded(4); +pub fn datarange_stream(_channel_name: &str) -> Result, Error> { + let (_tx, rx) = async_channel::bounded(4); let task = async {}; taskrun::spawn(task); Ok(rx) @@ -761,6 +775,7 @@ impl DbrType { Ok(res) } + #[allow(dead_code)] fn byte_len(&self) -> usize { use DbrType::*; match self { @@ -999,6 +1014,73 @@ pub fn list_all_channels(node: &ChannelArchiver) -> Receiver Result { + let mut type_info = None; + let mut stream = datablockstream::DatablockStream::for_channel_range( + q.range.clone(), + q.channel.clone(), + conf.data_base_paths.clone().into(), + true, + ); + while let Some(item) = stream.next().await { + match item { + Ok(k) => match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => (), + RangeCompletableItem::Data(k) => { + type_info = Some(k.type_info()); + break; + } + }, + StreamItem::Log(_) => (), + StreamItem::Stats(_) => (), + }, + Err(e) => { + error!("{}", e); + () + } + } + } + if type_info.is_none() { + let mut stream = datablockstream::DatablockStream::for_channel_range( + q.range.clone(), + q.channel.clone(), + conf.data_base_paths.clone().into(), + false, + ); + while let Some(item) = stream.next().await { + match item { + Ok(k) => match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => (), + RangeCompletableItem::Data(k) => { + type_info = Some(k.type_info()); + break; + } + }, + StreamItem::Log(_) => (), + StreamItem::Stats(_) => (), + }, + Err(e) => { + error!("{}", e); + () + } + } + } + } + if let Some(type_info) = type_info { + let ret = ChannelConfigResponse { + channel: q.channel.clone(), + scalar_type: type_info.0, + byte_order: None, + shape: type_info.1, + }; + Ok(ret) + } else { + Err(Error::with_msg_no_trace("can not get channel type info")) + } +} + #[cfg(test)] mod test { // TODO move RangeFilter to a different crate (items?) diff --git a/archapp/src/archeng/datablockstream.rs b/archapp/src/archeng/datablockstream.rs index 204dde8..77416c5 100644 --- a/archapp/src/archeng/datablockstream.rs +++ b/archapp/src/archeng/datablockstream.rs @@ -70,36 +70,40 @@ async fn datablock_stream_inner( search_record(&mut index_file, basics.rtree_m, basics.rtree_start_pos, search_ts).await?; if let Some(nrec) = res { let rec = nrec.rec(); - info!("found record: {:?}", rec); + trace!("found record: {:?}", rec); let pos = FilePos { pos: rec.child_or_id }; // TODO rename Datablock? → IndexNodeDatablock - info!("\n\nREAD Datablock FROM {:?}\n", pos); + trace!("READ Datablock FROM {:?}\n", pos); let datablock = read_index_datablockref(&mut index_file, pos).await?; - info!("\nDatablock: {:?}\n", datablock); + trace!("Datablock: {:?}\n", datablock); let data_path = index_path.parent().unwrap().join(datablock.file_name()); if data_path == last_data_file_path && datablock.data_header_pos() == last_data_file_pos { - warn!("SKIP BECAUSE ITS THE SAME BLOCK"); + debug!("skipping because it is the same block"); } else { - info!("try to open data_path: {:?}", data_path); - let mut data_file = open_read(data_path.clone()).await?; - let datafile_header = - read_datafile_header(&mut data_file, datablock.data_header_pos()).await?; - info!("datafile_header -------------- HEADER\n{:?}", datafile_header); - let events = read_data_1(&mut data_file, &datafile_header).await?; - info!("Was able to read data: {} events", events.len()); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events))); - tx.send(item).await?; + trace!("try to open data_path: {:?}", data_path); + match open_read(data_path.clone()).await { + Ok(mut data_file) => { + let datafile_header = + read_datafile_header(&mut data_file, datablock.data_header_pos()).await?; + trace!("datafile_header -------------- HEADER\n{:?}", datafile_header); + let events = read_data_1(&mut data_file, &datafile_header).await?; + trace!("Was able to read data: {} events", events.len()); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events))); + tx.send(item).await?; + } + Err(e) => { + // That's fine. The index mentions lots of datafiles which got purged already. + trace!("can not find file mentioned in index: {:?} {}", data_path, e); + } + }; } if datablock.next != 0 { - error!("datablock.next != 0: {:?}", datablock); + warn!("MAYBE TODO? datablock.next != 0: {:?}", datablock); } last_data_file_path = data_path; last_data_file_pos = datablock.data_header_pos(); - if expand { - err::todo() - } else { - search_ts.ns = rec.ts2.ns; - }; + // TODO anything special to do in expand mode? + search_ts.ns = rec.ts2.ns; } else { warn!("nothing found, break"); break; @@ -107,7 +111,7 @@ async fn datablock_stream_inner( } } } else { - info!("can not find index file at {:?}", index_path); + warn!("can not find index file at {:?}", index_path); } } Ok(()) @@ -118,7 +122,7 @@ pub struct DatablockStream { channel: Channel, base_dirs: VecDeque, expand: bool, - fut: Pin>>, + fut: Pin + Send>>, rx: Receiver>, done: bool, complete: bool, diff --git a/archapp/src/archeng/datastream.rs b/archapp/src/archeng/datastream.rs index 29b8e88..7b5d56f 100644 --- a/archapp/src/archeng/datastream.rs +++ b/archapp/src/archeng/datastream.rs @@ -9,7 +9,9 @@ pub struct DataStream {} impl Stream for DataStream { type Item = Sitemty; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let _ = self; + let _ = cx; todo!() } } diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs new file mode 100644 index 0000000..5368f35 --- /dev/null +++ b/archapp/src/archeng/pipe.rs @@ -0,0 +1,38 @@ +use crate::archeng::datablockstream::DatablockStream; +use crate::events::{FrameMaker, FrameMakerTrait}; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use items::Framable; +use netpod::{query::RawEventsQuery, ChannelArchiver}; +use std::pin::Pin; +use streams::rangefilter::RangeFilter; + +pub async fn make_event_pipe( + evq: &RawEventsQuery, + conf: &ChannelArchiver, +) -> Result> + Send>>, Error> { + // In order to extract something from the channel, need to look up first the type of the channel. + //let ci = channel_info(&evq.channel, aa).await?; + /*let mut inps = vec![]; + for p1 in &aa.data_base_paths { + let p2 = p1.clone(); + let p3 = make_single_event_pipe(evq, p2).await?; + inps.push(p3); + } + let sm = StorageMerge { + inprng: inps.len() - 1, + current_inp_item: (0..inps.len()).into_iter().map(|_| None).collect(), + completed_inps: vec![false; inps.len()], + inps, + };*/ + let range = evq.range.clone(); + let channel = evq.channel.clone(); + let expand = evq.agg_kind.need_expand(); + let data = DatablockStream::for_channel_range(range.clone(), channel, conf.data_base_paths.clone().into(), expand); + let filtered = RangeFilter::new(data, range, expand); + let stream = filtered; + let mut frame_maker = Box::new(FrameMaker::untyped(evq.agg_kind.clone())) as Box; + let ret = stream.map(move |j| frame_maker.make_frame(j)); + Ok(Box::pin(ret)) +} diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 291c8d4..9287728 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -163,13 +163,13 @@ impl Stream for StorageMerge { } } -trait FrameMakerTrait: Send { - fn make_frame(&self, ei: Sitemty) -> Box; +pub trait FrameMakerTrait: Send { + fn make_frame(&mut self, ei: Sitemty) -> Box; } -struct FrameMaker { - scalar_type: ScalarType, - shape: Shape, +pub struct FrameMaker { + scalar_type: Option, + shape: Option, agg_kind: AggKind, } @@ -181,6 +181,22 @@ impl FrameMaker { { err::todoval() } + + pub fn with_item_type(scalar_type: ScalarType, shape: Shape, agg_kind: AggKind) -> Self { + Self { + scalar_type: Some(scalar_type), + shape: Some(shape), + agg_kind: agg_kind, + } + } + + pub fn untyped(agg_kind: AggKind) -> Self { + Self { + scalar_type: None, + shape: None, + agg_kind, + } + } } #[allow(unused_macros)] @@ -334,18 +350,83 @@ macro_rules! arm1 { } impl FrameMakerTrait for FrameMaker { - fn make_frame(&self, item: Sitemty) -> Box { + fn make_frame(&mut self, item: Sitemty) -> Box { // Take from `self` the expected inner type. // If `ei` is not some data, then I can't dynamically determine the expected T of Sitemty. // Therefore, I need to decide that based on given parameters. // see also channel_info in this mod. - match self.scalar_type { - ScalarType::I8 => arm1!(item, i8, Byte, self.shape, self.agg_kind), - ScalarType::I16 => arm1!(item, i16, Short, self.shape, self.agg_kind), - ScalarType::I32 => arm1!(item, i32, Int, self.shape, self.agg_kind), - ScalarType::F32 => arm1!(item, f32, Float, self.shape, self.agg_kind), - ScalarType::F64 => arm1!(item, f64, Double, self.shape, self.agg_kind), - _ => err::todoval(), + if self.scalar_type.is_none() || self.shape.is_none() { + //let scalar_type = &ScalarType::I8; + //let shape = &Shape::Scalar; + //let agg_kind = &self.agg_kind; + let (scalar_type, shape) = match &item { + Ok(k) => match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => (ScalarType::I8, Shape::Scalar), + RangeCompletableItem::Data(k) => match k { + EventsItem::Plain(k) => match k { + PlainEvents::Scalar(k) => match k { + ScalarPlainEvents::Byte(_) => (ScalarType::I8, Shape::Scalar), + ScalarPlainEvents::Short(_) => (ScalarType::I16, Shape::Scalar), + ScalarPlainEvents::Int(_) => (ScalarType::I32, Shape::Scalar), + ScalarPlainEvents::Float(_) => (ScalarType::F32, Shape::Scalar), + ScalarPlainEvents::Double(_) => (ScalarType::F64, Shape::Scalar), + }, + PlainEvents::Wave(k) => match k { + WavePlainEvents::Byte(k) => (ScalarType::I8, Shape::Wave(k.vals[0].len() as u32)), + WavePlainEvents::Short(k) => (ScalarType::I16, Shape::Wave(k.vals[0].len() as u32)), + WavePlainEvents::Int(k) => (ScalarType::I32, Shape::Wave(k.vals[0].len() as u32)), + WavePlainEvents::Float(k) => (ScalarType::F32, Shape::Wave(k.vals[0].len() as u32)), + WavePlainEvents::Double(k) => { + (ScalarType::F64, Shape::Wave(k.vals[0].len() as u32)) + } + }, + }, + EventsItem::XBinnedEvents(k) => match k { + XBinnedEvents::Scalar(k) => match k { + ScalarPlainEvents::Byte(_) => (ScalarType::I8, Shape::Scalar), + ScalarPlainEvents::Short(_) => (ScalarType::I16, Shape::Scalar), + ScalarPlainEvents::Int(_) => (ScalarType::I32, Shape::Scalar), + ScalarPlainEvents::Float(_) => (ScalarType::F32, Shape::Scalar), + ScalarPlainEvents::Double(_) => (ScalarType::F64, Shape::Scalar), + }, + XBinnedEvents::SingleBinWave(k) => match k { + SingleBinWaveEvents::Byte(_) => todo!(), + SingleBinWaveEvents::Short(_) => todo!(), + SingleBinWaveEvents::Int(_) => todo!(), + SingleBinWaveEvents::Float(_) => todo!(), + SingleBinWaveEvents::Double(_) => todo!(), + }, + XBinnedEvents::MultiBinWave(k) => match k { + MultiBinWaveEvents::Byte(_) => todo!(), + MultiBinWaveEvents::Short(_) => todo!(), + MultiBinWaveEvents::Int(_) => todo!(), + MultiBinWaveEvents::Float(_) => todo!(), + MultiBinWaveEvents::Double(_) => todo!(), + }, + }, + }, + }, + StreamItem::Log(_) => (ScalarType::I8, Shape::Scalar), + StreamItem::Stats(_) => (ScalarType::I8, Shape::Scalar), + }, + Err(_) => (ScalarType::I8, Shape::Scalar), + }; + self.scalar_type = Some(scalar_type); + self.shape = Some(shape); + } + { + let scalar_type = self.scalar_type.as_ref().unwrap(); + let shape = self.shape.as_ref().unwrap(); + let agg_kind = &self.agg_kind; + match scalar_type { + ScalarType::I8 => arm1!(item, i8, Byte, shape, agg_kind), + ScalarType::I16 => arm1!(item, i16, Short, shape, agg_kind), + ScalarType::I32 => arm1!(item, i32, Int, shape, agg_kind), + ScalarType::F32 => arm1!(item, f32, Float, shape, agg_kind), + ScalarType::F64 => arm1!(item, f64, Double, shape, agg_kind), + _ => err::todoval(), + } } } } @@ -367,11 +448,11 @@ pub async fn make_event_pipe( completed_inps: vec![false; inps.len()], inps, }; - let frame_maker = Box::new(FrameMaker { - scalar_type: ci.scalar_type.clone(), - shape: ci.shape.clone(), - agg_kind: evq.agg_kind.clone(), - }) as Box; + let mut frame_maker = Box::new(FrameMaker::with_item_type( + ci.scalar_type.clone(), + ci.shape.clone(), + evq.agg_kind.clone(), + )) as Box; let ret = sm.map(move |j| frame_maker.make_frame(j)); Ok(Box::pin(ret)) } diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index d3578c0..4ede65a 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -15,7 +15,7 @@ use items::waveevents::{WaveEvents, WaveXBinner}; use items::xbinnedscalarevents::XBinnedScalarEvents; use items::xbinnedwaveevents::XBinnedWaveEvents; use items::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, SitemtyFrameType, WithLen, WithTimestamps}; -use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; +use netpod::{AggKind, ByteOrder, HasScalarType, HasShape, ScalarType, Shape}; #[cfg(not(feature = "devread"))] pub use parsestub as parse; @@ -202,6 +202,18 @@ pub enum WavePlainEvents { Double(WaveEvents), } +impl WavePlainEvents { + pub fn shape(&self) -> Result { + match self { + WavePlainEvents::Byte(k) => k.shape(), + WavePlainEvents::Short(k) => k.shape(), + WavePlainEvents::Int(k) => k.shape(), + WavePlainEvents::Float(k) => k.shape(), + WavePlainEvents::Double(k) => k.shape(), + } + } +} + fn _tmp1() { let _ev = EventValues:: { tss: vec![], @@ -250,7 +262,7 @@ impl WavePlainEvents { fn x_aggregate(self, ak: &AggKind) -> EventsItem { use WavePlainEvents::*; - let shape = self.shape(); + let shape = self.shape().unwrap(); match self { Byte(k) => wagg1!(k, ak, shape, Byte), Short(k) => wagg1!(k, ak, shape, Short), @@ -365,14 +377,15 @@ impl WithTimestamps for WavePlainEvents { impl HasShape for WavePlainEvents { fn shape(&self) -> Shape { - use WavePlainEvents::*; + /*use WavePlainEvents::*; match self { Byte(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), Short(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), Int(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), Float(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), Double(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), - } + }*/ + self.shape().unwrap() } } @@ -946,8 +959,8 @@ impl HasShape for PlainEvents { fn shape(&self) -> Shape { use PlainEvents::*; match self { - Scalar(h) => h.shape(), - Wave(h) => h.shape(), + Scalar(h) => HasShape::shape(h), + Wave(h) => HasShape::shape(h), } } } @@ -998,6 +1011,31 @@ impl EventsItem { XBinnedEvents(k) => k.x_aggregate(ak), } } + + pub fn type_info(&self) -> (ScalarType, Shape) { + match self { + EventsItem::Plain(k) => match k { + PlainEvents::Scalar(k) => match k { + ScalarPlainEvents::Byte(_) => (ScalarType::I8, Shape::Scalar), + ScalarPlainEvents::Short(_) => (ScalarType::I16, Shape::Scalar), + ScalarPlainEvents::Int(_) => (ScalarType::I32, Shape::Scalar), + ScalarPlainEvents::Float(_) => (ScalarType::F32, Shape::Scalar), + ScalarPlainEvents::Double(_) => (ScalarType::F64, Shape::Scalar), + }, + PlainEvents::Wave(k) => match k { + // TODO + // Inherent issue for the non-static-type backends: + // there is a chance that we can't determine the shape here. + WavePlainEvents::Byte(k) => (ScalarType::I8, k.shape().unwrap()), + WavePlainEvents::Short(k) => (ScalarType::I16, k.shape().unwrap()), + WavePlainEvents::Int(k) => (ScalarType::I32, k.shape().unwrap()), + WavePlainEvents::Float(k) => (ScalarType::F32, k.shape().unwrap()), + WavePlainEvents::Double(k) => (ScalarType::F64, k.shape().unwrap()), + }, + }, + EventsItem::XBinnedEvents(k) => panic!(), + } + } } impl WithLen for EventsItem { diff --git a/disk/src/binned.rs b/disk/src/binned.rs index e63167a..b754bb7 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -14,7 +14,7 @@ use items::numops::NumOps; use items::streams::{Collectable, Collector}; use items::{ Clearable, EventsNodeProcessor, FilterFittingInside, Framable, FrameType, PushableIndex, RangeCompletableItem, - Sitemty, StreamItem, TimeBinnableType, WithLen, WithTimestamps, + Sitemty, StreamItem, TimeBinnableType, WithLen, }; use netpod::log::*; use netpod::query::RawEventsQuery; diff --git a/httpclient/src/lib.rs b/httpclient/src/lib.rs index 63fbc80..379767e 100644 --- a/httpclient/src/lib.rs +++ b/httpclient/src/lib.rs @@ -9,7 +9,7 @@ pub async fn get_channel_config( ) -> Result { let mut url = Url::parse(&format!( "http://{}:{}/api/4/channel/config", - "localhost", node_config.node.port + node_config.node.host, node_config.node.port ))?; q.append_to_url(&mut url); let req = hyper::Request::builder() diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index ea06182..91e8ad9 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -104,7 +104,6 @@ impl ListChannelsHttpFunction { Some((Ok::<_, Error>(x), st)) } Err(e) => { - //Some((Err(e), st)) error!("{:?}", e); None } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 56a757d..0520401 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -111,7 +111,15 @@ pub fn response_err(status: StatusCode, msg: T) -> Result, Err where T: AsRef, { - let msg = format!("Error:\n\n{}\n\nDocumentation:\nhttps://data-api.psi.ch/api/1/documentation/\nhttps://data-api.psi.ch/api/4/documentation/", msg.as_ref()); + let msg = format!( + concat!( + "Error:\n{}\n", + "\nDocumentation pages API 1 and 4:", + "\nhttps://data-api.psi.ch/api/1/documentation/", + "\nhttps://data-api.psi.ch/api/4/documentation/", + ), + msg.as_ref() + ); let ret = response(status).body(Body::from(msg))?; Ok(ret) } @@ -478,19 +486,26 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON { - Ok(plain_events_json(req, node_config).await.map_err(|e| { - error!("{:?}", e); - e - })?) + let ret = match plain_events_json(req, node_config).await { + Ok(ret) => ret, + Err(e) => { + error!("{}", e); + response_err(StatusCode::BAD_REQUEST, e.msg())? + } + }; + Ok(ret) } else if accept == APP_OCTET { - Ok(plain_events_binary(req, node_config).await.map_err(|e| { - error!("{:?}", e); - e - })?) + let ret = match plain_events_binary(req, node_config).await { + Ok(ret) => ret, + Err(e) => { + error!("{}", e); + response_err(StatusCode::BAD_REQUEST, e.msg())? + } + }; + Ok(ret) } else { - let e = Error::with_msg(format!("unexpected Accept: {:?}", accept)); - error!("{:?}", e); - Err(e) + let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("unsupported Accept: {:?}", accept))?; + Ok(ret) } } @@ -672,8 +687,10 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) //let pairs = get_url_query_pairs(&url); let q = ChannelConfigQuery::from_url(&url)?; info!("ChannelConfigQuery {:?}", q); - let conf = if let Some(aa) = &node_config.node.archiver_appliance { - archapp_wrap::channel_config(&q, aa).await? + let conf = if let Some(conf) = &node_config.node.channel_archiver { + archapp_wrap::archapp::archeng::channel_config(&q, conf).await? + } else if let Some(conf) = &node_config.node.archiver_appliance { + archapp_wrap::channel_config(&q, conf).await? } else { parse::channelconfig::channel_config(&q, &node_config.node).await? }; diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index f9eb38a..f0cdeda 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -20,6 +20,17 @@ pub struct WaveEvents { pub vals: Vec>, } +impl WaveEvents { + pub fn shape(&self) -> Result { + if let Some(k) = self.vals.first() { + let ret = Shape::Wave(k.len() as u32); + Ok(ret) + } else { + Err(Error::with_msg_no_trace("WaveEvents is empty, can not determine Shape")) + } + } +} + impl SitemtyFrameType for WaveEvents where NTY: SubFrId, diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index c08a099..a1eda85 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1184,6 +1184,7 @@ pub struct ChannelConfigResponse { pub channel: Channel, #[serde(rename = "scalarType")] pub scalar_type: ScalarType, + #[serde(rename = "byteOrder")] pub byte_order: Option, pub shape: Shape, } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index e504783..184f246 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -118,7 +118,12 @@ async fn events_conn_handler_inner_try( info!("--- nodenet::conn got query -------------------\nevq {:?}", evq); let mut p1: Pin> + Send>> = - if let Some(aa) = &node_config.node.archiver_appliance { + if let Some(aa) = &node_config.node.channel_archiver { + match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, aa).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + } + } else if let Some(aa) = &node_config.node.archiver_appliance { match archapp_wrap::make_event_pipe(&evq, aa).await { Ok(j) => j, Err(e) => return Err((e, netout))?, diff --git a/taskrun/src/append.rs b/taskrun/src/append.rs index 2a302e2..2a3d2ee 100644 --- a/taskrun/src/append.rs +++ b/taskrun/src/append.rs @@ -164,14 +164,14 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result< // Get some more data. let b = buf.writable(); if false { - write!(&mut fout, "{} writable bytes\n", b.len())?; + write!(&mut fout, "[APPEND-WRITABLE] {} writable bytes\n", b.len())?; } let n1 = stdin.read(b)?; buf.inc_wp(n1); if false { write!( &mut fout, - "{} bytes read from stdin, total readable {} bytes\n", + "[APPEND-INFO] {} bytes read from stdin, total readable {} bytes\n", n1, buf.readable().len() )?; @@ -179,7 +179,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result< match parse_lines(buf.readable()) { Ok((lines, n2)) => { if false { - write!(&mut fout, "parsed {} lines\n", lines.len())?; + write!(&mut fout, "[APPEND-PARSED-LINES]: {}\n", lines.len())?; } for line in lines { let j = line.as_bytes(); @@ -190,7 +190,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result< buf.advance(n2); } Err(e) => { - write!(&mut fout, "parse error: {:?}\n", e)?; + write!(&mut fout, "[APPEND-PARSE-ERROR]: {:?}\n", e)?; return Ok(()); } } @@ -212,15 +212,15 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result< } w.sort_by(|a, b| std::cmp::Ord::cmp(a, b)); for q in &w { - write!(&mut fout, "file:::: {}\n", q.0.to_string_lossy())?; + write!(&mut fout, "[APPEND-SEES-FILE] {}\n", q.0.to_string_lossy())?; } let mut lentot = w.iter().map(|g| g.1).fold(0, |a, x| a + x); - write!(&mut fout, "lentot: {}\n", lentot)?; + write!(&mut fout, "[APPEND-LENTOT] {}\n", lentot)?; for q in w { if lentot <= MAX_TOTAL_SIZE as u64 { break; } - write!(&mut fout, "REMOVE {} {}\n", q.1, q.0.to_string_lossy())?; + write!(&mut fout, "[APPEND-REMOVE] {} {}\n", q.1, q.0.to_string_lossy())?; fs::remove_file(q.0)?; if q.1 < lentot { lentot -= q.1; diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index db497cd..73c8ebb 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -90,7 +90,7 @@ pub fn tracing_init() { .with_thread_names(true) //.with_max_level(tracing::Level::INFO) .with_env_filter(tracing_subscriber::EnvFilter::new( - ["info", "archapp::archeng=trace", "daqbuffer::test=trace"].join(","), + ["info", "archapp::archeng=info", "daqbuffer::test=trace"].join(","), )) .init(); *g = 1;