Refactor and prepare for scylla based bin caching

This commit is contained in:
Dominik Werder
2022-07-06 15:51:05 +02:00
parent 66215f583f
commit d0a7240934
29 changed files with 1647 additions and 542 deletions

View File

@@ -20,10 +20,19 @@ use std::time::{Duration, Instant};
use url::Url;
pub struct ChConf {
pub series: u64,
pub scalar_type: ScalarType,
pub shape: Shape,
}
/// It is an unsolved question as to how we want to uniquely address channels.
/// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases
/// are not solved. At the same time, it is desirable to avoid to complicate things for users.
/// Current state:
/// If the series id is given, we take that.
/// Otherwise we try to uniquely identify the series id from the given information.
/// In the future, we can even try to involve time range information for that, but backends like
/// old archivers and sf databuffer do not support such lookup.
pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> Result<ChConf, Error> {
if channel.backend != ncc.node_config.cluster.backend {
warn!(
@@ -31,11 +40,6 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
channel.backend, ncc.node_config.cluster.backend
);
}
// This requires the series id.
let series = channel.series.ok_or_else(|| {
Error::with_msg_no_trace(format!("needs a series id {:?}", channel))
.add_public_msg(format!("series id of channel not supplied"))
})?;
// TODO use a common already running worker pool for these queries:
let dbconf = &ncc.node_config.cluster.database;
let dburl = format!(
@@ -46,28 +50,59 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
.await
.err_conv()?;
tokio::spawn(pgconn);
let res = pgclient
.query(
"select scalar_type, shape_dims from series_by_channel where series = $1",
&[&(series as i64)],
)
.await
.err_conv()?;
if res.len() == 0 {
warn!("can not find channel information for series {series}");
let e = Error::with_public_msg_no_trace(format!("can not find channel information for series {series}"));
Err(e)
} else if res.len() > 1 {
error!("multiple channel information for series {series}");
let e = Error::with_public_msg_no_trace(format!("can not find channel information for series {series}"));
Err(e)
if let Some(series) = channel.series() {
let res = pgclient
.query(
"select scalar_type, shape_dims from series_by_channel where series = $1",
&[&(series as i64)],
)
.await
.err_conv()?;
if res.len() < 1 {
warn!("can not find channel information for series {series} given through {channel:?}");
let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}"));
Err(e)
} else {
let row = res.first().unwrap();
let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(0) as u8)?;
// TODO can I get a slice from psql driver?
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(1))?;
let ret = ChConf {
series,
scalar_type,
shape,
};
Ok(ret)
}
} else {
let row = res.first().unwrap();
let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(0) as u8)?;
// TODO can I get a slice from psql driver?
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(1))?;
let ret = ChConf { scalar_type, shape };
Ok(ret)
let res = pgclient
.query(
"select series, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2",
&[&channel.backend(), &channel.name()],
)
.await
.err_conv()?;
if res.len() < 1 {
warn!("can not find channel information for {channel:?}");
let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}"));
Err(e)
} else if res.len() > 1 {
warn!("ambigious channel {channel:?}");
let e = Error::with_public_msg_no_trace(format!("ambigious channel {channel:?}"));
Err(e)
} else {
let row = res.first().unwrap();
let series = row.get::<_, i64>(0) as u64;
let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?;
// TODO can I get a slice from psql driver?
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(2))?;
let ret = ChConf {
series,
scalar_type,
shape,
};
Ok(ret)
}
}
}
@@ -81,6 +116,10 @@ pub async fn chconf_from_events_json(q: &PlainEventsQuery, ncc: &NodeConfigCache
pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result<ChConf, Error> {
let ret = ChConf {
series: q
.channel()
.series()
.expect("PreBinnedQuery is expected to contain the series id"),
scalar_type: q.scalar_type().clone(),
shape: q.shape().clone(),
};
@@ -330,6 +369,10 @@ pub struct ChannelsWithTypeQuery {
impl FromUrl for ChannelsWithTypeQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let s = pairs
.get("scalar_type")
.ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?;
@@ -440,6 +483,10 @@ fn bool_false(x: &bool) -> bool {
impl FromUrl for ScyllaChannelEventSeriesIdQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let facility = pairs
.get("facility")
.ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))?
@@ -624,6 +671,10 @@ pub struct ScyllaChannelsActiveQuery {
impl FromUrl for ScyllaChannelsActiveQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let s = pairs
.get("tsedge")
.ok_or_else(|| Error::with_public_msg_no_trace("missing tsedge"))?;
@@ -731,6 +782,10 @@ pub struct ChannelFromSeriesQuery {
impl FromUrl for ChannelFromSeriesQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let s = pairs
.get("seriesId")
.ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?;
@@ -856,6 +911,10 @@ pub struct IocForChannelQuery {
impl FromUrl for IocForChannelQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let facility = pairs
.get("facility")
.ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))?
@@ -945,6 +1004,10 @@ pub struct ScyllaSeriesTsMspQuery {
impl FromUrl for ScyllaSeriesTsMspQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let s = pairs
.get("seriesId")
.ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?;
@@ -1029,3 +1092,74 @@ impl ScyllaSeriesTsMsp {
Ok(ret)
}
}
#[derive(Serialize)]
pub struct AmbigiousChannel {
series: u64,
name: String,
scalar_type: ScalarType,
shape: Shape,
}
#[derive(Serialize)]
pub struct AmbigiousChannelNamesResponse {
ambigious: Vec<AmbigiousChannel>,
}
pub struct AmbigiousChannelNames {}
impl AmbigiousChannelNames {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/channels/ambigious" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
match self.process(node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
async fn process(&self, node_config: &NodeConfigCached) -> Result<AmbigiousChannelNamesResponse, Error> {
let dbconf = &node_config.node_config.cluster.database;
let pg_client = create_connection(dbconf).await?;
let rows = pg_client
.query(
"select t2.series, t2.channel, t2.scalar_type, t2.shape_dims, t2.agg_kind from series_by_channel t1, series_by_channel t2 where t2.channel = t1.channel and t2.series != t1.series",
&[],
)
.await?;
let mut ret = AmbigiousChannelNamesResponse { ambigious: Vec::new() };
for row in rows {
let g = AmbigiousChannel {
series: row.get::<_, i64>(0) as u64,
name: row.get(1),
scalar_type: ScalarType::from_scylla_i32(row.get(2))?,
shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(3))?,
};
ret.ambigious.push(g);
}
Ok(ret)
}
}

View File

@@ -15,6 +15,10 @@ pub struct DownloadQuery {
impl FromUrl for DownloadQuery {
fn from_url(url: &Url) -> Result<Self, ::err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &std::collections::BTreeMap<String, String>) -> Result<Self, err::Error> {
let read_sys = pairs
.get("ReadSys")
.map(|x| x as &str)

View File

@@ -6,7 +6,7 @@ use futures_util::{StreamExt, TryStreamExt};
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use netpod::log::*;
use netpod::{AggKind, NodeConfigCached};
use netpod::{AggKind, FromUrl, NodeConfigCached};
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
use url::Url;
@@ -54,6 +54,13 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let query = PlainEventsQuery::from_url(&url)?;
let chconf = chconf_from_events_binary(&query, node_config).await?;
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
query.set_series_id(chconf.series);
let query = query;
// ---
let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone());
let s = disk::channelexec::channel_exec(
op,
@@ -78,6 +85,13 @@ async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -
let (head, _body) = req.into_parts();
let query = PlainEventsQuery::from_request_head(&head)?;
let chconf = chconf_from_events_json(&query, node_config).await?;
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
query.set_series_id(chconf.series);
let query = query;
// ---
let op = disk::channelexec::PlainEventsJson::new(
// TODO pass only the query, not channel, range again:
query.clone(),

View File

@@ -29,6 +29,7 @@ use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::AggKind;
use netpod::Channel;
use netpod::FromUrl;
use netpod::NanoRange;
use netpod::NodeConfigCached;
use netpod::PerfOpts;

View File

@@ -29,7 +29,7 @@ use net::SocketAddr;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::timeunits::SEC;
use netpod::{channel_from_pairs, get_url_query_pairs};
use netpod::{get_url_query_pairs, Channel};
use netpod::{FromUrl, NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance};
use netpod::{ACCEPT_ALL, APP_JSON, APP_JSON_LINES, APP_OCTET};
use nodenet::conn::events_service;
@@ -235,6 +235,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ChannelFromSeries::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = events::EventsHandler::handler(&req) {
h.handle(req, &node_config).await
} else if path == "/api/4/binned" {
@@ -424,6 +426,11 @@ async fn binned_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Res
e.add_public_msg(msg)
})?;
let chconf = chconf_from_binned(&query, node_config).await?;
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
query.set_series_id(chconf.series);
let query = query;
// ---
let desc = format!("binned-BEG-{}-END-{}", query.range().beg / SEC, query.range().end / SEC);
let span1 = span!(Level::INFO, "httpret::binned", desc = &desc.as_str());
span1.in_scope(|| {
@@ -874,7 +881,7 @@ pub async fn archapp_scan_files_insert(
pub async fn archapp_channel_info(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);
let channel = channel_from_pairs(&pairs)?;
let channel = Channel::from_pairs(&pairs)?;
match archapp_wrap::channel_info(&channel, node_config).await {
Ok(res) => {
let buf = serde_json::to_vec(&res)?;

View File

@@ -465,6 +465,12 @@ impl FromUrl for MapPulseQuery {
let ret = Self { backend, pulse };
Ok(ret)
}
fn from_pairs(_pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
Err(err::Error::with_msg_no_trace(format!(
"can not only construct from pairs"
)))
}
}
impl AppendToUrl for MapPulseQuery {