Basic binner for new items
This commit is contained in:
@@ -3,7 +3,7 @@ use crate::err::Error;
|
||||
use dbconn::events_scylla::channel_state_events;
|
||||
use http::{Method, Request, Response, StatusCode};
|
||||
use hyper::Body;
|
||||
use netpod::query::ChannelStateEvents;
|
||||
use netpod::query::ChannelStateEventsQuery;
|
||||
use netpod::{FromUrl, NodeConfigCached, ACCEPT_ALL, APP_JSON};
|
||||
use url::Url;
|
||||
|
||||
@@ -27,7 +27,7 @@ impl ChannelStatusConnectionEvents {
|
||||
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||
if accept == APP_JSON || accept == ACCEPT_ALL {
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
let q = ChannelStateEvents::from_url(&url)?;
|
||||
let q = ChannelStateEventsQuery::from_url(&url)?;
|
||||
match self.fetch_data(&q, node_config).await {
|
||||
Ok(k) => {
|
||||
let body = Body::from(serde_json::to_vec(&k)?);
|
||||
@@ -46,7 +46,7 @@ impl ChannelStatusConnectionEvents {
|
||||
|
||||
async fn fetch_data(
|
||||
&self,
|
||||
q: &ChannelStateEvents,
|
||||
q: &ChannelStateEventsQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Vec<(u64, u32)>, Error> {
|
||||
let dbconf = &node_config.node_config.cluster.database;
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json};
|
||||
use crate::err::Error;
|
||||
use crate::{response, response_err, BodyStream, ToPublicResponse};
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use futures_util::{Stream, StreamExt, TryStreamExt};
|
||||
use http::{Method, Request, Response, StatusCode};
|
||||
use hyper::Body;
|
||||
use items_2::ChannelEvents;
|
||||
use items_2::{binned_collected, ChannelEvents, ChannelEventsMerger};
|
||||
use netpod::log::*;
|
||||
use netpod::query::PlainEventsQuery;
|
||||
use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery};
|
||||
use netpod::{AggKind, FromUrl, NodeConfigCached};
|
||||
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
|
||||
use scyllaconn::create_scy_session;
|
||||
use scyllaconn::errconv::ErrConv;
|
||||
use scyllaconn::events::make_scylla_stream;
|
||||
use scyllaconn::events::{channel_state_events, make_scylla_stream};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use url::Url;
|
||||
|
||||
pub struct EventsHandler {}
|
||||
@@ -226,3 +226,90 @@ impl EventsHandlerScylla {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BinnedHandlerScylla {}
|
||||
|
||||
impl BinnedHandlerScylla {
|
||||
pub fn handler(req: &Request<Body>) -> Option<Self> {
|
||||
if req.uri().path() == "/api/4/scylla/binned" {
|
||||
Some(Self {})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
if req.method() != Method::GET {
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
match self.fetch(req, node_config).await {
|
||||
Ok(ret) => Ok(ret),
|
||||
Err(e) => Ok(e.to_public_response()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("BinnedHandlerScylla req: {:?}", req);
|
||||
let accept_def = APP_JSON;
|
||||
let accept = req
|
||||
.headers()
|
||||
.get(http::header::ACCEPT)
|
||||
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
|
||||
Ok(self.gather(req, node_config).await?)
|
||||
} else {
|
||||
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
async fn gather(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
let (head, _body) = req.into_parts();
|
||||
warn!("TODO BinnedQuery needs to take AggKind");
|
||||
let s1 = format!("dummy:{}", head.uri);
|
||||
let url = Url::parse(&s1)?;
|
||||
let evq = BinnedQuery::from_url(&url)?;
|
||||
let pgclient = {
|
||||
// TODO use common connection/pool:
|
||||
info!("--------------- open postgres connection");
|
||||
let pgconf = &node_config.node_config.cluster.database;
|
||||
let u = {
|
||||
let d = &pgconf;
|
||||
format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name)
|
||||
};
|
||||
let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?;
|
||||
tokio::spawn(pgconn);
|
||||
let pgclient = Arc::new(pgclient);
|
||||
pgclient
|
||||
};
|
||||
if let Some(scyco) = &node_config.node_config.cluster.scylla {
|
||||
let scy = create_scy_session(scyco).await?;
|
||||
let mut query2 = PlainEventsQuery::new(evq.channel().clone(), evq.range().clone(), 0, None, false);
|
||||
query2.set_timeout(evq.timeout());
|
||||
let query2 = query2;
|
||||
let stream = make_scylla_stream(&query2, scy.clone(), &pgclient, false).await?;
|
||||
let query3 = ChannelStateEventsQuery::new(evq.channel().clone(), evq.range().clone());
|
||||
let state_stream = channel_state_events(&query3, scy.clone()).await?;
|
||||
// TODO let the stream itself use the items_2 error, do not convert here.
|
||||
let data_stream = Box::pin(stream.map_err(|e| items_2::Error::from(format!("{e}"))));
|
||||
let state_stream = Box::pin(state_stream.map_err(|e| items_2::Error::from(format!("{e}"))));
|
||||
let merged_stream = ChannelEventsMerger::new(data_stream, state_stream);
|
||||
let merged_stream = Box::pin(merged_stream) as Pin<Box<dyn Stream<Item = _> + Send>>;
|
||||
let binned_collected = binned_collected(merged_stream)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
|
||||
let res = binned_collected.to_json_result()?;
|
||||
let res = res.to_json_bytes()?;
|
||||
let ret = response(StatusCode::OK).body(Body::from(res))?;
|
||||
{
|
||||
let _ret = response(StatusCode::OK).body(BodyStream::wrapped(
|
||||
futures_util::stream::iter([Ok(Vec::new())]),
|
||||
format!("gather"),
|
||||
))?;
|
||||
}
|
||||
Ok(ret)
|
||||
} else {
|
||||
return Err(Error::with_public_msg(format!("no scylla configured")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,6 +247,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = events::EventsHandlerScylla::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = events::BinnedHandlerScylla::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = ChannelStatusConnectionEvents::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if path == "/api/4/binned" {
|
||||
|
||||
Reference in New Issue
Block a user