diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index ee87473..235f279 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -494,10 +494,7 @@ pub async fn read_data_1( DbrType::DbrTimeDouble => { if datafile_header.dbr_count == 1 { trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble"); - let mut evs = ScalarEvents { - tss: vec![], - values: vec![], - }; + let mut evs = ScalarEvents::empty(); let n1 = datafile_header.num_samples as usize; //let n2 = datafile_header.dbr_type.byte_len(); let n2 = 2 + 2 + 4 + 4 + (4) + 8; diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 1ea712d..f4fbcb2 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -644,23 +644,25 @@ fn events_item_to_framable(ei: EventsItem) -> Result, E match ei { EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I32(h))) => { let range: NanoRange = err::todoval(); - let (x, y) = h + let (tss, pulses, values) = h .tss .into_iter() + .zip(h.pulses.into_iter()) .zip(h.values.into_iter()) - .filter_map(|(j, k)| { - if j < range.beg || j >= range.end { + .filter_map(|((t, p), v)| { + if t < range.beg || t >= range.end { None } else { - Some((j, k)) + Some((t, p, v)) } }) - .fold((vec![], vec![]), |(mut a, mut b), (j, k)| { + .fold((vec![], vec![], vec![]), |(mut a, mut b, mut c), (j, k, l)| { a.push(j); b.push(k); - (a, b) + c.push(l); + (a, b, c) }); - let b = ScalarEvents { tss: x, values: y }; + let b = ScalarEvents { tss, pulses, values }; let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b))); let ret = Box::new(b); Ok(ret) diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 5ccbed6..28f8b0d 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -42,10 +42,7 @@ pub struct PbFileReader { fn parse_scalar_byte(m: &[u8], year: u32) -> Result { let msg = crate::generated::EPICSEvent::ScalarByte::parse_from_bytes(m) .map_err(|_| Error::with_msg(format!("can not parse pb-type {}", "ScalarByte")))?; - let mut t = ScalarEvents:: { - tss: vec![], - values: vec![], - }; + let mut t = ScalarEvents::::empty(); let yd = Utc.ymd(year as i32, 1, 1).and_hms(0, 0, 0); let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; let v = msg.get_val().first().map_or(0, |k| *k as i8); @@ -58,10 +55,7 @@ macro_rules! scalar_parse { ($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{ let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m) .map_err(|e| Error::with_msg(format!("can not parse pb-type {} {:?}", stringify!($pbt), e)))?; - let mut t = ScalarEvents::<$evty> { - tss: vec![], - values: vec![], - }; + let mut t = ScalarEvents::<$evty>::empty(); let yd = Utc.ymd($year as i32, 1, 1).and_hms(0, 0, 0); let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; @@ -77,10 +71,7 @@ macro_rules! wave_parse { ($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{ let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m) .map_err(|_| Error::with_msg(format!("can not parse pb-type {}", stringify!($pbt))))?; - let mut t = WaveEvents::<$evty> { - tss: vec![], - vals: vec![], - }; + let mut t = WaveEvents::<$evty>::empty(); let yd = Utc.ymd($year as i32, 1, 1).and_hms(0, 0, 0); let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; diff --git a/bitshuffle/Cargo.toml b/bitshuffle/Cargo.toml index b359472..3314757 100644 --- a/bitshuffle/Cargo.toml +++ b/bitshuffle/Cargo.toml @@ -4,6 +4,9 @@ version = "0.0.1-a.0" authors = ["Dominik Werder "] edition = "2018" +[lib] +path = "src/bitshuffle.rs" + [dependencies] libc = "0.2.92" diff --git a/bitshuffle/src/lib.rs b/bitshuffle/src/bitshuffle.rs similarity index 69% rename from bitshuffle/src/lib.rs rename to bitshuffle/src/bitshuffle.rs index 05edd6d..092ed8f 100644 --- a/bitshuffle/src/lib.rs +++ b/bitshuffle/src/bitshuffle.rs @@ -1,4 +1,4 @@ -use libc::size_t; +use libc::{c_int, size_t}; extern "C" { pub fn bshuf_compress_lz4( @@ -8,6 +8,7 @@ extern "C" { elem_size: size_t, block_size: size_t, ) -> i64; + pub fn bshuf_decompress_lz4( inp: *const u8, out: *const u8, @@ -15,6 +16,13 @@ extern "C" { elem_size: size_t, block_size: size_t, ) -> i64; + + pub fn LZ4_decompress_safe( + source: *const u8, + dest: *mut u8, + compressedSize: c_int, + maxDecompressedSize: c_int, + ) -> c_int; } pub fn bitshuffle_compress( @@ -50,3 +58,13 @@ pub fn bitshuffle_decompress( } } } + +pub fn lz4_decompress(inp: &[u8], out: &mut [u8]) -> Result { + let max_out = out.len() as _; + let ec = unsafe { LZ4_decompress_safe(inp.as_ptr(), out.as_mut_ptr(), inp.len() as _, max_out) }; + if ec < 0 { + Err(ec as _) + } else { + Ok(ec as _) + } +} diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 05bdcf9..eb4fc55 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -20,6 +20,7 @@ pub fn make_test_node(id: u32) -> Node { }), archiver_appliance: None, channel_archiver: None, + access_scylla: false, } } diff --git a/disk/src/decode.rs b/disk/src/decode.rs index b0752e1..13fec46 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -269,8 +269,12 @@ where } let decomp = ev.decomps[i1].as_ref().unwrap().as_ref(); let val = self.evs.convert(decomp, be)?; - let k = - <>::Batch as EventAppendable>::append_event(ret, ev.tss[i1], val); + let k = <>::Batch as EventAppendable>::append_event( + ret, + ev.tss[i1], + ev.pulses[i1], + val, + ); ret = Some(k); } Ok(ret) diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 6aed8f6..da3cd39 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -131,6 +131,7 @@ pub async fn gen_test_data() -> Result<(), Error> { }), archiver_appliance: None, channel_archiver: None, + access_scylla: false, }; ensemble.nodes.push(node); } diff --git a/err/src/lib.rs b/err/src/lib.rs index 1b8f6c1..759c5e8 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -160,7 +160,12 @@ impl fmt::Debug for Error { } else { String::new() }; - write!(fmt, "{}", self.msg)?; + write!(fmt, "msg: {}", self.msg)?; + if let Some(msgs) = self.public_msg() { + for msg in msgs { + write!(fmt, "\npublic: {}", msg)?; + } + } if !trace_str.is_empty() { write!(fmt, "\nTrace:\n{}", trace_str)?; } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index bb404b7..3e01d8e 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -1,13 +1,17 @@ +use std::collections::BTreeMap; + use crate::err::Error; use crate::{response, ToPublicResponse}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; -use netpod::{ChannelConfigQuery, FromUrl, ScalarType, Shape}; +use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, FromUrl, ScalarType, ScyllaConfig, Shape}; use netpod::{ChannelConfigResponse, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON}; +use scylla::batch::Consistency; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; +use serde::{Deserialize, Serialize}; use url::Url; pub struct ChannelConfigHandler {} @@ -76,33 +80,64 @@ impl ErrConv for Result { } } +async fn config_from_scylla( + chq: ChannelConfigQuery, + scyco: &ScyllaConfig, + _node_config: &NodeConfigCached, +) -> Result { + // Find the "series" id. + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .default_consistency(Consistency::One) + .build() + .await + .err_conv()?; + let cql = "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; + let res = scy + .query(cql, (&chq.channel.backend, chq.channel.name())) + .await + .err_conv()?; + let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec)>().collect(); + if rows.len() == 0 { + return Err(Error::with_public_msg_no_trace(format!( + "can not find series for channel {}", + chq.channel.name() + ))); + } else { + for r in &rows { + if let Err(e) = r { + return Err(Error::with_msg_no_trace(format!("error {e:?}"))); + } + info!("got row {r:?}"); + } + let row = rows[0].as_ref().unwrap(); + let scalar_type = ScalarType::from_scylla_i32(row.1)?; + let shape = Shape::from_scylla_shape_dims(&row.2)?; + let res = ChannelConfigResponse { + channel: chq.channel, + scalar_type, + byte_order: None, + shape, + }; + info!("MADE: {res:?}"); + Ok(res) + } +} + pub async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> { info!("channel_config"); let url = Url::parse(&format!("dummy:{}", req.uri()))?; - //let pairs = get_url_query_pairs(&url); let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config for q {q:?}"); let conf = if q.channel.backend == "scylla" { - // Find the "series" id. - let scy = scylla::SessionBuilder::new() - .known_node("sf-daqbuf-34:8340") - .build() - .await - .err_conv()?; - let cql = "select dtype, series from series_by_channel where facility = ? and channel_name = ?"; - let res = scy.query(cql, ()).await.err_conv()?; - let rows = res.rows_typed_or_empty::<(i32, i32)>(); - for r in rows { - let r = r.err_conv()?; - info!("got row {r:?}"); - } - let res = ChannelConfigResponse { - channel: q.channel, - scalar_type: ScalarType::F32, - byte_order: None, - shape: Shape::Scalar, - }; - res + let scyco = node_config + .node_config + .cluster + .scylla + .as_ref() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; + config_from_scylla(q, scyco, node_config).await? } else if let Some(conf) = &node_config.node.channel_archiver { archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database) .await? @@ -116,3 +151,189 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) .body(Body::from(serde_json::to_string(&conf)?))?; Ok(ret) } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ConfigsHisto { + scalar_types: Vec<(ScalarType, Vec<(Shape, u32)>)>, +} + +pub struct ScyllaConfigsHisto {} + +impl ScyllaConfigsHisto { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/scylla/configs/histo" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + let res = self.make_histo(node_config).await?; + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn make_histo(&self, node_config: &NodeConfigCached) -> Result { + let scyco = node_config + .node_config + .cluster + .scylla + .as_ref() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .default_consistency(Consistency::One) + .build() + .await + .err_conv()?; + let facility = "scylla"; + let res = scy + .query( + "select scalar_type, shape_dims, series from series_by_channel where facility = ? allow filtering", + (facility,), + ) + .await + .err_conv()?; + let mut stm = BTreeMap::new(); + for row in res.rows_typed_or_empty::<(i32, Vec, i64)>() { + let (st, dims, _) = row.err_conv()?; + let scalar_type = ScalarType::from_scylla_i32(st)?; + let shape = Shape::from_scylla_shape_dims(&dims)?; + if stm.get_mut(&scalar_type).is_none() { + stm.insert(scalar_type.clone(), BTreeMap::new()); + } + let a = stm.get_mut(&scalar_type).unwrap(); + if a.get_mut(&shape).is_none() { + a.insert(shape.clone(), 0); + } + *a.get_mut(&shape).unwrap() += 1; + } + let mut stm: Vec<_> = stm + .into_iter() + .map(|(st, m2)| { + let mut g: Vec<_> = m2.into_iter().map(|(sh, c)| (sh, c)).collect(); + g.sort_by_key(|x| !x.1); + let n = g.len() as u32; + (st, g, n) + }) + .collect(); + stm.sort_unstable_by_key(|x| !x.2); + let stm = stm.into_iter().map(|(st, a, _)| (st, a)).collect(); + let ret = ConfigsHisto { scalar_types: stm }; + Ok(ret) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChannelsWithTypeQuery { + scalar_type: ScalarType, + shape: Shape, +} + +impl FromUrl for ChannelsWithTypeQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let s = pairs + .get("scalar_type") + .ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?; + //let scalar_type = ScalarType::from_bsread_str(s)?; + let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; + let s = pairs + .get("shape") + .ok_or_else(|| Error::with_public_msg_no_trace("missing shape"))?; + let shape = Shape::from_dims_str(s)?; + Ok(Self { scalar_type, shape }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChannelListWithType { + channels: Vec, +} + +pub struct ScyllaChannelsWithType {} + +impl ScyllaChannelsWithType { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/scylla/channels/with_type" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = ChannelsWithTypeQuery::from_url(&url)?; + let res = self.get_channels(&q, node_config).await?; + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn get_channels( + &self, + q: &ChannelsWithTypeQuery, + node_config: &NodeConfigCached, + ) -> Result { + let scyco = node_config + .node_config + .cluster + .scylla + .as_ref() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .default_consistency(Consistency::One) + .build() + .await + .err_conv()?; + let facility = "scylla"; + let res = scy + .query( + "select channel_name, series from series_by_channel where facility = ? and scalar_type = ? and shape_dims = ? allow filtering", + (facility, q.scalar_type.to_scylla_i32(), q.shape.to_scylla_vec()), + ) + .await + .err_conv()?; + let mut list = Vec::new(); + for row in res.rows_typed_or_empty::<(String, i64)>() { + let (channel_name, _series) = row.err_conv()?; + let ch = Channel { + backend: facility.into(), + name: channel_name, + }; + list.push(ch); + } + let ret = ChannelListWithType { channels: list }; + Ok(ret) + } +} diff --git a/httpret/src/events.rs b/httpret/src/events.rs index eab86fd..c8cba14 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -71,7 +71,6 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - info!("httpret plain_events_json req: {:?}", req); let (head, _body) = req.into_parts(); let query = PlainEventsJsonQuery::from_request_head(&head)?; - let op = disk::channelexec::PlainEventsJson::new( query.channel().clone(), query.range().clone(), diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 1ac3d88..e532fe7 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -224,6 +224,10 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::ScyllaConfigsHisto::handler(&req) { + h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await } else if path == "/api/4/binned" { diff --git a/items/src/lib.rs b/items/src/lib.rs index e48844a..546aefe 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -381,7 +381,7 @@ where Self: Sized, { type Value; - fn append_event(ret: Option, ts: u64, value: Self::Value) -> Self; + fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self; } pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { @@ -481,6 +481,12 @@ pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec, Vec) { (ts_anchor_sec, ts_off_ms, ts_off_ns) } +pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, Vec) { + let pulse_anchor = pulse.first().map_or(0, |k| *k); + let pulse_off: Vec<_> = pulse.iter().map(|k| *k - pulse_anchor).collect(); + (pulse_anchor, pulse_off) +} + pub trait TimeBinnableTypeAggregator: Send { type Input: TimeBinnableType; type Output: TimeBinnableType; diff --git a/items/src/minmaxavgdim1bins.rs b/items/src/minmaxavgdim1bins.rs index 42bf309..d4ae154 100644 --- a/items/src/minmaxavgdim1bins.rs +++ b/items/src/minmaxavgdim1bins.rs @@ -2,9 +2,9 @@ use crate::numops::NumOps; use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::waveevents::WaveEvents; use crate::{ - ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, RangeOverlapInfo, ReadPbv, - ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, TimeBins, - WithLen, + pulse_offs_from_abs, ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, + RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, + TimeBinnableTypeAggregator, TimeBins, WithLen, }; use chrono::{TimeZone, Utc}; use err::Error; @@ -451,6 +451,10 @@ pub struct WaveEventsCollectedResult { ts_off_ms: Vec, #[serde(rename = "tsNs")] ts_off_ns: Vec, + #[serde(rename = "pulseAnchor")] + pulse_anchor: u64, + #[serde(rename = "pulseOff")] + pulse_off: Vec, values: Vec>, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] range_complete: bool, @@ -502,10 +506,13 @@ where fn result(self) -> Result { let tst = ts_offs_from_abs(&self.vals.tss); + let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses); let ret = Self::Output { ts_anchor_sec: tst.0, ts_off_ms: tst.1, ts_off_ns: tst.2, + pulse_anchor, + pulse_off, values: self.vals.vals, range_complete: self.range_complete, timed_out: self.timed_out, diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index eace776..abdefaf 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -2,8 +2,8 @@ use crate::minmaxavgbins::MinMaxAvgBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside, - PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, + pulse_offs_from_abs, ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, + Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; @@ -15,13 +15,40 @@ use tokio::fs::File; // TODO in this module reduce clones. -// TODO add pulse. #[derive(Serialize, Deserialize)] pub struct ScalarEvents { pub tss: Vec, + pub pulses: Vec, pub values: Vec, } +impl ScalarEvents { + #[inline(always)] + pub fn push(&mut self, ts: u64, pulse: u64, value: NTY) { + self.tss.push(ts); + self.pulses.push(pulse); + self.values.push(value); + } + + // TODO should avoid the copies. + #[inline(always)] + pub fn extend_from_slice(&mut self, src: &Self) + where + NTY: Clone, + { + self.tss.extend_from_slice(&src.tss); + self.pulses.extend_from_slice(&src.pulses); + self.values.extend_from_slice(&src.values); + } + + #[inline(always)] + pub fn clearx(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.values.clear(); + } +} + impl SitemtyFrameType for ScalarEvents where NTY: NumOps, @@ -33,6 +60,7 @@ impl ScalarEvents { pub fn empty() -> Self { Self { tss: vec![], + pulses: vec![], values: vec![], } } @@ -148,8 +176,7 @@ where NTY: NumOps, { fn push_index(&mut self, src: &Self, ix: usize) { - self.tss.push(src.tss[ix]); - self.values.push(src.values[ix].clone()); + self.push(src.tss[ix], src.pulses[ix], src.values[ix].clone()); } } @@ -162,15 +189,13 @@ where } fn append(&mut self, src: &Self) { - self.tss.extend_from_slice(&src.tss); - self.values.extend_from_slice(&src.values); + self.extend_from_slice(src); } } impl Clearable for ScalarEvents { fn clear(&mut self) { - self.tss.clear(); - self.values.clear(); + ScalarEvents::::clearx(self); } } @@ -234,6 +259,10 @@ pub struct EventValuesCollectorOutput { ts_off_ms: Vec, #[serde(rename = "tsNs")] ts_off_ns: Vec, + #[serde(rename = "pulseAnchor")] + pulse_anchor: u64, + #[serde(rename = "pulseOff")] + pulse_off: Vec, values: Vec, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] range_complete: bool, @@ -262,10 +291,13 @@ where fn result(self) -> Result { let tst = ts_offs_from_abs(&self.vals.tss); + let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses); let ret = Self::Output { ts_anchor_sec: tst.0, ts_off_ms: tst.1, ts_off_ns: tst.2, + pulse_anchor, + pulse_off, values: self.vals.values, range_complete: self.range_complete, timed_out: self.timed_out, @@ -501,10 +533,9 @@ where { type Value = NTY; - fn append_event(ret: Option, ts: u64, value: Self::Value) -> Self { + fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self { let mut ret = if let Some(ret) = ret { ret } else { Self::empty() }; - ret.tss.push(ts); - ret.values.push(value); + ret.push(ts, pulse, value); ret } } diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs index dfddb96..31d7fc4 100644 --- a/items/src/statsevents.rs +++ b/items/src/statsevents.rs @@ -392,13 +392,11 @@ impl TimeBinnableTypeAggregator for StatsEventsAggregator { impl EventAppendable for StatsEvents { type Value = f32; - fn append_event(ret: Option, ts: u64, _value: Self::Value) -> Self { - let mut ret = if let Some(ret) = ret { ret } else { Self::empty() }; - ret.tss.push(ts); + fn append_event(ret: Option, _ts: u64, _pulse: u64, _value: Self::Value) -> Self { + let ret = if let Some(ret) = ret { ret } else { Self::empty() }; // TODO error!("TODO statsevents append_event"); err::todo(); - ret.pulses.push(42); ret } } diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index b3d6d5d..d0b83f3 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -17,9 +17,18 @@ use tokio::fs::File; #[derive(Debug, Serialize, Deserialize)] pub struct WaveEvents { pub tss: Vec, + pub pulses: Vec, pub vals: Vec>, } +impl WaveEvents { + pub fn push(&mut self, ts: u64, pulse: u64, value: Vec) { + self.tss.push(ts); + self.pulses.push(pulse); + self.vals.push(value); + } +} + impl WaveEvents { pub fn shape(&self) -> Result { if let Some(k) = self.vals.first() { @@ -42,6 +51,7 @@ impl WaveEvents { pub fn empty() -> Self { Self { tss: vec![], + pulses: vec![], vals: vec![], } } @@ -319,10 +329,9 @@ where { type Value = Vec; - fn append_event(ret: Option, ts: u64, value: Self::Value) -> Self { + fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self { let mut ret = if let Some(ret) = ret { ret } else { Self::empty() }; - ret.tss.push(ts); - ret.vals.push(value); + ret.push(ts, pulse, value); ret } } @@ -455,6 +464,7 @@ pub struct WavePlainProc { _m1: PhantomData, } +// TODO purpose? impl EventsNodeProcessor for WavePlainProc where NTY: NumOps, @@ -472,11 +482,13 @@ where let n = if n > 5 { 5 } else { n }; WaveEvents { tss: inp.tss, + pulses: inp.pulses, vals: inp.vals.iter().map(|k| k[..n].to_vec()).collect(), } } else { WaveEvents { tss: inp.tss, + pulses: inp.pulses, vals: inp.vals, } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 1ca5fa4..98bde2f 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -41,7 +41,7 @@ pub struct BodyStream { pub inner: Box> + Send + Unpin>, } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] pub enum ScalarType { U8, U16, @@ -96,8 +96,8 @@ impl ScalarType { I16 => "int16", I32 => "int32", I64 => "int64", - F32 => "float", - F64 => "double", + F32 => "float32", + F64 => "float64", BOOL => "bool", STRING => "string", } @@ -119,6 +119,7 @@ impl ScalarType { "float32" => F32, "float64" => F64, "string" => STRING, + "bool" => BOOL, _ => { return Err(Error::with_msg_no_trace(format!( "from_bsread_str can not understand bsread {}", @@ -148,6 +149,13 @@ impl ScalarType { Ok(ret) } + pub fn from_scylla_i32(k: i32) -> Result { + if k < 0 || k > u8::MAX as i32 { + return Err(Error::with_public_msg_no_trace(format!("bad scalar type index {k}"))); + } + Self::from_dtype_index(k as u8) + } + pub fn bytes(&self) -> u8 { use ScalarType::*; match self { @@ -200,6 +208,10 @@ impl ScalarType { ScalarType::STRING => "string", } } + + pub fn to_scylla_i32(&self) -> i32 { + self.index() as i32 + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -232,6 +244,8 @@ pub struct Node { pub sf_databuffer: Option, pub archiver_appliance: Option, pub channel_archiver: Option, + #[serde(default)] + pub access_scylla: bool, } struct Visit1 {} @@ -302,6 +316,7 @@ impl Node { }), archiver_appliance: None, channel_archiver: None, + access_scylla: false, } } } @@ -314,6 +329,12 @@ pub struct Database { pub pass: String, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ScyllaConfig { + pub hosts: Vec, + pub keyspace: String, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Cluster { pub backend: String, @@ -325,6 +346,7 @@ pub struct Cluster { pub is_central_storage: bool, #[serde(rename = "fileIoBufferSize", default)] pub file_io_buffer_size: FileIoBufferSize, + pub scylla: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -563,7 +585,7 @@ pub struct ChannelConfig { pub byte_order: ByteOrder, } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] pub enum Shape { Scalar, Wave(u32), @@ -601,6 +623,7 @@ impl Shape { } } + // TODO use simply a list to represent all shapes: empty, or with 1 or 2 entries. pub fn from_db_jsval(v: &JsVal) -> Result { match v { JsVal::String(s) => { @@ -628,6 +651,41 @@ impl Shape { ))), } } + + pub fn from_dims_str(s: &str) -> Result { + let a: Vec = serde_json::from_str(s)?; + if a.len() == 0 { + Ok(Shape::Scalar) + } else if a.len() == 1 { + Ok(Shape::Wave(a[0])) + } else if a.len() == 2 { + Ok(Shape::Image(a[0], a[1])) + } else { + Err(Error::with_public_msg_no_trace("only scalar, 1d and 2d supported")) + } + } + + pub fn from_scylla_shape_dims(v: &[i32]) -> Result { + let res = if v.len() == 0 { + Shape::Scalar + } else if v.len() == 1 { + Shape::Wave(v[0] as u32) + } else if v.len() == 2 { + Shape::Image(v[0] as u32, v[1] as u32) + } else { + return Err(Error::with_public_msg_no_trace(format!("bad shape_dims {v:?}"))); + }; + Ok(res) + } + + pub fn to_scylla_vec(&self) -> Vec { + use Shape::*; + match self { + Scalar => vec![], + Wave(n) => vec![*n as i32], + Image(n, m) => vec![*n as i32, *m as i32], + } + } } pub trait HasShape { @@ -1636,6 +1694,7 @@ pub fn test_cluster() -> Cluster { }), archiver_appliance: None, channel_archiver: None, + access_scylla: false, }) .collect(); Cluster { @@ -1647,6 +1706,7 @@ pub fn test_cluster() -> Cluster { user: "testingdaq".into(), pass: "testingdaq".into(), }, + scylla: None, run_map_pulse_task: false, is_central_storage: false, file_io_buffer_size: Default::default(), @@ -1667,6 +1727,7 @@ pub fn sls_test_cluster() -> Cluster { channel_archiver: Some(ChannelArchiver { data_base_paths: vec![test_data_base_path_channel_archiver_sls()], }), + access_scylla: false, }) .collect(); Cluster { @@ -1678,6 +1739,7 @@ pub fn sls_test_cluster() -> Cluster { user: "testingdaq".into(), pass: "testingdaq".into(), }, + scylla: None, run_map_pulse_task: false, is_central_storage: false, file_io_buffer_size: Default::default(), @@ -1698,6 +1760,7 @@ pub fn archapp_test_cluster() -> Cluster { archiver_appliance: Some(ArchiverAppliance { data_base_paths: vec![test_data_base_path_archiver_appliance()], }), + access_scylla: false, }) .collect(); Cluster { @@ -1709,6 +1772,7 @@ pub fn archapp_test_cluster() -> Cluster { user: "testingdaq".into(), pass: "testingdaq".into(), }, + scylla: None, run_map_pulse_task: false, is_central_storage: false, file_io_buffer_size: Default::default(), diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 2ca8ca4..3b4d8c9 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -4,6 +4,9 @@ version = "0.0.1-a.1" authors = ["Dominik Werder "] edition = "2018" +[lib] +path = "src/nodenet.rs" + [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 5caee2f..d47f82e 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,4 +1,4 @@ -// TODO move these frame-related things out of crate disk. Probably better into `nodenet` +use crate::scylla::make_scylla_stream; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_core::Stream; @@ -6,11 +6,10 @@ use futures_util::StreamExt; use items::frame::{decode_frame, make_term_frame}; use items::{Framable, StreamItem}; use netpod::histo::HistoLog2; +use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{log::*, AggKind}; +use netpod::AggKind; use netpod::{EventQueryJsonStringFrame, NodeConfigCached, PerfOpts}; -use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; -use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use std::net::SocketAddr; use std::pin::Pin; use tokio::io::AsyncWriteExt; @@ -18,37 +17,6 @@ use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; -trait ErrConv { - fn err_conv(self) -> Result; -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; @@ -150,59 +118,38 @@ async fn events_conn_handler_inner_try( }; debug!("--- got query evq {:?}", evq); - // TODO make scylla usage configurable in config - if evq.channel.backend == "scylla" { - // Find the "series" id. - let scy = scylla::SessionBuilder::new() - .known_node("sf-daqbuf-34:8340") - .build() - .await - .err_conv(); - let scy = match scy { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - let cql = "select dtype, series from series_by_channel where facility = ? and channel_name = ?"; - let res = scy.query(cql, ()).await.err_conv(); - let res = match res { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - let rows = res.rows_typed_or_empty::<(i32, i32)>(); - for r in rows { - let r = match r.err_conv() { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - info!("got row {r:?}"); - } - error!("TODO scylla fetch continue here"); - err::todo(); - } - - let mut p1: Pin> + Send>> = - if let Some(aa) = &node_config.node.channel_archiver { - match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - } - } else if let Some(aa) = &node_config.node.archiver_appliance { - match archapp_wrap::make_event_pipe(&evq, aa).await { + let mut p1: Pin> + Send>> = if evq.channel.backend == "scylla" { + if node_config.node.access_scylla { + let scyco = node_config.node_config.cluster.scylla.as_ref().unwrap(); + match make_scylla_stream(&evq, scyco).await { Ok(j) => j, Err(e) => return Err((e, netout))?, } } else { - match evq.agg_kind { - AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - }, - _ => match disk::raw::conn::make_event_pipe(&evq, node_config).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - }, - } - }; + Box::pin(futures_util::stream::empty()) + } + } else if let Some(aa) = &node_config.node.channel_archiver { + match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + } + } else if let Some(aa) = &node_config.node.archiver_appliance { + match archapp_wrap::make_event_pipe(&evq, aa).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + } + } else { + match evq.agg_kind { + AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + }, + _ => match disk::raw::conn::make_event_pipe(&evq, node_config).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + }, + } + }; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = p1.next().await { let item = item.make_frame(); diff --git a/nodenet/src/lib.rs b/nodenet/src/lib.rs deleted file mode 100644 index 80ac2a1..0000000 --- a/nodenet/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod conn; diff --git a/nodenet/src/nodenet.rs b/nodenet/src/nodenet.rs new file mode 100644 index 0000000..856472b --- /dev/null +++ b/nodenet/src/nodenet.rs @@ -0,0 +1,2 @@ +pub mod conn; +pub mod scylla; diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs new file mode 100644 index 0000000..3d2c00b --- /dev/null +++ b/nodenet/src/scylla.rs @@ -0,0 +1,440 @@ +use err::Error; +use futures_core::{Future, Stream}; +use futures_util::FutureExt; +use items::scalarevents::ScalarEvents; +use items::waveevents::WaveEvents; +use items::{Framable, RangeCompletableItem, StreamItem}; +use netpod::log::*; +use netpod::query::RawEventsQuery; +use netpod::{NanoRange, ScalarType, ScyllaConfig}; +use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; +use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; +use scylla::Session as ScySession; +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +macro_rules! impl_read_values_fut { + ($fname:ident, $self:expr, $ts_msp:expr) => {{ + let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone()); + let fut = fut.map(|x| { + let x2 = match x { + Ok(k) => { + // + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + } + Err(e) => { + // + Err(e) + } + }; + //Box::new(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) as Box}); + let ret = Box::new(x2) as Box; + ret + }); + let fut = Box::pin(fut) as Pin> + Send>>; + fut + }}; +} + +struct ReadValues { + series: i64, + scalar_type: ScalarType, + range: NanoRange, + ts_msp: VecDeque, + fut: Pin> + Send>>, + scy: Arc, +} + +impl ReadValues { + fn new( + series: i64, + scalar_type: ScalarType, + range: NanoRange, + ts_msp: VecDeque, + scy: Arc, + ) -> Self { + Self { + series, + scalar_type, + range, + ts_msp, + fut: Box::pin(futures_util::future::lazy(|_| panic!())), + scy, + } + } + + fn next(&mut self) -> bool { + if let Some(ts_msp) = self.ts_msp.pop_front() { + self.fut = self.make_fut(ts_msp); + true + } else { + false + } + } + + fn make_fut(&mut self, ts_msp: u64) -> Pin> + Send>> { + // TODO this also needs to differentiate on Shape. + let fut = match &self.scalar_type { + ScalarType::F32 => { + impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp) + } + ScalarType::F64 => { + impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp) + } + _ => err::todoval(), + }; + fut + } +} + +enum FrState { + New, + FindSeries(Pin> + Send>>), + FindMsp(Pin, Error>> + Send>>), + ReadValues(ReadValues), + Done, +} + +pub struct ScyllaFramableStream { + state: FrState, + facility: String, + channel_name: String, + range: NanoRange, + scalar_type: Option, + series: i64, + scy: Arc, +} + +impl ScyllaFramableStream { + pub fn new(evq: &RawEventsQuery, scy: Arc) -> Self { + Self { + state: FrState::New, + facility: evq.channel.backend.clone(), + channel_name: evq.channel.name().into(), + range: evq.range.clone(), + scalar_type: None, + series: 0, + scy, + } + } +} + +impl Stream for ScyllaFramableStream { + type Item = Box; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match self.state { + FrState::New => { + let fut = find_series(self.facility.clone(), self.channel_name.clone(), self.scy.clone()); + let fut = Box::pin(fut); + self.state = FrState::FindSeries(fut); + continue; + } + FrState::FindSeries(ref mut fut) => match fut.poll_unpin(cx) { + Ready(Ok((series, scalar_type))) => { + info!("ScyllaFramableStream found series {}", series); + self.series = series; + self.scalar_type = Some(scalar_type); + let fut = find_ts_msp(series, self.range.clone(), self.scy.clone()); + let fut = Box::pin(fut); + self.state = FrState::FindMsp(fut); + continue; + } + Ready(Err(e)) => { + self.state = FrState::Done; + Ready(Some(Box::new( + Err(e) as Result>>, _> + ))) + } + Pending => Pending, + }, + FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { + Ready(Ok(ts_msp)) => { + info!("found ts_msp {ts_msp:?}"); + // TODO get rid of into() for VecDeque + let mut st = ReadValues::new( + self.series, + self.scalar_type.as_ref().unwrap().clone(), + self.range.clone(), + ts_msp.into(), + self.scy.clone(), + ); + if st.next() { + self.state = FrState::ReadValues(st); + } else { + self.state = FrState::Done; + } + continue; + } + Ready(Err(e)) => { + self.state = FrState::Done; + Ready(Some(Box::new( + Err(e) as Result>>, _> + ))) + } + Pending => Pending, + }, + FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(item) => { + if st.next() { + } else { + info!("ReadValues exhausted"); + self.state = FrState::Done; + } + Ready(Some(item)) + } + Pending => Pending, + }, + FrState::Done => Ready(None), + }; + } + } +} + +async fn find_series(facility: String, channel_name: String, scy: Arc) -> Result<(i64, ScalarType), Error> { + info!("find_series"); + let res = { + let cql = + "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; + scy.query(cql, (&facility, &channel_name)).await.err_conv()? + }; + let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec)>().collect(); + if rows.len() > 1 { + error!("Multiple series found for channel, can not return data for ambiguous series"); + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + if rows.len() < 1 { + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + let row = rows + .into_iter() + .next() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))? + .err_conv()?; + info!("make_scylla_stream row {row:?}"); + let series = row.0; + let scalar_type = ScalarType::from_scylla_i32(row.1)?; + info!("make_scylla_stream series {series}"); + Ok((series, scalar_type)) +} + +async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { + let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; + let res = scy + .query(cql, (series, range.beg as i64, range.end as i64)) + .await + .err_conv()?; + let mut ret = vec![]; + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + ret.push(row.0 as u64); + } + info!("found in total {} rows", ret.len()); + Ok(ret) +} + +macro_rules! read_next_scalar_values { + ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { + async fn $fname( + series: i64, + ts_msp: u64, + range: NanoRange, + scy: Arc, + ) -> Result, Error> { + type ST = $st; + type SCYTY = $scyty; + info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + // TODO add the constraint on range! + warn!("remove the limit clause, add range check"); + // TODO use typed timestamp.. + let ts_lsp_max = if range.end < ts_msp { + // We should not be here anyway. + warn!("range.end < ts_msp"); + 0 + } else { + range.end - ts_msp + }; + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp < ?" + ); + let res = scy + .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()?; + let mut ret = ScalarEvents::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2 as ST; + ret.push(ts, pulse, value); + } + info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + Ok(ret) + } + }; +} + +macro_rules! read_next_1d_values { + ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { + async fn $fname( + series: i64, + ts_msp: u64, + range: NanoRange, + scy: Arc, + ) -> Result, Error> { + type ST = $st; + type SCYTY = $scyty; + info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + // TODO add the constraint on range! + warn!("remove the limit clause, add range check"); + // TODO use typed timestamp.. + let ts_lsp_max = if range.end < ts_msp { + // We should not be here anyway. + warn!("range.end < ts_msp"); + 0 + } else { + range.end - ts_msp + }; + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp < ?" + ); + let res = scy + .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()?; + let mut ret = ScalarEvents::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2 as ST; + ret.push(ts, pulse, value); + } + info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + Ok(ret) + } + }; +} + +read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); +read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); + +read_next_1d_values!(read_next_values_1d_u16, u16, u16, "events_wave_u16"); + +pub async fn make_scylla_stream( + evq: &RawEventsQuery, + scyco: &ScyllaConfig, +) -> Result> + Send>>, Error> { + info!("make_scylla_stream open scylla connection"); + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .build() + .await + .err_conv()?; + let scy = Arc::new(scy); + let res = Box::pin(ScyllaFramableStream::new(evq, scy)) as _; + Ok(res) +} + +pub async fn make_scylla_stream_2( + evq: &RawEventsQuery, + scyco: &ScyllaConfig, +) -> Result> + Send>>, Error> { + // Find the "series" id. + info!("make_scylla_stream finding series id"); + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .build() + .await + .err_conv()?; + let res = { + let cql = + "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; + scy.query(cql, (&evq.channel.backend, evq.channel.name())) + .await + .err_conv()? + }; + let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec)>().collect(); + if rows.len() > 1 { + error!("Multiple series found for channel, can not return data for ambiguous series"); + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + if rows.len() < 1 { + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + let row = rows + .into_iter() + .next() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))? + .err_conv()?; + info!("make_scylla_stream row {row:?}"); + let series = row.0; + info!("make_scylla_stream series {series}"); + let _expand = evq.agg_kind.need_expand(); + let range = &evq.range; + { + let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; + let res = scy + .query(cql, (series, range.beg as i64, range.end as i64)) + .await + .err_conv()?; + let mut rc = 0; + for _row in res.rows_or_empty() { + rc += 1; + } + info!("found in total {} rows", rc); + } + error!("TODO scylla fetch continue here"); + let res = Box::pin(futures_util::stream::empty()); + Ok(res) +}