diff --git a/Readme.md b/Readme.md index f1e3218..8800b85 100644 --- a/Readme.md +++ b/Readme.md @@ -150,10 +150,7 @@ Example config: The documentation of the currently running service version is served by the service itself: - - -These docs are found in this repository in the directory: - + # Setup Toolchain diff --git a/apidoc/src/SUMMARY.md b/apidoc/src/SUMMARY.md index 369249a..26f0df8 100644 --- a/apidoc/src/SUMMARY.md +++ b/apidoc/src/SUMMARY.md @@ -1,6 +1,7 @@ # Summary [Introduction](intro.md) +- [Backends](backends.md) - [Search Channels](search.md) - [Binned Data](bins.md) - [Event Data](events.md) diff --git a/apidoc/src/backends.md b/apidoc/src/backends.md new file mode 100644 index 0000000..ba74b3c --- /dev/null +++ b/apidoc/src/backends.md @@ -0,0 +1,7 @@ +# Backends + +The list of available backends can be queried: + +```bash +curl "https://data-api.psi.ch/api/4/backend/list" +``` diff --git a/apidoc/src/events.md b/apidoc/src/events.md index 9e76d55..e6303d2 100644 --- a/apidoc/src/events.md +++ b/apidoc/src/events.md @@ -6,10 +6,6 @@ Event data can be fetched like this: curl "https://data-api.psi.ch/api/4/events?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T12:41:00Z&endDate=2024-02-15T12:42:00Z" ``` -Note: if the channel changes data type within the requested date range, then the -server will return values for that data type which covers the requested -date range best. - Parameters: - `backend`: the backend that the channel exists in, e.g. `sf-databuffer`. - `channelName`: the name of the channel. @@ -17,3 +13,74 @@ Parameters: - `endDate`: end of the time range, exclusive. - `allowLargeResult=true` indicates that the client is prepared to accept also larger responses compared to what might be suitable for a typical browser. + +By default, events are returned as a json object. + +Note: if the channel changes data type within the requested date range, then the +server will return values for that data type which covers the requested +date range best. + +Note: if the server decides that the requested dataset gets "too large" then the response will contain +the key `continueAt` which indicates that the response is incomplete and that the caller should +issue another request with `begDate` as given by `continueAt`. + + +## Events as framed JSON stream + +To download larger amounts of JSON data it recommended to use the `json-framed` content encoding. +Using this encoding, the server can send the requested events as a stream of json objects, where each +json object contains a batch of events. +This content encoding is triggered via the `Accept: application/json-framed` header in the request. + +The returned body looks like: +``` +[JSON-frame] +[JSON-frame] +[JSON-frame] +... etc +``` + +where each `[JSON-frame]` looks like: +``` +[length N of the following JSON object: uint32 little-endian] +[reserved: 12 bytes of zero-padding] +[JSON object: N bytes] +[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0] +``` + + +## Events as framed CBOR stream + +The Concise Binary Object Representation (RFC 8949) can be a more compact option to transfer data. +Usage of the `Accept: application/cbor-framed` header in the request causes the api to return +the data as a stream of CBOR objects. Each CBOR-object will contain a batch of events. + +The http body of the response then looks like this: + +``` +[CBOR-frame] +[CBOR-frame] +[CBOR-frame] +... etc +``` + +where each `[CBOR-frame]` looks like: +``` +[length N of the following CBOR object: uint32 little-endian] +[reserved: 12 bytes of zero-padding] +[CBOR object: N bytes] +[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0] +``` + +Most returned CBOR objects are data objects and look like this in equivalent json notation: +```json +{ + "tss": [1, 2, 3, 4], + "values": [42, 60, 55, 20] +} +``` +where `tss` is the array of timestamps and `values` the corresponding array of values. + +Note: "data" CBOR objects are currently identified by the presence of the `tss` key. There can be +other types of CBOR objects, like log or statistics. +The next update will add a type-tag to discriminate them, but for now, look for the key `tss`. diff --git a/crates/bitshuffle/src/bitshuffle.rs b/crates/bitshuffle/src/bitshuffle.rs index 092ed8f..cc4c08d 100644 --- a/crates/bitshuffle/src/bitshuffle.rs +++ b/crates/bitshuffle/src/bitshuffle.rs @@ -68,3 +68,74 @@ pub fn lz4_decompress(inp: &[u8], out: &mut [u8]) -> Result { Ok(ec as _) } } + +#[cfg(test)] +mod _simd { + use std::arch::x86_64::_mm_loadu_si128; + use std::arch::x86_64::_mm_shuffle_epi32; + use std::ptr::null; + + #[test] + fn simd_test() { + if is_x86_feature_detected!("sse") { + eprintln!("have sse 1"); + } + if is_x86_feature_detected!("sse2") { + eprintln!("have sse 2"); + unsafe { simd_sse_2() }; + } + if is_x86_feature_detected!("sse3") { + eprintln!("have sse 3"); + unsafe { simd_sse_3() }; + } + if is_x86_feature_detected!("sse4.1") { + eprintln!("have sse 4.1"); + unsafe { simd_sse_4_1() }; + } + if is_x86_feature_detected!("sse4.2") { + eprintln!("have sse 4.2"); + unsafe { simd_sse_4_2() }; + } + if is_x86_feature_detected!("sse4a") { + eprintln!("have sse 4 a"); + } + if is_x86_feature_detected!("avx") { + eprintln!("have avx 1"); + } + if is_x86_feature_detected!("avx2") { + eprintln!("have avx 2"); + } + } + + #[target_feature(enable = "sse2")] + unsafe fn simd_sse_2() { + // _mm_loadu_si128(null()); + eprintln!("sse 2 done"); + } + + #[target_feature(enable = "sse3")] + unsafe fn simd_sse_3() { + // core::arch::asm!(); + // core::arch::global_asm!(); + let a = core::arch::x86_64::_mm256_setzero_si256(); + let b = core::arch::x86_64::_mm256_set_epi32(7, 3, 9, 11, 17, 13, 19, 21); + let x = core::arch::x86_64::_mm256_xor_si256(a, b); + core::arch::x86_64::_mm256_loadu_si256(&x as *const _); + // core::arch::x86_64::vpl!(); + // core::arch::x86_64::vps!(); + // let c = core::arch::x86_64::_mm256_shuffle_i32x4(a, b); + eprintln!("sse 3 done"); + } + + #[target_feature(enable = "sse4.1")] + unsafe fn simd_sse_4_1() { + // _mm_loadu_si128(null()); + eprintln!("sse 4.1 done"); + } + + #[target_feature(enable = "sse4.2")] + unsafe fn simd_sse_4_2() { + // _mm_loadu_si128(null()); + eprintln!("sse 4.2 done"); + } +} diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 7dc9c9e..3c51772 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.0" +version = "0.5.1-aa.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/daqbuffer/src/fetch.rs b/crates/daqbuffer/src/fetch.rs index 6ee9127..62f8fdf 100644 --- a/crates/daqbuffer/src/fetch.rs +++ b/crates/daqbuffer/src/fetch.rs @@ -1,3 +1,5 @@ +use err::thiserror; +use err::ThisError; use futures_util::future; use futures_util::StreamExt; use http::header; @@ -5,35 +7,26 @@ use http::Method; use httpclient::body_empty; use httpclient::connect_client; use httpclient::http; +use httpclient::http::StatusCode; +use httpclient::http_body_util::BodyExt; use httpclient::hyper::Request; use httpclient::IncomingStream; use netpod::log::*; use netpod::ScalarType; use netpod::Shape; -use netpod::APP_CBOR_FRAMES; -use std::fmt; +use netpod::APP_CBOR_FRAMED; use streams::cbor::FramedBytesToSitemtyDynEventsStream; use url::Url; -pub struct Error { - msg: String, -} - -impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{}", self.msg) - } -} - -impl From for Error -where - T: fmt::Debug, -{ - fn from(value: T) -> Self { - Self { - msg: format!("{value:?}"), - } - } +#[derive(Debug, ThisError)] +pub enum Error { + Url(#[from] url::ParseError), + NoHostname, + HttpBody(#[from] http::Error), + HttpClient(#[from] httpclient::Error), + Hyper(#[from] httpclient::hyper::Error), + #[error("RequestFailed({0})")] + RequestFailed(String), } pub async fn fetch_cbor(url: &str, scalar_type: ScalarType, shape: Shape) -> Result<(), Error> { @@ -42,17 +35,35 @@ pub async fn fetch_cbor(url: &str, scalar_type: ScalarType, shape: Shape) -> Res let req = Request::builder() .method(Method::GET) .uri(url.to_string()) - .header(header::HOST, url.host_str().ok_or_else(|| "NoHostname")?) - .header(header::ACCEPT, APP_CBOR_FRAMES) + .header(header::HOST, url.host_str().ok_or_else(|| Error::NoHostname)?) + .header(header::ACCEPT, APP_CBOR_FRAMED) .body(body_empty())?; debug!("open connection to {:?}", req.uri()); let mut send_req = connect_client(req.uri()).await?; let res = send_req.send_request(req).await?; let (head, body) = res.into_parts(); + if head.status != StatusCode::OK { + let buf = httpclient::read_body_bytes(body).await?; + let s = String::from_utf8_lossy(&buf); + let e = Error::RequestFailed(format!("request failed {:?} {}", head, s)); + return Err(e); + } debug!("fetch_cbor head {head:?}"); let stream = IncomingStream::new(body); let stream = FramedBytesToSitemtyDynEventsStream::new(stream, scalar_type, shape); - let stream = stream.map(|item| info!("{item:?}")); + let stream = stream + .map(|item| { + info!("{item:?}"); + item + }) + .take_while({ + let mut b = true; + move |item| { + let ret = b; + b = b && item.is_ok(); + future::ready(ret) + } + }); stream.for_each(|_| future::ready(())).await; Ok(()) } diff --git a/crates/dbconn/src/channelconfig.rs b/crates/dbconn/src/channelconfig.rs index 410533e..9859e0d 100644 --- a/crates/dbconn/src/channelconfig.rs +++ b/crates/dbconn/src/channelconfig.rs @@ -58,6 +58,7 @@ pub async fn chconf_best_matching_for_name_and_range( let tsc: DateTime = r.get(0); let series: i64 = r.get(1); let scalar_type: i32 = r.get(2); + // TODO can I get a slice from psql driver? let shape_dims: Vec = r.get(3); let series = series as u64; let _scalar_type = ScalarType::from_scylla_i32(scalar_type)?; @@ -72,12 +73,15 @@ pub async fn chconf_best_matching_for_name_and_range( let ch_conf = chconf_for_series(backend, rows[res].1, ncc).await?; Ok(ch_conf) } else { - let row = res.first().unwrap(); - let name: String = row.get(0); - let series = row.get::<_, i64>(1) as u64; - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(2) as u8)?; + let r = res.first().unwrap(); + let _tsc: DateTime = r.get(0); + let series: i64 = r.get(1); + let scalar_type: i32 = r.get(2); // TODO can I get a slice from psql driver? - let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?; + let shape_dims: Vec = r.get(3); + let series = series as u64; + let scalar_type = ScalarType::from_scylla_i32(scalar_type)?; + let shape = Shape::from_scylla_shape_dims(&shape_dims)?; let ret = ChConf::new(backend, series, scalar_type, shape, name); Ok(ret) } diff --git a/crates/dbconn/src/channelinfo.rs b/crates/dbconn/src/channelinfo.rs new file mode 100644 index 0000000..b952c48 --- /dev/null +++ b/crates/dbconn/src/channelinfo.rs @@ -0,0 +1,66 @@ +use err::thiserror; +use err::ThisError; +use netpod::ScalarType; +use netpod::Shape; +use tokio_postgres::Client; + +#[derive(Debug, ThisError)] +pub enum Error { + Pg(#[from] crate::pg::Error), + BadValue, +} + +pub struct ChannelInfo { + pub series: u64, + pub backend: String, + pub name: String, + pub scalar_type: ScalarType, + pub shape: Shape, + pub kind: u16, +} + +pub async fn info_for_series_ids(series_ids: &[u64], pg: &Client) -> Result, Error> { + let (ord, seriess) = series_ids + .iter() + .enumerate() + .fold((Vec::new(), Vec::new()), |mut a, x| { + a.0.push(x.0 as i32); + a.1.push(*x.1 as i64); + a + }); + let sql = concat!( + "with q1 as (", + " select * from unnest($1, $2) as inp (ord, series)", + ")", + "select q1.ord, q1.series, t.facility, t.channel, t.scalar_type, t.shape_dims, t.kind", + " from q1", + " join series_by_channel t on t.series = q1.series", + " order by q1.ord", + ); + use crate::pg::Type; + let st = pg.prepare_typed(sql, &[Type::INT4_ARRAY, Type::INT8_ARRAY]).await?; + let rows = pg.query(&st, &[&ord, &seriess]).await?; + let mut ret = Vec::new(); + for row in rows { + let series: i64 = row.get(1); + let backend: String = row.get(2); + let channel: String = row.get(3); + let scalar_type: i32 = row.get(4); + let shape_dims: Vec = row.get(5); + let kind: i16 = row.get(6); + let series = series as u64; + let scalar_type = ScalarType::from_scylla_i32(scalar_type).map_err(|_| Error::BadValue)?; + let shape = Shape::from_scylla_shape_dims(&shape_dims).map_err(|_| Error::BadValue)?; + let kind = kind as u16; + let e = ChannelInfo { + series, + backend, + name: channel, + scalar_type, + shape, + kind, + }; + ret.push(e); + } + Ok(ret) +} diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 35782d3..940f842 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -1,9 +1,11 @@ pub mod channelconfig; +pub mod channelinfo; pub mod query; pub mod scan; pub mod search; pub mod pg { + pub use tokio_postgres::types::Type; pub use tokio_postgres::Client; pub use tokio_postgres::Error; pub use tokio_postgres::NoTls; diff --git a/crates/httpclient/Cargo.toml b/crates/httpclient/Cargo.toml index 91cd5d6..58f046a 100644 --- a/crates/httpclient/Cargo.toml +++ b/crates/httpclient/Cargo.toml @@ -15,7 +15,7 @@ http = "1.0.0" http-body = "1.0.0" http-body-util = "0.1.0" hyper = { version = "1.0.1", features = ["http1", "http2", "client", "server"] } -hyper-tls = { version = "0.6.0" } +#hyper-tls = { version = "0.6.0" } hyper-util = { version = "0.1.1", features = ["full"] } bytes = "1.5.0" async-channel = "1.9.0" diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index aa569b8..091a364 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -194,6 +194,8 @@ pub enum Error { Http, } +impl std::error::Error for Error {} + impl From for Error { fn from(_: std::io::Error) -> Self { Self::IO @@ -324,13 +326,9 @@ pub async fn connect_client(uri: &http::Uri) -> Result, pub async fn read_body_bytes(mut body: hyper::body::Incoming) -> Result { let mut buf = BytesMut::new(); while let Some(x) = body.frame().await { - match x { - Ok(mut x) => { - if let Some(x) = x.data_mut() { - buf.put(x); - } - } - Err(e) => return Err(e.into()), + let mut frame = x?; + if let Some(x) = frame.data_mut() { + buf.put(x); } } Ok(buf.freeze()) diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs index f478a6a..c96ec7d 100644 --- a/crates/httpret/src/api4/accounting.rs +++ b/crates/httpret/src/api4/accounting.rs @@ -20,6 +20,9 @@ use netpod::req_uri_to_url; use netpod::FromUrl; use netpod::NodeConfigCached; use query::api4::AccountingIngestedBytesQuery; +use query::api4::AccountingToplistQuery; +use serde::Deserialize; +use serde::Serialize; pub struct AccountingIngestedBytes {} @@ -73,7 +76,7 @@ impl AccountingIngestedBytes { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::conn::create_scy_session(scyco).await?; - let mut stream = scyllaconn::accounting::AccountingStreamScylla::new(q.range().try_into()?, scy); + let mut stream = scyllaconn::accounting::totals::AccountingStreamScylla::new(q.range().try_into()?, scy); let mut ret = AccountingEvents::empty(); while let Some(item) = stream.next().await { let mut item = item?; @@ -82,3 +85,83 @@ impl AccountingIngestedBytes { Ok(ret) } } + +#[derive(Debug, Serialize, Deserialize)] +pub struct Toplist { + toplist: Vec<(String, u64, u64)>, +} + +pub struct AccountingToplistCounts {} + +impl AccountingToplistCounts { + pub fn handler(req: &Requ) -> Option { + if req.uri().path().starts_with("/api/4/accounting/toplist/counts") { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { + if req.method() == Method::GET { + if accepts_json_or_all(req.headers()) { + match self.handle_get(req, ctx, ncc).await { + Ok(x) => Ok(x), + Err(e) => { + error!("{e}"); + let e2 = e.to_public_error(); + let s = serde_json::to_string(&e2)?; + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(s))?) + } + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) + } + } + + async fn handle_get(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { + let url = req_uri_to_url(req.uri())?; + let qu = AccountingToplistQuery::from_url(&url)?; + let res = self.fetch_data(qu, ctx, ncc).await?; + let body = ToJsonBody::from(&res).into_body(); + Ok(response(StatusCode::OK).body(body)?) + } + + async fn fetch_data( + &self, + qu: AccountingToplistQuery, + _ctx: &ReqCtx, + ncc: &NodeConfigCached, + ) -> Result { + let scyco = ncc + .node_config + .cluster + .scylla + .as_ref() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; + let pgconf = &ncc.node_config.cluster.database; + let pg = dbconn::create_connection(&pgconf).await?; + let mut top1 = scyllaconn::accounting::toplist::read_ts(qu.ts().0, scy).await?; + top1.sort_by_bytes(); + let mut ret = Toplist { toplist: Vec::new() }; + let series_ids: Vec<_> = top1.usage().iter().take(qu.limit() as _).map(|x| x.0).collect(); + let infos = dbconn::channelinfo::info_for_series_ids(&series_ids, &pg) + .await + .map_err(Error::from_to_string)?; + let mut it = top1.usage().iter(); + for info in infos { + let h = it.next().ok_or_else(|| Error::with_msg_no_trace("logic error"))?; + if info.series != h.0 { + let e = Error::with_msg_no_trace(format!("mismatch {} != {}", info.series, h.0)); + warn!("{e}"); + return Err(e); + } + ret.toplist.push((info.name, h.1, h.2)); + } + Ok(ret) + } +} diff --git a/crates/httpret/src/api4/docs.rs b/crates/httpret/src/api4/docs.rs index 13f14bf..61e9aea 100644 --- a/crates/httpret/src/api4/docs.rs +++ b/crates/httpret/src/api4/docs.rs @@ -72,6 +72,7 @@ fn extract_all_files() -> Contents { } } +// . fn blob() -> &'static [u8] { include_bytes!(concat!("../../../../apidoc/book.cbor")) } @@ -84,7 +85,7 @@ impl DocsHandler { } pub fn handler(req: &Requ) -> Option { - if req.uri().path().starts_with(Self::path_prefix()) { + if req.uri().path().starts_with(Self::path_prefix()) || req.uri().path().starts_with("/api/4/documentation") { Some(Self {}) } else { None @@ -93,6 +94,13 @@ impl DocsHandler { pub async fn handle(&self, req: Requ, _ctx: &ReqCtx) -> Result { let path = req.uri().path(); + if path.starts_with("/api/4/documentation") { + let ret = http::Response::builder() + .status(StatusCode::TEMPORARY_REDIRECT) + .header(http::header::LOCATION, "/api/4/docs/") + .body(body_empty())?; + return Ok(ret); + } if path == "/api/4/docs" { let ret = http::Response::builder() .status(StatusCode::TEMPORARY_REDIRECT) diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 28d1842..ae66812 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -1,7 +1,8 @@ use crate::bodystream::response_err_msg; use crate::channelconfig::chconf_from_events_quorum; use crate::err::Error; -use crate::requests::accepts_cbor_frames; +use crate::requests::accepts_cbor_framed; +use crate::requests::accepts_json_framed; use crate::requests::accepts_json_or_all; use crate::response; use crate::ToPublicResponse; @@ -9,6 +10,7 @@ use bytes::Bytes; use bytes::BytesMut; use futures_util::future; use futures_util::stream; +use futures_util::Stream; use futures_util::StreamExt; use http::Method; use http::StatusCode; @@ -59,8 +61,10 @@ impl EventsHandler { async fn plain_events(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { let url = req_uri_to_url(req.uri())?; - if accepts_cbor_frames(req.headers()) { - Ok(plain_events_cbor(url, req, ctx, node_config).await?) + if accepts_cbor_framed(req.headers()) { + Ok(plain_events_cbor_framed(url, req, ctx, node_config).await?) + } else if accepts_json_framed(req.headers()) { + Ok(plain_events_json_framed(url, req, ctx, node_config).await?) } else if accepts_json_or_all(req.headers()) { Ok(plain_events_json(url, req, ctx, node_config).await?) } else { @@ -69,32 +73,62 @@ async fn plain_events(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) - } } -async fn plain_events_cbor(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { +async fn plain_events_cbor_framed( + url: Url, + req: Requ, + ctx: &ReqCtx, + ncc: &NodeConfigCached, +) -> Result { let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; let ch_conf = chconf_from_events_quorum(&evq, ctx, ncc) .await? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; - info!("plain_events_cbor chconf_from_events_quorum: {ch_conf:?} {req:?}"); + info!("plain_events_cbor_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let stream = streams::plaineventscbor::plain_events_cbor(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; + let stream = streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; use future::ready; let stream = stream .flat_map(|x| match x { Ok(y) => { use bytes::BufMut; let buf = y.into_inner(); - let mut b2 = BytesMut::with_capacity(8); + let adv = (buf.len() + 7) / 8 * 8; + let pad = adv - buf.len(); + let mut b2 = BytesMut::with_capacity(16); b2.put_u32_le(buf.len() as u32); - stream::iter([Ok::<_, Error>(b2.freeze()), Ok(buf)]) + b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); + let mut b3 = BytesMut::with_capacity(16); + b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); + stream::iter([Ok::<_, Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())]) + } + Err(e) => { + let e = Error::with_msg_no_trace(e.to_string()); + stream::iter([Err(e), Ok(Bytes::new()), Ok(Bytes::new())]) } - // TODO handle other cases - _ => stream::iter([Ok(Bytes::new()), Ok(Bytes::new())]), }) .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }); let ret = response(StatusCode::OK).body(body_stream(stream))?; Ok(ret) } +async fn plain_events_json_framed( + url: Url, + req: Requ, + ctx: &ReqCtx, + ncc: &NodeConfigCached, +) -> Result { + let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; + let ch_conf = chconf_from_events_quorum(&evq, ctx, ncc) + .await? + .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; + info!("plain_events_json_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); + let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); + let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; + let stream = bytes_chunks_to_framed(stream); + let ret = response(StatusCode::OK).body(body_stream(stream))?; + Ok(ret) +} + async fn plain_events_json( url: Url, req: Requ, @@ -133,3 +167,32 @@ async fn plain_events_json( info!("{self_name} response created"); Ok(ret) } + +fn bytes_chunks_to_framed(stream: S) -> impl Stream> +where + S: Stream>, + T: Into, +{ + use future::ready; + stream + // TODO unify this map to padded bytes for both json and cbor output + .flat_map(|x| match x { + Ok(y) => { + use bytes::BufMut; + let buf = y.into(); + let adv = (buf.len() + 7) / 8 * 8; + let pad = adv - buf.len(); + let mut b2 = BytesMut::with_capacity(16); + b2.put_u32_le(buf.len() as u32); + b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); + let mut b3 = BytesMut::with_capacity(16); + b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); + stream::iter([Ok::<_, Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())]) + } + Err(e) => { + let e = Error::with_msg_no_trace(e.to_string()); + stream::iter([Err(e), Ok(Bytes::new()), Ok(Bytes::new())]) + } + }) + .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) +} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 9af3654..dd4e8cc 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -105,6 +105,7 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?; + // tokio::net::TcpSocket::new_v4()?.listen(200)? let listener = TcpListener::bind(bind_addr).await?; loop { let (stream, addr) = if let Ok(x) = listener.accept().await { @@ -323,6 +324,8 @@ async fn http_service_inner( Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { Ok(h.handle(req, &node_config).await?) + } else if let Some(h) = api4::accounting::AccountingToplistCounts::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = api4::accounting::AccountingIngestedBytes::handler(&req) { Ok(h.handle(req, ctx, &node_config).await?) } else if path == "/api/4/prebinned" { diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 716930d..c9aec73 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -190,6 +190,8 @@ async fn proxy_http_service_inner( h.handle(req, ctx, &proxy_config, service_version).await } else if path == "/api/4/backends" { Ok(backends(req, proxy_config).await?) + } else if let Some(h) = api4::backend::BackendListHandler::handler(&req) { + h.handle(req, ctx, &proxy_config).await } else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) { h.handle(req, ctx, &proxy_config).await } else if let Some(h) = api4::events::EventsHandler::handler(&req) { @@ -499,19 +501,31 @@ pub async fn proxy_backend_query( where QT: fmt::Debug + FromUrl + AppendToUrl + HasBackend + HasTimeout, { - let (head, _body) = req.into_parts(); - // TODO will we need some mechanism to modify the necessary url? - let url = req_uri_to_url(&head.uri)?; - let query = match QT::from_url(&url) { + let url = req_uri_to_url(req.uri())?; + let mut query = match QT::from_url(&url) { Ok(k) => k, Err(_) => { - let msg = format!("malformed request or missing parameters {head:?}"); + let msg = format!("malformed request or missing parameters {:?}", req.uri()); warn!("{msg}"); return Ok(response_err_msg(StatusCode::BAD_REQUEST, msg)?); } }; - debug!("proxy_backend_query {query:?} {head:?}"); - let query_host = get_query_host_for_backend(&query.backend(), proxy_config)?; + debug!("proxy_backend_query {:?} {:?}", query, req.uri()); + let timeout = query.timeout(); + let timeout_next = timeout.saturating_sub(Duration::from_millis(1000)); + debug!("timeout {timeout:?} timeout_next {timeout_next:?}"); + query.set_timeout(timeout_next); + let query = query; + let backend = query.backend(); + let uri_path = proxy_backend_query_helper_uri_path(req.uri().path(), &url); + debug!("uri_path {uri_path}"); + let query_host = get_query_host_for_backend(backend, proxy_config)?; + let mut url = Url::parse(&format!("{}{}", query_host, uri_path))?; + query.append_to_url(&mut url); + proxy_backend_query_inner(req.headers(), url, timeout, ctx, proxy_config).await +} + +fn proxy_backend_query_helper_uri_path(path: &str, url: &Url) -> String { // TODO remove this special case // SPECIAL CASE: // Since the inner proxy is not yet handling map-pulse requests without backend, @@ -519,7 +533,7 @@ where // Instead, url needs to get parsed and formatted. // In general, the caller of this function should be able to provide a url, or maybe // better a closure so that the url can even depend on backend. - let uri_path: String = if url.as_str().contains("/map/pulse/") { + if url.as_str().contains("/map/pulse/") { match MapPulseQuery::from_url(&url) { Ok(qu) => { debug!("qu {qu:?}"); @@ -531,26 +545,30 @@ where } } } else { - head.uri.path().into() - }; - debug!("uri_path {uri_path}"); - let mut url = Url::parse(&format!("{}{}", query_host, uri_path))?; - query.append_to_url(&mut url); + path.into() + } +} - let mut req = Request::builder() +pub async fn proxy_backend_query_inner( + headers: &http::HeaderMap, + url: Url, + timeout: Duration, + ctx: &ReqCtx, + _proxy_config: &ProxyConfig, +) -> Result { + let host = url + .host_str() + .ok_or_else(|| Error::with_msg_no_trace("no host in url"))?; + let mut req2 = Request::builder() .method(http::Method::GET) .uri(url.to_string()) - .header( - header::HOST, - url.host_str() - .ok_or_else(|| Error::with_msg_no_trace("no host in url"))?, - ) + .header(header::HOST, host) .header(X_DAQBUF_REQID, ctx.reqid()); { - let hs = req + let hs = req2 .headers_mut() .ok_or_else(|| Error::with_msg_no_trace("can not set headers"))?; - for (hn, hv) in &head.headers { + for (hn, hv) in headers { if hn == header::HOST { } else if hn == X_DAQBUF_REQID { } else { @@ -558,23 +576,27 @@ where } } } - debug!("send request {req:?}"); - let req = req.body(body_empty())?; + + debug!("send request {req2:?}"); + let req2 = req2.body(body_empty())?; let fut = async move { - debug!("requesting {:?} {:?} {:?}", req.method(), req.uri(), req.headers()); - let mut send_req = httpclient::httpclient::connect_client(req.uri()).await?; - let res = send_req.send_request(req).await?; + debug!( + "requesting {:?} {:?} {:?}", + req2.method(), + req2.uri(), + req2.headers() + ); + let mut send_req = httpclient::httpclient::connect_client(req2.uri()).await?; + let res = send_req.send_request(req2).await?; Ok::<_, Error>(res) }; - let res = tokio::time::timeout(Duration::from_millis(5000), fut) - .await - .map_err(|_| { - let e = Error::with_msg_no_trace(format!("timeout trying to make sub request")); - warn!("{e}"); - e - })??; + let res = tokio::time::timeout(timeout, fut).await.map_err(|_| { + let e = Error::with_msg_no_trace(format!("timeout trying to make sub request")); + warn!("{e}"); + e + })??; { use bytes::Bytes; diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 5668d9a..8d5244c 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -1,3 +1,4 @@ +pub mod backend; pub mod caioclookup; pub mod events; diff --git a/crates/httpret/src/proxy/api4/backend.rs b/crates/httpret/src/proxy/api4/backend.rs new file mode 100644 index 0000000..7b7854b --- /dev/null +++ b/crates/httpret/src/proxy/api4/backend.rs @@ -0,0 +1,39 @@ +use crate::bodystream::response; +use crate::err::Error; +use crate::requests::accepts_json_or_all; +use http::Method; +use http::StatusCode; +use httpclient::body_empty; +use httpclient::body_string; +use httpclient::Requ; +use httpclient::StreamResponse; +use netpod::ProxyConfig; +use netpod::ReqCtx; + +pub struct BackendListHandler {} + +impl BackendListHandler { + pub fn handler(req: &Requ) -> Option { + if req.uri().path() == "/api/4/backend/list" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, _node_config: &ProxyConfig) -> Result { + if req.method() == Method::GET { + if accepts_json_or_all(req.headers()) { + let res = serde_json::json!({ + "backends_available": ["sf-databuffer"] + }); + let body = serde_json::to_string(&res)?; + Ok(response(StatusCode::OK).body(body_string(body))?) + } else { + Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) + } + } +} diff --git a/crates/httpret/src/proxy/api4/events.rs b/crates/httpret/src/proxy/api4/events.rs index 963e9c7..f3a6b4e 100644 --- a/crates/httpret/src/proxy/api4/events.rs +++ b/crates/httpret/src/proxy/api4/events.rs @@ -1,8 +1,8 @@ use crate::bodystream::response; -use crate::bodystream::response_err_msg; use crate::err::Error; use crate::proxy::get_query_host_for_backend; -use crate::requests::accepts_cbor_frames; +use crate::requests::accepts_cbor_framed; +use crate::requests::accepts_json_framed; use crate::requests::accepts_json_or_all; use crate::ReqCtx; use http::header; @@ -22,7 +22,8 @@ use netpod::req_uri_to_url; use netpod::FromUrl; use netpod::HasBackend; use netpod::ProxyConfig; -use netpod::APP_CBOR; +use netpod::APP_CBOR_FRAMED; +use netpod::APP_JSON_FRAMED; use query::api4::events::PlainEventsQuery; pub struct EventsHandler {} @@ -44,14 +45,10 @@ impl EventsHandler { if req.method() != Method::GET { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } else { - if accepts_cbor_frames(req.headers()) { - // self.handle_cbor(req, ctx, proxy_config).await - // Ok(crate::proxy::proxy_backend_query::(req, ctx, proxy_config).await?) - warn!("TODO enabe cbor endpoint"); - Ok(response_err_msg( - StatusCode::INTERNAL_SERVER_ERROR, - format!("cbor endpoint currently disabled"), - )?) + if accepts_cbor_framed(req.headers()) { + self.handle_framed(req, APP_CBOR_FRAMED, ctx, proxy_config).await + } else if accepts_json_framed(req.headers()) { + self.handle_framed(req, APP_JSON_FRAMED, ctx, proxy_config).await } else if accepts_json_or_all(req.headers()) { Ok(crate::proxy::proxy_backend_query::(req, ctx, proxy_config).await?) } else { @@ -60,26 +57,33 @@ impl EventsHandler { } } - async fn handle_cbor(&self, req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result { + async fn handle_framed( + &self, + req: Requ, + accept: &str, + ctx: &ReqCtx, + proxy_config: &ProxyConfig, + ) -> Result { let (head, _body) = req.into_parts(); let url = req_uri_to_url(&head.uri)?; let pairs = get_url_query_pairs(&url); let evq = PlainEventsQuery::from_pairs(&pairs)?; - info!("{evq:?}"); - // Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) - - // get_random_query_host_for_backend + debug!("{evq:?}"); let query_host = get_query_host_for_backend(evq.backend(), proxy_config)?; - - // TODO this ignores the fragment - let url_str = format!("{}{}", query_host, head.uri.path_and_query().unwrap()); - info!("try to contact {url_str}"); - + let url_str = format!( + "{}{}", + query_host, + head.uri + .path_and_query() + .ok_or_else(|| Error::with_msg_no_trace("uri contains no path"))? + ); + debug!("try to contact {url_str}"); let uri: Uri = url_str.parse()?; + let host = uri.host().ok_or_else(|| Error::with_msg_no_trace("no host in url"))?; let req = Request::builder() .method(Method::GET) - .header(header::HOST, uri.host().unwrap()) - .header(header::ACCEPT, APP_CBOR) + .header(header::HOST, host) + .header(header::ACCEPT, accept) .header(ctx.header_name(), ctx.header_value()) .uri(&uri) .body(body_empty())?; @@ -87,10 +91,10 @@ impl EventsHandler { let res = client.send_request(req).await?; let (head, body) = res.into_parts(); if head.status != StatusCode::OK { - error!("backend returned error: {head:?}"); + warn!("backend returned error: {head:?}"); Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) } else { - info!("backend returned OK"); + debug!("backend returned OK"); Ok(response(StatusCode::OK).body(body_stream(StreamIncoming::new(body)))?) } } diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 74926a3..0db75c8 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -845,6 +845,11 @@ impl HasTimeout for MapPulseQuery { fn timeout(&self) -> Duration { MAP_PULSE_QUERY_TIMEOUT } + + fn set_timeout(&mut self, timeout: Duration) { + // TODO + // self.timeout = Some(timeout); + } } impl FromUrl for MapPulseQuery { diff --git a/crates/httpret/src/requests.rs b/crates/httpret/src/requests.rs index cc72127..ef231cb 100644 --- a/crates/httpret/src/requests.rs +++ b/crates/httpret/src/requests.rs @@ -1,30 +1,28 @@ use httpclient::http::header; use httpclient::http::header::HeaderMap; use netpod::ACCEPT_ALL; -use netpod::APP_CBOR_FRAMES; +use netpod::APP_CBOR_FRAMED; use netpod::APP_JSON; +use netpod::APP_JSON_FRAMED; use netpod::APP_OCTET; pub fn accepts_json_or_all(headers: &HeaderMap) -> bool { - let accept_def = APP_JSON; - let accept = headers - .get(header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) + let h = get_accept_or(APP_JSON, headers); + h.contains(APP_JSON) || h.contains(ACCEPT_ALL) } pub fn accepts_octets(headers: &HeaderMap) -> bool { - let accept_def = ""; - let accept = headers - .get(header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - accept.contains(APP_OCTET) + get_accept_or("", headers).contains(APP_OCTET) } -pub fn accepts_cbor_frames(headers: &HeaderMap) -> bool { - let accept_def = ""; - let accept = headers - .get(header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - accept.contains(APP_CBOR_FRAMES) +pub fn accepts_cbor_framed(headers: &HeaderMap) -> bool { + get_accept_or("", headers).contains(APP_CBOR_FRAMED) +} + +pub fn accepts_json_framed(headers: &HeaderMap) -> bool { + get_accept_or("", headers).contains(APP_JSON_FRAMED) +} + +fn get_accept_or<'a>(def: &'a str, headers: &'a HeaderMap) -> &'a str { + headers.get(header::ACCEPT).map_or(def, |k| k.to_str().unwrap_or(def)) } diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index 313929c..b0a1c7b 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -149,6 +149,7 @@ pub trait Events: fn pulses(&self) -> &VecDeque; fn frame_type_id(&self) -> u32; fn to_min_max_avg(&mut self) -> Box; + fn to_json_vec_u8(&self) -> Vec; fn to_cbor_vec_u8(&self) -> Vec; } @@ -269,6 +270,10 @@ impl Events for Box { Events::to_min_max_avg(self.as_mut()) } + fn to_json_vec_u8(&self) -> Vec { + Events::to_json_vec_u8(self.as_ref()) + } + fn to_cbor_vec_u8(&self) -> Vec { Events::to_cbor_vec_u8(self.as_ref()) } diff --git a/crates/items_0/src/scalar_ops.rs b/crates/items_0/src/scalar_ops.rs index bbe3c00..433d56d 100644 --- a/crates/items_0/src/scalar_ops.rs +++ b/crates/items_0/src/scalar_ops.rs @@ -169,8 +169,9 @@ fn equal_string(a: &String, b: &String) -> bool { a == b } -fn add_int(a: &mut T, b: &T) { - ops::AddAssign::add_assign(a, todo!()); +fn _add_int(a: &mut T, b: &T) { + let _ = b; + ops::AddAssign::add_assign(a, err::todoval()); } macro_rules! add_int { @@ -193,22 +194,36 @@ macro_rules! add_string { macro_rules! div_int { ($a:expr, $b:expr) => { + // TODO what is this used for? + // TODO for average calculation, the accumulator must be large enough! // Use u64 for all ints, and f32 for all floats. // Therefore, the name "add" is too general. //*$a /= $b; + { + let _ = $a; + let _ = $b; + } }; } macro_rules! div_bool { ($a:expr, $b:expr) => { - // + // TODO what is this used for? + { + let _ = $a; + let _ = $b; + } }; } macro_rules! div_string { ($a:expr, $b:expr) => { - // + // TODO what is this used for? + { + let _ = $a; + let _ = $b; + } }; } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index 4058720..ea3f249 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -915,6 +915,16 @@ impl Events for ChannelEvents { } } + fn to_json_vec_u8(&self) -> Vec { + match self { + ChannelEvents::Events(item) => item.to_json_vec_u8(), + ChannelEvents::Status(item) => { + error!("TODO convert status to json"); + Vec::new() + } + } + } + fn to_cbor_vec_u8(&self) -> Vec { match self { ChannelEvents::Events(item) => item.to_cbor_vec_u8(), diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index fe1dbd0..5fce3de 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -1,6 +1,5 @@ use crate::framable::FrameType; use crate::merger::Mergeable; -use bitshuffle::bitshuffle_decompress; use bytes::BytesMut; use err::thiserror; use err::ThisError; @@ -281,11 +280,12 @@ fn decompress(databuf: &[u8], type_size: u32) -> Result, DecompError> { ele_count_2, ele_count_exp ); - let mut decomp = Vec::with_capacity(type_size as usize * ele_count as usize); + let mut decomp: Vec = Vec::with_capacity(type_size as usize * ele_count as usize); unsafe { decomp.set_len(decomp.capacity()); } - match bitshuffle_decompress(&databuf[12..], &mut decomp, ele_count as _, type_size as _, 0) { + // #[cfg(DISABLED)] + match bitshuffle::bitshuffle_decompress(&databuf[12..], &mut decomp, ele_count as _, type_size as _, 0) { Ok(c1) => { if 12 + c1 != databuf.len() { Err(DecompError::UnusedBytes) @@ -299,6 +299,7 @@ fn decompress(databuf: &[u8], type_size: u32) -> Result, DecompError> { } Err(_) => Err(DecompError::BitshuffleError), } + // todo!("bitshuffle not available") } impl EventFull { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 24a2f6a..e3d864d 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -965,6 +965,17 @@ impl Events for EventsDim0 { Box::new(dst) } + fn to_json_vec_u8(&self) -> Vec { + let ret = EventsDim0ChunkOutput { + // TODO use &mut to swap the content + tss: self.tss.clone(), + pulses: self.pulses.clone(), + values: self.values.clone(), + scalar_type: STY::scalar_type_name().into(), + }; + serde_json::to_vec(&ret).unwrap() + } + fn to_cbor_vec_u8(&self) -> Vec { let ret = EventsDim0ChunkOutput { // TODO use &mut to swap the content @@ -995,7 +1006,7 @@ impl EventsDim0TimeBinner { } pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { - trace!("{}::new binrange {binrange:?}", Self::type_name()); + trace!("{}::new binrange {:?}", Self::type_name(), binrange); let rng = binrange .range_at(0) .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index ea4b853..8671b31 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -48,22 +48,22 @@ macro_rules! trace2 { } #[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsDim1 { +pub struct EventsDim1 { pub tss: VecDeque, pub pulses: VecDeque, - pub values: VecDeque>, + pub values: VecDeque>, } -impl EventsDim1 { +impl EventsDim1 { #[inline(always)] - pub fn push(&mut self, ts: u64, pulse: u64, value: Vec) { + pub fn push(&mut self, ts: u64, pulse: u64, value: Vec) { self.tss.push_back(ts); self.pulses.push_back(pulse); self.values.push_back(value); } #[inline(always)] - pub fn push_front(&mut self, ts: u64, pulse: u64, value: Vec) { + pub fn push_front(&mut self, ts: u64, pulse: u64, value: Vec) { self.tss.push_front(ts); self.pulses.push_front(pulse); self.values.push_front(value); @@ -78,25 +78,25 @@ impl EventsDim1 { } } -impl AsAnyRef for EventsDim1 +impl AsAnyRef for EventsDim1 where - NTY: ScalarOps, + STY: ScalarOps, { fn as_any_ref(&self) -> &dyn Any { self } } -impl AsAnyMut for EventsDim1 +impl AsAnyMut for EventsDim1 where - NTY: ScalarOps, + STY: ScalarOps, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } -impl Empty for EventsDim1 { +impl Empty for EventsDim1 { fn empty() -> Self { Self { tss: VecDeque::new(), @@ -106,9 +106,9 @@ impl Empty for EventsDim1 { } } -impl fmt::Debug for EventsDim1 +impl fmt::Debug for EventsDim1 where - NTY: fmt::Debug, + STY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { if false { @@ -133,7 +133,7 @@ where } } -impl WithLen for EventsDim1 { +impl WithLen for EventsDim1 { fn len(&self) -> usize { self.tss.len() } @@ -167,13 +167,13 @@ impl HasTimestampDeque for EventsDim1 { items_0::impl_range_overlap_info_events!(EventsDim1); -impl TimeBinnableType for EventsDim1 +impl TimeBinnableType for EventsDim1 where - NTY: ScalarOps, + STY: ScalarOps, { // TODO - type Output = BinsDim0; - type Aggregator = EventsDim1Aggregator; + type Output = BinsDim0; + type Aggregator = EventsDim1Aggregator; fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { let self_name = std::any::type_name::(); @@ -313,38 +313,38 @@ impl EventsDim1CollectorOutput { } } -impl AsAnyRef for EventsDim1CollectorOutput +impl AsAnyRef for EventsDim1CollectorOutput where - NTY: ScalarOps, + STY: ScalarOps, { fn as_any_ref(&self) -> &dyn Any { self } } -impl AsAnyMut for EventsDim1CollectorOutput +impl AsAnyMut for EventsDim1CollectorOutput where - NTY: ScalarOps, + STY: ScalarOps, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } -impl WithLen for EventsDim1CollectorOutput { +impl WithLen for EventsDim1CollectorOutput { fn len(&self) -> usize { self.values.len() } } -impl ToJsonResult for EventsDim1CollectorOutput { +impl ToJsonResult for EventsDim1CollectorOutput { fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } } -impl Collected for EventsDim1CollectorOutput {} +impl Collected for EventsDim1CollectorOutput {} impl CollectorType for EventsDim1Collector { type Input = EventsDim1; @@ -442,23 +442,23 @@ impl CollectableType for EventsDim1 { } #[derive(Debug)] -pub struct EventsDim1Aggregator { +pub struct EventsDim1Aggregator { range: SeriesRange, count: u64, - min: NTY, - max: NTY, + min: STY, + max: STY, sumc: u64, sum: f32, int_ts: u64, last_seen_ts: u64, - last_seen_val: Option, + last_seen_val: Option, did_min_max: bool, do_time_weight: bool, events_taken_count: u64, events_ignored_count: u64, } -impl Drop for EventsDim1Aggregator { +impl Drop for EventsDim1Aggregator { fn drop(&mut self) { // TODO collect as stats for the request context: trace!( @@ -469,7 +469,7 @@ impl Drop for EventsDim1Aggregator { } } -impl EventsDim1Aggregator { +impl EventsDim1Aggregator { pub fn new(range: SeriesRange, do_time_weight: bool) -> Self { /*let int_ts = range.beg; Self { @@ -491,7 +491,7 @@ impl EventsDim1Aggregator { } // TODO reduce clone.. optimize via more traits to factor the trade-offs? - fn apply_min_max(&mut self, val: NTY) { + fn apply_min_max(&mut self, val: STY) { trace!( "apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}", val, @@ -515,7 +515,7 @@ impl EventsDim1Aggregator { } } - fn apply_event_unweight(&mut self, val: NTY) { + fn apply_event_unweight(&mut self, val: STY) { trace!("TODO check again result_reset_unweight"); err::todo(); let vf = val.as_prim_f32_b(); @@ -619,7 +619,7 @@ impl EventsDim1Aggregator { todo!() } - fn result_reset_unweight(&mut self, range: SeriesRange, _expand: bool) -> BinsDim0 { + fn result_reset_unweight(&mut self, range: SeriesRange, _expand: bool) -> BinsDim0 { /*trace!("TODO check again result_reset_unweight"); err::todo(); let (min, max, avg) = if self.sumc > 0 { @@ -650,7 +650,7 @@ impl EventsDim1Aggregator { todo!() } - fn result_reset_time_weight(&mut self, range: SeriesRange, expand: bool) -> BinsDim0 { + fn result_reset_time_weight(&mut self, range: SeriesRange, expand: bool) -> BinsDim0 { // TODO check callsite for correct expand status. /*if expand { debug!("result_reset_time_weight calls apply_event_time_weight"); @@ -689,9 +689,9 @@ impl EventsDim1Aggregator { } } -impl TimeBinnableTypeAggregator for EventsDim1Aggregator { - type Input = EventsDim1; - type Output = BinsDim0; +impl TimeBinnableTypeAggregator for EventsDim1Aggregator { + type Input = EventsDim1; + type Output = BinsDim0; fn range(&self) -> &SeriesRange { &self.range @@ -724,9 +724,9 @@ impl TimeBinnableTypeAggregator for EventsDim1Aggregator { } } -impl TimeBinnable for EventsDim1 { +impl TimeBinnable for EventsDim1 { fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { - let ret = EventsDim1TimeBinner::::new(binrange, do_time_weight).unwrap(); + let ret = EventsDim1TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } @@ -938,6 +938,17 @@ impl Events for EventsDim1 { Box::new(item) } + fn to_json_vec_u8(&self) -> Vec { + let ret = EventsDim1ChunkOutput { + // TODO use &mut to swap the content + tss: self.tss.clone(), + pulses: self.pulses.clone(), + values: self.values.clone(), + scalar_type: STY::scalar_type_name().into(), + }; + serde_json::to_vec(&ret).unwrap() + } + fn to_cbor_vec_u8(&self) -> Vec { let ret = EventsDim1ChunkOutput { // TODO use &mut to swap the content @@ -953,14 +964,18 @@ impl Events for EventsDim1 { } #[derive(Debug)] -pub struct EventsDim1TimeBinner { +pub struct EventsDim1TimeBinner { edges: VecDeque, - agg: EventsDim1Aggregator, - ready: Option< as TimeBinnableTypeAggregator>::Output>, + agg: EventsDim1Aggregator, + ready: Option< as TimeBinnableTypeAggregator>::Output>, range_complete: bool, } -impl EventsDim1TimeBinner { +impl EventsDim1TimeBinner { + fn type_name() -> &'static str { + any::type_name::() + } + fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { /*if edges.len() < 2 { return Err(Error::with_msg_no_trace(format!("need at least 2 edges"))); @@ -981,6 +996,22 @@ impl EventsDim1TimeBinner { range_complete: false, }; Ok(ret)*/ + + // trace!("{}::new binrange {:?}", Self::type_name(), binrange); + // let rng = binrange + // .range_at(0) + // .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; + // let agg = EventsDim0Aggregator::new(rng, do_time_weight); + // let ret = Self { + // binrange, + // rix: 0, + // rng: Some(agg.range().clone()), + // agg, + // ready: None, + // range_final: false, + // }; + // Ok(ret) + todo!() } @@ -1003,7 +1034,7 @@ impl EventsDim1TimeBinner { } } -impl TimeBinner for EventsDim1TimeBinner { +impl TimeBinner for EventsDim1TimeBinner { fn bins_ready_count(&self) -> usize { match &self.ready { Some(k) => k.len(), @@ -1152,7 +1183,7 @@ impl TimeBinner for EventsDim1TimeBinner { } fn empty(&self) -> Box { - let ret = as TimeBinnableTypeAggregator>::Output::empty(); + let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index 2c63b24..4b16541 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -370,6 +370,10 @@ impl Events for EventsXbinDim0 { Box::new(dst) } + fn to_json_vec_u8(&self) -> Vec { + todo!() + } + fn to_cbor_vec_u8(&self) -> Vec { todo!() } diff --git a/crates/items_2/src/test.rs b/crates/items_2/src/test.rs index 46ee54d..85ae0d0 100644 --- a/crates/items_2/src/test.rs +++ b/crates/items_2/src/test.rs @@ -1,5 +1,7 @@ #[cfg(test)] pub mod eventsdim0; +#[cfg(test)] +pub mod eventsdim1; use crate::binnedcollected::BinnedCollected; use crate::binsdim0::BinsDim0CollectedResult; diff --git a/crates/items_2/src/test/eventsdim1.rs b/crates/items_2/src/test/eventsdim1.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index edc9ec1..ff7d34c 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -44,22 +44,23 @@ use std::task::Poll; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; +use std::time::UNIX_EPOCH; use timeunits::*; use url::Url; pub const APP_JSON: &str = "application/json"; pub const APP_JSON_LINES: &str = "application/jsonlines"; pub const APP_OCTET: &str = "application/octet-stream"; -pub const APP_CBOR: &str = "application/cbor"; -pub const APP_CBOR_FRAMES: &str = "application/cbor-frames"; +pub const APP_CBOR_FRAMED: &str = "application/cbor-framed"; +pub const APP_JSON_FRAMED: &str = "application/json-framed"; pub const ACCEPT_ALL: &str = "*/*"; pub const X_DAQBUF_REQID: &str = "x-daqbuffer-request-id"; -pub const CONNECTION_STATUS_DIV: u64 = timeunits::DAY; -pub const TS_MSP_GRID_UNIT: u64 = timeunits::SEC * 10; -pub const TS_MSP_GRID_SPACING: u64 = 6 * 2; +pub const CONNECTION_STATUS_DIV: DtMs = DtMs::from_ms_u64(1000 * 60 * 60); +// pub const TS_MSP_GRID_UNIT: DtMs = DtMs::from_ms_u64(1000 * 10); +// pub const TS_MSP_GRID_SPACING: u64 = 6 * 2; -pub const EMIT_ACCOUNTING_SNAP: u64 = 60 * 10; +pub const EMIT_ACCOUNTING_SNAP: DtMs = DtMs::from_ms_u64(1000 * 60 * 10); pub const DATETIME_FMT_0MS: &str = "%Y-%m-%dT%H:%M:%SZ"; pub const DATETIME_FMT_3MS: &str = "%Y-%m-%dT%H:%M:%S.%3fZ"; @@ -68,6 +69,31 @@ pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ"; const TEST_BACKEND: &str = "testbackend-00"; +pub struct OnDrop +where + F: FnOnce() -> (), +{ + f: Option, +} + +impl OnDrop +where + F: FnOnce() -> (), +{ + pub fn new(f: F) -> Self { + Self { f: Some(f) } + } +} + +impl Drop for OnDrop +where + F: FnOnce() -> (), +{ + fn drop(&mut self) { + self.f.take().map(|x| x()); + } +} + pub fn is_false(x: T) -> bool where T: std::borrow::Borrow, @@ -1198,7 +1224,7 @@ impl Shape { ))) } else if k == 1 { Ok(Shape::Scalar) - } else if k <= 1024 * 32 { + } else if k <= 1024 * 3000 { Ok(Shape::Wave(k)) } else { Err(Error::with_public_msg_no_trace(format!( @@ -1338,7 +1364,7 @@ where pub ix: [T; 2], } -#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)] +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] pub struct DtNano(u64); impl DtNano { @@ -1357,8 +1383,13 @@ impl DtNano { pub fn ms(&self) -> u64 { self.0 / MS } + + pub fn to_i64(&self) -> i64 { + self.0 as i64 + } } +#[cfg(DISABLED)] impl fmt::Debug for DtNano { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let sec = self.0 / SEC; @@ -1412,7 +1443,24 @@ mod dt_nano_serde { } } -#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)] +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] +pub struct DtMs(u64); + +impl DtMs { + pub const fn from_ms_u64(x: u64) -> Self { + Self(x) + } + + pub const fn ms(&self) -> u64 { + self.0 / MS + } + + pub const fn to_i64(&self) -> i64 { + self.0 as i64 + } +} + +#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] pub struct TsNano(pub u64); mod ts_nano_ser { @@ -1469,37 +1517,45 @@ mod ts_nano_ser { } impl TsNano { - pub fn from_ns(ns: u64) -> Self { + pub const fn from_ns(ns: u64) -> Self { Self(ns) } - pub fn from_ms(ns: u64) -> Self { + pub const fn from_ms(ns: u64) -> Self { Self(MS * ns) } - pub fn ns(&self) -> u64 { + pub const fn ns(&self) -> u64 { self.0 } - pub fn ms(&self) -> u64 { + pub const fn ms(&self) -> u64 { self.0 / MS } - pub fn sub(self, v: Self) -> Self { + pub const fn sub(self, v: DtNano) -> Self { Self(self.0 - v.0) } - pub fn add_ns(self, v: u64) -> Self { + pub const fn delta(self, v: Self) -> DtNano { + DtNano(self.0 - v.0) + } + + pub const fn add_ns(self, v: u64) -> Self { Self(self.0 + v) } - pub fn mul(self, v: u64) -> Self { + pub const fn mul(self, v: u64) -> Self { Self(self.0 * v) } - pub fn div(self, v: u64) -> Self { + pub const fn div(self, v: u64) -> Self { Self(self.0 / v) } + + pub const fn to_ts_ms(self) -> TsMs { + TsMs::from_ms_u64(self.ms()) + } } impl fmt::Debug for TsNano { @@ -2294,13 +2350,41 @@ impl ToNanos for DateTime { } } -#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)] +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] pub struct TsMs(pub u64); impl TsMs { - pub fn to_u64(self) -> u64 { + pub const fn from_ms_u64(x: u64) -> Self { + Self(x) + } + + pub const fn from_ns_u64(x: u64) -> Self { + Self(x / 1000000) + } + + pub fn from_system_time(st: SystemTime) -> Self { + let tsunix = st.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); + let x = tsunix.as_secs() * 1000 + tsunix.subsec_millis() as u64; + Self::from_ms_u64(x) + } + + pub const fn ms(self) -> u64 { self.0 } + + pub const fn to_u64(self) -> u64 { + self.0 + } + + pub const fn to_i64(self) -> i64 { + self.0 as i64 + } + + pub const fn to_grid_02(self, grid: DtMs) -> (Self, DtMs) { + let msp = TsMs(self.0 / grid.0 * grid.0); + let lsp = DtMs(self.0 - msp.0); + (msp, lsp) + } } impl std::ops::Sub for TsMs { @@ -2713,6 +2797,7 @@ pub trait HasBackend { pub trait HasTimeout { fn timeout(&self) -> Duration; + fn set_timeout(&mut self, timeout: Duration); } pub trait FromUrl: Sized { @@ -2748,7 +2833,12 @@ impl HasBackend for ChannelConfigQuery { impl HasTimeout for ChannelConfigQuery { fn timeout(&self) -> Duration { - Duration::from_millis(2000) + Duration::from_millis(10000) + } + + fn set_timeout(&mut self, timeout: Duration) { + // TODO + // self.timeout = Some(timeout); } } diff --git a/crates/netpod/src/query.rs b/crates/netpod/src/query.rs index 03724a9..09126f5 100644 --- a/crates/netpod/src/query.rs +++ b/crates/netpod/src/query.rs @@ -284,7 +284,12 @@ impl HasBackend for ChannelStateEventsQuery { impl HasTimeout for ChannelStateEventsQuery { fn timeout(&self) -> Duration { - Duration::from_millis(6000) + Duration::from_millis(10000) + } + + fn set_timeout(&mut self, timeout: Duration) { + // TODO + // self.timeout = Some(timeout); } } diff --git a/crates/netpod/src/ttl.rs b/crates/netpod/src/ttl.rs index 49bb38f..d1c5123 100644 --- a/crates/netpod/src/ttl.rs +++ b/crates/netpod/src/ttl.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + #[derive(Debug, Clone)] pub enum RetentionTime { Short, @@ -14,4 +16,33 @@ impl RetentionTime { Long => "lt_", } } + + pub fn ttl_events_d0(&self) -> Duration { + match self { + RetentionTime::Short => Duration::from_secs(60 * 60 * 12), + RetentionTime::Medium => Duration::from_secs(60 * 60 * 24 * 100), + RetentionTime::Long => Duration::from_secs(60 * 60 * 24 * 31 * 12 * 11), + } + } + + pub fn ttl_events_d1(&self) -> Duration { + match self { + RetentionTime::Short => Duration::from_secs(60 * 60 * 12), + RetentionTime::Medium => Duration::from_secs(60 * 60 * 24 * 100), + RetentionTime::Long => Duration::from_secs(60 * 60 * 24 * 31 * 12 * 11), + } + } + + pub fn ttl_ts_msp(&self) -> Duration { + let dt = self.ttl_events_d0(); + dt + dt / 30 + } + + pub fn ttl_binned(&self) -> Duration { + self.ttl_events_d0() * 2 + } + + pub fn ttl_channel_status(&self) -> Duration { + self.ttl_binned() + } } diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index ee22543..3395f28 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -1,6 +1,9 @@ pub mod binned; pub mod events; +use chrono::DateTime; +use chrono::TimeZone; +use chrono::Utc; use err::Error; use netpod::get_url_query_pairs; use netpod::range::evrange::SeriesRange; @@ -8,6 +11,9 @@ use netpod::AppendToUrl; use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; +use netpod::ToNanos; +use netpod::TsNano; +use netpod::DATETIME_FMT_6MS; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -34,7 +40,12 @@ impl HasBackend for AccountingIngestedBytesQuery { impl HasTimeout for AccountingIngestedBytesQuery { fn timeout(&self) -> Duration { - Duration::from_millis(5000) + Duration::from_millis(10000) + } + + fn set_timeout(&mut self, timeout: Duration) { + // TODO + // self.timeout = Some(timeout); } } @@ -65,3 +76,77 @@ impl AppendToUrl for AccountingIngestedBytesQuery { self.range.append_to_url(url); } } + +// + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AccountingToplistQuery { + backend: String, + ts: TsNano, + limit: u32, +} + +impl AccountingToplistQuery { + pub fn ts(&self) -> TsNano { + self.ts.clone() + } + + pub fn limit(&self) -> u32 { + self.limit + } +} + +impl HasBackend for AccountingToplistQuery { + fn backend(&self) -> &str { + &self.backend + } +} + +impl HasTimeout for AccountingToplistQuery { + fn timeout(&self) -> Duration { + Duration::from_millis(10000) + } + + fn set_timeout(&mut self, timeout: Duration) { + // TODO + // self.timeout = Some(timeout); + } +} + +impl FromUrl for AccountingToplistQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let fn1 = |pairs: &BTreeMap| { + let v = pairs.get("tsDate").ok_or(Error::with_public_msg("missing tsDate"))?; + let w = v.parse::>()?; + Ok::<_, Error>(TsNano(w.to_nanos())) + }; + let ret = Self { + backend: pairs + .get("backend") + .ok_or_else(|| Error::with_msg_no_trace("missing backend"))? + .to_string(), + ts: fn1(pairs)?, + limit: pairs.get("limit").map_or(None, |x| x.parse().ok()).unwrap_or(20), + }; + Ok(ret) + } +} + +impl AppendToUrl for AccountingToplistQuery { + fn append_to_url(&self, url: &mut Url) { + let mut g = url.query_pairs_mut(); + g.append_pair("backend", &self.backend); + g.append_pair( + "ts", + &Utc.timestamp_nanos(self.ts.ns() as i64) + .format(DATETIME_FMT_6MS) + .to_string(), + ); + g.append_pair("limit", &self.limit.to_string()); + } +} diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index 2e87da7..939f6db 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -160,6 +160,10 @@ impl HasTimeout for BinnedQuery { fn timeout(&self) -> Duration { self.timeout_value() } + + fn set_timeout(&mut self, timeout: Duration) { + self.timeout = Some(timeout); + } } impl FromUrl for BinnedQuery { diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index c28ce38..d3983c4 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -208,6 +208,10 @@ impl HasTimeout for PlainEventsQuery { fn timeout(&self) -> Duration { self.timeout() } + + fn set_timeout(&mut self, timeout: Duration) { + self.timeout = Some(timeout); + } } impl FromUrl for PlainEventsQuery { diff --git a/crates/scyllaconn/src/accounting.rs b/crates/scyllaconn/src/accounting.rs index d95ca53..3b178ba 100644 --- a/crates/scyllaconn/src/accounting.rs +++ b/crates/scyllaconn/src/accounting.rs @@ -1,233 +1,2 @@ -use crate::errconv::ErrConv; -use err::Error; -use futures_util::Future; -use futures_util::FutureExt; -use futures_util::Stream; -use futures_util::StreamExt; -use items_0::Empty; -use items_0::Extendable; -use items_0::WithLen; -use items_2::accounting::AccountingEvents; -use netpod::log::*; -use netpod::range::evrange::NanoRange; -use netpod::timeunits; -use netpod::EMIT_ACCOUNTING_SNAP; -use scylla::prepared_statement::PreparedStatement; -use scylla::Session as ScySession; -use std::collections::VecDeque; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Context; -use std::task::Poll; - -async fn read_next( - ts_msp: u64, - fwd: bool, - qu: PreparedStatement, - scy: Arc, -) -> Result { - type RowType = (i64, i64, i64); - let mut ret = AccountingEvents::empty(); - let mut tot_count = 0; - let mut tot_bytes = 0; - for part in 0..255_u32 { - let mut res = if fwd { - scy.execute_iter(qu.clone(), (part as i32, ts_msp as i64)) - .await - .err_conv()? - .into_typed::() - } else { - return Err(Error::with_msg_no_trace("no backward support")); - }; - while let Some(row) = res.next().await { - let row = row.map_err(Error::from_string)?; - let _ts = ts_msp; - let _series = row.0 as u64; - let count = row.1 as u64; - let bytes = row.2 as u64; - tot_count += count; - tot_bytes += bytes; - } - } - ret.tss.push_back(ts_msp); - ret.count.push_back(tot_count); - ret.bytes.push_back(tot_bytes); - Ok(ret) -} - -struct ReadValues { - #[allow(unused)] - range: NanoRange, - ts_msps: VecDeque, - fwd: bool, - #[allow(unused)] - do_one_before_range: bool, - fut: Pin> + Send>>, - scy: Arc, - qu: PreparedStatement, -} - -impl ReadValues { - fn new( - range: NanoRange, - ts_msps: VecDeque, - fwd: bool, - do_one_before_range: bool, - scy: Arc, - qu: PreparedStatement, - ) -> Self { - let mut ret = Self { - range, - ts_msps, - fwd, - do_one_before_range, - fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( - "future not initialized", - )))), - scy, - qu, - }; - ret.next(); - ret - } - - fn next(&mut self) -> bool { - if let Some(ts_msp) = self.ts_msps.pop_front() { - self.fut = self.make_fut(ts_msp); - true - } else { - false - } - } - - fn make_fut(&mut self, ts_msp: u64) -> Pin> + Send>> { - let fut = read_next(ts_msp, self.fwd, self.qu.clone(), self.scy.clone()); - Box::pin(fut) - } -} - -enum FrState { - New, - Prepare(PrepFut), - Start, - ReadValues(ReadValues), - Done, -} - -type PrepFut = Pin> + Send>>; - -pub struct AccountingStreamScylla { - state: FrState, - range: NanoRange, - scy: Arc, - qu_select: Option, - outbuf: AccountingEvents, - poll_count: u32, -} - -impl AccountingStreamScylla { - pub fn new(range: NanoRange, scy: Arc) -> Self { - Self { - state: FrState::New, - range, - scy, - qu_select: None, - outbuf: AccountingEvents::empty(), - poll_count: 0, - } - } -} - -async fn prep(cql: &str, scy: Arc) -> Result { - scy.prepare(cql) - .await - .map_err(|e| Error::with_msg_no_trace(format!("cql error {e}"))) -} - -impl Stream for AccountingStreamScylla { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - // debug!("poll {}", self.poll_count); - self.poll_count += 1; - if self.poll_count > 200000 { - debug!("abort high poll count"); - return Ready(None); - } - let span = tracing::span!(tracing::Level::TRACE, "poll_next"); - let _spg = span.enter(); - loop { - if self.outbuf.len() > 0 { - let item = std::mem::replace(&mut self.outbuf, AccountingEvents::empty()); - break Ready(Some(Ok(item))); - } - break match &mut self.state { - FrState::New => { - let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?"); - let fut = prep(cql, self.scy.clone()); - let fut: PrepFut = Box::pin(fut); - self.state = FrState::Prepare(fut); - continue; - } - FrState::Prepare(fut) => match fut.poll_unpin(cx) { - Ready(Ok(x)) => { - self.qu_select = Some(x); - self.state = FrState::Start; - continue; - } - Ready(Err(e)) => { - error!("{e}"); - Ready(Some(Err(Error::with_msg_no_trace("cql error")))) - } - Pending => Pending, - }, - FrState::Start => { - let mut ts_msps = VecDeque::new(); - let mut ts = self.range.beg / timeunits::SEC / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP; - let ts_e = self.range.end / timeunits::SEC / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP; - while ts < ts_e { - if ts_msps.len() >= 100 { - debug!("too large time range requested"); - break; - } - ts_msps.push_back(ts); - ts += EMIT_ACCOUNTING_SNAP; - } - if ts_msps.len() == 0 { - self.state = FrState::Done; - continue; - } else { - let fwd = true; - let do_one_before_range = false; - let st = ReadValues::new( - self.range.clone(), - ts_msps, - fwd, - do_one_before_range, - self.scy.clone(), - self.qu_select.as_ref().unwrap().clone(), - ); - self.state = FrState::ReadValues(st); - continue; - } - } - FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(Ok(mut item)) => { - if !st.next() { - self.state = FrState::Done; - } - self.outbuf.extend_from(&mut item); - continue; - } - Ready(Err(e)) => { - error!("{e}"); - Ready(Some(Err(e))) - } - Pending => Pending, - }, - FrState::Done => Ready(None), - }; - } - } -} +pub mod toplist; +pub mod totals; diff --git a/crates/scyllaconn/src/accounting/toplist.rs b/crates/scyllaconn/src/accounting/toplist.rs new file mode 100644 index 0000000..6fd6f29 --- /dev/null +++ b/crates/scyllaconn/src/accounting/toplist.rs @@ -0,0 +1,75 @@ +use crate::errconv::ErrConv; +use err::Error; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::timeunits; +use netpod::EMIT_ACCOUNTING_SNAP; +use scylla::prepared_statement::PreparedStatement; +use scylla::Session as ScySession; +use std::sync::Arc; + +#[derive(Debug)] +pub struct UsageData { + ts: u64, + // (series, count, bytes) + usage: Vec<(u64, u64, u64)>, +} + +impl UsageData { + pub fn new(ts: u64) -> Self { + Self { ts, usage: Vec::new() } + } + + pub fn ts(&self) -> u64 { + self.ts + } + + pub fn usage(&self) -> &[(u64, u64, u64)] { + &self.usage + } + + pub fn sort_by_counts(&mut self) { + self.usage.sort_unstable_by(|a, b| b.1.cmp(&a.1)) + } + + pub fn sort_by_bytes(&mut self) { + self.usage.sort_unstable_by(|a, b| b.2.cmp(&a.2)) + } +} + +pub async fn read_ts(ts: u64, scy: Arc) -> Result { + // TODO toplist::read_ts refactor + info!("TODO toplist::read_ts refactor"); + let snap = EMIT_ACCOUNTING_SNAP.ms() / 1000; + let ts = ts / timeunits::SEC / snap * snap; + let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?"); + let qu = prep(cql, scy.clone()).await?; + let ret = read_ts_inner(ts, qu, scy).await?; + Ok(ret) +} + +async fn read_ts_inner(ts: u64, qu: PreparedStatement, scy: Arc) -> Result { + type RowType = (i64, i64, i64); + let mut ret = UsageData::new(ts); + for part in 0..255_u32 { + let mut res = scy + .execute_iter(qu.clone(), (part as i32, ts as i64)) + .await + .err_conv()? + .into_typed::(); + while let Some(row) = res.next().await { + let row = row.map_err(Error::from_string)?; + let series = row.0 as u64; + let count = row.1 as u64; + let bytes = row.2 as u64; + ret.usage.push((series, count, bytes)); + } + } + Ok(ret) +} + +async fn prep(cql: &str, scy: Arc) -> Result { + scy.prepare(cql) + .await + .map_err(|e| Error::with_msg_no_trace(format!("cql error {e}"))) +} diff --git a/crates/scyllaconn/src/accounting/totals.rs b/crates/scyllaconn/src/accounting/totals.rs new file mode 100644 index 0000000..d07fc69 --- /dev/null +++ b/crates/scyllaconn/src/accounting/totals.rs @@ -0,0 +1,234 @@ +use crate::errconv::ErrConv; +use err::Error; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::Empty; +use items_0::Extendable; +use items_0::WithLen; +use items_2::accounting::AccountingEvents; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::timeunits; +use netpod::EMIT_ACCOUNTING_SNAP; +use scylla::prepared_statement::PreparedStatement; +use scylla::Session as ScySession; +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +async fn read_next( + ts_msp: u64, + fwd: bool, + qu: PreparedStatement, + scy: Arc, +) -> Result { + type RowType = (i64, i64, i64); + let mut ret = AccountingEvents::empty(); + let mut tot_count = 0; + let mut tot_bytes = 0; + for part in 0..255_u32 { + let mut res = if fwd { + scy.execute_iter(qu.clone(), (part as i32, ts_msp as i64)) + .await + .err_conv()? + .into_typed::() + } else { + return Err(Error::with_msg_no_trace("no backward support")); + }; + while let Some(row) = res.next().await { + let row = row.map_err(Error::from_string)?; + let _ts = ts_msp; + let _series = row.0 as u64; + let count = row.1 as u64; + let bytes = row.2 as u64; + tot_count += count; + tot_bytes += bytes; + } + } + ret.tss.push_back(ts_msp); + ret.count.push_back(tot_count); + ret.bytes.push_back(tot_bytes); + Ok(ret) +} + +struct ReadValues { + #[allow(unused)] + range: NanoRange, + ts_msps: VecDeque, + fwd: bool, + #[allow(unused)] + do_one_before_range: bool, + fut: Pin> + Send>>, + scy: Arc, + qu: PreparedStatement, +} + +impl ReadValues { + fn new( + range: NanoRange, + ts_msps: VecDeque, + fwd: bool, + do_one_before_range: bool, + scy: Arc, + qu: PreparedStatement, + ) -> Self { + let mut ret = Self { + range, + ts_msps, + fwd, + do_one_before_range, + fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( + "future not initialized", + )))), + scy, + qu, + }; + ret.next(); + ret + } + + fn next(&mut self) -> bool { + if let Some(ts_msp) = self.ts_msps.pop_front() { + self.fut = self.make_fut(ts_msp); + true + } else { + false + } + } + + fn make_fut(&mut self, ts_msp: u64) -> Pin> + Send>> { + let fut = read_next(ts_msp, self.fwd, self.qu.clone(), self.scy.clone()); + Box::pin(fut) + } +} + +enum FrState { + New, + Prepare(PrepFut), + Start, + ReadValues(ReadValues), + Done, +} + +type PrepFut = Pin> + Send>>; + +pub struct AccountingStreamScylla { + state: FrState, + range: NanoRange, + scy: Arc, + qu_select: Option, + outbuf: AccountingEvents, + poll_count: u32, +} + +impl AccountingStreamScylla { + pub fn new(range: NanoRange, scy: Arc) -> Self { + Self { + state: FrState::New, + range, + scy, + qu_select: None, + outbuf: AccountingEvents::empty(), + poll_count: 0, + } + } +} + +async fn prep(cql: &str, scy: Arc) -> Result { + scy.prepare(cql) + .await + .map_err(|e| Error::with_msg_no_trace(format!("cql error {e}"))) +} + +impl Stream for AccountingStreamScylla { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + // debug!("poll {}", self.poll_count); + self.poll_count += 1; + if self.poll_count > 200000 { + debug!("abort high poll count"); + return Ready(None); + } + let span = tracing::span!(tracing::Level::TRACE, "poll_next"); + let _spg = span.enter(); + loop { + if self.outbuf.len() > 0 { + let item = std::mem::replace(&mut self.outbuf, AccountingEvents::empty()); + break Ready(Some(Ok(item))); + } + break match &mut self.state { + FrState::New => { + let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?"); + let fut = prep(cql, self.scy.clone()); + let fut: PrepFut = Box::pin(fut); + self.state = FrState::Prepare(fut); + continue; + } + FrState::Prepare(fut) => match fut.poll_unpin(cx) { + Ready(Ok(x)) => { + self.qu_select = Some(x); + self.state = FrState::Start; + continue; + } + Ready(Err(e)) => { + error!("{e}"); + Ready(Some(Err(Error::with_msg_no_trace("cql error")))) + } + Pending => Pending, + }, + FrState::Start => { + let mut ts_msps = VecDeque::new(); + let snap = EMIT_ACCOUNTING_SNAP.ms() / 1000; + let mut ts = self.range.beg / timeunits::SEC / snap * snap; + let ts_e = self.range.end / timeunits::SEC / snap * snap; + while ts < ts_e { + if ts_msps.len() >= 100 { + debug!("too large time range requested"); + break; + } + ts_msps.push_back(ts); + ts += snap; + } + if ts_msps.len() == 0 { + self.state = FrState::Done; + continue; + } else { + let fwd = true; + let do_one_before_range = false; + let st = ReadValues::new( + self.range.clone(), + ts_msps, + fwd, + do_one_before_range, + self.scy.clone(), + self.qu_select.as_ref().unwrap().clone(), + ); + self.state = FrState::ReadValues(st); + continue; + } + } + FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(Ok(mut item)) => { + if !st.next() { + self.state = FrState::Done; + } + self.outbuf.extend_from(&mut item); + continue; + } + Ready(Err(e)) => { + error!("{e}"); + Ready(Some(Err(e))) + } + Pending => Pending, + }, + FrState::Done => Ready(None), + }; + } + } +} diff --git a/crates/scyllaconn/src/status.rs b/crates/scyllaconn/src/status.rs index 87de0a8..849d297 100644 --- a/crates/scyllaconn/src/status.rs +++ b/crates/scyllaconn/src/status.rs @@ -205,11 +205,12 @@ impl Stream for StatusStreamScylla { break match self.state { FrState::New => { let mut ts_msps = VecDeque::new(); - let mut ts = self.range.beg / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; + let snap = CONNECTION_STATUS_DIV.ms() * 1000000; + let mut ts = self.range.beg / snap * snap; while ts < self.range.end { debug!("Use ts {ts}"); ts_msps.push_back(ts); - ts += CONNECTION_STATUS_DIV; + ts += snap; } let st = ReadValues::new( self.series, diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 8fdb3ce..8d98fd5 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -19,7 +19,7 @@ crc32fast = "1.3.2" byteorder = "1.4.3" async-channel = "1.8.0" chrono = { version = "0.4.19", features = ["serde"] } -wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"] } +wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true } err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } @@ -30,3 +30,6 @@ httpclient = { path = "../httpclient" } [dev-dependencies] taskrun = { path = "../taskrun" } + +[features] +wasm_transform = ["wasmer"] diff --git a/crates/streams/src/cbor.rs b/crates/streams/src/cbor.rs index 4a456c9..9301c75 100644 --- a/crates/streams/src/cbor.rs +++ b/crates/streams/src/cbor.rs @@ -23,6 +23,9 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; +const FRAME_HEAD_LEN: usize = 16; +const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 80; + trait ErrConv { fn ec(self) -> Result; } @@ -62,6 +65,7 @@ impl From for Bytes { pub type CborStream = Pin> + Send>>; +// TODO move this type decl because it is not specific to cbor pub type SitemtyDynEventsStream = Pin>>, Error>> + Send>>; @@ -143,29 +147,36 @@ impl FramedBytesToSitemtyDynEventsStream { inp, scalar_type, shape, - buf: BytesMut::with_capacity(1024 * 64), + buf: BytesMut::with_capacity(1024 * 256), } } fn try_parse(&mut self) -> Result>>, Error> { // debug!("try_parse {}", self.buf.len()); - if self.buf.len() < 4 { + if self.buf.len() < FRAME_HEAD_LEN { return Ok(None); } let n = u32::from_le_bytes(self.buf[..4].try_into()?); - if n > 1024 * 1024 * 40 { + if n > FRAME_PAYLOAD_MAX { let e = Error::with_msg_no_trace(format!("frame too large {n}")); error!("{e}"); return Err(e); } - if self.buf.len() < 4 + n as usize { + let frame_len = FRAME_HEAD_LEN + n as usize; + let adv = (frame_len + 7) / 8 * 8; + assert!(adv % 8 == 0); + assert!(adv >= frame_len); + assert!(adv < 8 + frame_len); + if self.buf.len() < adv { // debug!("not enough {} {}", n, self.buf.len()); return Ok(None); } - let buf = &self.buf[4..4 + n as usize]; + let buf = &self.buf[FRAME_HEAD_LEN..frame_len]; let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(Error::from_string)?; // debug!("decoded ciborium value {val:?}"); let item = if let Some(map) = val.as_map() { + let keys: Vec<&str> = map.iter().map(|k| k.0.as_text().unwrap_or("(none)")).collect(); + debug!("keys {keys:?}"); if let Some(x) = map.get(0) { if let Some(y) = x.0.as_text() { if y == "rangeFinal" { @@ -196,9 +207,10 @@ impl FramedBytesToSitemtyDynEventsStream { Some(x) } else { let item = decode_cbor_to_box_events(buf, &self.scalar_type, &self.shape)?; + debug!("decoded boxed events len {}", item.len()); Some(StreamItem::DataItem(RangeCompletableItem::Data(item))) }; - self.buf.advance(4 + n as usize); + self.buf.advance(adv); if let Some(x) = item { Ok(Some(Ok(x))) } else { diff --git a/crates/streams/src/firsterr.rs b/crates/streams/src/firsterr.rs index e202a00..7d04d37 100644 --- a/crates/streams/src/firsterr.rs +++ b/crates/streams/src/firsterr.rs @@ -2,8 +2,22 @@ use crate::cbor::CborBytes; use futures_util::future; use futures_util::Stream; use futures_util::StreamExt; +use items_0::WithLen; -pub fn non_empty(inp: S) -> impl Stream> +pub fn non_empty(inp: S) -> impl Stream> +where + S: Stream>, + T: WithLen, +{ + inp.filter(|x| { + future::ready(match x { + Ok(x) => x.len() > 0, + Err(_) => true, + }) + }) +} + +pub fn non_empty_nongen(inp: S) -> impl Stream> where S: Stream>, { diff --git a/crates/streams/src/generators.rs b/crates/streams/src/generators.rs index 042bc82..e9c341e 100644 --- a/crates/streams/src/generators.rs +++ b/crates/streams/src/generators.rs @@ -420,3 +420,114 @@ impl Stream for GenerateF64V00 { } } } + +pub struct GenerateWaveI16V00 { + ivl: u64, + ts: u64, + dts: u64, + tsend: u64, + node_ix: u64, + timeout: Option + Send>>>, + do_throttle: bool, + done: bool, + done_range_final: bool, +} + +impl GenerateWaveI16V00 { + pub fn self_name() -> &'static str { + std::any::type_name::() + } + + pub fn new(node_ix: u64, node_count: u64, range: SeriesRange, one_before_range: bool) -> Self { + let range = match range { + SeriesRange::TimeRange(k) => k, + SeriesRange::PulseRange(_) => todo!(), + }; + let ivl = MS * 100; + let dts = ivl * node_count as u64; + let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl; + let tsend = range.end; + debug!( + "{}::new ivl {} dts {} ts {} one_before_range {}", + Self::self_name(), + ivl, + dts, + ts, + one_before_range + ); + Self { + ivl, + ts, + dts, + tsend, + node_ix, + timeout: None, + do_throttle: false, + done: false, + done_range_final: false, + } + } + + fn make_batch(&mut self) -> Sitemty { + type T = i16; + let mut item = EventsDim1::empty(); + let mut ts = self.ts; + loop { + if self.ts >= self.tsend || item.byte_estimate() > 1024 * 20 { + break; + } + let pulse = ts; + let ampl = ((ts / self.ivl) as f32).sin() + 2.; + let mut value = Vec::new(); + let pi = std::f32::consts::PI; + for i in 0..21 { + let x = ((-pi + (2. * pi / 20.) * i as f32).cos() + 1.1) * ampl; + value.push(x as T); + } + if false { + info!( + "v01 node {} made event ts {} pulse {} value {:?}", + self.node_ix, ts, pulse, value + ); + } + item.push(ts, pulse, value); + ts += self.dts; + } + self.ts = ts; + trace!("generated len {}", item.len()); + let w = ChannelEvents::Events(Box::new(item) as _); + let w = sitem_data(w); + w + } +} + +impl Stream for GenerateWaveI16V00 { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.done { + Ready(None) + } else if self.ts >= self.tsend { + self.done = true; + self.done_range_final = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else if !self.do_throttle { + // To use the generator without throttling, use this scope + Ready(Some(self.make_batch())) + } else if let Some(fut) = self.timeout.as_mut() { + match fut.poll_unpin(cx) { + Ready(()) => { + self.timeout = None; + Ready(Some(self.make_batch())) + } + Pending => Pending, + } + } else { + self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + continue; + }; + } + } +} diff --git a/crates/streams/src/json_stream.rs b/crates/streams/src/json_stream.rs new file mode 100644 index 0000000..77b6f18 --- /dev/null +++ b/crates/streams/src/json_stream.rs @@ -0,0 +1,80 @@ +use crate::cbor::SitemtyDynEventsStream; +use bytes::Bytes; +use err::Error; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::StreamItem; +use items_0::WithLen; +use netpod::log::*; +use std::pin::Pin; + +pub struct JsonBytes(Bytes); + +impl JsonBytes { + pub fn into_inner(self) -> Bytes { + self.0 + } + + pub fn len(&self) -> u32 { + self.0.len() as _ + } +} + +impl WithLen for JsonBytes { + fn len(&self) -> usize { + self.len() as usize + } +} + +impl From for Bytes { + fn from(value: JsonBytes) -> Self { + value.0 + } +} + +pub type JsonStream = Pin> + Send>>; + +pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stream> { + let stream = stream.map(|x| match x { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(evs) => { + let buf = evs.to_json_vec_u8(); + let bytes = Bytes::from(buf); + let item = JsonBytes(bytes); + Ok(item) + } + RangeCompletableItem::RangeComplete => { + let item = serde_json::json!({ + "rangeFinal": true, + }); + let buf = serde_json::to_vec(&item)?; + let bytes = Bytes::from(buf); + let item = JsonBytes(bytes); + Ok(item) + } + }, + StreamItem::Log(item) => { + info!("{item:?}"); + let item = JsonBytes(Bytes::new()); + Ok(item) + } + StreamItem::Stats(item) => { + info!("{item:?}"); + let item = JsonBytes(Bytes::new()); + Ok(item) + } + }, + Err(e) => { + let item = serde_json::json!({ + "error": e.to_string(), + }); + let buf = serde_json::to_vec(&item)?; + let bytes = Bytes::from(buf); + let item = JsonBytes(bytes); + Ok(item) + } + }); + stream +} diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index d799082..753329c 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -7,6 +7,7 @@ pub mod firsterr; pub mod frames; pub mod generators; pub mod itemclone; +pub mod json_stream; pub mod lenframed; pub mod needminbuffer; pub mod plaineventscbor; diff --git a/crates/streams/src/plaineventscbor.rs b/crates/streams/src/plaineventscbor.rs index 50621b2..68050d2 100644 --- a/crates/streams/src/plaineventscbor.rs +++ b/crates/streams/src/plaineventscbor.rs @@ -10,12 +10,13 @@ use netpod::ChannelTypeConfigGen; use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; -pub async fn plain_events_cbor( +pub async fn plain_events_cbor_stream( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { + trace!("build stream"); let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; let stream = events_stream_to_cbor_stream(stream); let stream = non_empty(stream); diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index 86867a7..ac919bb 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -1,4 +1,8 @@ use crate::collect::Collect; +use crate::firsterr::non_empty; +use crate::firsterr::only_first_err; +use crate::json_stream::events_stream_to_json_stream; +use crate::json_stream::JsonStream; use crate::plaineventsstream::dyn_events_stream; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use err::Error; @@ -51,3 +55,17 @@ pub async fn plain_events_json( info!("plain_events_json json serialized"); Ok(jsval) } + +pub async fn plain_events_json_stream( + evq: &PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, + ctx: &ReqCtx, + open_bytes: OpenBoxedBytesStreamsBox, +) -> Result { + trace!("build stream"); + let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; + let stream = events_stream_to_json_stream(stream); + let stream = non_empty(stream); + let stream = only_first_err(stream); + Ok(Box::pin(stream)) +} diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index b0cdd38..36d25f4 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -26,6 +26,7 @@ pub async fn dyn_events_stream( ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { + trace!("dyn_events_stream begin"); let subq = make_sub_query( ch_conf, evq.range().clone(), @@ -78,6 +79,21 @@ pub async fn dyn_events_stream( } } +#[cfg(not(wasm_transform))] +async fn transform_wasm( + stream: INP, + _wasmname: &str, + _ctx: &ReqCtx, +) -> Result>>, Error>> + Send, Error> +where + INP: Stream>>, Error>> + Send + 'static, +{ + let ret: Pin>> + Send>> = Box::pin(stream); + Ok(ret) +} + +#[cfg(DISABLED)] +#[cfg(wasm_transform)] async fn transform_wasm( stream: INP, wasmname: &str, diff --git a/crates/streams/src/test/events.rs b/crates/streams/src/test/events.rs index 8830093..297b4f3 100644 --- a/crates/streams/src/test/events.rs +++ b/crates/streams/src/test/events.rs @@ -2,7 +2,7 @@ use crate::cbor::FramedBytesToSitemtyDynEventsStream; use crate::firsterr::only_first_err; use crate::frames::inmem::BoxedBytesStream; use crate::lenframed; -use crate::plaineventscbor::plain_events_cbor; +use crate::plaineventscbor::plain_events_cbor_stream; use crate::tcprawclient::OpenBoxedBytesStreams; use crate::tcprawclient::TEST_BACKEND; use err::Error; @@ -39,7 +39,7 @@ async fn merged_events_inner() -> Result<(), Error> { let evq = PlainEventsQuery::new(channel, range); let open_bytes = StreamOpener::new(); let open_bytes = Box::pin(open_bytes); - let stream = plain_events_cbor(&evq, ch_conf.clone().into(), &ctx, open_bytes) + let stream = plain_events_cbor_stream(&evq, ch_conf.clone().into(), &ctx, open_bytes) .await .unwrap(); let stream = lenframed::length_framed(stream); diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index dbb843d..77410cb 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -77,6 +77,8 @@ async fn timebinnable_stream( }) }); + #[cfg(DISABLED)] + #[cfg(wasm_transform)] let stream = if let Some(wasmname) = wasm1 { debug!("make wasm transform"); use httpclient::url::Url;