Deliver float events from scylla table

This commit is contained in:
Dominik Werder
2022-04-22 22:44:32 +02:00
parent 22b43fe012
commit e7016eda36
23 changed files with 917 additions and 162 deletions

View File

@@ -494,10 +494,7 @@ pub async fn read_data_1(
DbrType::DbrTimeDouble => {
if datafile_header.dbr_count == 1 {
trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble");
let mut evs = ScalarEvents {
tss: vec![],
values: vec![],
};
let mut evs = ScalarEvents::empty();
let n1 = datafile_header.num_samples as usize;
//let n2 = datafile_header.dbr_type.byte_len();
let n2 = 2 + 2 + 4 + 4 + (4) + 8;

View File

@@ -644,23 +644,25 @@ fn events_item_to_framable(ei: EventsItem) -> Result<Box<dyn Framable + Send>, E
match ei {
EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I32(h))) => {
let range: NanoRange = err::todoval();
let (x, y) = h
let (tss, pulses, values) = h
.tss
.into_iter()
.zip(h.pulses.into_iter())
.zip(h.values.into_iter())
.filter_map(|(j, k)| {
if j < range.beg || j >= range.end {
.filter_map(|((t, p), v)| {
if t < range.beg || t >= range.end {
None
} else {
Some((j, k))
Some((t, p, v))
}
})
.fold((vec![], vec![]), |(mut a, mut b), (j, k)| {
.fold((vec![], vec![], vec![]), |(mut a, mut b, mut c), (j, k, l)| {
a.push(j);
b.push(k);
(a, b)
c.push(l);
(a, b, c)
});
let b = ScalarEvents { tss: x, values: y };
let b = ScalarEvents { tss, pulses, values };
let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b)));
let ret = Box::new(b);
Ok(ret)

View File

@@ -42,10 +42,7 @@ pub struct PbFileReader {
fn parse_scalar_byte(m: &[u8], year: u32) -> Result<EventsItem, Error> {
let msg = crate::generated::EPICSEvent::ScalarByte::parse_from_bytes(m)
.map_err(|_| Error::with_msg(format!("can not parse pb-type {}", "ScalarByte")))?;
let mut t = ScalarEvents::<i8> {
tss: vec![],
values: vec![],
};
let mut t = ScalarEvents::<i8>::empty();
let yd = Utc.ymd(year as i32, 1, 1).and_hms(0, 0, 0);
let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;
let v = msg.get_val().first().map_or(0, |k| *k as i8);
@@ -58,10 +55,7 @@ macro_rules! scalar_parse {
($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{
let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m)
.map_err(|e| Error::with_msg(format!("can not parse pb-type {} {:?}", stringify!($pbt), e)))?;
let mut t = ScalarEvents::<$evty> {
tss: vec![],
values: vec![],
};
let mut t = ScalarEvents::<$evty>::empty();
let yd = Utc.ymd($year as i32, 1, 1).and_hms(0, 0, 0);
let ts =
yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;
@@ -77,10 +71,7 @@ macro_rules! wave_parse {
($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{
let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m)
.map_err(|_| Error::with_msg(format!("can not parse pb-type {}", stringify!($pbt))))?;
let mut t = WaveEvents::<$evty> {
tss: vec![],
vals: vec![],
};
let mut t = WaveEvents::<$evty>::empty();
let yd = Utc.ymd($year as i32, 1, 1).and_hms(0, 0, 0);
let ts =
yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;

View File

@@ -4,6 +4,9 @@ version = "0.0.1-a.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
[lib]
path = "src/bitshuffle.rs"
[dependencies]
libc = "0.2.92"

View File

@@ -1,4 +1,4 @@
use libc::size_t;
use libc::{c_int, size_t};
extern "C" {
pub fn bshuf_compress_lz4(
@@ -8,6 +8,7 @@ extern "C" {
elem_size: size_t,
block_size: size_t,
) -> i64;
pub fn bshuf_decompress_lz4(
inp: *const u8,
out: *const u8,
@@ -15,6 +16,13 @@ extern "C" {
elem_size: size_t,
block_size: size_t,
) -> i64;
pub fn LZ4_decompress_safe(
source: *const u8,
dest: *mut u8,
compressedSize: c_int,
maxDecompressedSize: c_int,
) -> c_int;
}
pub fn bitshuffle_compress(
@@ -50,3 +58,13 @@ pub fn bitshuffle_decompress(
}
}
}
pub fn lz4_decompress(inp: &[u8], out: &mut [u8]) -> Result<usize, isize> {
let max_out = out.len() as _;
let ec = unsafe { LZ4_decompress_safe(inp.as_ptr(), out.as_mut_ptr(), inp.len() as _, max_out) };
if ec < 0 {
Err(ec as _)
} else {
Ok(ec as _)
}
}

View File

@@ -20,6 +20,7 @@ pub fn make_test_node(id: u32) -> Node {
}),
archiver_appliance: None,
channel_archiver: None,
access_scylla: false,
}
}

View File

@@ -269,8 +269,12 @@ where
}
let decomp = ev.decomps[i1].as_ref().unwrap().as_ref();
let val = self.evs.convert(decomp, be)?;
let k =
<<EVS as EventValueFromBytes<NTY, END>>::Batch as EventAppendable>::append_event(ret, ev.tss[i1], val);
let k = <<EVS as EventValueFromBytes<NTY, END>>::Batch as EventAppendable>::append_event(
ret,
ev.tss[i1],
ev.pulses[i1],
val,
);
ret = Some(k);
}
Ok(ret)

View File

@@ -131,6 +131,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
}),
archiver_appliance: None,
channel_archiver: None,
access_scylla: false,
};
ensemble.nodes.push(node);
}

View File

@@ -160,7 +160,12 @@ impl fmt::Debug for Error {
} else {
String::new()
};
write!(fmt, "{}", self.msg)?;
write!(fmt, "msg: {}", self.msg)?;
if let Some(msgs) = self.public_msg() {
for msg in msgs {
write!(fmt, "\npublic: {}", msg)?;
}
}
if !trace_str.is_empty() {
write!(fmt, "\nTrace:\n{}", trace_str)?;
}

View File

@@ -1,13 +1,17 @@
use std::collections::BTreeMap;
use crate::err::Error;
use crate::{response, ToPublicResponse};
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use netpod::log::*;
use netpod::{ChannelConfigQuery, FromUrl, ScalarType, Shape};
use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, FromUrl, ScalarType, ScyllaConfig, Shape};
use netpod::{ChannelConfigResponse, NodeConfigCached};
use netpod::{ACCEPT_ALL, APP_JSON};
use scylla::batch::Consistency;
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
use serde::{Deserialize, Serialize};
use url::Url;
pub struct ChannelConfigHandler {}
@@ -76,33 +80,64 @@ impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
}
}
async fn config_from_scylla(
chq: ChannelConfigQuery,
scyco: &ScyllaConfig,
_node_config: &NodeConfigCached,
) -> Result<ChannelConfigResponse, Error> {
// Find the "series" id.
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.default_consistency(Consistency::One)
.build()
.await
.err_conv()?;
let cql = "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?";
let res = scy
.query(cql, (&chq.channel.backend, chq.channel.name()))
.await
.err_conv()?;
let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec<i32>)>().collect();
if rows.len() == 0 {
return Err(Error::with_public_msg_no_trace(format!(
"can not find series for channel {}",
chq.channel.name()
)));
} else {
for r in &rows {
if let Err(e) = r {
return Err(Error::with_msg_no_trace(format!("error {e:?}")));
}
info!("got row {r:?}");
}
let row = rows[0].as_ref().unwrap();
let scalar_type = ScalarType::from_scylla_i32(row.1)?;
let shape = Shape::from_scylla_shape_dims(&row.2)?;
let res = ChannelConfigResponse {
channel: chq.channel,
scalar_type,
byte_order: None,
shape,
};
info!("MADE: {res:?}");
Ok(res)
}
}
pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("channel_config");
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
//let pairs = get_url_query_pairs(&url);
let q = ChannelConfigQuery::from_url(&url)?;
info!("channel_config for q {q:?}");
let conf = if q.channel.backend == "scylla" {
// Find the "series" id.
let scy = scylla::SessionBuilder::new()
.known_node("sf-daqbuf-34:8340")
.build()
.await
.err_conv()?;
let cql = "select dtype, series from series_by_channel where facility = ? and channel_name = ?";
let res = scy.query(cql, ()).await.err_conv()?;
let rows = res.rows_typed_or_empty::<(i32, i32)>();
for r in rows {
let r = r.err_conv()?;
info!("got row {r:?}");
}
let res = ChannelConfigResponse {
channel: q.channel,
scalar_type: ScalarType::F32,
byte_order: None,
shape: Shape::Scalar,
};
res
let scyco = node_config
.node_config
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
config_from_scylla(q, scyco, node_config).await?
} else if let Some(conf) = &node_config.node.channel_archiver {
archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database)
.await?
@@ -116,3 +151,189 @@ pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached)
.body(Body::from(serde_json::to_string(&conf)?))?;
Ok(ret)
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ConfigsHisto {
scalar_types: Vec<(ScalarType, Vec<(Shape, u32)>)>,
}
pub struct ScyllaConfigsHisto {}
impl ScyllaConfigsHisto {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/configs/histo" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let res = self.make_histo(node_config).await?;
let body = Body::from(serde_json::to_vec(&res)?);
Ok(response(StatusCode::OK).body(body)?)
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
async fn make_histo(&self, node_config: &NodeConfigCached) -> Result<ConfigsHisto, Error> {
let scyco = node_config
.node_config
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.default_consistency(Consistency::One)
.build()
.await
.err_conv()?;
let facility = "scylla";
let res = scy
.query(
"select scalar_type, shape_dims, series from series_by_channel where facility = ? allow filtering",
(facility,),
)
.await
.err_conv()?;
let mut stm = BTreeMap::new();
for row in res.rows_typed_or_empty::<(i32, Vec<i32>, i64)>() {
let (st, dims, _) = row.err_conv()?;
let scalar_type = ScalarType::from_scylla_i32(st)?;
let shape = Shape::from_scylla_shape_dims(&dims)?;
if stm.get_mut(&scalar_type).is_none() {
stm.insert(scalar_type.clone(), BTreeMap::new());
}
let a = stm.get_mut(&scalar_type).unwrap();
if a.get_mut(&shape).is_none() {
a.insert(shape.clone(), 0);
}
*a.get_mut(&shape).unwrap() += 1;
}
let mut stm: Vec<_> = stm
.into_iter()
.map(|(st, m2)| {
let mut g: Vec<_> = m2.into_iter().map(|(sh, c)| (sh, c)).collect();
g.sort_by_key(|x| !x.1);
let n = g.len() as u32;
(st, g, n)
})
.collect();
stm.sort_unstable_by_key(|x| !x.2);
let stm = stm.into_iter().map(|(st, a, _)| (st, a)).collect();
let ret = ConfigsHisto { scalar_types: stm };
Ok(ret)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelsWithTypeQuery {
scalar_type: ScalarType,
shape: Shape,
}
impl FromUrl for ChannelsWithTypeQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
let s = pairs
.get("scalar_type")
.ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?;
//let scalar_type = ScalarType::from_bsread_str(s)?;
let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?;
let s = pairs
.get("shape")
.ok_or_else(|| Error::with_public_msg_no_trace("missing shape"))?;
let shape = Shape::from_dims_str(s)?;
Ok(Self { scalar_type, shape })
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelListWithType {
channels: Vec<Channel>,
}
pub struct ScyllaChannelsWithType {}
impl ScyllaChannelsWithType {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/channels/with_type" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelsWithTypeQuery::from_url(&url)?;
let res = self.get_channels(&q, node_config).await?;
let body = Body::from(serde_json::to_vec(&res)?);
Ok(response(StatusCode::OK).body(body)?)
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
async fn get_channels(
&self,
q: &ChannelsWithTypeQuery,
node_config: &NodeConfigCached,
) -> Result<ChannelListWithType, Error> {
let scyco = node_config
.node_config
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.default_consistency(Consistency::One)
.build()
.await
.err_conv()?;
let facility = "scylla";
let res = scy
.query(
"select channel_name, series from series_by_channel where facility = ? and scalar_type = ? and shape_dims = ? allow filtering",
(facility, q.scalar_type.to_scylla_i32(), q.shape.to_scylla_vec()),
)
.await
.err_conv()?;
let mut list = Vec::new();
for row in res.rows_typed_or_empty::<(String, i64)>() {
let (channel_name, _series) = row.err_conv()?;
let ch = Channel {
backend: facility.into(),
name: channel_name,
};
list.push(ch);
}
let ret = ChannelListWithType { channels: list };
Ok(ret)
}
}

View File

@@ -71,7 +71,6 @@ async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -
info!("httpret plain_events_json req: {:?}", req);
let (head, _body) = req.into_parts();
let query = PlainEventsJsonQuery::from_request_head(&head)?;
let op = disk::channelexec::PlainEventsJson::new(
query.channel().clone(),
query.range().clone(),

View File

@@ -224,6 +224,10 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
}
} else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ScyllaConfigsHisto::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = events::EventsHandler::handler(&req) {
h.handle(req, &node_config).await
} else if path == "/api/4/binned" {

View File

@@ -381,7 +381,7 @@ where
Self: Sized,
{
type Value;
fn append_event(ret: Option<Self>, ts: u64, value: Self::Value) -> Self;
fn append_event(ret: Option<Self>, ts: u64, pulse: u64, value: Self::Value) -> Self;
}
pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside {
@@ -481,6 +481,12 @@ pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec<u64>, Vec<u64>) {
(ts_anchor_sec, ts_off_ms, ts_off_ns)
}
pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, Vec<u64>) {
let pulse_anchor = pulse.first().map_or(0, |k| *k);
let pulse_off: Vec<_> = pulse.iter().map(|k| *k - pulse_anchor).collect();
(pulse_anchor, pulse_off)
}
pub trait TimeBinnableTypeAggregator: Send {
type Input: TimeBinnableType;
type Output: TimeBinnableType;

View File

@@ -2,9 +2,9 @@ use crate::numops::NumOps;
use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult};
use crate::waveevents::WaveEvents;
use crate::{
ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, RangeOverlapInfo, ReadPbv,
ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, TimeBins,
WithLen,
pulse_offs_from_abs, ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime,
RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType,
TimeBinnableTypeAggregator, TimeBins, WithLen,
};
use chrono::{TimeZone, Utc};
use err::Error;
@@ -451,6 +451,10 @@ pub struct WaveEventsCollectedResult<NTY> {
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
#[serde(rename = "pulseAnchor")]
pulse_anchor: u64,
#[serde(rename = "pulseOff")]
pulse_off: Vec<u64>,
values: Vec<Vec<NTY>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
range_complete: bool,
@@ -502,10 +506,13 @@ where
fn result(self) -> Result<Self::Output, Error> {
let tst = ts_offs_from_abs(&self.vals.tss);
let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses);
let ret = Self::Output {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
pulse_anchor,
pulse_off,
values: self.vals.vals,
range_complete: self.range_complete,
timed_out: self.timed_out,

View File

@@ -2,8 +2,8 @@ use crate::minmaxavgbins::MinMaxAvgBins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside,
PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType,
pulse_offs_from_abs, ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside,
Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType,
TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
@@ -15,13 +15,40 @@ use tokio::fs::File;
// TODO in this module reduce clones.
// TODO add pulse.
#[derive(Serialize, Deserialize)]
pub struct ScalarEvents<NTY> {
pub tss: Vec<u64>,
pub pulses: Vec<u64>,
pub values: Vec<NTY>,
}
impl<NTY> ScalarEvents<NTY> {
#[inline(always)]
pub fn push(&mut self, ts: u64, pulse: u64, value: NTY) {
self.tss.push(ts);
self.pulses.push(pulse);
self.values.push(value);
}
// TODO should avoid the copies.
#[inline(always)]
pub fn extend_from_slice(&mut self, src: &Self)
where
NTY: Clone,
{
self.tss.extend_from_slice(&src.tss);
self.pulses.extend_from_slice(&src.pulses);
self.values.extend_from_slice(&src.values);
}
#[inline(always)]
pub fn clearx(&mut self) {
self.tss.clear();
self.pulses.clear();
self.values.clear();
}
}
impl<NTY> SitemtyFrameType for ScalarEvents<NTY>
where
NTY: NumOps,
@@ -33,6 +60,7 @@ impl<NTY> ScalarEvents<NTY> {
pub fn empty() -> Self {
Self {
tss: vec![],
pulses: vec![],
values: vec![],
}
}
@@ -148,8 +176,7 @@ where
NTY: NumOps,
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.values.push(src.values[ix].clone());
self.push(src.tss[ix], src.pulses[ix], src.values[ix].clone());
}
}
@@ -162,15 +189,13 @@ where
}
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.values.extend_from_slice(&src.values);
self.extend_from_slice(src);
}
}
impl<NTY> Clearable for ScalarEvents<NTY> {
fn clear(&mut self) {
self.tss.clear();
self.values.clear();
ScalarEvents::<NTY>::clearx(self);
}
}
@@ -234,6 +259,10 @@ pub struct EventValuesCollectorOutput<NTY> {
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
#[serde(rename = "pulseAnchor")]
pulse_anchor: u64,
#[serde(rename = "pulseOff")]
pulse_off: Vec<u64>,
values: Vec<NTY>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
range_complete: bool,
@@ -262,10 +291,13 @@ where
fn result(self) -> Result<Self::Output, Error> {
let tst = ts_offs_from_abs(&self.vals.tss);
let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses);
let ret = Self::Output {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
pulse_anchor,
pulse_off,
values: self.vals.values,
range_complete: self.range_complete,
timed_out: self.timed_out,
@@ -501,10 +533,9 @@ where
{
type Value = NTY;
fn append_event(ret: Option<Self>, ts: u64, value: Self::Value) -> Self {
fn append_event(ret: Option<Self>, ts: u64, pulse: u64, value: Self::Value) -> Self {
let mut ret = if let Some(ret) = ret { ret } else { Self::empty() };
ret.tss.push(ts);
ret.values.push(value);
ret.push(ts, pulse, value);
ret
}
}

View File

@@ -392,13 +392,11 @@ impl TimeBinnableTypeAggregator for StatsEventsAggregator {
impl EventAppendable for StatsEvents {
type Value = f32;
fn append_event(ret: Option<Self>, ts: u64, _value: Self::Value) -> Self {
let mut ret = if let Some(ret) = ret { ret } else { Self::empty() };
ret.tss.push(ts);
fn append_event(ret: Option<Self>, _ts: u64, _pulse: u64, _value: Self::Value) -> Self {
let ret = if let Some(ret) = ret { ret } else { Self::empty() };
// TODO
error!("TODO statsevents append_event");
err::todo();
ret.pulses.push(42);
ret
}
}

View File

@@ -17,9 +17,18 @@ use tokio::fs::File;
#[derive(Debug, Serialize, Deserialize)]
pub struct WaveEvents<NTY> {
pub tss: Vec<u64>,
pub pulses: Vec<u64>,
pub vals: Vec<Vec<NTY>>,
}
impl<NTY> WaveEvents<NTY> {
pub fn push(&mut self, ts: u64, pulse: u64, value: Vec<NTY>) {
self.tss.push(ts);
self.pulses.push(pulse);
self.vals.push(value);
}
}
impl<NTY> WaveEvents<NTY> {
pub fn shape(&self) -> Result<Shape, Error> {
if let Some(k) = self.vals.first() {
@@ -42,6 +51,7 @@ impl<NTY> WaveEvents<NTY> {
pub fn empty() -> Self {
Self {
tss: vec![],
pulses: vec![],
vals: vec![],
}
}
@@ -319,10 +329,9 @@ where
{
type Value = Vec<NTY>;
fn append_event(ret: Option<Self>, ts: u64, value: Self::Value) -> Self {
fn append_event(ret: Option<Self>, ts: u64, pulse: u64, value: Self::Value) -> Self {
let mut ret = if let Some(ret) = ret { ret } else { Self::empty() };
ret.tss.push(ts);
ret.vals.push(value);
ret.push(ts, pulse, value);
ret
}
}
@@ -455,6 +464,7 @@ pub struct WavePlainProc<NTY> {
_m1: PhantomData<NTY>,
}
// TODO purpose?
impl<NTY> EventsNodeProcessor for WavePlainProc<NTY>
where
NTY: NumOps,
@@ -472,11 +482,13 @@ where
let n = if n > 5 { 5 } else { n };
WaveEvents {
tss: inp.tss,
pulses: inp.pulses,
vals: inp.vals.iter().map(|k| k[..n].to_vec()).collect(),
}
} else {
WaveEvents {
tss: inp.tss,
pulses: inp.pulses,
vals: inp.vals,
}
}

View File

@@ -41,7 +41,7 @@ pub struct BodyStream {
pub inner: Box<dyn futures_core::Stream<Item = Result<bytes::Bytes, Error>> + Send + Unpin>,
}
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub enum ScalarType {
U8,
U16,
@@ -96,8 +96,8 @@ impl ScalarType {
I16 => "int16",
I32 => "int32",
I64 => "int64",
F32 => "float",
F64 => "double",
F32 => "float32",
F64 => "float64",
BOOL => "bool",
STRING => "string",
}
@@ -119,6 +119,7 @@ impl ScalarType {
"float32" => F32,
"float64" => F64,
"string" => STRING,
"bool" => BOOL,
_ => {
return Err(Error::with_msg_no_trace(format!(
"from_bsread_str can not understand bsread {}",
@@ -148,6 +149,13 @@ impl ScalarType {
Ok(ret)
}
pub fn from_scylla_i32(k: i32) -> Result<Self, Error> {
if k < 0 || k > u8::MAX as i32 {
return Err(Error::with_public_msg_no_trace(format!("bad scalar type index {k}")));
}
Self::from_dtype_index(k as u8)
}
pub fn bytes(&self) -> u8 {
use ScalarType::*;
match self {
@@ -200,6 +208,10 @@ impl ScalarType {
ScalarType::STRING => "string",
}
}
pub fn to_scylla_i32(&self) -> i32 {
self.index() as i32
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -232,6 +244,8 @@ pub struct Node {
pub sf_databuffer: Option<SfDatabuffer>,
pub archiver_appliance: Option<ArchiverAppliance>,
pub channel_archiver: Option<ChannelArchiver>,
#[serde(default)]
pub access_scylla: bool,
}
struct Visit1 {}
@@ -302,6 +316,7 @@ impl Node {
}),
archiver_appliance: None,
channel_archiver: None,
access_scylla: false,
}
}
}
@@ -314,6 +329,12 @@ pub struct Database {
pub pass: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScyllaConfig {
pub hosts: Vec<String>,
pub keyspace: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Cluster {
pub backend: String,
@@ -325,6 +346,7 @@ pub struct Cluster {
pub is_central_storage: bool,
#[serde(rename = "fileIoBufferSize", default)]
pub file_io_buffer_size: FileIoBufferSize,
pub scylla: Option<ScyllaConfig>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -563,7 +585,7 @@ pub struct ChannelConfig {
pub byte_order: ByteOrder,
}
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub enum Shape {
Scalar,
Wave(u32),
@@ -601,6 +623,7 @@ impl Shape {
}
}
// TODO use simply a list to represent all shapes: empty, or with 1 or 2 entries.
pub fn from_db_jsval(v: &JsVal) -> Result<Shape, Error> {
match v {
JsVal::String(s) => {
@@ -628,6 +651,41 @@ impl Shape {
))),
}
}
pub fn from_dims_str(s: &str) -> Result<Self, Error> {
let a: Vec<u32> = serde_json::from_str(s)?;
if a.len() == 0 {
Ok(Shape::Scalar)
} else if a.len() == 1 {
Ok(Shape::Wave(a[0]))
} else if a.len() == 2 {
Ok(Shape::Image(a[0], a[1]))
} else {
Err(Error::with_public_msg_no_trace("only scalar, 1d and 2d supported"))
}
}
pub fn from_scylla_shape_dims(v: &[i32]) -> Result<Self, Error> {
let res = if v.len() == 0 {
Shape::Scalar
} else if v.len() == 1 {
Shape::Wave(v[0] as u32)
} else if v.len() == 2 {
Shape::Image(v[0] as u32, v[1] as u32)
} else {
return Err(Error::with_public_msg_no_trace(format!("bad shape_dims {v:?}")));
};
Ok(res)
}
pub fn to_scylla_vec(&self) -> Vec<i32> {
use Shape::*;
match self {
Scalar => vec![],
Wave(n) => vec![*n as i32],
Image(n, m) => vec![*n as i32, *m as i32],
}
}
}
pub trait HasShape {
@@ -1636,6 +1694,7 @@ pub fn test_cluster() -> Cluster {
}),
archiver_appliance: None,
channel_archiver: None,
access_scylla: false,
})
.collect();
Cluster {
@@ -1647,6 +1706,7 @@ pub fn test_cluster() -> Cluster {
user: "testingdaq".into(),
pass: "testingdaq".into(),
},
scylla: None,
run_map_pulse_task: false,
is_central_storage: false,
file_io_buffer_size: Default::default(),
@@ -1667,6 +1727,7 @@ pub fn sls_test_cluster() -> Cluster {
channel_archiver: Some(ChannelArchiver {
data_base_paths: vec![test_data_base_path_channel_archiver_sls()],
}),
access_scylla: false,
})
.collect();
Cluster {
@@ -1678,6 +1739,7 @@ pub fn sls_test_cluster() -> Cluster {
user: "testingdaq".into(),
pass: "testingdaq".into(),
},
scylla: None,
run_map_pulse_task: false,
is_central_storage: false,
file_io_buffer_size: Default::default(),
@@ -1698,6 +1760,7 @@ pub fn archapp_test_cluster() -> Cluster {
archiver_appliance: Some(ArchiverAppliance {
data_base_paths: vec![test_data_base_path_archiver_appliance()],
}),
access_scylla: false,
})
.collect();
Cluster {
@@ -1709,6 +1772,7 @@ pub fn archapp_test_cluster() -> Cluster {
user: "testingdaq".into(),
pass: "testingdaq".into(),
},
scylla: None,
run_map_pulse_task: false,
is_central_storage: false,
file_io_buffer_size: Default::default(),

View File

@@ -4,6 +4,9 @@ version = "0.0.1-a.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
[lib]
path = "src/nodenet.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@@ -1,4 +1,4 @@
// TODO move these frame-related things out of crate disk. Probably better into `nodenet`
use crate::scylla::make_scylla_stream;
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_core::Stream;
@@ -6,11 +6,10 @@ use futures_util::StreamExt;
use items::frame::{decode_frame, make_term_frame};
use items::{Framable, StreamItem};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{log::*, AggKind};
use netpod::AggKind;
use netpod::{EventQueryJsonStringFrame, NodeConfigCached, PerfOpts};
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
@@ -18,37 +17,6 @@ use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::TcpStream;
use tracing::Instrument;
trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, ScyQueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> {
let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw);
let lis = tokio::net::TcpListener::bind(addr).await?;
@@ -150,59 +118,38 @@ async fn events_conn_handler_inner_try(
};
debug!("--- got query evq {:?}", evq);
// TODO make scylla usage configurable in config
if evq.channel.backend == "scylla" {
// Find the "series" id.
let scy = scylla::SessionBuilder::new()
.known_node("sf-daqbuf-34:8340")
.build()
.await
.err_conv();
let scy = match scy {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
let cql = "select dtype, series from series_by_channel where facility = ? and channel_name = ?";
let res = scy.query(cql, ()).await.err_conv();
let res = match res {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
let rows = res.rows_typed_or_empty::<(i32, i32)>();
for r in rows {
let r = match r.err_conv() {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
info!("got row {r:?}");
}
error!("TODO scylla fetch continue here");
err::todo();
}
let mut p1: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> =
if let Some(aa) = &node_config.node.channel_archiver {
match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
}
} else if let Some(aa) = &node_config.node.archiver_appliance {
match archapp_wrap::make_event_pipe(&evq, aa).await {
let mut p1: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> = if evq.channel.backend == "scylla" {
if node_config.node.access_scylla {
let scyco = node_config.node_config.cluster.scylla.as_ref().unwrap();
match make_scylla_stream(&evq, scyco).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
}
} else {
match evq.agg_kind {
AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
},
_ => match disk::raw::conn::make_event_pipe(&evq, node_config).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
},
}
};
Box::pin(futures_util::stream::empty())
}
} else if let Some(aa) = &node_config.node.channel_archiver {
match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
}
} else if let Some(aa) = &node_config.node.archiver_appliance {
match archapp_wrap::make_event_pipe(&evq, aa).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
}
} else {
match evq.agg_kind {
AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
},
_ => match disk::raw::conn::make_event_pipe(&evq, node_config).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
},
}
};
let mut buf_len_histo = HistoLog2::new(5);
while let Some(item) = p1.next().await {
let item = item.make_frame();

View File

@@ -1 +0,0 @@
pub mod conn;

2
nodenet/src/nodenet.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod conn;
pub mod scylla;

440
nodenet/src/scylla.rs Normal file
View File

@@ -0,0 +1,440 @@
use err::Error;
use futures_core::{Future, Stream};
use futures_util::FutureExt;
use items::scalarevents::ScalarEvents;
use items::waveevents::WaveEvents;
use items::{Framable, RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{NanoRange, ScalarType, ScyllaConfig};
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
use scylla::Session as ScySession;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, ScyQueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
macro_rules! impl_read_values_fut {
($fname:ident, $self:expr, $ts_msp:expr) => {{
let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone());
let fut = fut.map(|x| {
let x2 = match x {
Ok(k) => {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
Err(e) => {
//
Err(e)
}
};
//Box::new(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) as Box<dyn Framable + Send>});
let ret = Box::new(x2) as Box<dyn Framable + 'static>;
ret
});
let fut = Box::pin(fut) as Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>;
fut
}};
}
struct ReadValues {
series: i64,
scalar_type: ScalarType,
range: NanoRange,
ts_msp: VecDeque<u64>,
fut: Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>,
scy: Arc<ScySession>,
}
impl ReadValues {
fn new(
series: i64,
scalar_type: ScalarType,
range: NanoRange,
ts_msp: VecDeque<u64>,
scy: Arc<ScySession>,
) -> Self {
Self {
series,
scalar_type,
range,
ts_msp,
fut: Box::pin(futures_util::future::lazy(|_| panic!())),
scy,
}
}
fn next(&mut self) -> bool {
if let Some(ts_msp) = self.ts_msp.pop_front() {
self.fut = self.make_fut(ts_msp);
true
} else {
false
}
}
fn make_fut(&mut self, ts_msp: u64) -> Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>> {
// TODO this also needs to differentiate on Shape.
let fut = match &self.scalar_type {
ScalarType::F32 => {
impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp)
}
ScalarType::F64 => {
impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp)
}
_ => err::todoval(),
};
fut
}
}
enum FrState {
New,
FindSeries(Pin<Box<dyn Future<Output = Result<(i64, ScalarType), Error>> + Send>>),
FindMsp(Pin<Box<dyn Future<Output = Result<Vec<u64>, Error>> + Send>>),
ReadValues(ReadValues),
Done,
}
pub struct ScyllaFramableStream {
state: FrState,
facility: String,
channel_name: String,
range: NanoRange,
scalar_type: Option<ScalarType>,
series: i64,
scy: Arc<ScySession>,
}
impl ScyllaFramableStream {
pub fn new(evq: &RawEventsQuery, scy: Arc<ScySession>) -> Self {
Self {
state: FrState::New,
facility: evq.channel.backend.clone(),
channel_name: evq.channel.name().into(),
range: evq.range.clone(),
scalar_type: None,
series: 0,
scy,
}
}
}
impl Stream for ScyllaFramableStream {
type Item = Box<dyn Framable>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break match self.state {
FrState::New => {
let fut = find_series(self.facility.clone(), self.channel_name.clone(), self.scy.clone());
let fut = Box::pin(fut);
self.state = FrState::FindSeries(fut);
continue;
}
FrState::FindSeries(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok((series, scalar_type))) => {
info!("ScyllaFramableStream found series {}", series);
self.series = series;
self.scalar_type = Some(scalar_type);
let fut = find_ts_msp(series, self.range.clone(), self.scy.clone());
let fut = Box::pin(fut);
self.state = FrState::FindMsp(fut);
continue;
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Box::new(
Err(e) as Result<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>
)))
}
Pending => Pending,
},
FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok(ts_msp)) => {
info!("found ts_msp {ts_msp:?}");
// TODO get rid of into() for VecDeque
let mut st = ReadValues::new(
self.series,
self.scalar_type.as_ref().unwrap().clone(),
self.range.clone(),
ts_msp.into(),
self.scy.clone(),
);
if st.next() {
self.state = FrState::ReadValues(st);
} else {
self.state = FrState::Done;
}
continue;
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Box::new(
Err(e) as Result<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>
)))
}
Pending => Pending,
},
FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(item) => {
if st.next() {
} else {
info!("ReadValues exhausted");
self.state = FrState::Done;
}
Ready(Some(item))
}
Pending => Pending,
},
FrState::Done => Ready(None),
};
}
}
}
async fn find_series(facility: String, channel_name: String, scy: Arc<ScySession>) -> Result<(i64, ScalarType), Error> {
info!("find_series");
let res = {
let cql =
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?";
scy.query(cql, (&facility, &channel_name)).await.err_conv()?
};
let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec<i32>)>().collect();
if rows.len() > 1 {
error!("Multiple series found for channel, can not return data for ambiguous series");
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
if rows.len() < 1 {
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
let row = rows
.into_iter()
.next()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?
.err_conv()?;
info!("make_scylla_stream row {row:?}");
let series = row.0;
let scalar_type = ScalarType::from_scylla_i32(row.1)?;
info!("make_scylla_stream series {series}");
Ok((series, scalar_type))
}
async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc<ScySession>) -> Result<Vec<u64>, Error> {
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let res = scy
.query(cql, (series, range.beg as i64, range.end as i64))
.await
.err_conv()?;
let mut ret = vec![];
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
ret.push(row.0 as u64);
}
info!("found in total {} rows", ret.len());
Ok(ret)
}
macro_rules! read_next_scalar_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
ts_msp: u64,
range: NanoRange,
scy: Arc<ScySession>,
) -> Result<ScalarEvents<$st>, Error> {
type ST = $st;
type SCYTY = $scyty;
info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
// TODO add the constraint on range!
warn!("remove the limit clause, add range check");
// TODO use typed timestamp..
let ts_lsp_max = if range.end < ts_msp {
// We should not be here anyway.
warn!("range.end < ts_msp");
0
} else {
range.end - ts_msp
};
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ? and ts_lsp < ?"
);
let res = scy
.query(cql, (series, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut ret = ScalarEvents::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2 as ST;
ret.push(ts, pulse, value);
}
info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
Ok(ret)
}
};
}
macro_rules! read_next_1d_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
ts_msp: u64,
range: NanoRange,
scy: Arc<ScySession>,
) -> Result<WaveEvents<$st>, Error> {
type ST = $st;
type SCYTY = $scyty;
info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
// TODO add the constraint on range!
warn!("remove the limit clause, add range check");
// TODO use typed timestamp..
let ts_lsp_max = if range.end < ts_msp {
// We should not be here anyway.
warn!("range.end < ts_msp");
0
} else {
range.end - ts_msp
};
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ? and ts_lsp < ?"
);
let res = scy
.query(cql, (series, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut ret = ScalarEvents::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2 as ST;
ret.push(ts, pulse, value);
}
info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
Ok(ret)
}
};
}
read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32");
read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64");
read_next_1d_values!(read_next_values_1d_u16, u16, u16, "events_wave_u16");
pub async fn make_scylla_stream(
evq: &RawEventsQuery,
scyco: &ScyllaConfig,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
info!("make_scylla_stream open scylla connection");
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.build()
.await
.err_conv()?;
let scy = Arc::new(scy);
let res = Box::pin(ScyllaFramableStream::new(evq, scy)) as _;
Ok(res)
}
pub async fn make_scylla_stream_2(
evq: &RawEventsQuery,
scyco: &ScyllaConfig,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
// Find the "series" id.
info!("make_scylla_stream finding series id");
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.build()
.await
.err_conv()?;
let res = {
let cql =
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?";
scy.query(cql, (&evq.channel.backend, evq.channel.name()))
.await
.err_conv()?
};
let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec<i32>)>().collect();
if rows.len() > 1 {
error!("Multiple series found for channel, can not return data for ambiguous series");
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
if rows.len() < 1 {
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
let row = rows
.into_iter()
.next()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?
.err_conv()?;
info!("make_scylla_stream row {row:?}");
let series = row.0;
info!("make_scylla_stream series {series}");
let _expand = evq.agg_kind.need_expand();
let range = &evq.range;
{
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let res = scy
.query(cql, (series, range.beg as i64, range.end as i64))
.await
.err_conv()?;
let mut rc = 0;
for _row in res.rows_or_empty() {
rc += 1;
}
info!("found in total {} rows", rc);
}
error!("TODO scylla fetch continue here");
let res = Box::pin(futures_util::stream::empty());
Ok(res)
}