diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 3e417f5..a78f740 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -10,7 +10,7 @@ pub mod indexfiles; pub mod indextree; pub mod pipe; -use self::indexfiles::list_index_files; +use self::indexfiles::{database_connect, list_index_files}; use self::indextree::channel_list; use crate::timed::Timed; use crate::wrap_task; @@ -22,7 +22,9 @@ use items::{StreamItem, WithLen}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse}; +use netpod::{ScalarType, Shape}; use serde::Serialize; +use serde_json::Value as JsVal; use std::convert::TryInto; const EPICS_EPOCH_OFFSET: u64 = 631152000 * SEC; @@ -161,6 +163,47 @@ pub fn list_all_channels(node: &ChannelArchiver) -> Receiver Result { + let dbc = database_connect(&conf.database).await?; + let sql = "select config from channels where name = $1"; + let rows = dbc.query(sql, &[&q.channel.name()]).await?; + if let Some(row) = rows.first() { + let cfg: JsVal = row.try_get(0)?; + let val = cfg + .get("shape") + .ok_or_else(|| Error::with_msg_no_trace("shape not found on config"))?; + let shape = Shape::from_db_jsval(val)?; + let val = cfg + .get("scalarType") + .ok_or_else(|| Error::with_msg_no_trace("no scalarType in db"))?; + let s = if let JsVal::String(s) = val { + s + } else { + return Err(Error::with_msg_no_trace(format!( + "channel_config_from_db bad scalar type {:?}", + cfg + ))); + }; + let scalar_type = ScalarType::from_archeng_db_str(s)?; + let ret = ChannelConfigResponse { + channel: q.channel.clone(), + scalar_type, + // TODO.. only binary endpoint would care. + byte_order: None, + shape, + }; + Ok(ret) + } else { + Err(Error::with_msg_no_trace(format!( + "can not find config for {}", + q.channel.name() + ))) + } +} + pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> Result { let _timed = Timed::new("channel_config"); let mut type_info = None; diff --git a/archapp/src/archeng/blockrefstream.rs b/archapp/src/archeng/blockrefstream.rs index f778361..4ff2462 100644 --- a/archapp/src/archeng/blockrefstream.rs +++ b/archapp/src/archeng/blockrefstream.rs @@ -89,14 +89,20 @@ impl BlockrefStream { } SelectIndexFile => { let dbc = database_connect(&self.conf.database).await?; - let sql = "select path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; + let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; let rows = dbc.query(sql, &[&self.channel.name()]).await?; for row in rows { - self.paths.push_back(row.try_get(0)?); + let p: String = row.try_get(0)?; + if self.paths.is_empty() && (p.contains("_ST/") || p.contains("_SH/")) { + self.paths.push_back(p); + } } if self.paths.len() == 0 { self.steps = Done; - Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("NOMOREPATHS"))), self))) + Ok(Some(( + BlockrefItem::JsVal(JsVal::String(format!("NOPATHSFROMDB"))), + self, + ))) } else { self.steps = SetupNextPath; Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("DBQUERY"))), self))) @@ -122,7 +128,7 @@ impl BlockrefStream { }; Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("NEXTPATH"))), self))) } else { - self.steps = SelectIndexFile; + self.steps = Done; Ok(Some(( BlockrefItem::JsVal(JsVal::String(format!("PATHQUEUEEMPTY"))), self, diff --git a/archapp/src/archeng/blockstream.rs b/archapp/src/archeng/blockstream.rs index f5ef95d..0c2c1b3 100644 --- a/archapp/src/archeng/blockstream.rs +++ b/archapp/src/archeng/blockstream.rs @@ -9,7 +9,7 @@ use futures_util::stream::FuturesOrdered; use futures_util::StreamExt; use items::eventsitem::EventsItem; use items::{WithLen, WithTimestamps}; -use netpod::{log::*, NanoRange}; +use netpod::{log::*, NanoRange, Nanos}; use serde::Serialize; use serde_json::Value as JsVal; use std::collections::{BTreeMap, VecDeque}; @@ -58,9 +58,9 @@ struct Reader { impl Reader {} struct FutAItem { - #[allow(unused)] fname: String, path: PathBuf, + dpos: DataheaderPos, dfnotfound: bool, reader: Option, bytes_read: u64, @@ -101,6 +101,8 @@ pub struct BlockStream { last_dfname: String, last_dfhpos: DataheaderPos, ts_max: u64, + data_done: bool, + raco: bool, done: bool, complete: bool, acc: StatsAcc, @@ -115,6 +117,7 @@ impl BlockStream { where S: Stream> + Unpin, { + debug!("new BlockStream"); Self { inp, inp_done: false, @@ -126,6 +129,8 @@ impl BlockStream { last_dfname: String::new(), last_dfhpos: DataheaderPos(u64::MAX), ts_max: 0, + data_done: false, + raco: false, done: false, complete: false, acc: StatsAcc::new(), @@ -159,6 +164,14 @@ where } else if self.done { self.complete = true; Ready(None) + } else if self.data_done { + self.done = true; + if self.raco { + // currently handled downstream + continue; + } else { + continue; + } } else { let item1 = if self.inp_done { Int::Done @@ -201,7 +214,7 @@ where Some(reader) } else { let stats = StatsChannel::dummy(); - debug!("open new reader file {:?}", dpath); + trace!("open new reader file {:?}", dpath); match open_read(dpath.clone(), &stats).await { Ok(file) => { // @@ -217,7 +230,8 @@ where if let Some(mut reader) = reader { let rp1 = reader.rb.bytes_read(); let dfheader = - read_datafile_header2(&mut reader.rb, pos).await?; + read_datafile_header2(&mut reader.rb, pos.clone()) + .await?; // TODO handle expand let expand = false; let data = @@ -234,6 +248,7 @@ where let ret = FutAItem { fname, path: dpath, + dpos: pos, dfnotfound: false, reader: Some(reader), bytes_read, @@ -245,6 +260,7 @@ where let ret = FutAItem { fname, path: dpath, + dpos: pos, dfnotfound: true, reader: None, bytes_read: 0, @@ -285,23 +301,62 @@ where } else { match self.block_reads.poll_next_unpin(cx) { Ready(Some(Ok(item))) => { + let mut item = item; + item.events = if let Some(ev) = item.events { + if ev.len() > 0 { + if ev.ts(ev.len() - 1) > self.range.end { + debug!(". . . . ===== DATA DONE ----------------------"); + self.raco = true; + self.data_done = true; + } + } + if ev.len() == 1 { + debug!("From {} {:?} {}", item.fname, item.path, item.dpos.0); + debug!("See 1 event {:?}", Nanos::from_ns(ev.ts(0))); + } else if ev.len() > 1 { + debug!("From {} {:?} {}", item.fname, item.path, item.dpos.0); + debug!( + "See {} events {:?} to {:?}", + ev.len(), + Nanos::from_ns(ev.ts(0)), + Nanos::from_ns(ev.ts(ev.len() - 1)) + ); + } + let mut contains_unordered = false; + for i in 0..ev.len() { + let ts = ev.ts(i); + debug!("\nSEE EVENT {:?}", Nanos::from_ns(ts)); + if ts < self.ts_max { + contains_unordered = true; + if true { + let msg = format!( + "unordered event in item at {} ts {:?} ts_max {:?}", + i, + Nanos::from_ns(ts), + Nanos::from_ns(self.ts_max) + ); + error!("{}", msg); + self.done = true; + return Ready(Some(Err(Error::with_msg_no_trace(msg)))); + } + } + self.ts_max = ts; + } + if contains_unordered { + Some(ev) + } else { + Some(ev) + } + } else { + None + }; + let item = item; if item.dfnotfound { - self.dfnotfound.insert(item.path, true); + self.dfnotfound.insert(item.path.clone(), true); } if let Some(reader) = item.reader { self.readers.push_back(reader); } - if let Some(ev) = &item.events { - for i in 0..ev.len() { - let ts = ev.ts(i); - if ts < self.ts_max { - let msg = format!("unordered event: {} {}", ts, self.ts_max); - error!("{}", msg); - self.done = true; - return Ready(Some(Err(Error::with_msg_no_trace(msg)))); - } - } - } self.acc.add(item.events_read, item.bytes_read); if false { let item = JsVal::String(format!( diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index 6d29806..5490415 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -1,17 +1,92 @@ +use crate::archeng::blockrefstream::blockref_stream; +use crate::archeng::blockstream::BlockStream; use crate::archeng::datablockstream::DatablockStream; use crate::events::{FrameMaker, FrameMakerTrait}; use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; -use items::Framable; -use netpod::ChannelConfigQuery; -use netpod::{query::RawEventsQuery, ChannelArchiver}; +use futures_util::{Stream, StreamExt}; +use items::binnedevents::XBinnedEvents; +use items::eventsitem::EventsItem; +use items::plainevents::PlainEvents; +use items::{Framable, LogItem, RangeCompletableItem, StreamItem}; +use netpod::query::RawEventsQuery; +use netpod::{log::*, AggKind, Shape}; +use netpod::{ChannelArchiver, ChannelConfigQuery}; use std::pin::Pin; use streams::rangefilter::RangeFilter; pub async fn make_event_pipe( evq: &RawEventsQuery, - conf: &ChannelArchiver, + conf: ChannelArchiver, +) -> Result> + Send>>, Error> { + debug!("make_event_pipe {:?}", evq); + let channel_config = { + let q = ChannelConfigQuery { + channel: evq.channel.clone(), + range: evq.range.clone(), + }; + crate::archeng::channel_config_from_db(&q, &conf).await? + }; + debug!("Channel config: {:?}", channel_config); + use crate::archeng::blockstream::BlockItem; + let refs = blockref_stream(evq.channel.clone(), evq.range.clone(), conf.clone()); + let blocks = BlockStream::new(Box::pin(refs), evq.range.clone(), 1); + let blocks = blocks.map(|k| match k { + Ok(item) => match item { + BlockItem::EventsItem(item) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))), + BlockItem::JsVal(jsval) => Ok(StreamItem::Log(LogItem::quick(Level::DEBUG, format!("{:?}", jsval)))), + }, + Err(e) => Err(e), + }); + let filtered = RangeFilter::new(blocks, evq.range.clone(), evq.agg_kind.need_expand()); + let xtrans = match channel_config.shape { + Shape::Scalar => match evq.agg_kind { + AggKind::Plain => Box::pin(filtered) as Pin + Send>>, + AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { + let tr = filtered.map(|j| match j { + Ok(j) => match j { + StreamItem::DataItem(j) => match j { + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + RangeCompletableItem::Data(j) => match j { + EventsItem::Plain(j) => match j { + PlainEvents::Scalar(j) => { + let item = XBinnedEvents::Scalar(j); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + PlainEvents::Wave(_) => panic!(), + }, + EventsItem::XBinnedEvents(_) => panic!(), + }, + }, + StreamItem::Log(j) => Ok(StreamItem::Log(j)), + StreamItem::Stats(j) => Ok(StreamItem::Stats(j)), + }, + Err(e) => Err(e), + }); + Box::pin(tr) as _ + } + AggKind::DimXBinsN(_) => err::todoval(), + AggKind::EventBlobs => err::todoval(), + }, + _ => { + error!("TODO shape {:?}", channel_config.shape); + panic!() + } + }; + let mut frame_maker = Box::new(FrameMaker::with_item_type( + channel_config.scalar_type.clone(), + channel_config.shape.clone(), + evq.agg_kind.clone(), + )) as Box; + let ret = xtrans.map(move |j| frame_maker.make_frame(j)); + Ok(Box::pin(ret)) +} + +pub async fn make_event_pipe1( + evq: &RawEventsQuery, + conf: ChannelArchiver, ) -> Result> + Send>>, Error> { let range = evq.range.clone(); let channel = evq.channel.clone(); @@ -25,7 +100,7 @@ pub async fn make_event_pipe( channel: channel.clone(), range: range.clone(), }; - crate::archeng::channel_config(&q, conf).await? + crate::archeng::channel_config_from_db(&q, &conf).await? }; let data = DatablockStream::for_channel_range( diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 23d1efd..997e671 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -138,7 +138,10 @@ macro_rules! arm2 { _ => panic!(), }, - _ => err::todoval(), + _ => { + error!("unexpected arm2 case"); + err::todoval() + } }, }, StreamItem::Log(k) => Ok(StreamItem::Log(k)), @@ -223,6 +226,7 @@ macro_rules! arm1 { }, Shape::Image(..) => { // There should be no images on archiver. + warn!("TODO for {:?}", $shape); err::todoval() } } @@ -240,7 +244,10 @@ impl FrameMakerTrait for FrameMaker { ScalarType::I32 => arm1!(item, i32, Int, shape, agg_kind), ScalarType::F32 => arm1!(item, f32, Float, shape, agg_kind), ScalarType::F64 => arm1!(item, f64, Double, shape, agg_kind), - _ => err::todoval(), + _ => { + warn!("TODO for scalar_type {:?}", scalar_type); + err::todoval() + } } } } @@ -273,14 +280,13 @@ pub async fn make_single_event_pipe( base_path: PathBuf, ) -> Result> + Send>>, Error> { // TODO must apply the proper x-binning depending on the requested AggKind. - info!("make_event_pipe {:?}", evq); + debug!("make_single_event_pipe {:?}", evq); let evq = evq.clone(); let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, base_path)?; //let dtbeg = Utc.timestamp((evq.range.beg / 1000000000) as i64, (evq.range.beg % 1000000000) as u32); let (tx, rx) = async_channel::bounded(16); let block1 = async move { - trace!("++++++++++++++++++++++++++++"); - info!("start read of {:?}", dir); + debug!("start read of {:?}", dir); // TODO first collect all matching filenames, then sort, then open files. // TODO if dir does not exist, should notify client but not log as error. @@ -299,13 +305,13 @@ pub async fn make_single_event_pipe( if s.starts_with(&prefix) && s.ends_with(".pb") { match parse_data_filename(&s) { Ok(df) => { - info!("parse went ok: {} {}", df.year, df.month); + debug!("parse went ok: {} {}", df.year, df.month); let ts0 = Utc.ymd(df.year as i32, df.month, 1).and_hms(0, 0, 0); let ts1 = ts0.timestamp() as u64 * SEC + ts0.timestamp_subsec_nanos() as u64; - info!("file {} {}", ts1, ts1 + DAY * 27); - info!("range {} {}", evq.range.beg, evq.range.end); + debug!("file {} {}", ts1, ts1 + DAY * 27); + debug!("range {} {}", evq.range.beg, evq.range.end); if evq.range.beg < ts1 + DAY * 27 && evq.range.end > ts1 { - info!("•••••••••••••••••••••••••• file matches requested range"); + debug!("•••••••••••••••••••••••••• file matches requested range"); let f1 = File::open(de.path()).await?; info!("opened {:?}", de.path()); @@ -322,7 +328,7 @@ pub async fn make_single_event_pipe( f1.seek(SeekFrom::Start(0)).await?; let mut pbr = PbFileReader::new(f1).await; pbr.read_header().await?; - info!("✓ read header {:?}", pbr.payload_type()); + debug!("✓ read header {:?}", pbr.payload_type()); pbr.file().seek(SeekFrom::Start(pos1)).await?; pbr.reset_io(pos1); @@ -347,7 +353,7 @@ pub async fn make_single_event_pipe( } } Ok(None) => { - info!("reached end of file"); + debug!("reached end of file"); break; } Err(e) => { @@ -363,7 +369,7 @@ pub async fn make_single_event_pipe( } } } else { - info!("prefix {} s {}", prefix, s); + debug!("prefix {} s {}", prefix, s); } } Ok::<_, Error>(()) diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 13b7565..7d5764b 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -105,7 +105,7 @@ pub async fn search_channel_archeng( if k == "Scalar" { vec![] } else { - return Err(Error::with_msg_no_trace(format!("can not understand {:?}", config))); + return Err(Error::with_msg_no_trace(format!("search_channel_archeng can not understand {:?}", config))); } } JsVal::Object(k) => match k.get("Wave") { @@ -114,15 +114,15 @@ pub async fn search_channel_archeng( vec![k.as_i64().unwrap_or(u32::MAX as i64) as u32] } _ => { - return Err(Error::with_msg_no_trace(format!("can not understand {:?}", config))); + return Err(Error::with_msg_no_trace(format!("search_channel_archeng can not understand {:?}", config))); } }, None => { - return Err(Error::with_msg_no_trace(format!("can not understand {:?}", config))); + return Err(Error::with_msg_no_trace(format!("search_channel_archeng can not understand {:?}", config))); } }, _ => { - return Err(Error::with_msg_no_trace(format!("can not understand {:?}", config))); + return Err(Error::with_msg_no_trace(format!("search_channel_archeng can not understand {:?}", config))); } }, None => vec![], diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 1d76f8b..8c24ba8 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -126,7 +126,12 @@ where let range = BinnedRange::covering_range(evq.range.clone(), count as u32)? .ok_or(Error::with_msg("covering_range returns None"))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); + let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()).map(|k| { + info!( + "setup_merged_from_remotes, MergedFromRemotes yields {:?}", + show_event_basic_info(&k) + ); + }); let ret = TBinnerStream::<_, ::Output>::new( s, range, diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 0fff4ea..4b78d9e 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -176,7 +176,10 @@ where let path = cfd.path(&node_config); let enc = serde_cbor::to_vec(&values)?; let ts1 = Instant::now(); - tokio::fs::create_dir_all(path.parent().unwrap()).await?; + tokio::fs::create_dir_all(path.parent().unwrap()).await.map_err(|e| { + error!("can not create cache directory {:?}", path.parent()); + e + })?; let now = Utc::now(); let mut h = crc32fast::Hasher::new(); h.update(&now.timestamp_nanos().to_le_bytes()); diff --git a/disk/src/raw/eventsfromframes.rs b/disk/src/raw/eventsfromframes.rs index 3177fe3..38d98fe 100644 --- a/disk/src/raw/eventsfromframes.rs +++ b/disk/src/raw/eventsfromframes.rs @@ -64,10 +64,7 @@ where } }, Err(e) => { - error!( - "EventsFromFrames ~~~~~~~~ ERROR on frame payload {}", - frame.buf().len(), - ); + error!("frame payload len {} tyid {} {}", frame.buf().len(), frame.tyid(), e); self.errored = true; Ready(Some(Err(e))) } diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 6ea8d2f..2ef45bb 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -178,6 +178,16 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/version" { + if req.method() == Method::GET { + let ret = serde_json::json!({ + //"data_api_version": "4.0.0-beta", + "data_api_version_major": 4, + }); + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path == "/api/4/search/channel" { if req.method() == Method::GET { Ok(search::channel_search(req, &node_config).await?) @@ -419,6 +429,13 @@ where } async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + match binned_inner(req, node_config).await { + Ok(ret) => Ok(ret), + Err(e) => Ok(response(StatusCode::BAD_REQUEST).body(Body::from(e.msg().to_string()))?), + } +} + +async fn binned_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let url = Url::parse(&format!("dummy:{}", head.uri))?; let query = BinnedQuery::from_url(&url)?; @@ -465,6 +482,13 @@ async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Resu } async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + match prebinned_inner(req, node_config).await { + Ok(ret) => Ok(ret), + Err(e) => Ok(response(StatusCode::BAD_REQUEST).body(Body::from(e.msg().to_string()))?), + } +} + +async fn prebinned_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let query = PreBinnedQuery::from_request(&head)?; let desc = format!( @@ -492,30 +516,23 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result } async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - debug!("httpret plain_events headers: {:?}", req.headers()); + match plain_events_inner(req, node_config).await { + Ok(ret) => Ok(ret), + Err(e) => Ok(response(StatusCode::BAD_REQUEST).body(Body::from(e.msg().to_string()))?), + } +} + +async fn plain_events_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + debug!("httpret plain_events_inner headers: {:?}", req.headers()); let accept_def = ""; let accept = req .headers() .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON { - let ret = match plain_events_json(req, node_config).await { - Ok(ret) => ret, - Err(e) => { - error!("{}", e); - response_err(StatusCode::BAD_REQUEST, e.msg())? - } - }; - Ok(ret) + Ok(plain_events_json(req, node_config).await?) } else if accept == APP_OCTET { - let ret = match plain_events_binary(req, node_config).await { - Ok(ret) => ret, - Err(e) => { - error!("{}", e); - response_err(StatusCode::BAD_REQUEST, e.msg())? - } - }; - Ok(ret) + Ok(plain_events_binary(req, node_config).await?) } else { let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("unsupported Accept: {:?}", accept))?; Ok(ret) @@ -699,7 +716,7 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) //let pairs = get_url_query_pairs(&url); let q = ChannelConfigQuery::from_url(&url)?; let conf = if let Some(conf) = &node_config.node.channel_archiver { - archapp_wrap::archapp::archeng::channel_config(&q, conf).await? + archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf).await? } else if let Some(conf) = &node_config.node.archiver_appliance { archapp_wrap::channel_config(&q, conf).await? } else { diff --git a/items/src/binnedevents.rs b/items/src/binnedevents.rs index b56cccb..bc0d157 100644 --- a/items/src/binnedevents.rs +++ b/items/src/binnedevents.rs @@ -2,13 +2,14 @@ use crate::xbinnedscalarevents::XBinnedScalarEvents; use crate::xbinnedwaveevents::XBinnedWaveEvents; use crate::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; +use serde::{Deserialize, Serialize}; use crate::{ eventsitem::EventsItem, plainevents::{PlainEvents, ScalarPlainEvents}, }; -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum SingleBinWaveEvents { Byte(XBinnedScalarEvents), Short(XBinnedScalarEvents), @@ -172,7 +173,7 @@ impl HasScalarType for SingleBinWaveEvents { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum MultiBinWaveEvents { Byte(XBinnedWaveEvents), Short(XBinnedWaveEvents), @@ -336,7 +337,7 @@ impl HasScalarType for MultiBinWaveEvents { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum XBinnedEvents { Scalar(ScalarPlainEvents), SingleBinWave(SingleBinWaveEvents), diff --git a/items/src/eventsitem.rs b/items/src/eventsitem.rs index b4e1308..dcc9ce1 100644 --- a/items/src/eventsitem.rs +++ b/items/src/eventsitem.rs @@ -1,14 +1,19 @@ use crate::binnedevents::XBinnedEvents; use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; -use crate::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps}; +use crate::{Appendable, Clearable, PushableIndex, SitemtyFrameType, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; +use serde::{Deserialize, Serialize}; -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum EventsItem { Plain(PlainEvents), XBinnedEvents(XBinnedEvents), } +impl SitemtyFrameType for EventsItem { + const FRAME_TYPE_ID: u32 = crate::EVENTS_ITEM_FRAME_TYPE_ID; +} + impl EventsItem { pub fn is_wave(&self) -> bool { use EventsItem::*; diff --git a/items/src/lib.rs b/items/src/lib.rs index e304334..f5c1269 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -40,6 +40,7 @@ pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x900; pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0xb00; pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; +pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; pub fn bool_is_false(j: &bool) -> bool { *j == false diff --git a/items/src/plainevents.rs b/items/src/plainevents.rs index df832df..9d68c06 100644 --- a/items/src/plainevents.rs +++ b/items/src/plainevents.rs @@ -5,8 +5,9 @@ use crate::waveevents::{WaveEvents, WaveXBinner}; use crate::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, WithLen, WithTimestamps}; use err::Error; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; +use serde::{Deserialize, Serialize}; -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum ScalarPlainEvents { Byte(EventValues), Short(EventValues), @@ -151,7 +152,7 @@ impl HasScalarType for ScalarPlainEvents { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum WavePlainEvents { Byte(WaveEvents), Short(WaveEvents), @@ -346,7 +347,7 @@ impl HasScalarType for WavePlainEvents { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum PlainEvents { Scalar(ScalarPlainEvents), Wave(WavePlainEvents), diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index d76a893..ab9e5bd 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -96,7 +96,31 @@ impl ScalarType { "int64" => I64, "float" => F32, "double" => F64, - _ => return Err(Error::with_msg_no_trace(format!("can not understand bsread {}", s))), + _ => { + return Err(Error::with_msg_no_trace(format!( + "from_bsread_str can not understand bsread {}", + s + ))) + } + }; + Ok(ret) + } + + pub fn from_archeng_db_str(s: &str) -> Result { + use ScalarType::*; + let ret = match s { + "I8" => I8, + "I16" => I16, + "I32" => I32, + "I64" => I64, + "F32" => F32, + "F64" => F64, + _ => { + return Err(Error::with_msg_no_trace(format!( + "from_archeng_db_str can not understand {}", + s + ))) + } }; Ok(ret) } @@ -394,7 +418,10 @@ impl ByteOrder { match s { "little" => Ok(ByteOrder::LE), "big" => Ok(ByteOrder::BE), - _ => Err(Error::with_msg_no_trace(format!("can not understand {}", s))), + _ => Err(Error::with_msg_no_trace(format!( + "ByteOrder::from_bsread_str can not understand {}", + s + ))), } } @@ -450,13 +477,44 @@ impl Shape { JsVal::Number(v) => match v.as_u64() { Some(0) | Some(1) => Ok(Shape::Scalar), Some(v) => Ok(Shape::Wave(v as u32)), - None => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))), + None => Err(Error::with_msg_no_trace(format!( + "Shape from_bsread_jsval can not understand {:?}", + v + ))), }, - _ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))), + _ => Err(Error::with_msg_no_trace(format!( + "Shape from_bsread_jsval can not understand {:?}", + v + ))), }, - _ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))), + _ => Err(Error::with_msg_no_trace(format!( + "Shape from_bsread_jsval can not understand {:?}", + v + ))), }, - _ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))), + _ => Err(Error::with_msg_no_trace(format!( + "Shape from_bsread_jsval can not understand {:?}", + v + ))), + } + } + + pub fn from_db_jsval(v: &JsVal) -> Result { + match v { + JsVal::String(s) => { + if s == "Scalar" { + Ok(Shape::Scalar) + } else { + Err(Error::with_msg_no_trace(format!( + "Shape from_db_jsval can not understand {:?}", + v + ))) + } + } + _ => Err(Error::with_msg_no_trace(format!( + "Shape from_db_jsval can not understand {:?}", + v + ))), } } } @@ -474,15 +532,45 @@ pub mod timeunits { pub const DAY: u64 = HOUR * 24; } -const BIN_T_LEN_OPTIONS_0: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; +//const BIN_T_LEN_OPTIONS_0: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; -const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; +//const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; -const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32]; +//const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32]; // Maybe alternative for GLS: //const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [HOUR * 4, DAY * 4, DAY * 16, DAY * 32]; -const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32]; +//const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32]; + +const BIN_T_LEN_OPTIONS_0: [u64; 2] = [ + // + //SEC, + //MIN * 10, + HOUR * 2, + DAY, +]; + +const PATCH_T_LEN_KEY: [u64; 2] = [ + // + //SEC, + //MIN * 10, + HOUR * 2, + DAY, +]; +const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 2] = [ + // + //MIN * 60, + //HOUR * 4, + DAY * 8, + DAY * 32, +]; +const PATCH_T_LEN_OPTIONS_WAVE: [u64; 2] = [ + // + //MIN * 10, + //HOUR * 2, + DAY * 8, + DAY * 32, +]; const BIN_THRESHOLDS: [u64; 31] = [ 2, @@ -622,6 +710,9 @@ impl PreBinnedPatchRange { if t <= bs { let bin_t_len = t; let patch_t_len = get_patch_t_len(bin_t_len); + if !PreBinnedPatchGridSpec::is_valid_bin_t_len(bin_t_len) { + return Err(Error::with_msg_no_trace(format!("not a valid bin_t_len {}", bin_t_len))); + } let grid_spec = PreBinnedPatchGridSpec { bin_t_len, patch_t_len }; let pl = patch_t_len; let ts1 = range.beg / pl * pl; diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index d910e03..a115598 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -119,7 +119,7 @@ async fn events_conn_handler_inner_try( let mut p1: Pin> + Send>> = if let Some(aa) = &node_config.node.channel_archiver { - match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, aa).await { + match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, aa.clone()).await { Ok(j) => j, Err(e) => return Err((e, netout))?, } diff --git a/streams/src/rangefilter.rs b/streams/src/rangefilter.rs index fb61ff4..f8a1226 100644 --- a/streams/src/rangefilter.rs +++ b/streams/src/rangefilter.rs @@ -5,10 +5,15 @@ use items::StatsItem; use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps}; use netpod::{log::*, RangeFilterStats}; use netpod::{NanoRange, Nanos}; +use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct RangeFilter { +pub struct RangeFilter +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ inp: S, range: NanoRange, range_str: String, @@ -23,11 +28,15 @@ pub struct RangeFilter { raco_done: bool, done: bool, complete: bool, + items_with_pre: usize, + items_with_post: usize, + items_with_unordered: usize, } impl RangeFilter where - ITY: Appendable, + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, { pub fn new(inp: S, range: NanoRange, expand: bool) -> Self { trace!("RangeFilter::new range: {:?} expand: {:?}", range, expand); @@ -46,6 +55,9 @@ where raco_done: false, done: false, complete: false, + items_with_pre: 0, + items_with_post: 0, + items_with_unordered: 0, } } } @@ -79,70 +91,88 @@ where match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => match item { Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { + let mut contains_pre = false; + let mut contains_post = false; + let mut contains_unordered = false; let mut ret = item.empty_like_self(); for i1 in 0..item.len() { let ts = item.ts(i1); if ts < self.ts_max { - self.done = true; - let msg = format!( - "unordered event i1 {} / {} ts {:?} ts_max {:?}", - i1, - item.len(), - Nanos::from_ns(ts), - Nanos::from_ns(self.ts_max) - ); - error!("{}", msg); - return Ready(Some(Err(Error::with_msg(msg)))); - } - self.ts_max = ts; - if ts < self.range.beg { - if self.expand { - let mut prerange = if let Some(prerange) = self.prerange.take() { - prerange - } else { - item.empty_like_self() - }; - prerange.clear(); - prerange.push_index(&item, i1); - self.prerange = Some(prerange); - self.have_pre = true; + contains_unordered = true; + if false { + self.done = true; + let msg = format!( + "unordered event i1 {} / {} ts {:?} ts_max {:?}", + i1, + item.len(), + Nanos::from_ns(ts), + Nanos::from_ns(self.ts_max) + ); + error!("{}", msg); + return Ready(Some(Err(Error::with_msg(msg)))); } - } else if ts >= self.range.end { - self.have_range_complete = true; - if self.expand { - if self.have_pre { - let prerange = if let Some(prerange) = &mut self.prerange { + } else { + self.ts_max = ts; + if ts < self.range.beg { + contains_pre = true; + if self.expand { + let mut prerange = if let Some(prerange) = self.prerange.take() { prerange } else { - panic!() + item.empty_like_self() }; - ret.push_index(prerange, 0); prerange.clear(); - self.have_pre = false; + prerange.push_index(&item, i1); + self.prerange = Some(prerange); + self.have_pre = true; } - if !self.emitted_post { - self.emitted_post = true; - ret.push_index(&item, i1); + } else if ts >= self.range.end { + contains_post = true; + self.have_range_complete = true; + if self.expand { + if self.have_pre { + let prerange = if let Some(prerange) = &mut self.prerange { + prerange + } else { + panic!() + }; + ret.push_index(prerange, 0); + prerange.clear(); + self.have_pre = false; + } + if !self.emitted_post { + self.emitted_post = true; + ret.push_index(&item, i1); + //self.data_done = true; + } + } else { //self.data_done = true; } } else { - //self.data_done = true; - } - } else { - if self.expand { - if self.have_pre { - let prerange = if let Some(prerange) = &mut self.prerange { - prerange - } else { - panic!() - }; - ret.push_index(prerange, 0); - prerange.clear(); - self.have_pre = false; + if self.expand { + if self.have_pre { + let prerange = if let Some(prerange) = &mut self.prerange { + prerange + } else { + panic!() + }; + ret.push_index(prerange, 0); + prerange.clear(); + self.have_pre = false; + } } + ret.push_index(&item, i1); } - ret.push_index(&item, i1); - }; + } + } + if contains_pre { + self.items_with_pre += 1; + } + if contains_post { + self.items_with_post += 1; + } + if contains_unordered { + self.items_with_unordered += 1; } Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } @@ -189,3 +219,27 @@ where span1.in_scope(|| Self::poll_next(self, cx)) } } + +impl fmt::Debug for RangeFilter +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("RangeFilter") + .field("items_with_pre", &self.items_with_pre) + .field("items_with_post", &self.items_with_post) + .field("items_with_unordered", &self.items_with_unordered) + .finish() + } +} + +impl Drop for RangeFilter +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + fn drop(&mut self) { + debug!("Drop {:?}", self); + } +} diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index f058d69..d3c1f23 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -95,10 +95,12 @@ pub fn tracing_init() { "archapp::archeng=info", "archapp::archeng::datablockstream=info", "archapp::archeng::indextree=info", - "archapp::archeng::blockstream=info", + "archapp::archeng::blockstream=debug", "archapp::archeng::ringbuf=info", "archapp::archeng::backreadbuf=info", + "archapp::archeng::pipe=trace", "archapp::storagemerge=info", + "streams::rangefilter=debug", "daqbuffer::test=trace", ] .join(","), @@ -137,10 +139,10 @@ pub fn test_cluster() -> netpod::Cluster { netpod::Cluster { nodes, database: netpod::Database { - name: "daqbuffer".into(), host: "localhost".into(), - user: "daqbuffer".into(), - pass: "daqbuffer".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), }, run_map_pulse_task: false, is_central_storage: false,