diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 8d7235f..4d803be 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -7,7 +7,7 @@ use items::eventvalues::EventValues; use items::{Framable, RangeCompletableItem, StreamItem}; use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::timeunits::DAY; +use netpod::timeunits::{DAY, SEC}; use netpod::{ArchiverAppliance, Channel, ChannelInfo, ScalarType, Shape}; use serde_json::Value as JsonValue; use std::path::PathBuf; @@ -50,66 +50,85 @@ pub async fn make_event_pipe( //let dtbeg = Utc.timestamp((evq.range.beg / 1000000000) as i64, (evq.range.beg % 1000000000) as u32); let (tx, rx) = async_channel::bounded(16); let block1 = async move { + trace!("++++++++++++++++++++++++++++"); + info!("++++++++++++++++++++++++++++"); + info!("start read of {:?}", dir); + + // TODO first collect all matching filenames, then sort, then open files. let mut rd = tokio::fs::read_dir(&dir).await?; while let Some(de) = rd.next_entry().await? { let s = de.file_name().to_string_lossy().into_owned(); if s.starts_with(&prefix) && s.ends_with(".pb") { match parse_data_filename(&s) { Ok(df) => { - let ts0 = Utc.ymd(df.year as i32, df.month, 0).and_hms(0, 0, 0); - let ts1 = ts0.timestamp() as u64 * 1000000000 + ts0.timestamp_subsec_nanos() as u64; - if evq.range.beg < ts1 + DAY * 32 && evq.range.end > ts1 { + info!("parse went ok: {} {}", df.year, df.month); + let ts0 = Utc.ymd(df.year as i32, df.month, 1).and_hms(0, 0, 0); + let ts1 = ts0.timestamp() as u64 * SEC + ts0.timestamp_subsec_nanos() as u64; + info!("file {} {}", ts1, ts1 + DAY * 27); + info!("range {} {}", evq.range.beg, evq.range.end); + if evq.range.beg < ts1 + DAY * 27 && evq.range.end > ts1 { + info!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); let f1 = File::open(de.path()).await?; info!("opened {:?}", de.path()); let mut pbr = PbFileReader::new(f1).await; pbr.read_header().await?; + info!("✓ read header {:?}", pbr.payload_type()); loop { match pbr.read_msg().await { - Ok(ev) => match ev { - EventsItem::ScalarDouble(h) => { - // - let (x, y) = h - .tss - .into_iter() - .zip(h.values.into_iter()) - .filter_map(|(j, k)| { - if j < evq.range.beg || j >= evq.range.end { - None - } else { - Some((j, k)) - } - }) - .fold((vec![], vec![]), |(mut a, mut b), (j, k)| { - a.push(j); - b.push(k); - (a, b) - }); - let b = EventValues { tss: x, values: y }; - let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b))); - tx.send(Box::new(b) as Box).await?; + Ok(ev) => { + // + info!("read msg from file"); + match ev { + EventsItem::ScalarDouble(h) => { + // + let (x, y) = h + .tss + .into_iter() + .zip(h.values.into_iter()) + .filter_map(|(j, k)| { + if j < evq.range.beg || j >= evq.range.end { + None + } else { + Some((j, k)) + } + }) + .fold((vec![], vec![]), |(mut a, mut b), (j, k)| { + a.push(j); + b.push(k); + (a, b) + }); + let b = EventValues { tss: x, values: y }; + let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b))); + tx.send(Box::new(b) as Box).await?; + } + _ => { + // + error!("case not covered"); + return Err(Error::with_msg_no_trace("todo")); + } } - _ => { - // - error!("case not covered"); - return Err(Error::with_msg_no_trace("todo")); - } - }, - Err(e) => {} + } + Err(e) => { + error!("error while reading msg {:?}", e); + break; + } } } } } - Err(e) => {} + Err(e) => { + error!("bad filename parse {:?}", e); + } } + } else { + info!("prefix {} s {}", prefix, s); } } Ok::<_, Error>(()) }; let block2 = async move { match block1.await { - Ok(_) => { - info!("block1 done ok"); - } + Ok(_) => {} Err(e) => { error!("{:?}", e); } diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index c8adce1..221f31b 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -190,7 +190,7 @@ impl PbFileReader { k += 1; } if k == self.wp { - return Err(Error::with_msg("no header in pb file")); + return Err(Error::with_msg("no nl in pb file")); } Ok(k) } @@ -198,6 +198,10 @@ impl PbFileReader { pub fn channel_name(&self) -> &str { &self.channel_name } + + pub fn payload_type(&self) -> &PayloadType { + &self.payload_type + } } #[derive(Serialize)] diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 12cd7a5..3f79e40 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,4 +1,6 @@ use bytes::{BufMut, Bytes, BytesMut}; +use err::Error; +use items::frame::make_frame; use items::{Appendable, RangeOverlapInfo, SitemtyFrameType}; use netpod::log::*; use netpod::NanoRange; diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 7c8655a..5bdcd2a 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -18,8 +18,9 @@ use items::{ }; use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; -use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use netpod::{ + AggKind, ByteOrder, Channel, ChannelConfigQuery, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape, +}; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; use std::fmt::Debug; @@ -177,34 +178,21 @@ pub async fn channel_exec( where F: ChannelExecFunction, { - let channel_config = match read_local_config(channel, &node_config.node).await { - Ok(k) => k, - Err(e) => { - if e.msg().contains("ErrorKind::NotFound") { - return Ok(F::empty()); - } else { - return Err(e); - } - } + let q = ChannelConfigQuery { + channel: channel.clone(), + range: range.clone(), }; - match extract_matching_config_entry(range, &channel_config)? { - MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, - MatchingConfigEntry::None => { - // TODO function needs to provide some default. - err::todoval() - } - MatchingConfigEntry::Entry(entry) => { - let ret = channel_exec_config( - f, - entry.scalar_type.clone(), - entry.byte_order.clone(), - entry.to_shape()?, - agg_kind, - node_config, - )?; - Ok(ret) - } - } + let conf = httpclient::get_channel_config(&q, node_config).await?; + let ret = channel_exec_config( + f, + conf.scalar_type.clone(), + // TODO is the byte order ever important here? + conf.byte_order.unwrap_or(ByteOrder::LE).clone(), + conf.shape.clone(), + agg_kind, + node_config, + )?; + Ok(ret) } pub struct PlainEvents { diff --git a/disk/src/raw/client.rs b/disk/src/raw/client.rs index fb8266e..1547fb0 100644 --- a/disk/src/raw/client.rs +++ b/disk/src/raw/client.rs @@ -27,6 +27,7 @@ where ::Output: Unpin + 'static, Result::Output>>, err::Error>: FrameType, { + netpod::log::info!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw); let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index 2c86582..d831f11 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -1,3 +1,4 @@ +use crate::frame::{make_frame, make_frame_2}; use crate::minmaxavgbins::MinMaxAvgBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; @@ -5,6 +6,7 @@ use crate::{ ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; +use bytes::BytesMut; use err::Error; use netpod::NanoRange; use serde::{Deserialize, Serialize}; diff --git a/items/src/frame.rs b/items/src/frame.rs index 930df40..2a3c665 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -40,6 +40,36 @@ where } } +// TODO decide for either make_frame or make_frame_2 +pub fn make_frame_2(item: &FT, fty: u32) -> Result +where + FT: Serialize, +{ + match bincode::serialize(item) { + Ok(enc) => { + if enc.len() > u32::MAX as usize { + return Err(Error::with_msg(format!("too long payload {}", enc.len()))); + } + let mut h = crc32fast::Hasher::new(); + h.update(&enc); + let payload_crc = h.finalize(); + let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); + buf.put_u32_le(INMEM_FRAME_MAGIC); + buf.put_u32_le(INMEM_FRAME_ENCID); + buf.put_u32_le(fty); + buf.put_u32_le(enc.len() as u32); + buf.put_u32_le(payload_crc); + buf.put(enc.as_ref()); + let mut h = crc32fast::Hasher::new(); + h.update(&buf); + let frame_crc = h.finalize(); + buf.put_u32_le(frame_crc); + Ok(buf) + } + Err(e) => Err(e)?, + } +} + pub fn make_term_frame() -> BytesMut { let mut h = crc32fast::Hasher::new(); h.update(&[]); diff --git a/items/src/lib.rs b/items/src/lib.rs index ce972f6..2187e0b 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -1,4 +1,5 @@ use crate::eventvalues::EventValues; +use crate::frame::make_frame_2; use crate::numops::BoolNum; use bytes::BytesMut; use chrono::{TimeZone, Utc}; @@ -212,134 +213,20 @@ pub trait Framable: Send { fn make_frame(&self) -> Result; } -// TODO need als Framable for those types defined in other crates. +// TODO need also Framable for those types defined in other crates. impl Framable for Sitemty where - T: SitemtyFrameType + Send, + T: SitemtyFrameType + Serialize + Send, { fn typeid(&self) -> u32 { - todo!() + T::FRAME_TYPE_ID } fn make_frame(&self) -> Result { - todo!() + make_frame_2(self, T::FRAME_TYPE_ID) } } -/* - -impl Framable for Sitemty { - fn typeid(&self) -> u32 { - EventQueryJsonStringFrame::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - panic!() - } -} - -impl Framable for Result>, Error> { - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Result>, Error> { - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Result>>, err::Error> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Result>>, err::Error> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} -*/ - pub trait EventsNodeProcessor: Send + Unpin { type Input; type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType;