diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 0faae02..7290494 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -138,7 +138,7 @@ pub async fn make_event_pipe( } } let range = &evq.range; - let channel_config = match read_local_config(&evq.channel, &node_config.node).await { + let channel_config = match read_local_config(evq.channel.clone(), node_config.node.clone()).await { Ok(k) => k, Err(e) => { if e.msg().contains("ErrorKind::NotFound") { @@ -204,7 +204,7 @@ pub async fn make_event_blobs_pipe( } } let range = &evq.range; - let channel_config = match read_local_config(&evq.channel, &node_config.node).await { + let channel_config = match read_local_config(evq.channel.clone(), node_config.node.clone()).await { Ok(k) => k, Err(e) => { if e.msg().contains("ErrorKind::NotFound") { diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 4992f3e..ed36286 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -2,6 +2,8 @@ use crate::gather::{gather_get_json_generic, SubRes}; use crate::{response, BodyStream}; use bytes::{BufMut, BytesMut}; use err::Error; +use futures_core::Stream; +use futures_util::{FutureExt, StreamExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use items::{RangeCompletableItem, StreamItem}; @@ -9,11 +11,12 @@ use itertools::Itertools; use netpod::query::RawEventsQuery; use netpod::{log::*, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; -use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use parse::channelconfig::{extract_matching_config_entry, read_local_config, Config, MatchingConfigEntry}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use url::Url; @@ -492,6 +495,188 @@ pub struct Api1ChannelHeader { compression: Option, } +pub struct DataApiPython3DataStream { + range: NanoRange, + channels: Vec, + node_config: NodeConfigCached, + chan_ix: usize, + chan_stream: Option> + Send>>>, + config_fut: Option> + Send>>>, + data_done: bool, + completed: bool, +} + +impl DataApiPython3DataStream { + pub fn new(range: NanoRange, channels: Vec, node_config: NodeConfigCached) -> Self { + Self { + range, + channels, + node_config, + chan_ix: 0, + chan_stream: None, + config_fut: None, + data_done: false, + completed: false, + } + } +} + +impl Stream for DataApiPython3DataStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.completed { + panic!("poll on completed") + } else if self.data_done { + self.completed = true; + Ready(None) + } else { + if let Some(stream) = &mut self.chan_stream { + match stream.poll_next_unpin(cx) { + Ready(k) => match k { + Some(k) => match k { + Ok(k) => Ready(Some(Ok(k))), + Err(e) => { + self.data_done = true; + Ready(Some(Err(e))) + } + }, + None => { + self.chan_stream = None; + continue; + } + }, + Pending => Pending, + } + } else if let Some(fut) = &mut self.config_fut { + match fut.poll_unpin(cx) { + Ready(Ok(config)) => { + self.config_fut = None; + let entry_res = match extract_matching_config_entry(&self.range, &config) { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let entry = match entry_res { + MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, + MatchingConfigEntry::Multiple => { + return Err(Error::with_msg("multiple config entries found"))? + } + MatchingConfigEntry::Entry(entry) => entry.clone(), + }; + warn!("found channel_config {:?}", entry); + let evq = RawEventsQuery { + channel: self.channels[self.chan_ix - 1].clone(), + range: self.range.clone(), + agg_kind: netpod::AggKind::EventBlobs, + disk_io_buffer_size: 1024 * 4, + }; + let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; + let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new( + evq, + perf_opts, + self.node_config.node_config.cluster.clone(), + ); + let s = s.map({ + let mut header_out = false; + let mut count_events = 0; + let channel = self.channels[self.chan_ix - 1].clone(); + move |b| { + let ret = match b { + Ok(b) => { + let f = match b { + StreamItem::DataItem(RangeCompletableItem::Data(b)) => { + let mut d = BytesMut::new(); + for i1 in 0..b.tss.len() { + if count_events < 6 { + info!( + "deco len {:?} BE {} scalar-type {:?} shape {:?}", + b.decomps[i1].as_ref().map(|x| x.len()), + b.be[i1], + b.scalar_types[i1], + b.shapes[i1] + ); + } + if !header_out { + let head = Api1ChannelHeader { + name: channel.name.clone(), + ty: scalar_type_to_api3proto(&b.scalar_types[i1]) + .into(), + byte_order: if b.be[i1] { + "BIG_ENDIAN".into() + } else { + "LITTLE_ENDIAN".into() + }, + // The shape is inconsistent on the events. + // Seems like the config is to be trusted in this case. + shape: shape_to_api3proto(&entry.shape), + compression: None, + }; + let h = serde_json::to_string(&head)?; + info!("sending channel header {}", h); + let l1 = 1 + h.as_bytes().len() as u32; + d.put_u32(l1); + d.put_u8(0); + d.extend_from_slice(h.as_bytes()); + d.put_u32(l1); + header_out = true; + } + { + if let Some(deco) = &b.decomps[i1] { + let l1 = 17 + deco.len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&deco); + d.put_u32(l1); + } + } + count_events += 1; + } + d + } + _ => BytesMut::new(), + }; + Ok(f) + } + Err(e) => Err(e), + }; + ret + } + }); + //let _ = Box::new(s) as Box> + Unpin>; + self.chan_stream = Some(Box::pin(s)); + continue; + } + Ready(Err(e)) => { + self.config_fut = None; + self.data_done = true; + error!("api1_binary_events error {:?}", e); + Ready(Some(Err(Error::with_msg_no_trace("can not parse channel config")))) + } + Pending => Pending, + } + } else { + if self.chan_ix >= self.channels.len() { + self.data_done = true; + continue; + } else { + let channel = self.channels[self.chan_ix].clone(); + self.chan_ix += 1; + self.config_fut = Some(Box::pin(read_local_config( + channel.clone(), + self.node_config.node.clone(), + ))); + continue; + } + } + }; + } + } +} + pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { info!("api1_binary_events headers: {:?}", req.headers()); let accept_def = ""; @@ -502,6 +687,7 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach .to_owned(); let (_head, body) = req.into_parts(); let body_data = hyper::body::to_bytes(body).await?; + info!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec())); let qu: Api1Query = serde_json::from_slice(&body_data)?; info!("got Api1Query: {:?}", qu); let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date); @@ -524,14 +710,29 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach beg: beg_ns, end: end_ns, }; + let backend = "sf-databuffer"; + let chans = qu + .channels + .iter() + .map(|x| Channel { + backend: backend.into(), + name: x.clone(), + }) + .collect(); + if true { + let s = DataApiPython3DataStream::new(range.clone(), chans, node_config.clone()); + let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", "dummy"); + let ret = ret.body(BodyStream::wrapped(s, format!("plain_events")))?; + return Ok(ret); + } // TODO to server multiple channels, I need to wrap the loop over channels in a Stream itself. let channel = qu.channels[0].clone(); let channel = Channel { - backend: "sf-databuffer".into(), + backend: backend.into(), name: channel, }; let channel_config = { - let channel_config = match read_local_config(&channel, &node_config.node).await { + let channel_config = match read_local_config(channel.clone(), node_config.node.clone()).await { Ok(k) => k, Err(e) => { error!("api1_binary_events error {:?}", e); @@ -562,7 +763,6 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach perf_opts, node_config.node_config.cluster.clone(), ); - use futures_util::StreamExt; let s = s.map({ let mut header_out = false; let mut count_events = 0; @@ -621,10 +821,7 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach } d } - _ => { - // - BytesMut::new() - } + _ => BytesMut::new(), }; Ok(f) } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index bcffcd5..604ee91 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -20,6 +20,7 @@ use netpod::{ use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; +use pulsemap::MapPulseHttpFunction; use serde::{Deserialize, Serialize}; use std::{future, net, panic, pin, task}; use task::{Context, Poll}; @@ -30,6 +31,7 @@ use url::Url; pub mod api1; pub mod gather; pub mod proxy; +pub mod pulsemap; pub mod search; fn proxy_mark() -> &'static str { @@ -246,6 +248,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) } + } else if MapPulseHttpFunction::path_matches(path) { + MapPulseHttpFunction::handle(req, &node_config) } else if path.starts_with("/api/1/requestStatus/") { info!("{}", path); Ok(response(StatusCode::OK).body(Body::from("{}"))?) diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs new file mode 100644 index 0000000..cbc93bf --- /dev/null +++ b/httpret/src/pulsemap.rs @@ -0,0 +1,55 @@ +use crate::response; +use err::Error; +use http::{Method, StatusCode}; +use hyper::{Body, Request, Response}; +use netpod::NodeConfigCached; + +pub struct MapPulseHisto { + _pulse: u64, + _tss: Vec, + _counts: Vec, +} + +const MAP_PULSE_HISTO_URL_PREFIX: &'static str = "/api/1/map/pulse/histo/"; +const MAP_PULSE_URL_PREFIX: &'static str = "/api/1/map/pulse/"; + +fn _make_tables() -> Result<(), Error> { + let _sql = "create table if not exists map_pulse_channels (name text, tbmax int)"; + let _sql = "create table if not exists map_pulse_files (channel text not null, split int not null, timebin int not null, closed int not null default 0, pulse_min int8 not null, pulse_max int8 not null)"; + let _sql = "create unique index if not exists map_pulse_files_ix1 on map_pulse_files (channel, split, timebin)"; + err::todoval() +} + +pub struct MapPulseHistoHttpFunction {} + +impl MapPulseHistoHttpFunction { + pub fn path_matches(path: &str) -> bool { + path.starts_with(MAP_PULSE_HISTO_URL_PREFIX) + } + + pub fn handle(req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let urls = format!("{}", req.uri()); + let _pulse: u64 = urls[MAP_PULSE_HISTO_URL_PREFIX.len()..].parse()?; + Ok(response(StatusCode::NOT_IMPLEMENTED).body(Body::empty())?) + } +} + +pub struct MapPulseHttpFunction {} + +impl MapPulseHttpFunction { + pub fn path_matches(path: &str) -> bool { + path.starts_with(MAP_PULSE_URL_PREFIX) + } + + pub fn handle(req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let urls = format!("{}", req.uri()); + let _pulse: u64 = urls[MAP_PULSE_URL_PREFIX.len()..].parse()?; + Ok(response(StatusCode::NOT_IMPLEMENTED).body(Body::empty())?) + } +} diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index e950fa2..bf67919 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -256,7 +256,7 @@ pub fn parse_config(inp: &[u8]) -> NRes { } pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result { - let conf = read_local_config(&q.channel, node).await?; + let conf = read_local_config(q.channel.clone(), node.clone()).await?; let entry_res = extract_matching_config_entry(&q.range, &conf)?; let entry = match entry_res { MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")), @@ -272,7 +272,8 @@ pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result Result { +// TODO can I take parameters as ref, even when used in custom streams? +pub async fn read_local_config(channel: Channel, node: Node) -> Result { let path = node .data_base_path .join("config")