Deliver enum channel as both numeric and stringified

This commit is contained in:
Dominik Werder
2024-09-03 16:33:40 +02:00
parent 4cc0f65a0b
commit 55b3bf4acd
21 changed files with 712 additions and 204 deletions

View File

@@ -5,3 +5,4 @@
- [Search Channels](search.md)
- [Binned Data](bins.md)
- [Event Data](events.md)
- [Map Pulse to Timestamp](pulsemap.md)

View File

@@ -29,7 +29,7 @@ issue another request with `begDate` as given by `continueAt`.
## Events as framed JSON stream
To download larger amounts of JSON data it recommended to use the `json-framed` content encoding.
To download larger amounts data as JSON it is recommended to use the `json-framed` content encoding.
Using this encoding, the server can send the requested events as a stream of json objects, where each
json object contains a batch of events.
This content encoding is triggered via the `Accept: application/json-framed` header in the request.
@@ -44,10 +44,10 @@ The returned body looks like:
where each `[JSON-frame]` looks like:
```
[length N of the following JSON object: uint32 little-endian]
[reserved: 12 bytes of zero-padding]
[number of bytes N of the following json-encoded data, as ASCII-encoded number]
[newline]
[JSON object: N bytes]
[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0]
[newline]
```
Note: "data" objects are currently identified by the presence of the `tss` key.

View File

@@ -0,0 +1,12 @@
[package]
name = "daqbuf-redis"
version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
err = { path = "../err" }
taskrun = { path = "../taskrun" }
redis = { version = "0.26.1", features = [] }

View File

@@ -0,0 +1 @@

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.3-aa.0"
version = "0.5.3-aa.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -81,29 +81,14 @@ pub fn internal_error() -> http::Response<StreamBody> {
}
pub fn error_response(msg: String, reqid: impl AsRef<str>) -> http::Response<StreamBody> {
let status = StatusCode::INTERNAL_SERVER_ERROR;
let js = serde_json::json!({
"message": msg.to_string(),
"requestid": reqid.as_ref(),
});
if let Ok(body) = serde_json::to_string_pretty(&js) {
match Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(body_string(body))
{
Ok(res) => res,
Err(e) => {
error!("can not generate http error response {e}");
internal_error()
}
}
} else {
internal_error()
}
error_status_response(StatusCode::INTERNAL_SERVER_ERROR, msg, reqid)
}
pub fn not_found_response(msg: String, reqid: impl AsRef<str>) -> http::Response<StreamBody> {
let status = StatusCode::NOT_FOUND;
error_status_response(StatusCode::NOT_FOUND, msg, reqid)
}
pub fn error_status_response(status: StatusCode, msg: String, reqid: impl AsRef<str>) -> http::Response<StreamBody> {
let js = serde_json::json!({
"message": msg.to_string(),
"requestid": reqid.as_ref(),

View File

@@ -41,4 +41,5 @@ nodenet = { path = "../nodenet" }
commonio = { path = "../commonio" }
taskrun = { path = "../taskrun" }
scyllaconn = { path = "../scyllaconn" }
daqbuf-redis = { path = "../daqbuf-redis" }
httpclient = { path = "../httpclient" }

View File

@@ -19,9 +19,9 @@ use http::StatusCode;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::error_response;
use httpclient::not_found_response;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamBody;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::log::*;
@@ -50,6 +50,44 @@ pub enum Error {
EventsJson(#[from] streams::plaineventsjson::Error),
}
impl Error {
pub fn user_message(&self) -> String {
match self {
Error::ChannelNotFound => format!("channel not found"),
_ => self.to_string(),
}
}
pub fn status_code(&self) -> StatusCode {
match self {
Error::ChannelNotFound => StatusCode::NOT_FOUND,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
pub fn response(&self, reqid: &str) -> http::Response<StreamBody> {
let js = serde_json::json!({
"message": self.user_message(),
"requestid": reqid,
});
if let Ok(body) = serde_json::to_string_pretty(&js) {
match http::Response::builder()
.status(self.status_code())
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(httpclient::body_string(body))
{
Ok(res) => res,
Err(e) => {
error!("can not generate http error response {e}");
httpclient::internal_error()
}
}
} else {
httpclient::internal_error()
}
}
}
impl From<crate::channelconfig::Error> for Error {
fn from(value: crate::channelconfig::Error) -> Self {
use crate::channelconfig::Error::*;
@@ -100,16 +138,7 @@ impl EventsHandler {
.await
{
Ok(ret) => Ok(ret),
Err(e) => match e {
Error::ChannelNotFound => {
let res = not_found_response("channel not found".into(), ctx.reqid());
Ok(res)
}
_ => {
error!("EventsHandler sees: {e}");
Ok(error_response(e.public_message(), ctx.reqid()))
}
},
Err(e) => Ok(e.response(ctx.reqid())),
}
}
}

View File

@@ -1,5 +1,4 @@
use crate::err::Error;
use crate::RetrievalError;
use err::ToPublicError;
use http::Response;
use http::StatusCode;

View File

@@ -1,5 +1,6 @@
use crate::bodystream::response;
use crate::err::Error;
use crate::requests::accepts_json_or_all;
use crate::ReqCtx;
use crate::ServiceSharedResources;
use futures_util::StreamExt;
@@ -7,6 +8,8 @@ use http::Method;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::body_string;
use httpclient::error_response;
use httpclient::error_status_response;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
@@ -113,35 +116,36 @@ impl ChannelStatusEventsHandler {
pub async fn handle(
&self,
req: Requ,
_ctx: &ReqCtx,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
let url = req_uri_to_url(req.uri())?;
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, shared_res, ncc).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => {
error!("{e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(body_string(format!("{:?}", e.public_msg())))?)
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
if req.method() != Method::GET {
Ok(error_status_response(
StatusCode::METHOD_NOT_ALLOWED,
"expect a GET request".into(),
ctx.reqid(),
))
} else if !accepts_json_or_all(req.headers()) {
Ok(error_status_response(
StatusCode::NOT_ACCEPTABLE,
"server can only deliver json".into(),
ctx.reqid(),
))
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
let url = req_uri_to_url(req.uri())?;
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, shared_res, ncc).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => {
error!("{e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(body_string(format!("{:?}", e.public_msg())))?)
}
}
}
}

View File

@@ -55,6 +55,7 @@ use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
use url::Url;
static USE_CACHE: bool = false;
static CACHE: Cache<u64, u64> = Cache::new();
pub struct MapPulseHisto {
@@ -1274,51 +1275,69 @@ impl MapPulseHttpFunction {
}
trace!("MapPulseHttpFunction handle uri: {:?}", req.uri());
let pulse = extract_path_number_after_prefix(&req, Self::prefix())?;
match CACHE.portal(pulse) {
CachePortal::Fresh => {
let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
if USE_CACHE {
match CACHE.portal(pulse) {
CachePortal::Fresh => {
let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
}
}
if max > 0 {
let val = histo.tss[i1];
CACHE.set_value(pulse, val);
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(body_empty())?)
}
}
if max > 0 {
let val = histo.tss[i1];
CACHE.set_value(pulse, val);
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(body_empty())?)
}
}
CachePortal::Existing(rx) => {
trace!("waiting for already running pulse map pulse {pulse}");
match rx.recv().await {
Ok(_) => {
error!("should never recv from existing operation pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
CachePortal::Existing(rx) => {
trace!("waiting for already running pulse map pulse {pulse}");
match rx.recv().await {
Ok(_) => {
error!("should never recv from existing operation pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
Err(_e) => match CACHE.portal(pulse) {
CachePortal::Known(ts) => {
info!("pulse {pulse} known from cache ts {ts}");
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?)
}
CachePortal::Fresh => {
error!("pulse {pulse} woken up, but fresh");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
CachePortal::Existing(..) => {
error!("pulse {pulse} woken up, but existing");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
},
}
Err(_e) => match CACHE.portal(pulse) {
CachePortal::Known(ts) => {
info!("pulse {pulse} known from cache ts {ts}");
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?)
}
CachePortal::Fresh => {
error!("pulse {pulse} woken up, but fresh");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
CachePortal::Existing(..) => {
error!("pulse {pulse} woken up, but existing");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
},
}
CachePortal::Known(ts) => {
info!("pulse {pulse} in cache ts {ts}");
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?)
}
}
CachePortal::Known(ts) => {
info!("pulse {pulse} in cache ts {ts}");
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?)
} else {
let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
}
}
if max > 0 {
let val = histo.tss[i1];
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(body_empty())?)
}
}
}
@@ -1342,58 +1361,80 @@ impl Api4MapPulseHttpFunction {
pub async fn find_timestamp(q: MapPulseQuery, ncc: &NodeConfigCached) -> Result<Option<u64>, Error> {
use crate::cache::CachePortal;
let pulse = q.pulse;
let res = match CACHE.portal(pulse) {
CachePortal::Fresh => {
trace!("value not yet in cache pulse {pulse}");
let histo = MapPulseHistoHttpFunction::histo(pulse, ncc).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
let res = if USE_CACHE {
match CACHE.portal(pulse) {
CachePortal::Fresh => {
trace!("value not yet in cache pulse {pulse}");
let histo = MapPulseHistoHttpFunction::histo(pulse, ncc).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
}
}
if histo.tss.len() > 1 {
warn!("Ambigious pulse map pulse {} histo {:?}", pulse, histo);
}
if max > 0 {
let val = histo.tss[i1];
CACHE.set_value(pulse, val);
Ok(Some(val))
} else {
Ok(None)
}
}
if histo.tss.len() > 1 {
warn!("Ambigious pulse map pulse {} histo {:?}", pulse, histo);
}
if max > 0 {
let val = histo.tss[i1];
CACHE.set_value(pulse, val);
Ok(Some(val))
} else {
Ok(None)
}
}
CachePortal::Existing(rx) => {
trace!("waiting for already running pulse map pulse {pulse}");
match rx.recv().await {
Ok(_) => {
error!("should never recv from existing operation pulse {pulse}");
Err(Error::with_msg_no_trace("map pulse error"))
}
Err(_e) => {
trace!("woken up while value wait pulse {pulse}");
match CACHE.portal(pulse) {
CachePortal::Known(val) => {
trace!("good, value after wakeup pulse {pulse}");
Ok(Some(val))
}
CachePortal::Fresh => {
error!("woken up, but portal fresh pulse {pulse}");
Err(Error::with_msg_no_trace("map pulse error"))
}
CachePortal::Existing(..) => {
error!("woken up, but portal existing pulse {pulse}");
Err(Error::with_msg_no_trace("map pulse error"))
CachePortal::Existing(rx) => {
trace!("waiting for already running pulse map pulse {pulse}");
match rx.recv().await {
Ok(_) => {
error!("should never recv from existing operation pulse {pulse}");
Err(Error::with_msg_no_trace("map pulse error"))
}
Err(_e) => {
trace!("woken up while value wait pulse {pulse}");
match CACHE.portal(pulse) {
CachePortal::Known(val) => {
trace!("good, value after wakeup pulse {pulse}");
Ok(Some(val))
}
CachePortal::Fresh => {
error!("woken up, but portal fresh pulse {pulse}");
Err(Error::with_msg_no_trace("map pulse error"))
}
CachePortal::Existing(..) => {
error!("woken up, but portal existing pulse {pulse}");
Err(Error::with_msg_no_trace("map pulse error"))
}
}
}
}
}
CachePortal::Known(val) => {
trace!("value already in cache pulse {pulse} ts {val}");
Ok(Some(val))
}
}
CachePortal::Known(val) => {
trace!("value already in cache pulse {pulse} ts {val}");
} else {
trace!("value not yet in cache pulse {pulse}");
let histo = MapPulseHistoHttpFunction::histo(pulse, ncc).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
}
}
if histo.tss.len() > 1 {
warn!("Ambigious pulse map pulse {} histo {:?}", pulse, histo);
}
if max > 0 {
let val = histo.tss[i1];
Ok(Some(val))
} else {
Ok(None)
}
};
res

View File

@@ -83,6 +83,231 @@ macro_rules! trace_binning {
};
}
#[derive(Debug)]
pub struct EventsDim0EnumCollector {
vals: EventsDim0Enum,
range_final: bool,
timed_out: bool,
needs_continue_at: bool,
}
impl EventsDim0EnumCollector {
pub fn new() -> Self {
Self {
vals: EventsDim0Enum::new(),
range_final: false,
timed_out: false,
needs_continue_at: false,
}
}
}
impl TypeName for EventsDim0EnumCollector {
fn type_name(&self) -> String {
"EventsDim0EnumCollector".into()
}
}
impl WithLen for EventsDim0EnumCollector {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
impl ByteEstimate for EventsDim0EnumCollector {
fn byte_estimate(&self) -> u64 {
// TODO does it need to be more accurate?
30 * self.len() as u64
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EventsDim0EnumCollectorOutput {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: VecDeque<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: VecDeque<u64>,
#[serde(rename = "values")]
vals: VecDeque<u16>,
#[serde(rename = "valuestrings")]
valstrs: VecDeque<String>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
}
impl WithLen for EventsDim0EnumCollectorOutput {
fn len(&self) -> usize {
todo!()
}
}
impl AsAnyRef for EventsDim0EnumCollectorOutput {
fn as_any_ref(&self) -> &dyn Any {
todo!()
}
}
impl AsAnyMut for EventsDim0EnumCollectorOutput {
fn as_any_mut(&mut self) -> &mut dyn Any {
todo!()
}
}
impl ToJsonResult for EventsDim0EnumCollectorOutput {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
todo!()
}
}
impl Collected for EventsDim0EnumCollectorOutput {}
impl CollectorType for EventsDim0EnumCollector {
type Input = EventsDim0Enum;
type Output = EventsDim0EnumCollectorOutput;
fn ingest(&mut self, src: &mut EventsDim0Enum) {
self.vals.tss.append(&mut src.tss);
self.vals.values.append(&mut src.values);
self.vals.valuestrs.append(&mut src.valuestrs);
}
fn set_range_complete(&mut self) {
self.range_final = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
self.needs_continue_at = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
&mut self,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<EventsDim0EnumCollectorOutput, Error> {
debug!(
"{} result() needs_continue_at {}",
self.type_name(),
self.needs_continue_at
);
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
// The amount of the delta must take into account what kind of timestamp precision the client
// can parse and handle.
let vals = &mut self.vals;
let continue_at = if self.needs_continue_at {
if let Some(ts) = vals.tss.back() {
let x = Some(IsoDateTime::from_u64(*ts / MS * MS + MS));
x
} else {
if let Some(range) = &range {
match range {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)),
SeriesRange::PulseRange(_) => {
error!("TODO emit create continueAt for pulse range");
Some(IsoDateTime::from_u64(0))
}
}
} else {
Some(IsoDateTime::from_u64(0))
}
}
} else {
None
};
let tss_sl = vals.tss.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
let valixs = mem::replace(&mut vals.values, VecDeque::new());
let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new());
let vals = valixs;
if ts_off_ms.len() != ts_off_ns.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
if ts_off_ms.len() != vals.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
if ts_off_ms.len() != valstrs.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
let ret = Self::Output {
ts_anchor_sec,
ts_off_ms,
ts_off_ns,
vals,
valstrs,
range_final: self.range_final,
timed_out: self.timed_out,
continue_at,
};
Ok(ret)
}
}
// Experiment with having this special case for enums
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EventsDim0Enum {
pub tss: VecDeque<u64>,
pub values: VecDeque<u16>,
pub valuestrs: VecDeque<String>,
}
impl EventsDim0Enum {
pub fn new() -> Self {
Self {
tss: VecDeque::new(),
values: VecDeque::new(),
valuestrs: VecDeque::new(),
}
}
pub fn push_back(&mut self, ts: u64, value: u16, valuestr: String) {
self.tss.push_back(ts);
self.values.push_back(value);
self.valuestrs.push_back(valuestr);
}
}
impl TypeName for EventsDim0Enum {
fn type_name(&self) -> String {
"EventsDim0Enum".into()
}
}
impl AsAnyRef for EventsDim0Enum {
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl AsAnyMut for EventsDim0Enum {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl WithLen for EventsDim0Enum {
fn len(&self) -> usize {
self.tss.len()
}
}
impl Collectable for EventsDim0Enum {
fn new_collector(&self) -> Box<dyn Collector> {
Box::new(EventsDim0EnumCollector::new())
}
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct EventsDim0NoPulse<STY> {
pub tss: VecDeque<u64>,
@@ -131,6 +356,14 @@ impl<STY> EventsDim0<STY> {
pub fn tss(&self) -> &VecDeque<u64> {
&self.tss
}
// only for testing at the moment
pub fn private_values_ref(&self) -> &VecDeque<STY> {
&self.values
}
pub fn private_values_mut(&mut self) -> &mut VecDeque<STY> {
&mut self.values
}
}
impl<STY> AsAnyRef for EventsDim0<STY>

View File

@@ -0,0 +1,109 @@
#[derive(Debug, Clone)]
pub enum ChannelStatusClosedReason {
ShutdownCommand,
ChannelRemove,
ProtocolError,
FrequencyQuota,
BandwidthQuota,
InternalError,
IocTimeout,
NoProtocol,
ProtocolDone,
ConnectFail,
IoError,
}
#[derive(Debug, Clone)]
pub enum ChannelStatus {
AssignedToAddress,
Opened,
Closed(ChannelStatusClosedReason),
Pong,
MonitoringSilenceReadStart,
MonitoringSilenceReadTimeout,
MonitoringSilenceReadUnchanged,
HaveStatusId,
HaveAddress,
}
impl ChannelStatus {
pub fn to_kind(&self) -> u32 {
use ChannelStatus::*;
use ChannelStatusClosedReason::*;
match self {
AssignedToAddress => 24,
Opened => 1,
Closed(x) => match x {
ShutdownCommand => 2,
ChannelRemove => 3,
ProtocolError => 4,
FrequencyQuota => 5,
BandwidthQuota => 6,
InternalError => 7,
IocTimeout => 8,
NoProtocol => 9,
ProtocolDone => 10,
ConnectFail => 11,
IoError => 12,
},
Pong => 25,
MonitoringSilenceReadStart => 26,
MonitoringSilenceReadTimeout => 27,
MonitoringSilenceReadUnchanged => 28,
HaveStatusId => 29,
HaveAddress => 30,
}
}
pub fn from_kind(kind: u32) -> Result<Self, err::Error> {
use ChannelStatus::*;
use ChannelStatusClosedReason::*;
let ret = match kind {
1 => Opened,
2 => Closed(ShutdownCommand),
3 => Closed(ChannelRemove),
4 => Closed(ProtocolError),
5 => Closed(FrequencyQuota),
6 => Closed(BandwidthQuota),
7 => Closed(InternalError),
8 => Closed(IocTimeout),
9 => Closed(NoProtocol),
10 => Closed(ProtocolDone),
11 => Closed(ConnectFail),
12 => Closed(IoError),
24 => AssignedToAddress,
25 => Pong,
26 => MonitoringSilenceReadStart,
27 => MonitoringSilenceReadTimeout,
28 => MonitoringSilenceReadUnchanged,
29 => HaveStatusId,
30 => HaveAddress,
_ => {
return Err(err::Error::with_msg_no_trace(format!(
"unknown ChannelStatus kind {kind}"
)));
}
};
Ok(ret)
}
pub fn to_u64(&self) -> u64 {
self.to_kind() as u64
}
pub fn to_user_variant_string(&self) -> String {
use ChannelStatus::*;
let ret = match self {
AssignedToAddress => "Located",
Opened => "Opened",
Closed(_) => "Closed",
Pong => "Pongg",
MonitoringSilenceReadStart => "MSRS",
MonitoringSilenceReadTimeout => "MSRT",
MonitoringSilenceReadUnchanged => "MSRU",
HaveStatusId => "HaveStatusId",
HaveAddress => "HaveAddress",
};
ret.into()
}
}

View File

@@ -1,3 +1,4 @@
pub mod channelstatus;
pub mod hex;
pub mod histo;
pub mod query;
@@ -121,10 +122,11 @@ pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ";
const TEST_BACKEND: &str = "testbackend-00";
#[allow(non_upper_case_globals)]
pub const trigger: [&'static str; 2] = [
pub const trigger: [&'static str; 0] = [
//
"S30CB05-VMCP-A010:PRESSURE",
"ATSRF-CAV:TUN-DETUNING-REL-ACT",
// "S30CB05-VMCP-A010:PRESSURE",
// "ATSRF-CAV:TUN-DETUNING-REL-ACT",
// "S30CB14-KBOC-HPPI1:PI-OUT",
];
pub const TRACE_SERIES_ID: [u64; 1] = [
@@ -598,6 +600,10 @@ impl<const N: usize> StringFix<N> {
len: 0,
}
}
pub fn string(&self) -> String {
self.data[..self.len as usize].iter().map(|x| *x).collect()
}
}
impl<const N: usize, T> From<T> for StringFix<N>
@@ -634,7 +640,7 @@ mod string_fix_impl_serde {
where
S: serde::Serializer,
{
ser.serialize_unit()
ser.serialize_str(todo!("StringFix Serialize"))
}
}
@@ -643,7 +649,8 @@ mod string_fix_impl_serde {
where
D: serde::Deserializer<'de>,
{
de.deserialize_unit(Vis::<N>)
todo!("StringFix Deserialize")
// de.deserialize_unit(Vis::<N>)
}
}
@@ -660,7 +667,8 @@ mod string_fix_impl_serde {
where
E: serde::de::Error,
{
Ok(Self::Value::new())
todo!("StringFix Visitor")
// Ok(Self::Value::new())
}
}
}
@@ -668,12 +676,20 @@ mod string_fix_impl_serde {
#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, PartialEq)]
pub struct EnumVariant {
ix: u16,
name: StringFix<26>,
name: String,
}
impl EnumVariant {
pub fn new(ix: u16, name: StringFix<26>) -> Self {
Self { ix, name }
pub fn new(ix: u16, name: impl Into<String>) -> Self {
Self { ix, name: name.into() }
}
pub fn ix(&self) -> u16 {
self.ix
}
pub fn name_string(&self) -> String {
self.name.clone()
}
}
@@ -681,7 +697,7 @@ impl Default for EnumVariant {
fn default() -> Self {
Self {
ix: u16::MAX,
name: StringFix::new(),
name: String::new(),
}
}
}
@@ -1852,7 +1868,7 @@ impl TsNano {
pub fn from_system_time(st: SystemTime) -> Self {
let tsunix = st.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
let x = tsunix.as_secs() * 1000000000 + tsunix.subsec_nanos() as u64;
let x = tsunix.as_secs() * 1_000_000_000 + tsunix.subsec_nanos() as u64;
Self::from_ns(x)
}
@@ -2430,8 +2446,12 @@ impl BinnedRangeEnum {
if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?;
}
if min_bin_count > 20000 {
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
let bin_count_max = i32::MAX as u32;
if min_bin_count > bin_count_max {
Err(Error::with_msg(format!(
"min_bin_count > {}: {}",
bin_count_max, min_bin_count
)))?;
}
let du = b.sub(&a);
let max_bin_len = du.div_n(min_bin_count as u64);

View File

@@ -5,14 +5,12 @@ use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME;
use items_2::channelevents::ChannelEvents;
use items_2::empty::empty_events_dyn_ev;
use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::decode_frame;
@@ -115,11 +113,8 @@ async fn make_channel_events_stream(
scyqueue: Option<&ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
let empty = empty_events_dyn_ev(subq.ch_conf().scalar_type(), subq.ch_conf().shape())?;
let empty = sitem_data(ChannelEvents::Events(empty));
let stream = make_channel_events_stream_data(subq, reqctx, scyqueue, ncc).await?;
let ret = futures_util::stream::iter([empty]).chain(stream);
let ret = Box::pin(ret);
let ret = Box::pin(stream);
Ok(ret)
}

View File

@@ -9,6 +9,7 @@ use items_0::streamitem::StreamItem;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ChConf;
use netpod::SeriesKind;
use query::api4::events::EventsSubQuery;
use scyllaconn::events2::events::EventReadOpts;
use scyllaconn::events2::mergert;
@@ -40,7 +41,7 @@ pub async fn scylla_channel_event_stream(
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
let x = scyllaconn::events2::events::EventsStreamRt::new(
rt,
chconf,
chconf.clone(),
evq.range().into(),
readopts,
scyqueue.clone(),
@@ -48,10 +49,45 @@ pub async fn scylla_channel_event_stream(
.map_err(|e| scyllaconn::events2::mergert::Error::from(e));
Box::pin(x)
} else {
let x = scyllaconn::events2::mergert::MergeRts::new(chconf, evq.range().into(), readopts, scyqueue.clone());
let x =
scyllaconn::events2::mergert::MergeRts::new(chconf.clone(), evq.range().into(), readopts, scyqueue.clone());
Box::pin(x)
};
let stream = stream
.map(move |item| match item {
Ok(k) => match k {
ChannelEvents::Events(mut k) => {
if let SeriesKind::ChannelStatus = chconf.kind() {
use items_0::Empty;
type C1 = items_2::eventsdim0::EventsDim0<u64>;
type C2 = items_2::eventsdim0::EventsDim0<String>;
if let Some(j) = k.as_any_mut().downcast_mut::<C1>() {
let mut g = C2::empty();
let tss = j.tss();
let vals = j.private_values_ref();
for (&ts, &val) in tss.iter().zip(vals.iter()) {
use netpod::channelstatus as cs2;
let val = match cs2::ChannelStatus::from_kind(val as _) {
Ok(x) => x.to_user_variant_string(),
Err(_) => format!("{}", val),
};
if val.len() != 0 {
g.push_back(ts, 0, val);
}
}
Ok(ChannelEvents::Events(Box::new(g)))
// Ok(ChannelEvents::Events(k))
} else {
Ok(ChannelEvents::Events(k))
}
} else {
Ok(ChannelEvents::Events(k))
}
}
ChannelEvents::Status(k) => Ok(ChannelEvents::Status(k)),
},
_ => item,
})
.map(move |item| match &item {
Ok(k) => match k {
ChannelEvents::Events(k) => {

View File

@@ -192,11 +192,6 @@ impl FromUrl for BinnedQuery {
channel: SfDbChannel::from_pairs(&pairs)?,
range: SeriesRange::from_pairs(pairs)?,
bin_count: pairs.get("binCount").map_or(None, |x| x.parse().ok()).unwrap_or(10),
// bin_count: pairs
// .get("binCount")
// .ok_or_else(|| Error::with_msg_no_trace("missing binCount"))?
// .parse()
// .map_err(|e| Error::with_msg_no_trace(format!("can not parse binCount {:?}", e)))?,
transform: TransformQuery::from_pairs(pairs)?,
cache_usage: CacheUsage::from_pairs(&pairs)?,
buf_len_disk_io: pairs

View File

@@ -26,6 +26,15 @@ use std::pin::Pin;
use std::sync::Arc;
use tracing::Instrument;
#[allow(unused)]
macro_rules! trace_fetch {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaReadEvents")]
pub enum Error {
@@ -535,22 +544,18 @@ fn convert_rows_enum(
last_before: &mut Option<(TsNano, EnumVariant)>,
) -> Result<<EnumVariant as ValTy>::Container, Error> {
let mut ret = <EnumVariant as ValTy>::Container::empty();
trace_fetch!("convert_rows_enum {}", <EnumVariant as ValTy>::st_name());
for row in rows {
let (ts, value) = if with_values {
if EnumVariant::is_valueblob() {
if true {
return Err(Error::Logic);
}
let row: (i64, Vec<u8>) = row.into_typed()?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = ValTy::from_valueblob(row.1);
(ts, value)
return Err(Error::Logic);
} else {
let row: (i64, i16, String) = row.into_typed()?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let val = row.1 as u16;
let valstr = row.2;
let value = EnumVariant::new(val, valstr.into());
let value = EnumVariant::new(val, valstr);
info!("read enum variant {:?} {:?}", value, value.name_string());
(ts, value)
}
} else {
@@ -582,5 +587,6 @@ fn convert_rows_enum(
}
}
}
trace_fetch!("convert_rows_enum return {:?}", ret);
Ok(ret)
}

View File

@@ -19,7 +19,6 @@ use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsMsVecFmt;
use netpod::TsNano;
use series::SeriesId;
use std::collections::VecDeque;
use std::pin::Pin;
@@ -107,12 +106,10 @@ enum ReadingState {
}
struct ReadingBck {
scyqueue: ScyllaQueue,
reading_state: ReadingState,
}
struct ReadingFwd {
scyqueue: ScyllaQueue,
reading_state: ReadingState,
}
@@ -136,6 +133,7 @@ pub struct EventsStreamRt {
msp_buf: VecDeque<TsMs>,
msp_buf_bck: VecDeque<TsMs>,
out: VecDeque<Box<dyn Events>>,
out_cnt: u64,
ts_seen_max: u64,
}
@@ -162,6 +160,7 @@ impl EventsStreamRt {
msp_buf: VecDeque::new(),
msp_buf_bck: VecDeque::new(),
out: VecDeque::new(),
out_cnt: 0,
ts_seen_max: 0,
}
}
@@ -218,14 +217,7 @@ impl EventsStreamRt {
ScalarType::F64 => read_next_values::<f64>(opts).await,
ScalarType::BOOL => read_next_values::<bool>(opts).await,
ScalarType::STRING => read_next_values::<String>(opts).await,
ScalarType::Enum => {
trace_fetch!(
"make_read_events_fut {:?} {:?} ------------- good",
shape,
scalar_type
);
read_next_values::<EnumVariant>(opts).await
}
ScalarType::Enum => read_next_values::<EnumVariant>(opts).await,
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => read_next_values::<Vec<u8>>(opts).await,
@@ -284,7 +276,6 @@ impl EventsStreamRt {
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, true, scyqueue);
self.state = State::ReadingBck(ReadingBck {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchEvents(FetchEvents { fut }),
});
} else {
@@ -306,14 +297,12 @@ impl EventsStreamRt {
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, false, scyqueue);
self.state = State::ReadingFwd(ReadingFwd {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchEvents(FetchEvents { fut }),
});
} else {
trace_fetch!("setup_fwd_read no msp");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingFwd(ReadingFwd {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
}
@@ -393,6 +382,7 @@ impl Stream for EventsStreamRt {
}
}
trace_emit!("deliver item {}", item.output_info());
self.out_cnt += item.len() as u64;
break Ready(Some(Ok(ChannelEvents::Events(item))));
}
break match &mut self.state {
@@ -401,14 +391,12 @@ impl Stream for EventsStreamRt {
trace_fetch!("State::Begin Bck");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingBck(ReadingBck {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
} else {
trace_fetch!("State::Begin Fwd");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingFwd(ReadingFwd {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
}
@@ -424,7 +412,6 @@ impl Stream for EventsStreamRt {
} else {
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingBck(ReadingBck {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
}
@@ -507,7 +494,24 @@ impl Stream for EventsStreamRt {
},
State::InputDone => {
if self.out.len() == 0 {
Ready(None)
self.state = State::Done;
if self.out_cnt == 0 {
let d =
items_2::empty::empty_events_dyn_ev(self.ch_conf.scalar_type(), self.ch_conf.shape());
match d {
Ok(empty) => {
// let empty = items_0::streamitem::sitem_data(ChannelEvents::Events(empty));
let item = items_2::channelevents::ChannelEvents::Events(empty);
Ready(Some(Ok(item)))
}
Err(_) => {
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
}
}
} else {
continue;
}
} else {
continue;
}

View File

@@ -39,9 +39,39 @@ pub async fn plain_events_json(
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
let stream = stream.map(move |k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Collectable> = Box::new(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
on_sitemty_data!(k, |mut k: Box<dyn items_0::Events>| {
if let Some(j) = k.as_any_mut().downcast_mut::<items_2::channelevents::ChannelEvents>() {
use items_0::AsAnyMut;
match j {
items_2::channelevents::ChannelEvents::Events(m) => {
if let Some(g) = m
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<netpod::EnumVariant>>()
{
trace!("consider container EnumVariant");
let mut out = items_2::eventsdim0::EventsDim0Enum::new();
for (&ts, val) in g.tss.iter().zip(g.values.iter()) {
out.push_back(ts, val.ix(), val.name_string());
}
let k: Box<dyn Collectable> = Box::new(out);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
} else {
trace!("consider container channel events other events {}", k.type_name());
let k: Box<dyn Collectable> = Box::new(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
}
items_2::channelevents::ChannelEvents::Status(_) => {
trace!("consider container channel events status {}", k.type_name());
let k: Box<dyn Collectable> = Box::new(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
}
} else {
trace!("consider container else {}", k.type_name());
let k: Box<dyn Collectable> = Box::new(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
})
});

View File

@@ -191,10 +191,14 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
let tracing_trace = collect_env_list("TRACING_TRACE");
// let tracing_trace_always = collect_env_list("TRACING_TRACE_ALWAYS");
let filter_3 = tracing_subscriber::filter::DynFilterFn::new(move |meta, ctx| {
let mut tmp1 = String::with_capacity(128);
if *meta.level() >= tracing::Level::TRACE {
let mut target_match = false;
for e in &tracing_trace {
if meta.target().starts_with(e) {
tmp1.clear();
tmp1.push_str(e);
tmp1.push_str("::");
if meta.target() == &tmp1[..tmp1.len() - 2] || meta.target().starts_with(&tmp1) {
target_match = true;
break;
}
@@ -218,7 +222,10 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
} else if *meta.level() >= tracing::Level::DEBUG {
let mut target_match = false;
for e in &tracing_debug {
if meta.target().starts_with(e) {
tmp1.clear();
tmp1.push_str(e);
tmp1.push_str("::");
if meta.target() == &tmp1[..tmp1.len() - 2] || meta.target().starts_with(&tmp1) {
target_match = true;
break;
}