Refactor series lookup

This commit is contained in:
Dominik Werder
2024-05-16 23:33:34 +02:00
parent 82455a2b16
commit 6224df534a
41 changed files with 762 additions and 562 deletions

View File

@@ -64,7 +64,7 @@ async fn go() -> Result<(), Error> {
};
match opts.subcmd {
SubCmd::Retrieval(subcmd) => {
info!("daqbuffer version {}", clap::crate_version!());
info!("daqbuffer version {} +0002", clap::crate_version!());
info!(" service_version {}", service_version);
if false {
#[allow(non_snake_case)]

View File

@@ -5,11 +5,11 @@ use err::Error;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ChConf;
use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use std::time::Duration;
use tokio_postgres::Client;
/// It is an unsolved question as to how we want to uniquely address channels.
/// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases
@@ -19,13 +19,14 @@ use std::time::Duration;
/// Otherwise we try to uniquely identify the series id from the given information.
/// In the future, we can even try to involve time range information for that, but backends like
/// old archivers and sf databuffer do not support such lookup.
pub async fn chconf_best_matching_for_name_and_range(
pub(super) async fn chconf_best_matching_for_name_and_range(
backend: &str,
name: &str,
range: NanoRange,
ncc: &NodeConfigCached,
pg: &Client,
) -> Result<ChConf, Error> {
debug!("chconf_best_matching_for_name_and_range {backend} {name} {range:?}");
#[cfg(DISABLED)]
if ncc.node_config.cluster.scylla.is_none() {
let e = Error::with_msg_no_trace(format!(
"chconf_best_matching_for_name_and_range but not a scylla backend"
@@ -33,21 +34,20 @@ pub async fn chconf_best_matching_for_name_and_range(
error!("{e}");
return Err(e);
};
#[cfg(DISABLED)]
if backend != ncc.node_config.cluster.backend {
warn!(
"mismatched backend {} vs {}",
backend, ncc.node_config.cluster.backend
);
}
let dbconf = &ncc.node_config.cluster.database;
let pgclient = crate::create_connection(dbconf).await?;
let sql = concat!(
"select unnest(tscs) as tsc, series, scalar_type, shape_dims",
" from series_by_channel",
" where kind = 2 and facility = $1 and channel = $2",
" order by tsc",
);
let res = pgclient.query(sql, &[&backend, &name]).await.err_conv()?;
let res = pg.query(sql, &[&backend, &name]).await.err_conv()?;
if res.len() == 0 {
let e = Error::with_public_msg_no_trace(format!("can not find channel information for {name}"));
warn!("{e}");
@@ -70,7 +70,7 @@ pub async fn chconf_best_matching_for_name_and_range(
let tsmss: Vec<_> = rows.iter().map(|x| x.0.clone()).collect();
let range = (TsMs(range.beg / 1000), TsMs(range.end / 1000));
let res = decide_best_matching_index(range, &tsmss)?;
let ch_conf = chconf_for_series(backend, rows[res].1, ncc).await?;
let ch_conf = chconf_for_series(backend, rows[res].1, pg).await?;
Ok(ch_conf)
} else {
let r = res.first().unwrap();
@@ -191,10 +191,8 @@ fn test_decide_best_matching_index_after_01() {
assert_eq!(i, 0);
}
pub async fn chconf_for_series(backend: &str, series: u64, ncc: &NodeConfigCached) -> Result<ChConf, Error> {
let dbconf = &ncc.node_config.cluster.database;
let pgclient = crate::create_connection(dbconf).await?;
let res = pgclient
pub(super) async fn chconf_for_series(backend: &str, series: u64, pg: &Client) -> Result<ChConf, Error> {
let res = pg
.query(
"select channel, scalar_type, shape_dims from series_by_channel where facility = $1 and series = $2",
&[&backend, &(series as i64)],

View File

@@ -3,6 +3,7 @@ pub mod channelinfo;
pub mod query;
pub mod scan;
pub mod search;
pub mod worker;
pub mod pg {
pub use tokio_postgres::types::Type;
@@ -28,6 +29,7 @@ use serde::Serialize;
use std::sync::Arc;
use std::time::Duration;
use taskrun::tokio;
use tokio::task::JoinHandle;
trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
@@ -63,27 +65,28 @@ pub async fn delay_io_medium() {
delay_us(2000).await;
}
pub async fn create_connection(db_config: &Database) -> Result<PgClient, Error> {
pub async fn create_connection(db_config: &Database) -> Result<(PgClient, JoinHandle<Result<(), Error>>), Error> {
warn!("create_connection\n\n CREATING CONNECTION\n\n");
// TODO use a common already running worker pool for these queries:
let d = db_config;
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name);
let (cl, conn) = tokio_postgres::connect(&uri, NoTls)
.await
.map_err(|e| format!("Can not connect to database: {e:?}"))
//.errconv()
?;
// TODO monitor connection drop.
let _cjh = tokio::spawn(async move {
if let Err(e) = conn.await {
error!("connection error: {}", e);
.map_err(|e| format!("Can not connect to database: {e}"))?;
let jh = tokio::spawn(async move {
match conn.await {
Ok(()) => Ok(()),
Err(e) => {
error!("connection error: {}", e);
Err(Error::from_string(e))
}
}
Ok::<_, Error>(())
});
Ok(cl)
Ok((cl, jh))
}
pub async fn channel_exists(channel_name: &str, node_config: &NodeConfigCached) -> Result<bool, Error> {
let cl = create_connection(&node_config.node_config.cluster.database).await?;
let (cl, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
let rows = cl
.query("select rowid from channels where name = $1::text", &[&channel_name])
.await
@@ -101,7 +104,7 @@ pub async fn channel_exists(channel_name: &str, node_config: &NodeConfigCached)
}
pub async fn database_size(node_config: &NodeConfigCached) -> Result<u64, Error> {
let cl = create_connection(&node_config.node_config.cluster.database).await?;
let (cl, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
let rows = cl
.query(
"select pg_database_size($1::text)",
@@ -129,7 +132,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, E
"ORDER BY pg_total_relation_size(C.oid) DESC LIMIT 20",
);
let sql = sql.as_str();
let cl = create_connection(&node_config.node_config.cluster.database).await?;
let (cl, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
let rows = cl.query(sql, &[]).await.err_conv()?;
let mut sizes = TableSizes { sizes: Vec::new() };
sizes.sizes.push((format!("table"), format!("size")));
@@ -141,7 +144,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, E
pub async fn random_channel(node_config: &NodeConfigCached) -> Result<String, Error> {
let sql = "select name from channels order by rowid limit 1 offset (random() * (select count(rowid) from channels))::bigint";
let cl = create_connection(&node_config.node_config.cluster.database).await?;
let (cl, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
let rows = cl.query(sql, &[]).await.err_conv()?;
if rows.len() == 0 {
Err(Error::with_msg("can not get random channel"))?;

View File

@@ -6,7 +6,6 @@ use netpod::NodeConfigCached;
use netpod::SfDbChannel;
// For sf-databuffer backend, given a Channel, try to complete the information if only id is given.
#[allow(unused)]
async fn sf_databuffer_fetch_channel_by_series(
channel: SfDbChannel,
ncc: &NodeConfigCached,
@@ -24,7 +23,7 @@ async fn sf_databuffer_fetch_channel_by_series(
let series = channel
.series()
.ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64;
let pgcon = create_connection(&ncc.node_config.cluster.database).await?;
let (pgcon, _pgjh) = create_connection(&ncc.node_config.cluster.database).await?;
let mut rows = pgcon
.query("select name from channels where rowid = $1", &[&series])
.await

View File

@@ -30,8 +30,6 @@ use tokio::fs::DirEntry;
use tokio::fs::ReadDir;
use tokio_postgres::Client;
mod updatechannelnames;
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeDiskIdent {
pub rowid: i64,
@@ -201,7 +199,7 @@ async fn update_db_with_channel_names_inner(
node_config: NodeConfigCached,
db_config: Database,
) -> Result<(), Error> {
let dbc = create_connection(&db_config).await?;
let (dbc, _pgjh) = create_connection(&db_config).await?;
info!("update_db_with_channel_names connection done");
let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?;
info!("update_db_with_channel_names get_node_disk_ident done");
@@ -335,7 +333,7 @@ async fn update_db_with_all_channel_configs_inner(
node_config: NodeConfigCached,
) -> Result<(), Error> {
let node_config = &node_config;
let dbc = create_connection(&node_config.node_config.cluster.database).await?;
let (dbc, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
let dbc = Arc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
let rows = dbc
@@ -437,7 +435,7 @@ pub async fn update_db_with_all_channel_configs(
}
pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<bool, Error> {
let dbc = create_connection(&node_config.node_config.cluster.database).await?;
let (dbc, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
dbc.query("select update_cache()", &[])
.await
.err_conv()
@@ -554,7 +552,8 @@ pub async fn update_db_with_all_channel_datafiles(
node_disk_ident: &NodeDiskIdent,
ks_prefix: &str,
) -> Result<(), Error> {
let dbc = Arc::new(create_connection(&node_config.node_config.cluster.database).await?);
let (dbc, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
let dbc = Arc::new(dbc);
let rows = dbc
.query(
"select rowid, facility, name from channels where facility = $1 order by facility, name",

View File

@@ -1,171 +0,0 @@
use super::get_node_disk_ident;
use super::update_db_with_channel_name_list;
use super::FindChannelNamesFromConfigReadDir;
use super::NodeDiskIdent;
use super::UpdatedDbWithChannelNames;
use crate::create_connection;
use crate::pg::Client as PgClient;
use err::Error;
use futures_util::Future;
use futures_util::Stream;
use netpod::NodeConfigCached;
use pin_project::pin_project;
use std::os::unix::prelude::OsStringExt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[pin_project]
struct UpdatedDbWithChannelNamesStream {
errored: bool,
data_complete: bool,
#[allow(dead_code)]
node_config: Pin<Box<NodeConfigCached>>,
// TODO can we pass a Pin to the async fn instead of creating static ref?
node_config_ref: &'static NodeConfigCached,
#[pin]
client_fut: Option<Pin<Box<dyn Future<Output = Result<PgClient, Error>> + Send>>>,
#[pin]
client: Option<PgClient>,
client_ref: Option<&'static PgClient>,
#[pin]
ident_fut: Option<Pin<Box<dyn Future<Output = Result<NodeDiskIdent, Error>> + Send>>>,
ident: Option<NodeDiskIdent>,
#[pin]
find: Option<FindChannelNamesFromConfigReadDir>,
#[pin]
update_batch: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
channel_inp_done: bool,
clist: Vec<String>,
}
impl UpdatedDbWithChannelNamesStream {
#[allow(unused)]
fn new(node_config: NodeConfigCached) -> Result<Self, Error> {
let node_config = Box::pin(node_config.clone());
let node_config_ref = unsafe { &*(&node_config as &NodeConfigCached as *const _) };
let mut ret = Self {
errored: false,
data_complete: false,
node_config,
node_config_ref,
client_fut: None,
client: None,
client_ref: None,
ident_fut: None,
ident: None,
find: None,
update_batch: None,
channel_inp_done: false,
clist: Vec::new(),
};
ret.client_fut = Some(Box::pin(create_connection(
&ret.node_config_ref.node_config.cluster.database,
)));
Ok(ret)
}
}
impl Stream for UpdatedDbWithChannelNamesStream {
type Item = Result<UpdatedDbWithChannelNames, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut pself = self.project();
loop {
break if *pself.errored {
Ready(None)
} else if *pself.data_complete {
Ready(None)
} else if let Some(fut) = pself.find.as_mut().as_pin_mut() {
match fut.poll_next(cx) {
Ready(Some(Ok(item))) => {
pself
.clist
.push(String::from_utf8(item.file_name().into_vec()).unwrap());
continue;
}
Ready(Some(Err(e))) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
*pself.channel_inp_done = true;
// Work through the collected items
let l = std::mem::replace(pself.clist, Vec::new());
let fut = update_db_with_channel_name_list(
l,
pself.ident.as_ref().unwrap().facility,
pself.client.as_ref().get_ref().as_ref().unwrap(),
);
// TODO
//pself.update_batch.replace(Box::pin(fut));
let _ = fut;
continue;
}
Pending => Pending,
}
} else if let Some(fut) = pself.ident_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Ready(Ok(item)) => {
*pself.ident_fut = None;
*pself.ident = Some(item);
let ret = UpdatedDbWithChannelNames {
msg: format!("Got ident {:?}", pself.ident),
count: 43,
};
let base_path = &pself
.node_config
.node
.sf_databuffer
.as_ref()
.ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))?
.data_base_path;
let s = FindChannelNamesFromConfigReadDir::new(base_path);
*pself.find = Some(s);
Ready(Some(Ok(ret)))
}
Ready(Err(e)) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else if let Some(fut) = pself.client_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Ready(Ok(item)) => {
*pself.client_fut = None;
//*pself.client = Some(Box::pin(item));
//*pself.client_ref = Some(unsafe { &*(&pself.client.as_ref().unwrap() as &Client as *const _) });
*pself.client = Some(item);
let c2: &PgClient = pself.client.as_ref().get_ref().as_ref().unwrap();
*pself.client_ref = Some(unsafe { &*(c2 as *const _) });
//() == pself.node_config.as_ref();
//() == pself.client.as_ref().as_pin_ref().unwrap();
/* *pself.ident_fut = Some(Box::pin(get_node_disk_ident_2(
pself.node_config.as_ref(),
pself.client.as_ref().as_pin_ref().unwrap(),
)));*/
*pself.ident_fut = Some(Box::pin(get_node_disk_ident(
pself.node_config_ref,
pself.client_ref.as_ref().unwrap(),
)));
let ret = UpdatedDbWithChannelNames {
msg: format!("Client opened connection"),
count: 42,
};
Ready(Some(Ok(ret)))
}
Ready(Err(e)) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else {
Ready(None)
};
}
}
}

View File

@@ -34,8 +34,8 @@ pub async fn search_channel_databuffer(
" dtype, shape, unit, description, channel_backend",
" from searchext($1, $2, $3, $4)",
);
let cl = create_connection(&node_config.node_config.cluster.database).await?;
let rows = cl
let (pg, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
let rows = pg
.query(
sql,
&[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"],
@@ -115,7 +115,7 @@ pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database)
),
regop
);
let pgclient = crate::create_connection(pgconf).await?;
let (pgclient, _pgjh) = crate::create_connection(pgconf).await?;
let rows = pgclient
.query(sql, &[&ch_kind, &query.name_regex, &cb1, &cb2])
.await
@@ -182,7 +182,7 @@ async fn search_channel_archeng(
" order by c.name",
" limit 100"
));
let cl = create_connection(database).await?;
let (cl, _pgjh) = create_connection(database).await?;
let rows = cl.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?;
let mut res = Vec::new();
for row in rows {
@@ -271,7 +271,7 @@ pub async fn search_channel(
node_config: &NodeConfigCached,
) -> Result<ChannelSearchResult, Error> {
let pgconf = &node_config.node_config.cluster.database;
if let Some(_scyconf) = node_config.node_config.cluster.scylla.as_ref() {
if let Some(_scyconf) = node_config.node_config.cluster.scylla_st() {
search_channel_scylla(query, pgconf).await
} else if let Some(conf) = node_config.node.channel_archiver.as_ref() {
search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, pgconf).await

125
crates/dbconn/src/worker.rs Normal file
View File

@@ -0,0 +1,125 @@
use crate::create_connection;
use async_channel::Receiver;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ChConf;
use netpod::Database;
use taskrun::tokio;
use tokio::task::JoinHandle;
use tokio_postgres::Client;
#[derive(Debug, ThisError)]
pub enum Error {
Error(#[from] err::Error),
ChannelSend,
ChannelRecv,
Join,
}
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
err::Error::from_string(self)
}
}
#[derive(Debug)]
enum Job {
ChConfBestMatchingNameRange(String, String, NanoRange, Sender<Result<ChConf, Error>>),
ChConfForSeries(String, u64, Sender<Result<ChConf, Error>>),
}
#[derive(Debug, Clone)]
pub struct PgQueue {
tx: Sender<Job>,
}
impl PgQueue {
pub async fn chconf_for_series(
&self,
backend: &str,
series: u64,
) -> Result<Receiver<Result<ChConf, Error>>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ChConfForSeries(backend.into(), series, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
}
pub async fn chconf_best_matching_name_range_job(
&self,
backend: &str,
name: &str,
range: NanoRange,
) -> Result<Receiver<Result<ChConf, Error>>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ChConfBestMatchingNameRange(backend.into(), name.into(), range, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
}
}
#[derive(Debug)]
pub struct PgWorker {
rx: Receiver<Job>,
pg: Client,
pgjh: Option<JoinHandle<Result<(), err::Error>>>,
}
impl PgWorker {
pub async fn new(pgconf: &Database) -> Result<(PgQueue, Self), Error> {
let (tx, rx) = async_channel::bounded(64);
let (pg, pgjh) = create_connection(pgconf).await?;
let queue = PgQueue { tx };
let worker = Self {
rx,
pg,
pgjh: Some(pgjh),
};
Ok((queue, worker))
}
pub async fn work(self) -> Result<(), Error> {
loop {
let x = self.rx.recv().await;
let job = match x {
Ok(x) => x,
Err(_) => {
error!("PgWorker can not receive from channel");
return Err(Error::ChannelRecv);
}
};
match job {
Job::ChConfBestMatchingNameRange(backend, name, range, tx) => {
let res =
crate::channelconfig::chconf_best_matching_for_name_and_range(&backend, &name, range, &self.pg)
.await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::ChConfForSeries(backend, series, tx) => {
let res = crate::channelconfig::chconf_for_series(&backend, series, &self.pg).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
}
}
}
pub async fn join(&mut self) -> Result<(), Error> {
if let Some(jh) = self.pgjh.take() {
jh.await.map_err(|_| Error::Join)?.map_err(Error::from)?;
Ok(())
} else {
Ok(())
}
}
pub fn close(&self) {
self.rx.close();
}
}

View File

@@ -30,9 +30,9 @@ impl From<ConfigParseError> for ConfigError {
pub async fn config_entry_best_match(
range: &NanoRange,
channel: SfDbChannel,
node_config: &NodeConfigCached,
ncc: &NodeConfigCached,
) -> Result<Option<ConfigEntry>, ConfigError> {
let channel_config = match read_local_config(channel.clone(), node_config.clone()).await {
let channel_config = match read_local_config(channel.clone(), ncc.clone()).await {
Ok(x) => x,
Err(e) => match e {
ConfigParseError::FileNotFound => return Ok(None),
@@ -59,9 +59,9 @@ pub async fn channel_configs(
pub async fn channel_config_best_match(
range: NanoRange,
channel: SfDbChannel,
node_config: &NodeConfigCached,
ncc: &NodeConfigCached,
) -> Result<Option<SfDbChConf>, ConfigError> {
let best = config_entry_best_match(&range, channel.clone(), node_config).await?;
let best = config_entry_best_match(&range, channel.clone(), ncc).await?;
match best {
None => Ok(None),
Some(entry) => {

View File

@@ -255,7 +255,7 @@ async fn open_files_inner(
return Ok(());
}
for &tb in &timebins {
let ts_bin = TsNano(tb * fetch_info.bs().ns());
let ts_bin = TsNano::from_ns(tb * fetch_info.bs().ns());
if ts_bin.ns() >= range.end {
continue;
}
@@ -350,7 +350,7 @@ async fn open_expanded_files_inner(
}
let mut p1 = None;
for (i1, tb) in timebins.iter().enumerate().rev() {
let ts_bin = TsNano(tb * fetch_info.bs().ns());
let ts_bin = TsNano::from_ns(tb * fetch_info.bs().ns());
if ts_bin.ns() <= range.beg {
p1 = Some(i1);
break;

View File

@@ -385,7 +385,8 @@ impl FileContentStream2 {
fn make_reading(&mut self) {
let mut buf = Box::new(BytesMut::with_capacity(self.disk_io_tune.read_buffer_len));
let bufref = unsafe { &mut *((&mut buf as &mut BytesMut) as *mut BytesMut) };
// let bufref = unsafe { &mut *((&mut buf as &mut BytesMut) as *mut BytesMut) };
let bufref: &mut BytesMut = err::todoval();
let fileref = unsafe { &mut *((&mut self.file) as *mut Pin<Box<File>>) };
let fut = AsyncReadExt::read_buf(fileref, bufref).map_err(|e| e.into());
self.fcs = FCS2::Reading((buf, Box::pin(fut)));

View File

@@ -164,7 +164,7 @@ async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: &
.await
.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?;
let mut evix = 0;
let mut ts = TsNano(0);
let mut ts = TsNano::from_ns(0);
let mut pulse = 0;
while ts.ns() < DAY * 3 {
let res = gen_timebin(
@@ -352,7 +352,7 @@ async fn gen_timebin(
let mut evix = evix;
let mut ts = ts;
let mut pulse = pulse;
let tsmax = TsNano((tb + 1) * config.time_bin_size.ns());
let tsmax = TsNano::from_ns((tb + 1) * config.time_bin_size.ns());
while ts.ns() < tsmax.ns() {
match gen_var {
// TODO
@@ -377,7 +377,7 @@ async fn gen_timebin(
}
}
evix += 1;
ts.0 += ts_spacing;
ts = ts.add_ns(ts_spacing);
pulse += 1;
}
let ret = GenTimebinRes { evix, ts, pulse };

View File

@@ -197,7 +197,7 @@ pub fn parse_event(buf: &[u8]) -> Result<(u32, TsNano), Error> {
return Err(Error::with_msg(format!("len mismatch len1: {} len2: {}", len1, len2)));
}
let ts = u64::from_be_bytes(*array_ref![buf, 12, 8]);
Ok((len1 as u32, TsNano(ts)))
Ok((len1 as u32, TsNano::from_ns(ts)))
}
pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, TsNano), Error> {

View File

@@ -98,12 +98,12 @@ pub fn make_event_blobs_stream(
event_chunker_conf: EventChunkerConf,
disk_io_tune: DiskIoTune,
reqctx: ReqCtxArc,
node_config: &NodeConfigCached,
ncc: &NodeConfigCached,
) -> Result<EventChunkerMultifile, Error> {
debug!("make_local_event_blobs_stream {fetch_info:?} disk_io_tune {disk_io_tune:?}");
// TODO should not need this for correctness.
// Should limit based on return size and latency.
let out_max_len = if node_config.node_config.cluster.is_central_storage {
let out_max_len = if ncc.node_config.cluster.is_central_storage {
128
} else {
128
@@ -111,8 +111,8 @@ pub fn make_event_blobs_stream(
let event_blobs = EventChunkerMultifile::new(
range,
fetch_info.clone(),
node_config.node.clone(),
node_config.ix,
ncc.node.clone(),
ncc.ix,
disk_io_tune,
event_chunker_conf,
expand,
@@ -126,7 +126,7 @@ pub fn make_event_blobs_pipe_real(
subq: &EventsSubQuery,
fetch_info: &SfChFetchInfo,
reqctx: ReqCtxArc,
node_config: &NodeConfigCached,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
let expand = subq.transform().need_one_before_range();
let range = subq.range();
@@ -138,7 +138,7 @@ pub fn make_event_blobs_pipe_real(
event_chunker_conf,
subq.disk_io_tune(),
reqctx,
node_config,
ncc,
)?;
let pipe = Box::pin(event_blobs) as _;
Ok(pipe)
@@ -191,12 +191,12 @@ pub fn make_event_blobs_pipe(
subq: &EventsSubQuery,
fetch_info: &SfChFetchInfo,
reqctx: ReqCtxArc,
node_config: &NodeConfigCached,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
debug!("make_event_blobs_pipe {subq:?}");
if subq.backend() == TEST_BACKEND {
make_event_blobs_pipe_test(subq, node_config)
make_event_blobs_pipe_test(subq, ncc)
} else {
make_event_blobs_pipe_real(subq, fetch_info, reqctx, node_config)
make_event_blobs_pipe_real(subq, fetch_info, reqctx, ncc)
}
}

View File

@@ -6,6 +6,7 @@ use crate::gather::SubRes;
use crate::response;
use crate::ReqCtx;
use crate::Requ;
use crate::ServiceSharedResources;
use bytes::BufMut;
use bytes::BytesMut;
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
@@ -896,6 +897,7 @@ impl Api1EventsBinaryHandler {
&self,
req: Requ,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() != Method::POST {
@@ -942,6 +944,7 @@ impl Api1EventsBinaryHandler {
span.clone(),
reqidspan.clone(),
ctx,
shared_res,
node_config,
)
.instrument(span)
@@ -958,6 +961,7 @@ impl Api1EventsBinaryHandler {
span: tracing::Span,
reqidspan: tracing::Span,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let self_name = any::type_name::<Self>();
@@ -983,9 +987,14 @@ impl Api1EventsBinaryHandler {
for ch in qu.channels() {
debug!("try to find config quorum for {ch:?}");
let ch = SfDbChannel::from_name(backend, ch.name());
let ch_conf =
nodenet::configquorum::find_config_basics_quorum(ch.clone(), range.clone().into(), ctx, ncc)
.await?;
let ch_conf = nodenet::configquorum::find_config_basics_quorum(
ch.clone(),
range.clone().into(),
ctx,
&shared_res.pgqueue,
ncc,
)
.await?;
match ch_conf {
Some(x) => {
debug!("found quorum {ch:?} {x:?}");

View File

@@ -72,8 +72,7 @@ impl AccountingIngestedBytes {
let scyco = ncc
.node_config
.cluster
.scylla
.as_ref()
.scylla_st()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
let mut stream = scyllaconn::accounting::totals::AccountingStreamScylla::new(q.range().try_into()?, scy);
@@ -136,16 +135,16 @@ impl AccountingToplistCounts {
_ctx: &ReqCtx,
ncc: &NodeConfigCached,
) -> Result<Toplist, Error> {
// TODO assumes that accounting data is in the LT keyspace
let scyco = ncc
.node_config
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
.scylla_lt()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no lt scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
let pgconf = &ncc.node_config.cluster.database;
let pg = dbconn::create_connection(&pgconf).await?;
let mut top1 = scyllaconn::accounting::toplist::read_ts(qu.ts().0, scy).await?;
let (pg, pgjh) = dbconn::create_connection(&pgconf).await?;
let mut top1 = scyllaconn::accounting::toplist::read_ts(qu.ts().ns(), scy).await?;
top1.sort_by_bytes();
let mut ret = Toplist { toplist: Vec::new() };
let series_ids: Vec<_> = top1.usage().iter().take(qu.limit() as _).map(|x| x.0).collect();

View File

@@ -5,6 +5,8 @@ use crate::channelconfig::ch_conf_from_binned;
use crate::err::Error;
use crate::requests::accepts_json_or_all;
use crate::requests::accepts_octets;
use crate::ServiceSharedResources;
use dbconn::worker::PgQueue;
use http::Method;
use http::StatusCode;
use httpclient::body_empty;
@@ -23,7 +25,13 @@ use query::api4::binned::BinnedQuery;
use tracing::Instrument;
use url::Url;
async fn binned_json(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
async fn binned_json(
url: Url,
req: Requ,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("{:?}", req);
let reqid = crate::status_board()
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?
@@ -35,7 +43,7 @@ async fn binned_json(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached)
e.add_public_msg(msg)
})?;
// TODO handle None case better and return 404
let ch_conf = ch_conf_from_binned(&query, ctx, ncc)
let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
let span1 = span!(
@@ -58,7 +66,7 @@ async fn binned_json(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached)
Ok(ret)
}
async fn binned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
async fn binned(req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
if req
.uri()
@@ -68,7 +76,7 @@ async fn binned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Resu
Err(Error::with_msg_no_trace("hidden message").add_public_msg("PublicMessage"))?;
}
if accepts_json_or_all(&req.headers()) {
Ok(binned_json(url, req, ctx, node_config).await?)
Ok(binned_json(url, req, ctx, pgqueue, ncc).await?)
} else if accepts_octets(&req.headers()) {
Ok(response_err_msg(
StatusCode::NOT_ACCEPTABLE,
@@ -98,12 +106,13 @@ impl BinnedHandler {
&self,
req: Requ,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
match binned(req, ctx, node_config).await {
match binned(req, ctx, &shared_res.pgqueue, ncc).await {
Ok(ret) => Ok(ret),
Err(e) => {
warn!("BinnedHandler handle sees: {e}");

View File

@@ -1,6 +1,7 @@
use crate::bodystream::response_err_msg;
use crate::response;
use crate::ReqCtx;
use crate::ServiceSharedResources;
use err::thiserror;
use err::PublicError;
use err::ThisError;
@@ -15,6 +16,7 @@ use httpclient::StreamResponse;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::ServiceVersion;
use std::sync::Arc;
#[derive(Debug, ThisError)]
pub enum EventDataError {
@@ -50,14 +52,14 @@ impl EventDataHandler {
req: Requ,
_ctx: &ReqCtx,
ncc: &NodeConfigCached,
_service_version: &ServiceVersion,
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, EventDataError> {
if req.method() != Method::POST {
Ok(response(StatusCode::NOT_ACCEPTABLE)
.body(body_empty())
.map_err(|_| EventDataError::InternalError)?)
} else {
match Self::handle_req(req, ncc).await {
match Self::handle_req(req, ncc, shared_res).await {
Ok(ret) => Ok(ret),
Err(e) => {
error!("{e}");
@@ -69,7 +71,11 @@ impl EventDataHandler {
}
}
async fn handle_req(req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, EventDataError> {
async fn handle_req(
req: Requ,
ncc: &NodeConfigCached,
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, EventDataError> {
let (_head, body) = req.into_parts();
let body = read_body_bytes(body)
.await

View File

@@ -5,9 +5,11 @@ use crate::requests::accepts_cbor_framed;
use crate::requests::accepts_json_framed;
use crate::requests::accepts_json_or_all;
use crate::response;
use crate::ServiceSharedResources;
use crate::ToPublicResponse;
use bytes::Bytes;
use bytes::BytesMut;
use dbconn::worker::PgQueue;
use futures_util::future;
use futures_util::stream;
use futures_util::Stream;
@@ -44,12 +46,13 @@ impl EventsHandler {
&self,
req: Requ,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
match plain_events(req, ctx, node_config).await {
match plain_events(req, ctx, &shared_res.pgqueue, ncc).await {
Ok(ret) => Ok(ret),
Err(e) => {
error!("EventsHandler sees: {e}");
@@ -59,14 +62,19 @@ impl EventsHandler {
}
}
async fn plain_events(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
async fn plain_events(
req: Requ,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
if accepts_cbor_framed(req.headers()) {
Ok(plain_events_cbor_framed(url, req, ctx, node_config).await?)
Ok(plain_events_cbor_framed(url, req, ctx, pgqueue, ncc).await?)
} else if accepts_json_framed(req.headers()) {
Ok(plain_events_json_framed(url, req, ctx, node_config).await?)
Ok(plain_events_json_framed(url, req, ctx, pgqueue, ncc).await?)
} else if accepts_json_or_all(req.headers()) {
Ok(plain_events_json(url, req, ctx, node_config).await?)
Ok(plain_events_json(url, req, ctx, pgqueue, ncc).await?)
} else {
let ret = response_err_msg(StatusCode::NOT_ACCEPTABLE, format!("unsupported accept {:?}", req))?;
Ok(ret)
@@ -77,10 +85,11 @@ async fn plain_events_cbor_framed(
url: Url,
req: Requ,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?;
let ch_conf = chconf_from_events_quorum(&evq, ctx, ncc)
let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
info!("plain_events_cbor_framed chconf_from_events_quorum: {ch_conf:?} {req:?}");
@@ -115,10 +124,11 @@ async fn plain_events_json_framed(
url: Url,
req: Requ,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?;
let ch_conf = chconf_from_events_quorum(&evq, ctx, ncc)
let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
info!("plain_events_json_framed chconf_from_events_quorum: {ch_conf:?} {req:?}");
@@ -133,7 +143,8 @@ async fn plain_events_json(
url: Url,
req: Requ,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let self_name = "plain_events_json";
info!("{self_name} req: {:?}", req);
@@ -141,17 +152,17 @@ async fn plain_events_json(
let query = PlainEventsQuery::from_url(&url)?;
info!("{self_name} query {query:?}");
// TODO handle None case better and return 404
let ch_conf = chconf_from_events_quorum(&query, ctx, node_config)
let ch_conf = chconf_from_events_quorum(&query, ctx, pgqueue, ncc)
.await
.map_err(Error::from)?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
info!("{self_name} chconf_from_events_quorum: {ch_conf:?}");
let open_bytes = OpenBoxedBytesViaHttp::new(node_config.node_config.cluster.clone());
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let item = streams::plaineventsjson::plain_events_json(
&query,
ch_conf,
ctx,
&node_config.node_config.cluster,
&ncc.node_config.cluster,
Box::pin(open_bytes),
)
.await;

View File

@@ -1,6 +1,7 @@
use crate::bodystream::response;
use crate::err::Error;
use crate::ReqCtx;
use crate::ServiceSharedResources;
use futures_util::StreamExt;
use http::Method;
use http::StatusCode;
@@ -37,7 +38,8 @@ impl ConnectionStatusEvents {
&self,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
@@ -48,7 +50,7 @@ impl ConnectionStatusEvents {
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
let url = req_uri_to_url(req.uri())?;
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, node_config).await {
match self.fetch_data(&q, shared_res, ncc).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
@@ -70,17 +72,18 @@ impl ConnectionStatusEvents {
async fn fetch_data(
&self,
q: &ChannelStateEventsQuery,
node_config: &NodeConfigCached,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<Vec<ConnStatusEvent>, Error> {
let scyco = node_config
let scyco = ncc
.node_config
.cluster
.scylla
.as_ref()
.scylla_st()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let _scy = scyllaconn::conn::create_scy_session(scyco).await?;
let _chconf =
nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?;
nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), &shared_res.pgqueue, ncc)
.await?;
let _do_one_before_range = true;
let ret = Vec::new();
if true {
@@ -111,7 +114,8 @@ impl ChannelStatusEventsHandler {
&self,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
@@ -122,7 +126,7 @@ impl ChannelStatusEventsHandler {
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
let url = req_uri_to_url(req.uri())?;
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, node_config).await {
match self.fetch_data(&q, shared_res, ncc).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
@@ -144,20 +148,25 @@ impl ChannelStatusEventsHandler {
async fn fetch_data(
&self,
q: &ChannelStateEventsQuery,
node_config: &NodeConfigCached,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<ChannelStatusEvents, Error> {
let scyco = node_config
let scyco = ncc
.node_config
.cluster
.scylla
.as_ref()
.scylla_st()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
let do_one_before_range = true;
if false {
let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel config not found"))?;
let chconf = nodenet::channelconfig::channel_config(
q.range().clone(),
q.channel().clone(),
&shared_res.pgqueue,
ncc,
)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel config not found"))?;
use netpod::ChannelTypeConfigGen;
match chconf {
ChannelTypeConfigGen::Scylla(_x) => todo!(),

View File

@@ -2,6 +2,7 @@ use crate::err::Error;
use crate::response;
use crate::ToPublicResponse;
use dbconn::create_connection;
use dbconn::worker::PgQueue;
use futures_util::StreamExt;
use http::Method;
use http::StatusCode;
@@ -38,27 +39,30 @@ use url::Url;
pub async fn chconf_from_events_quorum(
q: &PlainEventsQuery,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<Option<ChannelTypeConfigGen>, Error> {
let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, ncc).await?;
let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, pgqueue, ncc).await?;
Ok(ret)
}
pub async fn chconf_from_prebinned(
q: &PreBinnedQuery,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<Option<ChannelTypeConfigGen>, Error> {
let ret = find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ctx, ncc).await?;
let ret = find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ctx, pgqueue, ncc).await?;
Ok(ret)
}
pub async fn ch_conf_from_binned(
q: &BinnedQuery,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<Option<ChannelTypeConfigGen>, Error> {
let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, ncc).await?;
let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, pgqueue, ncc).await?;
Ok(ret)
}
@@ -73,7 +77,12 @@ impl ChannelConfigHandler {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
pub async fn handle(
&self,
req: Requ,
pgqueue: &PgQueue,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -81,7 +90,7 @@ impl ChannelConfigHandler {
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
match self.channel_config(req, &node_config).await {
match self.channel_config(req, pgqueue, &node_config).await {
Ok(k) => Ok(k),
Err(e) => {
warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}");
@@ -96,10 +105,16 @@ impl ChannelConfigHandler {
}
}
async fn channel_config(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
async fn channel_config(
&self,
req: Requ,
pgqueue: &PgQueue,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
let q = ChannelConfigQuery::from_url(&url)?;
let conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?;
let conf =
nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), pgqueue, node_config).await?;
match conf {
Some(conf) => {
let res: ChannelConfigResponse = conf.into();
@@ -180,6 +195,7 @@ impl ChannelConfigQuorumHandler {
&self,
req: Requ,
ctx: &ReqCtx,
pgqueue: &PgQueue,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
@@ -189,7 +205,7 @@ impl ChannelConfigQuorumHandler {
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
match self.channel_config_quorum(req, ctx, &node_config).await {
match self.channel_config_quorum(req, ctx, pgqueue, &node_config).await {
Ok(k) => Ok(k),
Err(e) => {
warn!("from channel_config_quorum: {e}");
@@ -208,13 +224,15 @@ impl ChannelConfigQuorumHandler {
&self,
req: Requ,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
info!("channel_config_quorum");
let url = req_uri_to_url(req.uri())?;
let q = ChannelConfigQuery::from_url(&url)?;
info!("channel_config_quorum for q {q:?}");
let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, ncc).await?;
let ch_confs =
nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, pgqueue, ncc).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(ToJsonBody::from(&ch_confs).into_body())?;
@@ -386,8 +404,7 @@ impl ScyllaChannelsActive {
let scyco = node_config
.node_config
.cluster
.scylla
.as_ref()
.scylla_st()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
// Database stores tsedge/ts_msp in units of (10 sec), and we additionally map to the grid.
@@ -494,7 +511,7 @@ impl IocForChannel {
node_config: &NodeConfigCached,
) -> Result<Option<IocForChannelRes>, Error> {
let dbconf = &node_config.node_config.cluster.database;
let pg_client = create_connection(dbconf).await?;
let (pg_client, pgjh) = create_connection(dbconf).await?;
let rows = pg_client
.query(
"select addr from ioc_by_channel where facility = $1 and channel = $2",
@@ -583,8 +600,7 @@ impl ScyllaSeriesTsMsp {
let scyco = node_config
.node_config
.cluster
.scylla
.as_ref()
.scylla_st()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
let mut ts_msps = Vec::new();
@@ -626,7 +642,7 @@ impl AmbigiousChannelNames {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -634,7 +650,7 @@ impl AmbigiousChannelNames {
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
match self.process(node_config).await {
match self.process(ncc).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
@@ -650,9 +666,9 @@ impl AmbigiousChannelNames {
}
}
async fn process(&self, node_config: &NodeConfigCached) -> Result<AmbigiousChannelNamesResponse, Error> {
let dbconf = &node_config.node_config.cluster.database;
let pg_client = create_connection(dbconf).await?;
async fn process(&self, ncc: &NodeConfigCached) -> Result<AmbigiousChannelNamesResponse, Error> {
let dbconf = &ncc.node_config.cluster.database;
let (pg_client, pgjh) = create_connection(dbconf).await?;
let rows = pg_client
.query(
"select t2.series, t2.channel, t2.scalar_type, t2.shape_dims, t2.agg_kind from series_by_channel t1, series_by_channel t2 where t2.channel = t1.channel and t2.series != t1.series",
@@ -747,9 +763,7 @@ impl GenerateScyllaTestData {
}
async fn process(&self, node_config: &NodeConfigCached) -> Result<(), Error> {
let dbconf = &node_config.node_config.cluster.database;
let _pg_client = create_connection(dbconf).await?;
let scyconf = node_config.node_config.cluster.scylla.as_ref().unwrap();
let scyconf = node_config.node_config.cluster.scylla_st().unwrap();
let scy = scyllaconn::conn::create_scy_session(scyconf).await?;
let series: u64 = 42001;
// TODO query `ts_msp` for all MSP values und use that to delete from event table first.

View File

@@ -19,6 +19,8 @@ use crate::bodystream::response;
use crate::err::Error;
use ::err::thiserror;
use ::err::ThisError;
use dbconn::worker::PgQueue;
use dbconn::worker::PgWorker;
use futures_util::Future;
use futures_util::FutureExt;
use http::Method;
@@ -37,6 +39,7 @@ use netpod::query::prebinned::PreBinnedQuery;
use netpod::req_uri_to_url;
use netpod::status_board;
use netpod::status_board_init;
use netpod::Database;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use netpod::ServiceVersion;
@@ -49,6 +52,7 @@ use serde::Serialize;
use std::net;
use std::panic;
use std::pin;
use std::sync::Arc;
use std::task;
use task::Context;
use task::Poll;
@@ -79,6 +83,7 @@ impl IntoBoxedError for tokio::task::JoinError {}
impl IntoBoxedError for api4::databuffer_tools::FindActiveError {}
impl IntoBoxedError for std::string::FromUtf8Error {}
impl IntoBoxedError for std::io::Error {}
impl IntoBoxedError for dbconn::worker::Error {}
impl<E> From<E> for RetrievalError
where
@@ -95,16 +100,29 @@ impl ::err::ToErr for RetrievalError {
}
}
pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> {
pub struct ServiceSharedResources {
pgqueue: PgQueue,
}
impl ServiceSharedResources {
pub fn new(pgqueue: PgQueue) -> Self {
Self { pgqueue }
}
}
pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> {
status_board_init();
#[cfg(DISABLED)]
if let Some(bind) = node_config.node.prometheus_api_bind {
if let Some(bind) = ncc.node.prometheus_api_bind {
tokio::spawn(prometheus::host(bind));
}
// let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone()));
let (pgqueue, pgworker) = PgWorker::new(&ncc.node_config.cluster.database).await?;
let pgworker_jh = taskrun::spawn(pgworker.work());
let shared_res = ServiceSharedResources::new(pgqueue);
let shared_res = Arc::new(shared_res);
use std::str::FromStr;
let bind_addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?;
let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?;
// tokio::net::TcpSocket::new_v4()?.listen(200)?
let listener = TcpListener::bind(bind_addr).await?;
loop {
@@ -114,14 +132,24 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion
break;
};
debug!("new connection from {addr}");
let node_config = node_config.clone();
let node_config = ncc.clone();
let service_version = service_version.clone();
let io = TokioIo::new(stream);
let shared_res = shared_res.clone();
// let shared_res = &shared_res;
tokio::task::spawn(async move {
let res = hyper::server::conn::http1::Builder::new()
.serve_connection(
io,
service_fn(move |req| the_service_fn(req, addr, node_config.clone(), service_version.clone())),
service_fn(move |req| {
the_service_fn(
req,
addr,
node_config.clone(),
service_version.clone(),
shared_res.clone(),
)
}),
)
.await;
match res {
@@ -132,7 +160,7 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion
}
});
}
info!("http host done");
// rawjh.await??;
Ok(())
}
@@ -142,10 +170,11 @@ async fn the_service_fn(
addr: SocketAddr,
node_config: NodeConfigCached,
service_version: ServiceVersion,
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, Error> {
let ctx = ReqCtx::new_with_node(&req, &node_config);
let reqid_span = span!(Level::INFO, "req", reqid = ctx.reqid());
let f = http_service(req, addr, ctx, node_config, service_version);
let f = http_service(req, addr, ctx, node_config, service_version, shared_res);
let f = Cont { f: Box::pin(f) };
f.instrument(reqid_span).await
}
@@ -156,6 +185,7 @@ async fn http_service(
ctx: ReqCtx,
node_config: NodeConfigCached,
service_version: ServiceVersion,
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, Error> {
info!(
"http-request {:?} - {:?} - {:?} - {:?}",
@@ -164,7 +194,7 @@ async fn http_service(
req.uri(),
req.headers()
);
match http_service_try(req, ctx, &node_config, &service_version).await {
match http_service_try(req, ctx, &node_config, &service_version, shared_res).await {
Ok(k) => Ok(k),
Err(e) => {
error!("daqbuffer node http_service sees error from http_service_try: {}", e);
@@ -209,6 +239,7 @@ async fn http_service_try(
ctx: ReqCtx,
node_config: &NodeConfigCached,
service_version: &ServiceVersion,
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, Error> {
use http::HeaderValue;
let mut urlmarks = Vec::new();
@@ -221,7 +252,7 @@ async fn http_service_try(
}
}
}
let mut res = http_service_inner(req, &ctx, node_config, service_version).await?;
let mut res = http_service_inner(req, &ctx, node_config, service_version, shared_res).await?;
let hm = res.headers_mut();
hm.append("Access-Control-Allow-Origin", "*".parse().unwrap());
hm.append("Access-Control-Allow-Headers", "*".parse().unwrap());
@@ -243,6 +274,7 @@ async fn http_service_inner(
ctx: &ReqCtx,
node_config: &NodeConfigCached,
service_version: &ServiceVersion,
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, RetrievalError> {
let uri = req.uri().clone();
let path = uri.path();
@@ -291,7 +323,7 @@ async fn http_service_inner(
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if let Some(h) = api4::eventdata::EventDataHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config, service_version)
Ok(h.handle(req, ctx, &node_config, shared_res)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?)
} else if let Some(h) = api4::status::StatusNodesRecursive::handler(&req) {
@@ -303,19 +335,19 @@ async fn http_service_inner(
} else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if let Some(h) = api4::events::EventsHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if let Some(h) = api4::binned::BinnedHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if let Some(h) = channelconfig::ChannelConfigQuorumHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
Ok(h.handle(req, ctx, &shared_res.pgqueue, &node_config).await?)
} else if let Some(h) = channelconfig::ChannelConfigsHandler::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) {
Ok(h.handle(req, &node_config).await?)
Ok(h.handle(req, &shared_res.pgqueue, &node_config).await?)
} else if let Some(h) = channelconfig::IocForChannel::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) {
@@ -357,7 +389,7 @@ async fn http_service_inner(
} else if let Some(h) = settings::SettingsThreadsMaxHandler::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = api1::Api1EventsBinaryHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = pulsemap::IndexChannelHttpFunction::handler(&req) {

View File

@@ -425,7 +425,7 @@ impl IndexChannelHttpFunction {
async fn index(req: Requ, do_print: bool, node_config: &NodeConfigCached) -> Result<String, Error> {
// TODO avoid double-insert on central storage.
let pgc = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let (pgc, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
let insert_01 = pgc.prepare(sql).await?;
@@ -936,7 +936,7 @@ impl MapPulseScyllaHandler {
let url = req_uri_to_url(req.uri())?;
let query = MapPulseQuery::from_url(&url)?;
let pulse = query.pulse;
let scyconf = if let Some(x) = node_config.node_config.cluster.scylla.as_ref() {
let scyconf = if let Some(x) = node_config.node_config.cluster.scylla_st() {
x
} else {
return Err(Error::with_public_msg_no_trace("no scylla configured"));
@@ -1017,7 +1017,7 @@ impl MapPulseLocalHttpFunction {
})
.unwrap_or_else(|| String::from("missing x-req-from"));
let ts1 = Instant::now();
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let (conn, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)";
let rows = conn.query(sql, &[&node_config.node.host, &(pulse as i64)]).await?;
let cands: Vec<_> = rows
@@ -1516,7 +1516,7 @@ impl MarkClosedHttpFunction {
}
pub async fn mark_closed(node_config: &NodeConfigCached) -> Result<(), Error> {
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let (conn, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let sql = "select distinct channel from map_pulse_files order by channel";
let rows = conn.query(sql, &[]).await?;
let chns: Vec<_> = rows.iter().map(|r| r.get::<_, String>(0)).collect();

View File

@@ -1011,7 +1011,7 @@ impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {
fn bins_timebin_fill_empty_00() {
let mut bins = BinsDim0::<u32>::empty();
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_len: TsNano::from_ns(SEC * 2),
bin_off: 9,
bin_cnt: 5,
});
@@ -1033,7 +1033,7 @@ fn bins_timebin_fill_empty_00() {
fn bins_timebin_fill_empty_01() {
let mut bins = BinsDim0::<u32>::empty();
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_len: TsNano::from_ns(SEC * 2),
bin_off: 9,
bin_cnt: 5,
});
@@ -1056,7 +1056,7 @@ fn bins_timebin_fill_empty_01() {
fn bins_timebin_push_empty_00() {
let mut bins = BinsDim0::<u32>::empty();
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_len: TsNano::from_ns(SEC * 2),
bin_off: 9,
bin_cnt: 5,
});
@@ -1078,7 +1078,7 @@ fn bins_timebin_push_empty_00() {
fn bins_timebin_push_empty_01() {
let mut bins = BinsDim0::<u32>::empty();
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_len: TsNano::from_ns(SEC * 2),
bin_off: 9,
bin_cnt: 5,
});
@@ -1104,7 +1104,7 @@ fn bins_timebin_ingest_only_before() {
bins.push(SEC * 2, SEC * 4, 3, 7, 9, 8.1);
bins.push(SEC * 4, SEC * 6, 3, 6, 9, 8.2);
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_len: TsNano::from_ns(SEC * 2),
bin_off: 9,
bin_cnt: 5,
});
@@ -1127,7 +1127,7 @@ fn bins_timebin_ingest_00() {
bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.);
bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.);
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_len: TsNano::from_ns(SEC * 2),
bin_off: 9,
bin_cnt: 5,
});
@@ -1148,7 +1148,7 @@ fn bins_timebin_ingest_00() {
#[test]
fn bins_timebin_ingest_continuous_00() {
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_len: TsNano::from_ns(SEC * 2),
bin_off: 9,
bin_cnt: 20,
});

View File

@@ -1292,7 +1292,7 @@ fn binner_00() {
let mut ev1 = EventsDim0::empty();
ev1.push(MS * 1200, 3, 1.2f32);
ev1.push(MS * 3200, 3, 3.2f32);
let binrange = BinnedRangeEnum::from_custom(TsNano(SEC), 0, 10);
let binrange = BinnedRangeEnum::from_custom(TsNano::from_ns(SEC), 0, 10);
let mut binner = ev1.time_binner_new(binrange, true);
binner.ingest(ev1.as_time_binnable_mut());
eprintln!("{:?}", binner);
@@ -1306,7 +1306,7 @@ fn binner_01() {
ev1.push(MS * 1300, 3, 1.3);
ev1.push(MS * 2100, 3, 2.1);
ev1.push(MS * 2300, 3, 2.3);
let binrange = BinnedRangeEnum::from_custom(TsNano(SEC), 0, 10);
let binrange = BinnedRangeEnum::from_custom(TsNano::from_ns(SEC), 0, 10);
let mut binner = ev1.time_binner_new(binrange, true);
binner.ingest(ev1.as_time_binnable_mut());
eprintln!("{:?}", binner);
@@ -1386,7 +1386,7 @@ fn bin_binned_02() {
#[test]
fn events_timebin_ingest_continuous_00() {
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_len: TsNano::from_ns(SEC * 2),
bin_off: 9,
bin_cnt: 20,
});

View File

@@ -16,6 +16,7 @@ use items_0::streamitem::ERROR_FRAME_TYPE_ID;
use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME;
use items_0::streamitem::SITEMTY_NONSPEC_FRAME_TYPE_ID;
use items_0::Events;
use netpod::log::*;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
@@ -82,7 +83,10 @@ where
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => make_range_complete_frame(),
Ok(StreamItem::Log(item)) => make_log_frame(item),
Ok(StreamItem::Stats(item)) => make_stats_frame(item),
Err(e) => make_error_frame(e),
Err(e) => {
info!("calling make_error_frame for [[{e}]]");
make_error_frame(e)
}
}
}
}
@@ -181,7 +185,7 @@ fn test_frame_log() {
#[test]
fn test_frame_error() {
use crate::channelevents::ChannelEvents;
use crate::frame::decode_from_slice;
use crate::frame::json_from_slice;
let item: Sitemty<ChannelEvents> = Err(Error::with_msg_no_trace(format!("dummy-error-message")));
let buf = Framable::make_frame(&item).unwrap();
let len = u32::from_le_bytes(buf[12..16].try_into().unwrap());
@@ -190,5 +194,5 @@ fn test_frame_error() {
panic!("bad tyid");
}
eprintln!("buf len {} len {}", buf.len(), len);
let item2: Error = decode_from_slice(&buf[20..20 + len as usize]).unwrap();
let item2: Error = json_from_slice(&buf[20..20 + len as usize]).unwrap();
}

View File

@@ -146,6 +146,20 @@ where
postcard::from_bytes(buf).map_err(|e| format!("{e}").into())
}
fn json_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: Serialize,
{
serde_json::to_vec(&item).map_err(Error::from_string)
}
pub fn json_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
serde_json::from_slice(buf).map_err(Error::from_string)
}
pub fn encode_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: Serialize,
@@ -213,7 +227,8 @@ where
// TODO remove duplication for these similar `make_*_frame` functions:
pub fn make_error_frame(error: &err::Error) -> Result<BytesMut, Error> {
match encode_to_vec(error) {
// error frames are always encoded as json
match json_to_vec(error) {
Ok(enc) => {
let mut h = crc32fast::Hasher::new();
h.update(&enc);
@@ -335,7 +350,8 @@ where
)));
}
if frame.tyid() == ERROR_FRAME_TYPE_ID {
let k: err::Error = match decode_from_slice(frame.buf()) {
// error frames are always encoded as json
let k: err::Error = match json_from_slice(frame.buf()) {
Ok(item) => item,
Err(e) => {
error!("deserialize len {} ERROR_FRAME_TYPE_ID {}", frame.buf().len(), e);

View File

@@ -642,8 +642,14 @@ pub struct Cluster {
pub is_central_storage: bool,
#[serde(rename = "fileIoBufferSize", default)]
pub file_io_buffer_size: FileIoBufferSize,
pub scylla: Option<ScyllaConfig>,
pub cache_scylla: Option<ScyllaConfig>,
scylla: Option<ScyllaConfig>,
#[serde(rename = "scylla_st")]
scylla_st: Option<ScyllaConfig>,
#[serde(rename = "scylla_mt")]
scylla_mt: Option<ScyllaConfig>,
#[serde(rename = "scylla_lt")]
scylla_lt: Option<ScyllaConfig>,
cache_scylla: Option<ScyllaConfig>,
}
impl Cluster {
@@ -654,6 +660,40 @@ impl Cluster {
true
}
}
pub fn scylla_st(&self) -> Option<&ScyllaConfig> {
self.scylla_st.as_ref().map_or_else(|| self.scylla.as_ref(), Some)
}
pub fn scylla_mt(&self) -> Option<&ScyllaConfig> {
self.scylla_mt.as_ref()
}
pub fn scylla_lt(&self) -> Option<&ScyllaConfig> {
self.scylla_lt.as_ref()
}
pub fn test_00() -> Self {
Self {
backend: "testbackend-00".into(),
nodes: Vec::new(),
database: Database {
name: "".into(),
host: "".into(),
port: 5432,
user: "".into(),
pass: "".into(),
},
run_map_pulse_task: false,
is_central_storage: false,
file_io_buffer_size: FileIoBufferSize(1024 * 8),
scylla: None,
scylla_st: None,
scylla_mt: None,
scylla_lt: None,
cache_scylla: None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -1368,20 +1408,20 @@ where
pub struct DtNano(u64);
impl DtNano {
pub fn from_ns(ns: u64) -> Self {
pub const fn from_ns(ns: u64) -> Self {
Self(ns)
}
pub fn from_ms(ns: u64) -> Self {
Self(MS * ns)
pub const fn from_ms(ns: u64) -> Self {
Self(1000000 * ns)
}
pub fn ns(&self) -> u64 {
pub const fn ns(&self) -> u64 {
self.0
}
pub fn ms(&self) -> u64 {
self.0 / MS
pub const fn ms(&self) -> u64 {
self.0 / 1000000
}
pub fn to_i64(&self) -> i64 {
@@ -1389,13 +1429,13 @@ impl DtNano {
}
}
#[cfg(DISABLED)]
impl fmt::Debug for DtNano {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
impl fmt::Display for DtNano {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let sec = self.0 / SEC;
let ms = (self.0 - SEC * sec) / MS;
let ns = self.0 - SEC * sec - MS * ms;
f.debug_tuple("DtNano").field(&sec).field(&ms).field(&ns).finish()
// fmt.debug_tuple("DtNano").field(&sec).field(&ms).field(&ns).finish()
write!(fmt, "DtNano {{ sec {} ms {} ns {} }}", sec, ms, ns)
}
}
@@ -1452,7 +1492,11 @@ impl DtMs {
}
pub const fn ms(&self) -> u64 {
self.0 / MS
self.0
}
pub const fn ns(&self) -> u64 {
1000000 * self.0
}
pub const fn to_i64(&self) -> i64 {
@@ -1461,7 +1505,7 @@ impl DtMs {
}
#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)]
pub struct TsNano(pub u64);
pub struct TsNano(u64);
mod ts_nano_ser {
use super::TsNano;
@@ -1522,7 +1566,7 @@ impl TsNano {
}
pub const fn from_ms(ns: u64) -> Self {
Self(MS * ns)
Self(1000000 * ns)
}
pub const fn ns(&self) -> u64 {
@@ -1530,7 +1574,7 @@ impl TsNano {
}
pub const fn ms(&self) -> u64 {
self.0 / MS
self.0 / 1000000
}
pub const fn sub(self, v: DtNano) -> Self {
@@ -1560,19 +1604,26 @@ impl TsNano {
impl fmt::Debug for TsNano {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32);
let ts = Utc
.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32)
.earliest()
.unwrap_or(Default::default());
f.debug_struct("TsNano")
.field(
"ts",
&ts.earliest()
.unwrap_or(Default::default())
.format(DATETIME_FMT_3MS)
.to_string(),
)
.field("ts", &ts.format(DATETIME_FMT_3MS).to_string())
.finish()
}
}
impl fmt::Display for TsNano {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let ts = Utc
.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32)
.earliest()
.unwrap_or(Default::default());
ts.format(DATETIME_FMT_3MS).fmt(fmt)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)]
pub struct PulseId(u64);
@@ -2372,6 +2423,18 @@ impl TsMs {
self.0
}
pub const fn ns(self) -> TsNano {
TsNano::from_ms(self.0)
}
pub const fn ns_u64(self) -> u64 {
1000000 * self.0
}
pub const fn sec(self) -> u64 {
self.0 / 1000
}
pub const fn to_u64(self) -> u64 {
self.0
}
@@ -2387,6 +2450,12 @@ impl TsMs {
}
}
impl fmt::Display for TsMs {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "TsMs {{ {} }}", self.0)
}
}
impl std::ops::Sub for TsMs {
type Output = TsMs;
@@ -3215,6 +3284,9 @@ pub fn test_cluster() -> Cluster {
pass: "testingdaq".into(),
},
scylla: None,
scylla_st: None,
scylla_mt: None,
scylla_lt: None,
cache_scylla: None,
run_map_pulse_task: false,
is_central_storage: false,
@@ -3249,6 +3321,9 @@ pub fn sls_test_cluster() -> Cluster {
pass: "testingdaq".into(),
},
scylla: None,
scylla_st: None,
scylla_mt: None,
scylla_lt: None,
cache_scylla: None,
run_map_pulse_task: false,
is_central_storage: false,
@@ -3283,6 +3358,9 @@ pub fn archapp_test_cluster() -> Cluster {
pass: "testingdaq".into(),
},
scylla: None,
scylla_st: None,
scylla_mt: None,
scylla_lt: None,
cache_scylla: None,
run_map_pulse_task: false,
is_central_storage: false,

View File

@@ -8,41 +8,54 @@ pub enum RetentionTime {
}
impl RetentionTime {
pub fn debug_tag(&self) -> &'static str {
use RetentionTime::*;
match self {
Short => "ST",
Medium => "MT",
Long => "LT",
}
}
pub fn table_prefix(&self) -> &'static str {
use RetentionTime::*;
match self {
Short => "",
Short => "st_",
Medium => "mt_",
Long => "lt_",
}
}
pub fn ttl_events_d0(&self) -> Duration {
match self {
RetentionTime::Short => Duration::from_secs(60 * 60 * 12),
RetentionTime::Medium => Duration::from_secs(60 * 60 * 24 * 100),
RetentionTime::Long => Duration::from_secs(60 * 60 * 24 * 31 * 12 * 11),
}
let day = 60 * 60 * 24;
let margin_max = Duration::from_secs(day * 2);
let ttl = self.ttl_ts_msp();
let margin = ttl / 10;
let margin = if margin >= margin_max { margin_max } else { margin };
ttl + margin
}
pub fn ttl_events_d1(&self) -> Duration {
match self {
RetentionTime::Short => Duration::from_secs(60 * 60 * 12),
RetentionTime::Medium => Duration::from_secs(60 * 60 * 24 * 100),
RetentionTime::Long => Duration::from_secs(60 * 60 * 24 * 31 * 12 * 11),
}
// TTL now depends only on RetentionTime, not on data type or shape.
self.ttl_events_d0()
}
pub fn ttl_ts_msp(&self) -> Duration {
let dt = self.ttl_events_d0();
dt + dt / 30
let day = 60 * 60 * 24;
match self {
RetentionTime::Short => Duration::from_secs(day * 40),
RetentionTime::Medium => Duration::from_secs(day * 31 * 13),
RetentionTime::Long => Duration::from_secs(day * 31 * 12 * 17),
}
}
pub fn ttl_binned(&self) -> Duration {
self.ttl_events_d0() * 2
// Current choice is to keep the TTL the same as for events
self.ttl_events_d0()
}
pub fn ttl_channel_status(&self) -> Duration {
self.ttl_binned()
// Current choice is to keep the TTL the same as for events
self.ttl_events_d0()
}
}

View File

@@ -1,3 +1,4 @@
use dbconn::worker::PgQueue;
use err::Error;
use httpclient::url::Url;
use netpod::log::*;
@@ -100,13 +101,14 @@ fn channel_config_test_backend(channel: SfDbChannel) -> Result<ChannelTypeConfig
pub async fn channel_config(
range: NanoRange,
channel: SfDbChannel,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<Option<ChannelTypeConfigGen>, Error> {
if channel.backend() == TEST_BACKEND {
Ok(Some(channel_config_test_backend(channel)?))
} else if ncc.node_config.cluster.scylla.is_some() {
} else if ncc.node_config.cluster.scylla_st().is_some() {
debug!("try to get ChConf for scylla type backend");
let ret = scylla_chconf_from_sf_db_channel(range, &channel, ncc)
let ret = scylla_chconf_from_sf_db_channel(range, &channel, pgqueue)
.await
.map_err(Error::from)?;
Ok(Some(ChannelTypeConfigGen::Scylla(ret)))
@@ -158,7 +160,7 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re
}
};
Ok(ret)
} else if ncc.node_config.cluster.scylla.is_some() {
} else if ncc.node_config.cluster.scylla_st().is_some() {
debug!("try to get ChConf for scylla type backend");
let ret = scylla_all_chconf_from_sf_db_channel(&channel, ncc)
.await
@@ -206,20 +208,22 @@ pub async fn http_get_channel_config(
async fn scylla_chconf_from_sf_db_channel(
range: NanoRange,
channel: &SfDbChannel,
ncc: &NodeConfigCached,
pgqueue: &PgQueue,
) -> Result<ChConf, Error> {
if let Some(series) = channel.series() {
dbconn::channelconfig::chconf_for_series(channel.backend(), series, ncc).await
let ret = pgqueue
.chconf_for_series(channel.backend(), series)
.await?
.recv()
.await??;
Ok(ret)
} else {
// TODO let called function allow to return None instead of error-not-found
let ret = dbconn::channelconfig::chconf_best_matching_for_name_and_range(
channel.backend(),
channel.name(),
range,
ncc,
)
.await
.map_err(Error::from)?;
let ret = pgqueue
.chconf_best_matching_name_range_job(channel.backend(), channel.name(), range)
.await?
.recv()
.await??;
Ok(ret)
}
}

View File

@@ -1,4 +1,5 @@
use crate::channelconfig::http_get_channel_config;
use dbconn::worker::PgQueue;
use err::Error;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
@@ -90,13 +91,14 @@ pub async fn find_config_basics_quorum(
channel: SfDbChannel,
range: SeriesRange,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<Option<ChannelTypeConfigGen>, Error> {
trace!("find_config_basics_quorum");
if let Some(_cfg) = &ncc.node.sf_databuffer {
let channel = if channel.name().is_empty() {
if let Some(_) = channel.series() {
let pgclient = dbconn::create_connection(&ncc.node_config.cluster.database).await?;
let (pgclient, _pgjh) = dbconn::create_connection(&ncc.node_config.cluster.database).await?;
let pgclient = std::sync::Arc::new(pgclient);
dbconn::find_sf_channel_by_series(channel, pgclient)
.await
@@ -111,9 +113,9 @@ pub async fn find_config_basics_quorum(
Some(x) => Ok(Some(ChannelTypeConfigGen::SfDatabuffer(x))),
None => Ok(None),
}
} else if let Some(_) = &ncc.node_config.cluster.scylla {
} else if let Some(_) = &ncc.node_config.cluster.scylla_st() {
let range = netpod::range::evrange::NanoRange::try_from(&range)?;
let ret = crate::channelconfig::channel_config(range, channel, ncc).await?;
let ret = crate::channelconfig::channel_config(range, channel, pgqueue, ncc).await?;
Ok(ret)
} else {
Err(Error::with_msg_no_trace(

View File

@@ -17,6 +17,7 @@ use items_2::empty::empty_events_dyn_ev;
use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::decode_frame;
use items_2::frame::make_error_frame;
use items_2::frame::make_term_frame;
use items_2::inmem::InMemoryFrame;
use netpod::histo::HistoLog2;
@@ -81,7 +82,7 @@ async fn make_channel_events_stream_data(
let node_count = ncc.node_config.cluster.nodes.len() as u64;
let node_ix = ncc.ix as u64;
streams::generators::make_test_channel_events_stream_data(subq, node_count, node_ix)
} else if let Some(scyconf) = &ncc.node_config.cluster.scylla {
} else if let Some(scyconf) = &ncc.node_config.cluster.scylla_st() {
let cfg = subq.ch_conf().to_scylla()?;
scylla_channel_event_stream(subq, cfg, scyconf, ncc).await
} else if let Some(_) = &ncc.node.channel_archiver {
@@ -125,6 +126,7 @@ pub async fn create_response_bytes_stream(
return Err(e);
}
if evq.is_event_blobs() {
// This is only relevant for "api-1" queries in sf-data/imagebuffer based backends.
// TODO support event blobs as transform
let fetch_info = evq.ch_conf().to_sf_databuffer()?;
let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc)?;
@@ -151,7 +153,7 @@ pub async fn create_response_bytes_stream(
})
});
// let stream = stream.map(move |x| Box::new(x) as Box<dyn Framable + Send>);
let stream = stream.map(|x| x.make_frame().map(|x| x.freeze()));
let stream = stream.map(|x| x.make_frame().map(bytes::BytesMut::freeze));
let ret = Box::pin(stream);
Ok(ret)
}

View File

@@ -39,6 +39,7 @@ use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
// TODO unify with Cluster::test_00()
const TEST_BACKEND: &str = "testbackend-00";
#[test]
@@ -50,22 +51,7 @@ fn raw_data_00() {
let cfg = NodeConfigCached {
node_config: NodeConfig {
name: "node_name_dummy".into(),
cluster: Cluster {
backend: TEST_BACKEND.into(),
nodes: Vec::new(),
database: Database {
name: "".into(),
host: "".into(),
port: 5432,
user: "".into(),
pass: "".into(),
},
run_map_pulse_task: false,
is_central_storage: false,
file_io_buffer_size: FileIoBufferSize(1024 * 8),
scylla: None,
cache_scylla: None,
},
cluster: Cluster::test_00(),
},
node: Node {
host: "empty".into(),

View File

@@ -29,7 +29,7 @@ pub async fn scylla_channel_event_stream(
let shape = chconf.shape();
let do_test_stream_error = false;
let with_values = evq.need_value_data();
debug!("Make EventsStreamScylla for {series:?} {scalar_type:?} {shape:?}");
debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n");
let stream = scyllaconn::events::EventsStreamScylla::new(
series,
evq.range().into(),

View File

@@ -59,7 +59,7 @@ impl FromUrl for AccountingIngestedBytesQuery {
let ret = Self {
backend: pairs
.get("backend")
.ok_or_else(|| Error::with_msg_no_trace("missing backend"))?
.ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))?
.to_string(),
range: SeriesRange::from_pairs(pairs)?,
};
@@ -121,14 +121,16 @@ impl FromUrl for AccountingToplistQuery {
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
let fn1 = |pairs: &BTreeMap<String, String>| {
let v = pairs.get("tsDate").ok_or(Error::with_public_msg("missing tsDate"))?;
let v = pairs
.get("tsDate")
.ok_or(Error::with_public_msg_no_trace("missing tsDate"))?;
let w = v.parse::<DateTime<Utc>>()?;
Ok::<_, Error>(TsNano(w.to_nanos()))
Ok::<_, Error>(TsNano::from_ns(w.to_nanos()))
};
let ret = Self {
backend: pairs
.get("backend")
.ok_or_else(|| Error::with_msg_no_trace("missing backend"))?
.ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))?
.to_string(),
ts: fn1(pairs)?,
limit: pairs.get("limit").map_or(None, |x| x.parse().ok()).unwrap_or(20),

View File

@@ -170,11 +170,6 @@ impl PlainEventsQuery {
self.do_test_stream_error = k;
}
pub fn for_event_blobs(mut self) -> Self {
self.transform = TransformQuery::for_event_blobs();
self
}
pub fn for_time_weighted_scalar(mut self) -> Self {
self.transform = TransformQuery::for_time_weighted_scalar();
self
@@ -196,6 +191,14 @@ impl PlainEventsQuery {
pub fn create_errors_contains(&self, x: &str) -> bool {
self.create_errors.contains(&String::from(x))
}
pub fn summary_short(&self) -> String {
format!(
"PlainEventsQuery {{ chn: {}, range: {:?} }}",
self.channel().name(),
self.range()
)
}
}
impl HasBackend for PlainEventsQuery {

View File

@@ -41,8 +41,9 @@ pub async fn read_ts(ts: u64, scy: Arc<ScySession>) -> Result<UsageData, Error>
// TODO toplist::read_ts refactor
info!("TODO toplist::read_ts refactor");
let snap = EMIT_ACCOUNTING_SNAP.ms() / 1000;
info!("ts {ts} snap {snap:?}");
let ts = ts / timeunits::SEC / snap * snap;
let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?");
let cql = concat!("select series, count, bytes from lt_account_00 where part = ? and ts = ?");
let qu = prep(cql, scy.clone()).await?;
let ret = read_ts_inner(ts, qu, scy).await?;
Ok(ret)

View File

@@ -4,6 +4,7 @@ use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::scalar_ops::ScalarOps;
use items_0::Appendable;
use items_0::Empty;
@@ -13,8 +14,12 @@ use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::*;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scylla::frame::response::result::Row;
use scylla::Session as ScySession;
use std::collections::VecDeque;
use std::mem;
@@ -27,33 +32,42 @@ async fn find_ts_msp(
series: u64,
range: ScyllaSeriesRange,
scy: Arc<ScySession>,
) -> Result<(VecDeque<u64>, VecDeque<u64>), Error> {
trace!("find_ts_msp series {} {:?}", series, range);
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> {
trace!("find_ts_msp series {:?} {:?}", series, range);
let mut ret1 = VecDeque::new();
let mut ret2 = VecDeque::new();
// TODO use prepared statements
let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 2";
let res = scy.query(cql, (series as i64, range.beg() as i64)).await.err_conv()?;
let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 2";
let params = (series as i64, range.beg().ms() as i64);
trace!("find_ts_msp query 1 params {:?}", params);
let res = scy.query(cql, params).await.err_conv()?;
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
ret1.push_front(row.0 as u64);
let ts = TsMs::from_ms_u64(row.0 as u64);
trace!("query 1 ts_msp {}", ts);
ret1.push_front(ts);
}
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let res = scy
.query(cql, (series as i64, range.beg() as i64, range.end() as i64))
.await
.err_conv()?;
let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64);
trace!("find_ts_msp query 2 params {:?}", params);
let res = scy.query(cql, params).await.err_conv()?;
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
ret2.push_back(row.0 as u64);
let ts = TsMs::from_ms_u64(row.0 as u64);
trace!("query 2 ts_msp {}", ts);
ret2.push_back(ts);
}
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? limit 1";
let res = scy.query(cql, (series as i64, range.end() as i64)).await.err_conv()?;
let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? limit 1";
let params = (series as i64, range.end().ms() as i64);
trace!("find_ts_msp query 3 params {:?}", params);
let res = scy.query(cql, params).await.err_conv()?;
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
ret2.push_back(row.0 as u64);
let ts = TsMs::from_ms_u64(row.0 as u64);
trace!("query 3 ts_msp {}", ts);
ret2.push_back(ts);
}
trace!("find_ts_msp n1 {} n2 {}", ret1.len(), ret2.len());
trace!("find_ts_msp n1 {:?} n2 {:?}", ret1.len(), ret2.len());
Ok((ret1, ret2))
}
@@ -64,6 +78,7 @@ trait ValTy: Sized {
fn from_scyty(inp: Self::ScyTy) -> Self;
fn table_name() -> &'static str;
fn default() -> Self;
fn is_valueblob() -> bool;
}
macro_rules! impl_scaty_scalar {
@@ -81,6 +96,9 @@ macro_rules! impl_scaty_scalar {
fn default() -> Self {
<Self as std::default::Default>::default()
}
fn is_valueblob() -> bool {
false
}
}
};
}
@@ -100,39 +118,42 @@ macro_rules! impl_scaty_array {
fn default() -> Self {
Vec::new()
}
fn is_valueblob() -> bool {
true
}
}
};
}
impl_scaty_scalar!(u8, i8, "events_scalar_u8");
impl_scaty_scalar!(u16, i16, "events_scalar_u16");
impl_scaty_scalar!(u32, i32, "events_scalar_u32");
impl_scaty_scalar!(u64, i64, "events_scalar_u64");
impl_scaty_scalar!(i8, i8, "events_scalar_i8");
impl_scaty_scalar!(i16, i16, "events_scalar_i16");
impl_scaty_scalar!(i32, i32, "events_scalar_i32");
impl_scaty_scalar!(i64, i64, "events_scalar_i64");
impl_scaty_scalar!(f32, f32, "events_scalar_f32");
impl_scaty_scalar!(f64, f64, "events_scalar_f64");
impl_scaty_scalar!(bool, bool, "events_scalar_bool");
impl_scaty_scalar!(String, String, "events_scalar_string");
impl_scaty_scalar!(u8, i8, "st_events_scalar_u8");
impl_scaty_scalar!(u16, i16, "st_events_scalar_u16");
impl_scaty_scalar!(u32, i32, "st_events_scalar_u32");
impl_scaty_scalar!(u64, i64, "st_events_scalar_u64");
impl_scaty_scalar!(i8, i8, "st_events_scalar_i8");
impl_scaty_scalar!(i16, i16, "st_events_scalar_i16");
impl_scaty_scalar!(i32, i32, "st_events_scalar_i32");
impl_scaty_scalar!(i64, i64, "st_events_scalar_i64");
impl_scaty_scalar!(f32, f32, "st_events_scalar_f32");
impl_scaty_scalar!(f64, f64, "st_events_scalar_f64");
impl_scaty_scalar!(bool, bool, "st_events_scalar_bool");
impl_scaty_scalar!(String, String, "st_events_scalar_string");
impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "events_array_u8");
impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "events_array_u16");
impl_scaty_array!(Vec<u32>, u32, Vec<i32>, "events_array_u32");
impl_scaty_array!(Vec<u64>, u64, Vec<i64>, "events_array_u64");
impl_scaty_array!(Vec<i8>, i8, Vec<i8>, "events_array_i8");
impl_scaty_array!(Vec<i16>, i16, Vec<i16>, "events_array_i16");
impl_scaty_array!(Vec<i32>, i32, Vec<i32>, "events_array_i32");
impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "events_array_i64");
impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "events_array_f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "events_array_f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "events_array_bool");
impl_scaty_array!(Vec<String>, String, Vec<String>, "events_array_string");
impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "st_events_array_u8");
impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "st_events_array_u16");
impl_scaty_array!(Vec<u32>, u32, Vec<i32>, "st_events_array_u32");
impl_scaty_array!(Vec<u64>, u64, Vec<i64>, "st_events_array_u64");
impl_scaty_array!(Vec<i8>, i8, Vec<i8>, "st_events_array_i8");
impl_scaty_array!(Vec<i16>, i16, Vec<i16>, "st_events_array_i16");
impl_scaty_array!(Vec<i32>, i32, Vec<i32>, "st_events_array_i32");
impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "st_events_array_i64");
impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "st_events_array_f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "st_events_array_f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "st_events_array_bool");
impl_scaty_array!(Vec<String>, String, Vec<String>, "st_events_array_string");
struct ReadNextValuesOpts {
series: u64,
ts_msp: u64,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
with_values: bool,
@@ -143,30 +164,41 @@ async fn read_next_values<ST>(opts: ReadNextValuesOpts) -> Result<Box<dyn Events
where
ST: ValTy,
{
debug!("read_next_values {} {}", opts.series, opts.ts_msp);
let series = opts.series;
let ts_msp = opts.ts_msp;
let range = opts.range;
let fwd = opts.fwd;
let scy = opts.scy;
let table_name = ST::table_name();
if range.end() > i64::MAX as u64 {
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::with_msg_no_trace(format!("range.end overflows i64")));
}
let cql_fields = if opts.with_values {
"ts_lsp, pulse, value"
if ST::is_valueblob() {
"ts_lsp, pulse, valueblob"
} else {
"ts_lsp, pulse, value"
}
} else {
"ts_lsp, pulse"
};
let ret = if fwd {
let ts_lsp_min = if ts_msp < range.beg() { range.beg() - ts_msp } else { 0 };
let ts_lsp_max = if ts_msp < range.end() { range.end() - ts_msp } else { 0 };
let ts_lsp_min = if range.beg() > ts_msp.ns() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
let ts_lsp_max = if range.end() > ts_msp.ns() {
range.end().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace!(
"FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}",
"FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}",
ts_msp,
ts_lsp_min,
ts_lsp_max,
range.beg(),
range.end(),
table_name,
);
// TODO use prepared!
@@ -177,54 +209,28 @@ where
),
cql_fields, table_name,
);
let res = scy
.query(
cql,
(series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64),
)
.await
.err_conv()?;
let mut last_before = None;
let mut ret = ST::Container::empty();
for row in res.rows().err_conv()? {
let (ts, pulse, value) = if opts.with_values {
let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = ValTy::from_scyty(row.2);
(ts, pulse, value)
} else {
let row: (i64, i64) = row.into_typed().err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = ValTy::default();
(ts, pulse, value)
};
if ts >= range.end() {
// TODO count as logic error
error!("ts >= range.end");
} else if ts >= range.beg() {
if pulse % 27 != 3618 {
ret.push(ts, pulse, value);
}
} else {
if last_before.is_none() {
warn!("encounter event before range in forward read {ts}");
}
last_before = Some((ts, pulse, value));
}
let params = (
series as i64,
ts_msp.ms() as i64,
ts_lsp_min.ns() as i64,
ts_lsp_max.ns() as i64,
);
trace!("FWD event search params {:?}", params);
let mut res = scy.query_iter(cql, params).await.err_conv()?;
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x.err_conv()?);
}
let mut last_before = None;
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !fwd, &mut last_before)?;
ret
} else {
let ts_lsp_max = if ts_msp < range.beg() { range.beg() - ts_msp } else { 0 };
trace!(
"BCK ts_msp {} ts_lsp_max {} beg {} end {} {}",
ts_msp,
ts_lsp_max,
range.beg(),
range.end(),
table_name,
);
let ts_lsp_max = if ts_msp.ns() < range.beg() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,);
// TODO use prepared!
let cql = format!(
concat!(
@@ -233,54 +239,89 @@ where
),
cql_fields, table_name,
);
let res = scy
.query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut seen_before = false;
let mut ret = ST::Container::empty();
for row in res.rows().err_conv()? {
let (ts, pulse, value) = if opts.with_values {
let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = ValTy::from_scyty(row.2);
(ts, pulse, value)
} else {
let row: (i64, i64) = row.into_typed().err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = ValTy::default();
(ts, pulse, value)
};
if ts >= range.beg() {
// TODO count as logic error
error!("ts >= range.beg");
} else if ts < range.beg() {
if pulse % 27 != 3618 {
ret.push(ts, pulse, value);
}
} else {
seen_before = true;
}
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace!("BCK event search params {:?}", params);
let mut res = scy.query_iter(cql, params).await.err_conv()?;
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x.err_conv()?);
}
let _ = seen_before;
let mut _last_before = None;
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !fwd, &mut _last_before)?;
if ret.len() > 1 {
error!("multiple events in backwards search {}", ret.len());
}
ret
};
trace!("read ts_msp {} len {}", ts_msp, ret.len());
trace!("read ts_msp {:?} len {}", ts_msp, ret.len());
let ret = Box::new(ret);
Ok(ret)
}
fn convert_rows<ST: ValTy>(
rows: Vec<Row>,
range: ScyllaSeriesRange,
ts_msp: TsMs,
with_values: bool,
bck: bool,
last_before: &mut Option<(TsNano, u64, ST)>,
) -> Result<<ST as ValTy>::Container, Error> {
let mut ret = <ST as ValTy>::Container::empty();
for row in rows {
let (ts, pulse, value) = if with_values {
if ST::is_valueblob() {
let row: (i64, i64, Vec<u8>) = row.into_typed().err_conv()?;
trace!("read a value blob len {}", row.2.len());
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let pulse = row.1 as u64;
let value = ValTy::default();
(ts, pulse, value)
} else {
let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let pulse = row.1 as u64;
let value = ValTy::from_scyty(row.2);
(ts, pulse, value)
}
} else {
let row: (i64, i64) = row.into_typed().err_conv()?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let pulse = row.1 as u64;
let value = ValTy::default();
(ts, pulse, value)
};
if bck {
if ts >= range.beg() {
// TODO count as logic error
error!("ts >= range.beg");
} else if ts < range.beg() {
ret.push(ts.ns(), pulse, value);
} else {
*last_before = Some((ts, pulse, value));
}
} else {
if ts >= range.end() {
// TODO count as logic error
error!("ts >= range.end");
} else if ts >= range.beg() {
ret.push(ts.ns(), pulse, value);
} else {
if last_before.is_none() {
warn!("encounter event before range in forward read {ts}");
}
*last_before = Some((ts, pulse, value));
}
}
}
Ok(ret)
}
struct ReadValues {
series: u64,
scalar_type: ScalarType,
shape: Shape,
range: ScyllaSeriesRange,
ts_msps: VecDeque<u64>,
ts_msps: VecDeque<TsMs>,
fwd: bool,
with_values: bool,
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
@@ -294,7 +335,7 @@ impl ReadValues {
scalar_type: ScalarType,
shape: Shape,
range: ScyllaSeriesRange,
ts_msps: VecDeque<u64>,
ts_msps: VecDeque<TsMs>,
fwd: bool,
with_values: bool,
scy: Arc<ScySession>,
@@ -327,7 +368,7 @@ impl ReadValues {
}
}
fn make_fut(&mut self, ts_msp: u64) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
fn make_fut(&mut self, ts_msp: TsMs) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
let opts = ReadNextValuesOpts {
series: self.series.clone(),
ts_msp,
@@ -387,7 +428,7 @@ impl ReadValues {
enum FrState {
New,
FindMsp(Pin<Box<dyn Future<Output = Result<(VecDeque<u64>, VecDeque<u64>), Error>> + Send>>),
FindMsp(Pin<Box<dyn Future<Output = Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error>> + Send>>),
ReadBack1(ReadValues),
ReadBack2(ReadValues),
ReadValues(ReadValues),
@@ -402,8 +443,8 @@ pub struct EventsStreamScylla {
shape: Shape,
range: ScyllaSeriesRange,
do_one_before_range: bool,
ts_msp_bck: VecDeque<u64>,
ts_msp_fwd: VecDeque<u64>,
ts_msp_bck: VecDeque<TsMs>,
ts_msp_fwd: VecDeque<TsMs>,
scy: Arc<ScySession>,
do_test_stream_error: bool,
found_one_after: bool,
@@ -422,6 +463,7 @@ impl EventsStreamScylla {
scy: Arc<ScySession>,
do_test_stream_error: bool,
) -> Self {
debug!("EventsStreamScylla::new");
Self {
state: FrState::New,
series,
@@ -439,13 +481,13 @@ impl EventsStreamScylla {
}
}
fn ts_msps_found(&mut self, msps1: VecDeque<u64>, msps2: VecDeque<u64>) {
fn ts_msps_found(&mut self, msps1: VecDeque<TsMs>, msps2: VecDeque<TsMs>) {
trace!("ts_msps_found msps1 {msps1:?} msps2 {msps2:?}");
self.ts_msp_bck = msps1;
self.ts_msp_fwd = msps2;
for x in self.ts_msp_bck.iter().rev() {
let x = x.clone();
if x >= self.range.end() {
if x.ns() >= self.range.end() {
info!("FOUND one-after because of MSP");
self.found_one_after = true;
}
@@ -589,6 +631,7 @@ impl Stream for EventsStreamScylla {
continue;
}
Ready(Err(e)) => {
error!("EventsStreamScylla FindMsp {e}");
self.state = FrState::DataDone;
Ready(Some(Err(e)))
}
@@ -601,6 +644,7 @@ impl Stream for EventsStreamScylla {
continue;
}
Ready(Err(e)) => {
error!("EventsStreamScylla ReadBack1 {e}");
st.fut_done = true;
self.state = FrState::DataDone;
Ready(Some(Err(e)))
@@ -614,6 +658,7 @@ impl Stream for EventsStreamScylla {
continue;
}
Ready(Err(e)) => {
error!("EventsStreamScylla ReadBack2 {e}");
st.fut_done = true;
self.state = FrState::DataDone;
Ready(Some(Err(e)))
@@ -633,6 +678,7 @@ impl Stream for EventsStreamScylla {
continue;
}
Ready(Err(e)) => {
error!("EventsStreamScylla ReadValues {e}");
st.fut_done = true;
Ready(Some(Err(e)))
}

View File

@@ -1,4 +1,5 @@
use netpod::range::evrange::SeriesRange;
use netpod::TsNano;
#[derive(Debug, Clone)]
pub struct ScyllaSeriesRange {
@@ -7,12 +8,12 @@ pub struct ScyllaSeriesRange {
}
impl ScyllaSeriesRange {
pub fn beg(&self) -> u64 {
self.beg
pub fn beg(&self) -> TsNano {
TsNano::from_ns(self.beg)
}
pub fn end(&self) -> u64 {
self.end
pub fn end(&self) -> TsNano {
TsNano::from_ns(self.end)
}
}

View File

@@ -26,7 +26,7 @@ pub async fn dyn_events_stream(
ctx: &ReqCtx,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<DynEventsStream, Error> {
trace!("dyn_events_stream begin");
trace!("dyn_events_stream {}", evq.summary_short());
let subq = make_sub_query(
ch_conf,
evq.range().clone(),

View File

@@ -8,15 +8,14 @@ edition = "2021"
path = "src/taskrun.rs"
[dependencies]
tokio = { version = "1.32.0", features = ["full", "tracing", "time"] }
tokio = { version = "1.37.0", features = ["full", "tracing", "time"] }
futures-util = "0.3.28"
tracing = "0.1.40"
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.17", features = ["fmt", "time"] }
tracing-subscriber = { version = "0.3.18", features = ["fmt", "time"] }
#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] }
console-subscriber = { version = "0.2.0" }
time = { version = "0.3", features = ["formatting"] }
backtrace = "0.3.56"
lazy_static = "1.4.0"
chrono = "0.4"
backtrace = "0.3.71"
chrono = "0.4.38"
err = { path = "../err" }