Let channel search consider backend

This commit is contained in:
Dominik Werder
2024-01-25 23:52:04 +01:00
parent b5ce2dd743
commit a0490abe0c
7 changed files with 99 additions and 16 deletions

View File

@@ -132,7 +132,7 @@ async fn go() -> Result<(), Error> {
Shape::from_dims_str(&opts.shape).unwrap(),
)
.await
.map_err(|_| Error::with_msg_no_trace("error"))
.map_err(|e| Error::with_msg_no_trace(format!("got error: {e}")))
.unwrap();
}
},

View File

@@ -2,7 +2,6 @@ use futures_util::future;
use futures_util::StreamExt;
use http::header;
use http::Method;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::connect_client;
use httpclient::http;
@@ -11,34 +10,46 @@ use httpclient::IncomingStream;
use netpod::log::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::APP_CBOR_FRAMES;
use std::fmt;
use streams::cbor::FramedBytesToSitemtyDynEventsStream;
use url::Url;
pub struct Error {}
pub struct Error {
msg: String,
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.msg)
}
}
impl<T> From<T> for Error
where
T: fmt::Debug,
{
fn from(_value: T) -> Self {
Self {}
fn from(value: T) -> Self {
Self {
msg: format!("{value:?}"),
}
}
}
pub async fn fetch_cbor(url: &str, scalar_type: ScalarType, shape: Shape) -> Result<(), Error> {
let url: Url = url.parse().unwrap();
let accept = "application/cbor";
let url: Url = url.parse()?;
debug!("parsed url: {url:?}");
let req = Request::builder()
.method(Method::GET)
.uri(url.to_string())
.header(header::HOST, url.host_str().ok_or_else(|| "NoHostname")?)
.header(header::ACCEPT, accept)
.header(header::ACCEPT, APP_CBOR_FRAMES)
.body(body_empty())?;
debug!("open connection to {:?}", req.uri());
let mut send_req = connect_client(req.uri()).await?;
let res = send_req.send_request(req).await?;
let (head, body) = res.into_parts();
debug!("http_get head {head:?}");
debug!("fetch_cbor head {head:?}");
let stream = IncomingStream::new(body);
let stream = FramedBytesToSitemtyDynEventsStream::new(stream, scalar_type, shape);
let stream = stream.map(|item| info!("{item:?}"));

View File

@@ -95,19 +95,28 @@ pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database)
} else {
"scalar_type != 14"
};
let (cb1, cb2) = if let Some(x) = &query.backend {
(false, x.as_str())
} else {
(true, "")
};
let sql = format!(
concat!(
"select",
" series, facility, channel, scalar_type, shape_dims",
" from series_by_channel",
" where channel ~* $1",
" and ($2 or facility = $3)",
" and {}",
" limit 400000",
),
cond_status
);
let pgclient = crate::create_connection(pgconf).await?;
let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?;
let rows = pgclient
.query(sql.as_str(), &[&query.name_regex, &cb1, &cb2])
.await
.err_conv()?;
let mut res = Vec::new();
for row in rows {
let series: i64 = row.get(0);

View File

@@ -192,7 +192,8 @@ impl From<std::convert::Infallible> for BodyError {
#[derive(Debug)]
pub enum Error {
BadUrl,
NoHostInUrl,
NoPortInUrl,
Connection,
IO,
Http,
@@ -237,7 +238,7 @@ pub async fn http_get(url: Url, accept: &str, ctx: &ReqCtx) -> Result<HttpRespon
let req = Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?)
.header(header::HOST, url.host_str().ok_or_else(|| Error::NoHostInUrl)?)
.header(header::ACCEPT, accept)
.header("daqbuf-reqid", ctx.reqid())
.body(body_empty())?;
@@ -267,7 +268,7 @@ pub async fn http_post(url: Url, accept: &str, body: String, ctx: &ReqCtx) -> Re
let req = Request::builder()
.method(http::Method::POST)
.uri(url.to_string())
.header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?)
.header(header::HOST, url.host_str().ok_or_else(|| Error::NoHostInUrl)?)
.header(header::CONTENT_TYPE, APP_JSON)
.header(header::ACCEPT, accept)
.header("daqbuf-reqid", ctx.reqid())
@@ -301,8 +302,16 @@ pub async fn http_post(url: Url, accept: &str, body: String, ctx: &ReqCtx) -> Re
}
pub async fn connect_client(uri: &http::Uri) -> Result<SendRequest<StreamBody>, Error> {
let host = uri.host().ok_or_else(|| Error::BadUrl)?;
let port = uri.port_u16().ok_or_else(|| Error::BadUrl)?;
let scheme = uri.scheme_str().unwrap_or("http");
let host = uri.host().ok_or_else(|| Error::NoHostInUrl)?;
let port = uri.port_u16().unwrap_or_else(|| {
// NOTE known issue: url::Url will "forget" the port if it was the default port.
if scheme == "https" {
443
} else {
80
}
});
let stream = TcpStream::connect(format!("{host}:{port}")).await?;
#[cfg(DISABLED)]
{

View File

@@ -965,7 +965,7 @@ impl<STY: ScalarOps> EventsDim0TimeBinner<STY> {
any::type_name::<Self>()
}
fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result<Self, Error> {
pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result<Self, Error> {
trace!("{}::new binrange {binrange:?}", Self::type_name());
let rng = binrange
.range_at(0)

View File

@@ -50,6 +50,7 @@ pub const APP_JSON: &str = "application/json";
pub const APP_JSON_LINES: &str = "application/jsonlines";
pub const APP_OCTET: &str = "application/octet-stream";
pub const APP_CBOR: &str = "application/cbor";
pub const APP_CBOR_FRAMES: &str = "application/cbor-frames";
pub const ACCEPT_ALL: &str = "*/*";
pub const X_DAQBUF_REQID: &str = "x-daqbuffer-request-id";
@@ -325,6 +326,20 @@ impl ScalarType {
Ok(ret)
}
pub fn to_ca_id(&self) -> Result<u16, Error> {
use ScalarType::*;
let ret = match self {
I8 => 4,
I16 => 1,
I32 => 5,
F32 => 2,
F64 => 6,
STRING => 0,
_ => return Err(Error::with_msg_no_trace(format!("can not represent {self:?} as CA id"))),
};
Ok(ret)
}
pub fn from_archeng_db_str(s: &str) -> Result<Self, Error> {
use ScalarType::*;
let ret = match s {
@@ -1164,6 +1179,20 @@ impl Shape {
}
}
pub fn to_ca_count(&self) -> Result<u32, Error> {
use Shape::*;
let res = match self {
Scalar => 1,
Wave(n) => *n as u32,
_ => {
return Err(Error::with_msg_no_trace(format!(
"can not represent {self:?} as CA count"
)))
}
};
Ok(res)
}
pub fn to_scylla_vec(&self) -> Vec<i32> {
use Shape::*;
match self {
@@ -1418,6 +1447,22 @@ impl TsNano {
pub fn ms(&self) -> u64 {
self.0 / MS
}
pub fn sub(self, v: Self) -> Self {
Self(self.0 - v.0)
}
pub fn add_ns(self, v: u64) -> Self {
Self(self.0 + v)
}
pub fn mul(self, v: u64) -> Self {
Self(self.0 * v)
}
pub fn div(self, v: u64) -> Self {
Self(self.0 / v)
}
}
impl fmt::Debug for TsNano {

View File

@@ -3,6 +3,7 @@ pub mod formatter;
pub use tokio;
use crate::log::*;
use console_subscriber::ConsoleLayer;
use err::Error;
use std::fmt;
use std::future::Future;
@@ -114,6 +115,14 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
);
if let TracingMode::Console = mode {
// Only async console
// let console_layer = console_subscriber::spawn();
// let console_layer = ConsoleLayer::builder().with_default_env().init();
let console_layer = ConsoleLayer::builder().spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(tracing_subscriber::fmt::layer().with_ansi(false))
// .with(other_layer)
.init();
console_subscriber::init();
} else {
// #[cfg(DISABLED)]