Deliver channel status events

This commit is contained in:
Dominik Werder
2022-08-31 17:03:50 +02:00
parent cf9f8dd54b
commit 904faeffa3
10 changed files with 212 additions and 10 deletions

View File

@@ -1,11 +1,12 @@
use crate::ErrConv;
use err::Error;
use futures_util::{Future, FutureExt, Stream};
use futures_util::{Future, FutureExt, Stream, StreamExt};
use items::scalarevents::ScalarEvents;
use items::waveevents::WaveEvents;
use items::{EventsDyn, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::query::{ChannelStateEvents, RawEventsQuery};
use netpod::timeunits::DAY;
use netpod::{Channel, Database, NanoRange, ScalarType, ScyllaConfig, Shape};
use scylla::Session as ScySession;
use std::collections::VecDeque;
@@ -573,3 +574,48 @@ pub async fn make_scylla_stream(
)) as _;
Ok(res)
}
pub async fn channel_state_events(
evq: &ChannelStateEvents,
scyco: &ScyllaConfig,
_dbconf: Database,
) -> Result<Vec<(u64, u32)>, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.build()
.await
.err_conv()?;
let scy = Arc::new(scy);
let mut ret = Vec::new();
let div = DAY;
let mut ts_msp = evq.range().beg / div * div;
loop {
let series = (evq
.channel()
.series()
.ok_or(Error::with_msg_no_trace(format!("series id not given"))))?;
let params = (series as i64, ts_msp as i64);
let mut res = scy
.query_iter(
"select ts_lsp, kind from channel_status where series = ? and ts_msp = ?",
params,
)
.await
.err_conv()?;
while let Some(row) = res.next().await {
let row = row.err_conv()?;
let (ts_lsp, kind): (i64, i32) = row.into_typed().err_conv()?;
let ts = ts_msp + ts_lsp as u64;
let kind = kind as u32;
if ts >= evq.range().beg && ts < evq.range().end {
ret.push((ts, kind));
}
}
ts_msp += DAY;
if ts_msp >= evq.range().end {
break;
}
}
Ok(ret)
}

View File

@@ -869,11 +869,13 @@ impl Api1EventsBinaryHandler {
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
.to_owned();
let body_data = hyper::body::to_bytes(body).await?;
let qu: Api1Query = if let Ok(qu) = serde_json::from_slice(&body_data) {
qu
} else {
error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec()));
return Err(Error::with_msg_no_trace("can not parse query"));
let qu: Api1Query = match serde_json::from_slice(&body_data) {
Ok(qu) => qu,
Err(e) => {
error!("got body_data: {:?}", String::from_utf8_lossy(&body_data[..]));
error!("can not parse: {e}");
return Err(Error::with_msg_no_trace("can not parse query"));
}
};
let span = if qu.log_level == "trace" {
tracing::span!(tracing::Level::TRACE, "log_span_t")

View File

@@ -0,0 +1,62 @@
use crate::bodystream::response;
use crate::err::Error;
use dbconn::events_scylla::channel_state_events;
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use netpod::query::ChannelStateEvents;
use netpod::{FromUrl, NodeConfigCached, ACCEPT_ALL, APP_JSON};
use url::Url;
pub struct ChannelStatusConnectionEvents {}
impl ChannelStatusConnectionEvents {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/channel/status/connection/events" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelStateEvents::from_url(&url)?;
match self.fetch_data(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
async fn fetch_data(
&self,
q: &ChannelStateEvents,
node_config: &NodeConfigCached,
) -> Result<Vec<(u64, u32)>, Error> {
let dbconf = &node_config.node_config.cluster.database;
let scyco = node_config
.node_config
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let ret = channel_state_events(q, scyco, dbconf.clone()).await?;
Ok(ret)
}
}

View File

@@ -1,5 +1,6 @@
pub mod api1;
pub mod bodystream;
pub mod channel_status;
pub mod channelconfig;
pub mod download;
pub mod err;
@@ -17,6 +18,7 @@ use crate::bodystream::response;
use crate::err::Error;
use crate::gather::gather_get_json;
use crate::pulsemap::UpdateTask;
use channel_status::ChannelStatusConnectionEvents;
use channelconfig::{chconf_from_binned, ChConf};
use disk::binned::query::PreBinnedQuery;
use future::Future;
@@ -243,6 +245,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
h.handle(req, &node_config).await
} else if let Some(h) = events::EventsHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = ChannelStatusConnectionEvents::handler(&req) {
h.handle(req, &node_config).await
} else if path == "/api/4/binned" {
if req.method() == Method::GET {
Ok(binned(req, node_config).await?)

View File

@@ -650,7 +650,7 @@ impl MapPulseHistoHttpFunction {
let uri: Uri = s.parse()?;
let fut = hyper::Client::new().get(uri);
let fut = tokio::time::timeout(Duration::from_millis(1000), fut);
futs.push(fut);
futs.push_back(fut);
}
use futures_util::stream::StreamExt;
let mut map = BTreeMap::new();

11
items/src/binnernew.rs Normal file
View File

@@ -0,0 +1,11 @@
pub enum ConnStatus {}
pub struct ConnStatusEvent {
ts: u64,
status: ConnStatus,
}
pub enum ChannelEvents {
Status(ConnStatus),
Data(),
}

View File

@@ -1,4 +1,5 @@
pub mod binnedevents;
pub mod binnernew;
pub mod binsdim0;
pub mod binsdim1;
pub mod eventsitem;

View File

@@ -343,3 +343,81 @@ pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap<String, String>) -> Result<
};
Ok(ret)
}
#[derive(Clone, Debug)]
pub struct ChannelStateEvents {
channel: Channel,
range: NanoRange,
}
impl ChannelStateEvents {
pub fn new(channel: Channel, range: NanoRange) -> Self {
Self { channel, range }
}
pub fn range(&self) -> &NanoRange {
&self.range
}
pub fn channel(&self) -> &Channel {
&self.channel
}
pub fn set_series_id(&mut self, series: u64) {
self.channel.series = Some(series);
}
pub fn channel_mut(&mut self) -> &mut Channel {
&mut self.channel
}
}
impl HasBackend for ChannelStateEvents {
fn backend(&self) -> &str {
&self.channel.backend
}
}
impl HasTimeout for ChannelStateEvents {
fn timeout(&self) -> Duration {
Duration::from_millis(6000)
}
}
impl FromUrl for ChannelStateEvents {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?;
let ret = Self {
channel: Channel::from_pairs(&pairs)?,
range: NanoRange {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
},
};
let self_name = std::any::type_name::<Self>();
info!("{self_name}::from_url {ret:?}");
Ok(ret)
}
}
impl AppendToUrl for ChannelStateEvents {
fn append_to_url(&self, url: &mut Url) {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
self.channel.append_to_url(url);
let mut g = url.query_pairs_mut();
g.append_pair(
"begDate",
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
);
g.append_pair(
"endDate",
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
}
}

View File

@@ -1,2 +1 @@
pub mod conn;
pub mod scylla;

View File

@@ -1 +0,0 @@