diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 001e89d..33b0058 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -1,4 +1,5 @@ pub mod channelconfig; +pub mod query; pub mod scan; pub mod search; diff --git a/dbconn/src/query.rs b/dbconn/src/query.rs new file mode 100644 index 0000000..369949d --- /dev/null +++ b/dbconn/src/query.rs @@ -0,0 +1,33 @@ +use crate::create_connection; +use crate::ErrConv; +use err::Error; +use netpod::Channel; +use netpod::NodeConfigCached; + +// For sf-databuffer backend, given a Channel, try to complete the information if only id is given. +pub async fn sf_databuffer_fetch_channel_by_series(channel: Channel, ncc: &NodeConfigCached) -> Result { + // TODO should not be needed at some point. + if channel.backend().is_empty() || channel.name().is_empty() { + let series = channel + .series() + .ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64; + let pgcon = create_connection(&ncc.node_config.cluster.database).await?; + let mut rows = pgcon + .query("select name from channels where rowid = $1", &[&series]) + .await + .err_conv()?; + if let Some(row) = rows.pop() { + let name: String = row.get(0); + let channel = Channel { + series: channel.series, + backend: ncc.node_config.cluster.backend.clone(), + name, + }; + Ok(channel) + } else { + Err(Error::with_msg_no_trace("can not find series")) + } + } else { + Ok(channel) + } +} diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index a209f8c..ed906c9 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -70,6 +70,7 @@ pub async fn make_event_pipe( let channel_config = match channel_config { Ok(x) => x, Err(e) => { + error!("make_event_pipe can not find config"); if e.msg().contains("ErrorKind::NotFound") { warn!("{e}"); let s = futures_util::stream::empty(); diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index fae087c..a3f0b6e 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -1,3 +1,4 @@ +use dbconn::query::sf_databuffer_fetch_channel_by_series; use err::Error; use netpod::log::*; use netpod::range::evrange::NanoRange; @@ -69,6 +70,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig Ok(ret) } else if ncc.node.sf_databuffer.is_some() { info!("try to get ChConf for sf-databuffer type backend"); + let channel = sf_databuffer_fetch_channel_by_series(channel, ncc).await?; let c1 = disk::channelconfig::config(range, channel, ncc).await?; let ret = ChConf { backend: c1.channel.backend, diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 825c2af..4f1349e 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -335,9 +335,11 @@ pub async fn read_local_config(channel: Channel, node: Node) -> Result k, Err(e) => match e.kind() { ErrorKind::NotFound => { + let bt = err::bt::Backtrace::new(); + netpod::log::error!("{bt:?}"); return Err(Error::with_public_msg(format!( "databuffer channel config file not found for channel {channel:?} at {path:?}" - ))) + ))); } ErrorKind::PermissionDenied => { return Err(Error::with_public_msg(format!( diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 66f8dcf..c50c926 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -5,7 +5,6 @@ use futures_util::stream; use futures_util::StreamExt; use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; -use items_0::Empty; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; use netpod::log::*; @@ -16,14 +15,10 @@ use serde_json::Value as JsonValue; use std::time::Duration; use std::time::Instant; -pub async fn plain_events_json( - query: &PlainEventsQuery, - chconf: &ChConf, - cluster: &Cluster, -) -> Result { - if query.channel().name() == "wasm-test-01" { +pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster: &Cluster) -> Result { + if evq.channel().name() == "wasm-test-01" { use wasmer::Value; - let wasm = query.channel().name().as_bytes(); + let wasm = evq.channel().name().as_bytes(); let mut store = wasmer::Store::default(); let module = wasmer::Module::new(&store, wasm).unwrap(); let import_object = wasmer::imports! {}; @@ -33,14 +28,14 @@ pub async fn plain_events_json( assert_eq!(result[0], Value::I32(43)); } // TODO remove magic constant - let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000); - let events_max = query.events_max(); - let evquery = query.clone(); + let deadline = Instant::now() + evq.timeout() + Duration::from_millis(1000); + let events_max = evq.events_max(); + let evquery = evq.clone(); info!("plain_events_json evquery {:?}", evquery); //let ev_agg_kind = evquery.agg_kind().as_ref().map_or(AggKind::Plain, |x| x.clone()); //info!("plain_events_json ev_agg_kind {:?}", ev_agg_kind); warn!("TODO feed through transform chain"); - let empty = if query.transform().is_pulse_id_diff() { + let empty = if evq.transform().is_pulse_id_diff() { use items_0::Empty; Box::new(items_2::eventsdim0::EventsDim0::::empty()) } else { @@ -60,13 +55,14 @@ pub async fn plain_events_json( use futures_util::Stream; use items_0::streamitem::Sitemty; use std::pin::Pin; - let stream: Pin> + Send>> = if query.transform().is_pulse_id_diff() { + let stream: Pin> + Send>> = if evq.transform().is_pulse_id_diff() { Box::pin(stream.map(|item| { let mut pulse_last = None; on_sitemty_data!(item, move |item| { use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; use items_0::Appendable; + use items_0::Empty; let x = match item { ChannelEvents::Events(item) => { let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item); @@ -95,13 +91,13 @@ pub async fn plain_events_json( //info!("item after merge: {item:?}"); item }); - let stream = RangeFilter2::new(stream, query.range().try_into()?, evquery.one_before_range()); + let stream = RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range()); let stream = stream.map(|item| { //info!("item after rangefilter: {item:?}"); item }); let stream = stream::iter([empty]).chain(stream); - let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?; + let collected = crate::collect::collect(stream, deadline, events_max, Some(evq.range().clone()), None).await?; let jsval = serde_json::to_value(&collected)?; Ok(jsval) }