Fix warnings

This commit is contained in:
Dominik Werder
2025-03-07 11:46:51 +01:00
parent ab84858e1b
commit f8b3c1533b
39 changed files with 1208 additions and 886 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.5-aa.9"
version = "0.5.5-aa.10"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -32,3 +32,6 @@ items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" }
items_2 = { path = "../../../daqbuf-items-2", package = "daqbuf-items-2" }
streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" }
parse = { path = "../../../daqbuf-parse", package = "daqbuf-parse" }
[features]
DISABLED = []

View File

@@ -33,6 +33,7 @@ pub async fn fetch_events_json(query: PlainEventsQuery, cluster: &Cluster) -> Re
}
// TODO improve by a more information-rich return type.
#[allow(unused)]
pub async fn fetch_binned_json(query: BinnedQuery, cluster: &Cluster) -> Result<JsonValue, Error> {
let ctx = ReqCtx::for_test();
let t1 = Utc::now();

View File

@@ -2,7 +2,6 @@ use crate::nodes::require_test_hosts_running;
use crate::test::api4::common::fetch_events_json;
use chrono::Utc;
use daqbuf_err::Error;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::AppendToUrl;
@@ -36,7 +35,8 @@ fn events_plain_json_00() -> Result<(), Error> {
"1970-01-01T00:20:04.000Z",
"1970-01-01T00:21:10.000Z",
)?;
let jsv = fetch_events_json(query, cluster).await?;
// TODO
let _jsv = fetch_events_json(query, cluster).await?;
// let res: EventsDim0CollectorOutput<i32> = serde_json::from_value(jsv)?;
// Tim-weighted uses one event before requested range:
// assert_eq!(res.len(), 133);
@@ -51,7 +51,7 @@ fn events_plain_json_02_range_incomplete() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = events_plain_json(
let _jsv = events_plain_json(
SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"),
"1970-01-03T23:59:55.000Z",
"1970-01-04T00:00:01.000Z",

View File

@@ -1,7 +1,7 @@
use crate::nodes::require_test_hosts_running;
use crate::test::api4::common::fetch_events_json;
use daqbuf_err::Error;
use items_0::test::f32_iter_cmp_near;
// use items_0::test::f32_iter_cmp_near;
use netpod::range::evrange::NanoRange;
use netpod::SfDbChannel;
use query::api4::events::PlainEventsQuery;
@@ -27,7 +27,7 @@ fn events_plain_json_00() -> Result<(), Error> {
"1970-01-01T00:20:04.000Z",
"1970-01-01T00:21:10.000Z",
)?;
let jsv = fetch_events_json(query, cluster).await?;
let _jsv = fetch_events_json(query, cluster).await?;
// let res: EventsDim0CollectorOutput<i64> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
// assert_eq!(res.ts_anchor_sec(), 1204);

View File

@@ -11,7 +11,7 @@ fn get_events_1() -> Result<(), Error> {
}
// TODO re-use test data in dedicated archapp converter.
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
let fut = async {
let rh = require_archapp_test_host_running()?;
let cluster = &rh.cluster;

View File

@@ -9,7 +9,7 @@ fn get_sls_archive_1() -> Result<(), Error> {
}
// TODO re-use test data in dedicated convert application.
let fut = async { Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -37,7 +37,7 @@ fn get_sls_archive_3() -> Result<(), Error> {
}
// TODO re-use test data in dedicated convert application.
let fut = async { Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -65,7 +65,7 @@ fn get_sls_archive_wave_2() -> Result<(), Error> {
}
// TODO re-use test data in dedicated convert application.
let fut = async { Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;

View File

@@ -7,7 +7,7 @@ fn get_scalar_2_events() -> Result<(), Error> {
}
// TODO re-use test data in dedicated convert application.
let fut = async { Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -56,7 +56,7 @@ fn get_scalar_2_binned() -> Result<(), Error> {
}
// TODO re-use test data in dedicated convert application.
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -84,7 +84,7 @@ fn get_wave_1_events() -> Result<(), Error> {
}
// TODO re-use test data in dedicated convert application.
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -131,7 +131,7 @@ fn get_wave_1_binned() -> Result<(), Error> {
}
// TODO re-use test data in dedicated convert application.
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;

View File

@@ -95,5 +95,6 @@ async fn get_json_common(
return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
}
let ret = DataResult { avgs };
let _ = &ret.avgs;
Ok(ret)
}

View File

@@ -42,3 +42,6 @@ streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" }
streamio = { path = "../streamio" }
httpclient = { path = "../httpclient" }
bitshuffle = { path = "../../../daqbuf-bitshuffle", package = "daqbuf-bitshuffle" }
[features]
DISABLED = []

View File

@@ -79,7 +79,7 @@ async fn read_local_config_real(
Ok(buf) => parse_config(&buf),
Err(e) => match e.kind() {
ErrorKind::NotFound => Err(ConfigParseError::FileNotFound),
ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied(path.clone())),
ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied(path.to_string_lossy().into())),
e => {
error!("read_local_config_real {e:?}");
Err(ConfigParseError::IO)

View File

@@ -282,7 +282,7 @@ impl Stream for EventChunkerMultifile {
}
// TODO re-enable tests generate data on the fly.
#[cfg(DISABLED)]
#[cfg(feature = "DISABLED")]
#[cfg(test)]
mod test {
use crate::eventchunker::EventChunkerConf;

View File

@@ -2,10 +2,7 @@
name = "dq"
version = "0.1.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/dq.rs"
edition = "2024"
[dependencies]
tokio = { version = "1.43.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }

View File

@@ -1,42 +0,0 @@
use bytes::BufMut;
use std::fmt;
trait WritableValue: fmt::Debug {
fn put_value(&self, buf: &mut Vec<u8>);
}
impl WritableValue for u32 {
fn put_value(&self, buf: &mut Vec<u8>) {
buf.put_u32_le(*self);
}
}
impl WritableValue for i8 {
fn put_value(&self, buf: &mut Vec<u8>) {
buf.put_i8(*self);
}
}
impl WritableValue for i16 {
fn put_value(&self, buf: &mut Vec<u8>) {
buf.put_i16_le(*self);
}
}
impl WritableValue for i32 {
fn put_value(&self, buf: &mut Vec<u8>) {
buf.put_i32_le(*self);
}
}
impl WritableValue for f32 {
fn put_value(&self, buf: &mut Vec<u8>) {
buf.put_f32_le(*self);
}
}
impl WritableValue for f64 {
fn put_value(&self, buf: &mut Vec<u8>) {
buf.put_f64_le(*self);
}
}

1
crates/dq/src/lib.rs Normal file
View File

@@ -0,0 +1 @@

View File

@@ -1,8 +1,8 @@
[package]
name = "httpclient"
version = "0.0.2"
version = "0.0.3"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
edition = "2024"
[dependencies]
futures-util = "0.3.31"
@@ -19,11 +19,8 @@ hyper = { version = "1.6.0", features = ["http1", "http2", "client", "server"] }
hyper-util = { version = "0.1.10", features = ["full"] }
bytes = "1.10.0"
async-channel = "1.9.0"
autoerr = "0.0.3"
daqbuf-err = { path = "../../../daqbuf-err" }
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
parse = { path = "../../../daqbuf-parse", package = "daqbuf-parse" }
streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" }
thiserror = "=0.0.1"
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

View File

@@ -14,7 +14,6 @@ use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use items_2::accounting::AccountingEvents;
use netpod::log::*;
use netpod::req_uri_to_url;
use netpod::ttl::RetentionTime;
@@ -23,7 +22,6 @@ use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use query::api4::AccountingIngestedBytesQuery;
use query::api4::AccountingToplistQuery;
use scyllaconn::accounting::toplist::UsageData;
use serde::Deserialize;
@@ -57,6 +55,7 @@ impl AccountedIngested {
self.shapes.push(shape);
}
#[allow(unused)]
fn sort_by_counts(&mut self) {
let mut tmp: Vec<_> = self
.counts
@@ -70,6 +69,7 @@ impl AccountedIngested {
self.reorder_by_index_list(&tmp);
}
#[allow(unused)]
fn sort_by_bytes(&mut self) {
let mut tmp: Vec<_> = self.bytes.iter().map(|&x| x).enumerate().map(|(i, x)| (x, i)).collect();
tmp.sort_unstable();
@@ -85,6 +85,7 @@ impl AccountedIngested {
self.shapes = tmp.iter().map(|&x| self.shapes[x].clone()).collect();
}
#[allow(unused)]
fn truncate(&mut self, len: usize) {
self.names.truncate(len);
self.counts.truncate(len);
@@ -273,12 +274,13 @@ async fn fetch_data(
_ncc: &NodeConfigCached,
) -> Result<Toplist, Error> {
let list_len_max = 10000000;
let _ = list_len_max;
if let Some(scyqu) = &shared_res.scyqueue {
let x = scyqu
.accounting_read_ts(rt, ts)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut ret = resolve_usages(x, &shared_res.pgqueue).await?;
let ret = resolve_usages(x, &shared_res.pgqueue).await?;
// ret.dim0.sort_by_bytes();
// ret.dim1.sort_by_bytes();
// ret.dim0.truncate(list_len_max);

View File

@@ -83,7 +83,7 @@ impl EventDataHandler {
.await
.map_err(|_| EventDataError::InternalError)?;
let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?;
debug!("{:?}", evsubq);
info!("{:?}", evsubq);
let logspan = if evsubq.log_level() == "trace" {
trace!("emit trace span");
tracing::span!(tracing::Level::INFO, "log_span_trace")

View File

@@ -5,6 +5,7 @@ use std::collections::BTreeMap;
use std::sync::Mutex;
use std::time::SystemTime;
#[allow(unused)]
pub struct Dummy(u32);
pub enum CachePortal<V> {

View File

@@ -11,19 +11,13 @@ use quinn::EndpointConfig;
use quinn::Incoming;
use rustls::pki_types::pem::PemObject;
use rustls::server::ProducesTickets;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use taskrun::tokio;
const EARLY_DATA_MAX: u32 = u32::MAX * 0;
macro_rules! info { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
#[derive(Debug)]
@@ -38,11 +32,11 @@ impl ProducesTickets for TicketerCustom {
60 * 60 * 24
}
fn encrypt(&self, plain: &[u8]) -> Option<Vec<u8>> {
fn encrypt(&self, _plain: &[u8]) -> Option<Vec<u8>> {
todo!()
}
fn decrypt(&self, cipher: &[u8]) -> Option<Vec<u8>> {
fn decrypt(&self, _cipher: &[u8]) -> Option<Vec<u8>> {
todo!()
}
}
@@ -111,6 +105,7 @@ impl Http3Support {
Ok(ret)
}
#[allow(unused)]
async fn new_plain_quic(bind_addr: SocketAddr) -> Result<Self, Error> {
let key = PemObject::from_pem_file("key.pem")?;
let cert = PemObject::from_pem_file("cert.pem")?;
@@ -160,6 +155,7 @@ impl Http3Support {
}
}
#[allow(unused)]
async fn handle_incoming_inner_1(inc: Incoming, addr_remote: SocketAddr) -> Result<(), Error> {
debug!("handle_incoming_inner_1 new incoming {:?}", addr_remote);
let conn1 = inc.accept()?.await?;
@@ -184,6 +180,7 @@ impl Http3Support {
Ok(())
}
#[allow(unused)]
async fn handle_incoming_inner_2(inc: Incoming, addr_remote: SocketAddr) -> Result<(), Error> {
let selfname = "handle_incoming_inner_2";
debug!("{} new incoming {:?}", selfname, addr_remote);

View File

@@ -123,7 +123,8 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
}
// let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone()));
let (pgqueue, pgworker) = PgWorker::new(&ncc.node_config.cluster.database).await?;
let pgworker_jh = taskrun::spawn(async move {
// TODO use
let _pgworker_jh = taskrun::spawn(async move {
let x = pgworker.work().await;
match x {
Ok(()) => {}
@@ -143,7 +144,8 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
error!("{e}");
RetrievalError::TextError(e.to_string())
})?;
let scylla_worker_jh = taskrun::spawn(async move {
// TODO use
let _scylla_worker_jh = taskrun::spawn(async move {
let x = scylla_worker.work().await;
match x {
Ok(()) => {}

View File

@@ -9,7 +9,6 @@ use httpclient::Requ;
use httpclient::StreamResponse;
use netpod::ProxyConfig;
use netpod::ReqCtx;
use netpod::ServiceVersion;
use std::collections::BTreeMap;
pub struct BackendListHandler {}

View File

@@ -427,7 +427,8 @@ impl IndexChannelHttpFunction {
async fn index(req: Requ, do_print: bool, node_config: &NodeConfigCached) -> Result<String, Error> {
// TODO avoid double-insert on central storage.
let (pgc, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
// TODO
let (pgc, _pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
let insert_01 = pgc.prepare(sql).await?;
@@ -1012,7 +1013,8 @@ impl MapPulseLocalHttpFunction {
})
.unwrap_or_else(|| String::from("missing x-req-from"));
let ts1 = Instant::now();
let (conn, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
// TODO
let (conn, _pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)";
let rows = conn.query(sql, &[&node_config.node.host, &(pulse as i64)]).await?;
let cands: Vec<_> = rows
@@ -1552,6 +1554,8 @@ impl MarkClosedHttpFunction {
pub async fn mark_closed(node_config: &NodeConfigCached) -> Result<(), Error> {
let (conn, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
// TODO
let _ = &pgjh;
let sql = "select distinct channel from map_pulse_files order by channel";
let rows = conn.query(sql, &[]).await?;
let chns: Vec<_> = rows.iter().map(|r| r.get::<_, String>(0)).collect();

View File

@@ -10,7 +10,7 @@ use httpclient::http;
use httpclient::hyper::StatusCode;
use httpclient::hyper::Uri;
use items_0::streamitem::sitem_data;
use items_0::streamitem::sitem_err2_from_string;
// use items_0::streamitem::sitem_err2_from_string;
use items_2::framable::Framable;
use netpod::log::*;
use netpod::Cluster;

View File

@@ -20,9 +20,7 @@ use netpod::timeunits::DAY;
use netpod::timeunits::SEC;
use netpod::ByteOrder;
use netpod::Cluster;
use netpod::Database;
use netpod::DtNano;
use netpod::FileIoBufferSize;
use netpod::Node;
use netpod::NodeConfig;
use netpod::NodeConfigCached;
@@ -98,8 +96,8 @@ fn raw_data_00() {
eprintln!("written");
con.shutdown().await.unwrap();
eprintln!("shut down");
let (netin, netout) = con.into_split();
// TODO use?
let (netin, _netout) = con.into_split();
let mut frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), qu.inmem_bufcap());
while let Some(frame) = frames.next().await {
match frame {

View File

@@ -1,5 +1,4 @@
use crate::log::*;
use daqbuf_err as err;
use futures_util::TryStreamExt;
use netpod::ttl::RetentionTime;
use netpod::TsMs;

View File

@@ -2,6 +2,7 @@ use crate::events2::prepare::StmtsCache;
use crate::events2::prepare::StmtsEvents;
use crate::log::*;
use crate::worker::ScyllaQueue;
use daqbuf_series::msp::PrebinnedPartitioning;
use futures_util::TryStreamExt;
use items_0::merge::MergeableTy;
use items_2::binning::container_bins::ContainerBins;
@@ -11,7 +12,6 @@ use netpod::TsNano;
use scylla::Session as ScySession;
use std::ops::Range;
use streams::timebin::cached::reader::BinsReadRes;
use daqbuf_series::msp::PrebinnedPartitioning;
async fn scylla_read_prebinned_f32(
series: u64,
@@ -137,7 +137,7 @@ pub async fn worker_read(
scy: &ScySession,
) -> Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error> {
let partt = PrebinnedPartitioning::try_from(bin_len)?;
let div = partt.msp_div();
let div = partt.patch_dt();
let params = (
series as i64,
bin_len.ms() as i32,

View File

@@ -1,575 +0,0 @@
use crate::events2::events::EventReadOpts;
use crate::events2::prepare::StmtsEvents;
use crate::log::*;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use core::fmt;
use daqbuf_series::SeriesId;
use futures_util::Future;
use futures_util::TryStreamExt;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::Appendable;
use items_0::Empty;
use items_0::WithLen;
use items_2::binning::container_events::ContainerEvents;
use netpod::ttl::RetentionTime;
use netpod::DtNano;
use netpod::EnumVariant;
use netpod::TsMs;
use netpod::TsNano;
use scylla::Session;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::Instrument;
macro_rules! trace_fetch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
autoerr::create_error_v1!(
name(Error, "ScyllaReadEvents"),
enum variants {
Prepare(#[from] crate::events2::prepare::Error),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaWorker(Box<crate::worker::Error>),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
MissingQuery(String),
NotTokenAware,
RangeEndOverflow,
InvalidFuture,
TestError(String),
Logic,
TodoUnsupported,
},
);
impl From<crate::worker::Error> for Error {
fn from(value: crate::worker::Error) -> Self {
Self::ScyllaWorker(Box::new(value))
}
}
pub(super) trait ValTy: Sized + 'static {
type ScaTy: ScalarOps + std::default::Default;
type ScyTy: for<'a, 'b> scylla::deserialize::DeserializeValue<'a, 'b>;
type ScyRowTy: for<'a, 'b> scylla::deserialize::DeserializeRow<'a, 'b>;
type Container: BinningggContainerEventsDyn + Empty + Appendable<Self>;
fn from_valueblob(inp: Vec<u8>) -> Self;
fn table_name() -> &'static str;
fn default() -> Self;
fn is_valueblob() -> bool;
fn st_name() -> &'static str;
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>>;
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self);
}
macro_rules! impl_scaty_scalar {
($st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => {
impl ValTy for $st {
type ScaTy = $st;
type ScyTy = $st_scy;
type ScyRowTy = (i64, $st_scy);
type Container = ContainerEvents<Self::ScaTy>;
fn from_valueblob(_inp: Vec<u8>) -> Self {
panic!("unused")
}
fn table_name() -> &'static str {
concat!("scalar_", $table_name)
}
fn default() -> Self {
<Self as std::default::Default>::default()
}
fn is_valueblob() -> bool {
false
}
fn st_name() -> &'static str {
$st_name
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1 as Self::ScaTy)
}
}
};
}
macro_rules! impl_scaty_array {
($vt:ty, $st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => {
impl ValTy for $vt {
type ScaTy = $st;
type ScyTy = $st_scy;
type ScyRowTy = (i64, $st_scy);
type Container = ContainerEvents<Vec<Self::ScaTy>>;
fn from_valueblob(inp: Vec<u8>) -> Self {
if inp.len() < 32 {
<Self as ValTy>::default()
} else {
let en = std::mem::size_of::<Self::ScaTy>();
let n = (inp.len().max(32) - 32) / en;
let mut c = Vec::with_capacity(n);
for i in 0..n {
let r1 = &inp[32 + en * (0 + i)..32 + en * (1 + i)];
let p1 = r1 as *const _ as *const $st;
let v1 = unsafe { p1.read_unaligned() };
c.push(v1);
}
c
}
}
fn table_name() -> &'static str {
concat!("array_", $table_name)
}
fn default() -> Self {
Vec::new()
}
fn is_valueblob() -> bool {
true
}
fn st_name() -> &'static str {
$st_name
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1 .into_iter().map(|x| x as _).collect())
}
}
};
}
impl ValTy for EnumVariant {
type ScaTy = EnumVariant;
type ScyTy = i16;
type ScyRowTy = (i64, i16, String);
type Container = ContainerEvents<EnumVariant>;
fn from_valueblob(_inp: Vec<u8>) -> Self {
panic!("unused")
}
fn table_name() -> &'static str {
"array_string"
}
fn default() -> Self {
<Self as Default>::default()
}
fn is_valueblob() -> bool {
false
}
fn st_name() -> &'static str {
"enum"
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, EnumVariant::new(inp.1 as u16, inp.2))
}
}
impl ValTy for Vec<String> {
type ScaTy = String;
type ScyTy = Vec<String>;
type ScyRowTy = (i64, Vec<String>);
type Container = ContainerEvents<Vec<String>>;
fn from_valueblob(_inp: Vec<u8>) -> Self {
panic!("unused")
}
fn table_name() -> &'static str {
"array_string"
}
fn default() -> Self {
Vec::new()
}
fn is_valueblob() -> bool {
false
}
fn st_name() -> &'static str {
"string"
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
let fut = read_next_values_2::<Self>(opts, jobtrace, scy, stmts);
Box::pin(fut)
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1)
}
}
impl_scaty_scalar!(u8, i8, "u8", "u8");
impl_scaty_scalar!(u16, i16, "u16", "u16");
impl_scaty_scalar!(u32, i32, "u32", "u32");
impl_scaty_scalar!(u64, i64, "u64", "u64");
impl_scaty_scalar!(i8, i8, "i8", "i8");
impl_scaty_scalar!(i16, i16, "i16", "i16");
impl_scaty_scalar!(i32, i32, "i32", "i32");
impl_scaty_scalar!(i64, i64, "i64", "i64");
impl_scaty_scalar!(f32, f32, "f32", "f32");
impl_scaty_scalar!(f64, f64, "f64", "f64");
impl_scaty_scalar!(bool, bool, "bool", "bool");
impl_scaty_scalar!(String, String, "string", "string");
impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "u8", "u8");
impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "u16", "u16");
impl_scaty_array!(Vec<u32>, u32, Vec<i32>, "u32", "u32");
impl_scaty_array!(Vec<u64>, u64, Vec<i64>, "u64", "u64");
impl_scaty_array!(Vec<i8>, i8, Vec<i8>, "i8", "i8");
impl_scaty_array!(Vec<i16>, i16, Vec<i16>, "i16", "i16");
impl_scaty_array!(Vec<i32>, i32, Vec<i32>, "i32", "i32");
impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "i64", "i64");
impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "bool");
#[derive(Debug)]
pub enum ReadEventKind {
Create,
FutgenCallingReadNextValues,
FutgenFutureCreated,
CallExecuteIter,
ScyllaReadRow(u32),
ScyllaReadRowDone(u32),
ReadNextValuesFutureDone,
EventsStreamRtSees(u32),
}
#[derive(Debug)]
pub struct ReadJobTrace {
jobid: u64,
ts0: Instant,
events: Vec<(Instant, ReadEventKind)>,
}
impl ReadJobTrace {
pub fn new() -> Self {
static JOBID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
Self {
jobid: JOBID.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
ts0: Instant::now(),
events: Vec::with_capacity(128),
}
}
pub fn add_event_now(&mut self, kind: ReadEventKind) {
self.events.push((Instant::now(), kind))
}
}
impl fmt::Display for ReadJobTrace {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "ReadJobTrace jobid {jid}", jid = self.jobid)?;
for (ts, kind) in &self.events {
let dt = 1e3 * ts.saturating_duration_since(self.ts0).as_secs_f32();
write!(fmt, "\njobid {jid:4} {dt:7.2} {kind:?}", jid = self.jobid)?;
}
Ok(())
}
}
#[derive(Debug)]
pub(super) struct ReadNextValuesOpts {
rt: RetentionTime,
series: u64,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
}
impl ReadNextValuesOpts {
pub(super) fn new(
rt: RetentionTime,
series: SeriesId,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
) -> Self {
Self {
rt,
series: series.id(),
ts_msp,
range,
fwd,
readopts,
scyqueue,
}
}
}
pub(super) struct ReadNextValuesParams {
pub opts: ReadNextValuesOpts,
pub jobtrace: ReadJobTrace,
}
pub(super) async fn read_next_values<ST>(
params: ReadNextValuesParams,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
ST: ValTy,
{
let opts = params.opts;
let jobtrace = params.jobtrace;
// TODO could take scyqeue out of opts struct.
let scyqueue = opts.scyqueue.clone();
let level = taskrun::query_log_level();
let futgen = move |scy: Arc<Session>, stmts: Arc<StmtsEvents>, mut jobtrace: ReadJobTrace| {
// TODO avoid this
// opts.jobtrace = jobtrace;
let fut = async move {
// let jobtrace = &mut opts.jobtrace;
let logspan = if level == Level::DEBUG {
tracing::span!(Level::INFO, "log_span_debug")
} else if level == Level::TRACE {
tracing::span!(Level::INFO, "log_span_trace")
} else {
tracing::Span::none()
};
jobtrace.add_event_now(ReadEventKind::FutgenCallingReadNextValues);
let fut = ST::read_next_values_trait(opts, jobtrace, scy, stmts).instrument(logspan);
match fut.await.map_err(crate::worker::Error::from) {
Ok((ret, mut jobtrace)) => {
jobtrace.add_event_now(ReadEventKind::ReadNextValuesFutureDone);
Ok((ret, jobtrace))
}
Err(e) => Err(e),
}
};
Box::pin(fut)
as Pin<
Box<
dyn Future<
Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), crate::worker::Error>,
> + Send,
>,
>
};
let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?;
Ok((res, jobtrace))
}
async fn read_next_values_2<ST>(
opts: ReadNextValuesOpts,
mut jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
ST: ValTy,
{
trace_fetch!("read_next_values_2 {:?} st_name {}", opts, ST::st_name());
let series = opts.series;
let ts_msp = opts.ts_msp;
let range = opts.range;
let table_name = ST::table_name();
let with_values = opts.readopts.with_values();
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::RangeEndOverflow);
}
let ret = if opts.fwd {
let ts_lsp_min = if range.beg() > ts_msp.ns() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
let ts_lsp_max = if range.end() > ts_msp.ns() {
range.end().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_min,
ts_lsp_max,
table_name,
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.shape(ST::is_valueblob())
.st(ST::st_name())?;
let qu = {
let mut qu = qu.clone();
if qu.is_token_aware() == false {
return Err(Error::NotTokenAware);
}
qu.set_page_size(10000);
// qu.disable_paging();
qu
};
let params = (
series as i64,
ts_msp.ms() as i64,
ts_lsp_min.ns() as i64,
ts_lsp_max.ns() as i64,
);
trace_fetch!("FWD event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
let res = scy.execute_iter(qu.clone(), params).await?;
{
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts, value);
}
ret
} else {
let mut i = 0;
let mut it = res.rows_stream::<<ST as ValTy>::ScyRowTy>()?;
while let Some(row) = it.try_next().await? {
let (ts, value) = <ST as ValTy>::scy_row_to_ts_val(ts_msp, row);
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
}
ret
}
}
} else {
let ts_lsp_max = if ts_msp.ns() < range.beg() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"BCK ts_msp {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_max,
table_name,
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.shape(ST::is_valueblob())
.st(ST::st_name())?;
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace_fetch!("BCK event search params {:?}", params);
let res = scy.execute_iter(qu.clone(), params).await?;
{
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts, value);
}
ret
} else {
let mut i = 0;
let mut it = res.rows_stream::<<ST as ValTy>::ScyRowTy>()?;
while let Some(row) = it.try_next().await? {
let (ts, value) = <ST as ValTy>::scy_row_to_ts_val(ts_msp, row);
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
}
ret
}
}
};
trace_fetch!("read ts_msp {} len {}", ts_msp.fmt(), ret.len());
let ret = Box::new(ret);
Ok((ret, jobtrace))
}

View File

@@ -1,49 +1,62 @@
use super::msp::MspStreamRt;
use crate::events::read_next_values;
use crate::events::ReadJobTrace;
use crate::events::ReadNextValuesOpts;
use crate::events2::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use daqbuf_err as err;
use daqbuf_series::SeriesId;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::container::ByteEstimate;
use items_0::merge::DrainIntoNewDynResult;
use items_0::merge::MergeableDyn;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::Appendable;
use items_0::Empty;
use items_0::WithLen;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::log;
use netpod::ttl::RetentionTime;
use netpod::ChConf;
use netpod::DtNano;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsMsVecFmt;
use netpod::TsNano;
use scylla::Session;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use taskrun::tracing;
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ) }
macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ) }
macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_init { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_redo_fwd_read { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_fetch { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_msp_fetch { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_redo_fwd_read { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_every_event { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! warn_item { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) }
#[derive(Debug, Clone)]
pub struct EventReadOpts {
@@ -66,21 +79,34 @@ impl EventReadOpts {
}
}
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaEvents")]
pub enum Error {
Worker(#[from] crate::worker::Error),
Events(#[from] crate::events::Error),
Msp(#[from] crate::events2::msp::Error),
Unordered,
OutOfRange,
BadBatch,
ReadQueueEmptyBck,
ReadQueueEmptyFwd,
Logic,
TruncateLogic,
AlreadyTaken,
DrainFailure,
autoerr::create_error_v1!(
name(Error, "ScyllaEvents"),
enum variants {
Worker(Box<crate::worker::Error>),
Msp(#[from] crate::events2::msp::Error),
Unordered,
OutOfRange,
BadBatch,
ReadQueueEmptyBck,
ReadQueueEmptyFwd,
Logic,
TruncateLogic,
AlreadyTaken,
DrainFailure,
RangeEndOverflow,
NotTokenAware,
Prepare(#[from] crate::events2::prepare::Error),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaWorker(Box<crate::worker::Error>),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
},
);
impl From<crate::worker::Error> for Error {
fn from(e: crate::worker::Error) -> Self {
Error::Worker(Box::new(e))
}
}
struct FetchMsp {
@@ -218,6 +244,89 @@ impl ReadingFwd {
}
}
#[derive(Debug)]
pub enum ReadEventKind {
Create,
FutgenCallingReadNextValues,
FutgenFutureCreated,
CallExecuteIter,
ScyllaReadRow(u32),
ScyllaReadRowDone(u32),
ReadNextValuesFutureDone,
EventsStreamRtSees(u32),
}
#[derive(Debug)]
pub struct ReadJobTrace {
jobid: u64,
ts0: Instant,
events: Vec<(Instant, ReadEventKind)>,
}
impl ReadJobTrace {
pub fn new() -> Self {
static JOBID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
Self {
jobid: JOBID.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
ts0: Instant::now(),
events: Vec::with_capacity(128),
}
}
pub fn add_event_now(&mut self, kind: ReadEventKind) {
self.events.push((Instant::now(), kind))
}
}
impl fmt::Display for ReadJobTrace {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "ReadJobTrace jobid {jid}", jid = self.jobid)?;
for (ts, kind) in &self.events {
let dt = 1e3 * ts.saturating_duration_since(self.ts0).as_secs_f32();
write!(fmt, "\njobid {jid:4} {dt:7.2} {kind:?}", jid = self.jobid)?;
}
Ok(())
}
}
#[derive(Debug)]
pub(super) struct ReadNextValuesOpts {
rt: RetentionTime,
series: u64,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
}
impl ReadNextValuesOpts {
pub(super) fn new(
rt: RetentionTime,
series: SeriesId,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
) -> Self {
Self {
rt,
series: series.id(),
ts_msp,
range,
fwd,
readopts,
scyqueue,
}
}
}
struct ReadNextValuesParams {
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
}
#[derive(Clone)]
struct MakeFutInfo {
scyqueue: ScyllaQueue,
@@ -346,7 +455,7 @@ impl EventsStreamRt {
if false {
taskrun::tokio::time::sleep(Duration::from_millis(10)).await;
}
let params = crate::events::ReadNextValuesParams { opts, jobtrace };
let params = ReadNextValuesParams { opts, jobtrace };
let ret = match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U8 => read_next_values::<u8>(params).await,
@@ -663,8 +772,7 @@ impl Stream for EventsStreamRt {
match st.qu.poll_next_unpin(cx) {
Ready(Some(x)) => match x {
Ok((evs, mut jobtrace)) => {
jobtrace
.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32));
jobtrace.add_event_now(ReadEventKind::EventsStreamRtSees(evs.len() as u32));
trace_fetch!("ReadingFwd {jobtrace}");
for ts in MergeableDyn::tss_for_testing(evs.as_ref()) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
@@ -741,3 +849,451 @@ fn trait_assert_try() {
fn phantomval<T>() -> T {
panic!()
}
async fn read_next_values_2<ST>(
opts: ReadNextValuesOpts,
mut jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
ST: ValTy,
{
trace_fetch!("read_next_values_2 {:?} st_name {}", opts, ST::st_name());
let series = opts.series;
let ts_msp = opts.ts_msp;
let range = opts.range;
let table_name = ST::table_name();
let with_values = opts.readopts.with_values();
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::RangeEndOverflow);
}
let ret = if opts.fwd {
let ts_lsp_min = if range.beg() > ts_msp.ns() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
let ts_lsp_max = if range.end() > ts_msp.ns() {
range.end().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_min,
ts_lsp_max,
table_name
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.shape(ST::is_valueblob())
.st(ST::st_name())?;
let qu = {
let mut qu = qu.clone();
if qu.is_token_aware() == false {
return Err(Error::NotTokenAware);
}
qu.set_page_size(10000);
// qu.disable_paging();
qu
};
let params = (
series as i64,
ts_msp.ms() as i64,
ts_lsp_min.ns() as i64,
ts_lsp_max.ns() as i64,
);
trace_fetch!("FWD event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
let res = scy.execute_iter(qu.clone(), params).await?;
{
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts, value);
}
ret
} else {
let mut i = 0;
let mut it = res.rows_stream::<<ST as ValTy>::ScyRowTy>()?;
while let Some(row) = it.try_next().await? {
let (ts, value) = <ST as ValTy>::scy_row_to_ts_val(ts_msp, row);
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
}
ret
}
}
} else {
let ts_lsp_max = if ts_msp.ns() < range.beg() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"BCK ts_msp {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_max,
table_name
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.shape(ST::is_valueblob())
.st(ST::st_name())?;
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace_fetch!("BCK event search params {:?}", params);
let res = scy.execute_iter(qu.clone(), params).await?;
{
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts, value);
}
ret
} else {
let mut i = 0;
let mut it = res.rows_stream::<<ST as ValTy>::ScyRowTy>()?;
while let Some(row) = it.try_next().await? {
let (ts, value) = <ST as ValTy>::scy_row_to_ts_val(ts_msp, row);
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
}
ret
}
}
};
let byte_est = ret.byte_estimate();
trace_fetch!(
"read ts_msp {} len {} byte_est {}",
ts_msp.fmt(),
ret.len(),
byte_est
);
let ret = Box::new(ret);
Ok((ret, jobtrace))
}
async fn read_next_values<ST>(
params: ReadNextValuesParams,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
ST: ValTy,
{
let opts = params.opts;
let jobtrace = params.jobtrace;
// TODO could take scyqeue out of opts struct.
let scyqueue = opts.scyqueue.clone();
let level = taskrun::query_log_level();
let futgen = move |scy: Arc<Session>, stmts: Arc<StmtsEvents>, mut jobtrace: ReadJobTrace| {
// TODO avoid this
// opts.jobtrace = jobtrace;
let fut = async move {
// let jobtrace = &mut opts.jobtrace;
let logspan = if level == log::Level::DEBUG {
tracing::span!(log::Level::INFO, "log_span_debug")
} else if level == log::Level::TRACE {
tracing::span!(log::Level::INFO, "log_span_trace")
} else {
tracing::Span::none()
};
jobtrace.add_event_now(ReadEventKind::FutgenCallingReadNextValues);
let fut = ST::read_next_values_trait(opts, jobtrace, scy, stmts);
let fut = tracing::Instrument::instrument(fut, logspan);
match fut.await.map_err(crate::worker::Error::from) {
Ok((ret, mut jobtrace)) => {
jobtrace.add_event_now(ReadEventKind::ReadNextValuesFutureDone);
Ok((ret, jobtrace))
}
Err(e) => Err(e),
}
};
Box::pin(fut)
as Pin<
Box<
dyn Future<
Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), crate::worker::Error>,
> + Send,
>,
>
};
let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?;
Ok((res, jobtrace))
}
trait ValTy: Sized + 'static {
type ScaTy: ScalarOps + std::default::Default;
type ScyTy: for<'a, 'b> scylla::deserialize::DeserializeValue<'a, 'b>;
type ScyRowTy: for<'a, 'b> scylla::deserialize::DeserializeRow<'a, 'b>;
type Container: BinningggContainerEventsDyn + Empty + Appendable<Self>;
fn from_valueblob(inp: Vec<u8>) -> Self;
fn table_name() -> &'static str;
fn default() -> Self;
fn is_valueblob() -> bool;
fn st_name() -> &'static str;
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>>;
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self);
}
macro_rules! impl_scaty_scalar {
($st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => {
impl ValTy for $st {
type ScaTy = $st;
type ScyTy = $st_scy;
type ScyRowTy = (i64, $st_scy);
type Container = ContainerEvents<Self::ScaTy>;
fn from_valueblob(_inp: Vec<u8>) -> Self {
panic!("unused")
}
fn table_name() -> &'static str {
concat!("scalar_", $table_name)
}
fn default() -> Self {
<Self as std::default::Default>::default()
}
fn is_valueblob() -> bool {
false
}
fn st_name() -> &'static str {
$st_name
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1 as Self::ScaTy)
}
}
};
}
macro_rules! impl_scaty_array {
($vt:ty, $st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => {
impl ValTy for $vt {
type ScaTy = $st;
type ScyTy = $st_scy;
type ScyRowTy = (i64, $st_scy);
type Container = ContainerEvents<Vec<Self::ScaTy>>;
fn from_valueblob(inp: Vec<u8>) -> Self {
if inp.len() < 32 {
<Self as ValTy>::default()
} else {
let en = std::mem::size_of::<Self::ScaTy>();
let n = (inp.len().max(32) - 32) / en;
let mut c = Vec::with_capacity(n);
for i in 0..n {
let r1 = &inp[32 + en * (0 + i)..32 + en * (1 + i)];
let p1 = r1 as *const _ as *const $st;
let v1 = unsafe { p1.read_unaligned() };
c.push(v1);
}
c
}
}
fn table_name() -> &'static str {
concat!("array_", $table_name)
}
fn default() -> Self {
Vec::new()
}
fn is_valueblob() -> bool {
true
}
fn st_name() -> &'static str {
$st_name
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1 .into_iter().map(|x| x as _).collect())
}
}
};
}
impl ValTy for EnumVariant {
type ScaTy = EnumVariant;
type ScyTy = i16;
type ScyRowTy = (i64, i16, String);
type Container = ContainerEvents<EnumVariant>;
fn from_valueblob(_inp: Vec<u8>) -> Self {
panic!("unused")
}
fn table_name() -> &'static str {
"array_string"
}
fn default() -> Self {
<Self as Default>::default()
}
fn is_valueblob() -> bool {
false
}
fn st_name() -> &'static str {
"enum"
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, EnumVariant::new(inp.1 as u16, inp.2))
}
}
impl ValTy for Vec<String> {
type ScaTy = String;
type ScyTy = Vec<String>;
type ScyRowTy = (i64, Vec<String>);
type Container = ContainerEvents<Vec<String>>;
fn from_valueblob(_inp: Vec<u8>) -> Self {
panic!("unused")
}
fn table_name() -> &'static str {
"array_string"
}
fn default() -> Self {
Vec::new()
}
fn is_valueblob() -> bool {
false
}
fn st_name() -> &'static str {
"string"
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
let fut = read_next_values_2::<Self>(opts, jobtrace, scy, stmts);
Box::pin(fut)
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1)
}
}
impl_scaty_scalar!(u8, i8, "u8", "u8");
impl_scaty_scalar!(u16, i16, "u16", "u16");
impl_scaty_scalar!(u32, i32, "u32", "u32");
impl_scaty_scalar!(u64, i64, "u64", "u64");
impl_scaty_scalar!(i8, i8, "i8", "i8");
impl_scaty_scalar!(i16, i16, "i16", "i16");
impl_scaty_scalar!(i32, i32, "i32", "i32");
impl_scaty_scalar!(i64, i64, "i64", "i64");
impl_scaty_scalar!(f32, f32, "f32", "f32");
impl_scaty_scalar!(f64, f64, "f64", "f64");
impl_scaty_scalar!(bool, bool, "bool", "bool");
impl_scaty_scalar!(String, String, "string", "string");
impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "u8", "u8");
impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "u16", "u16");
impl_scaty_array!(Vec<u32>, u32, Vec<i32>, "u32", "u32");
impl_scaty_array!(Vec<u64>, u64, Vec<i64>, "u64", "u64");
impl_scaty_array!(Vec<i8>, i8, Vec<i8>, "i8", "i8");
impl_scaty_array!(Vec<i16>, i16, Vec<i16>, "i16", "i16");
impl_scaty_array!(Vec<i32>, i32, Vec<i32>, "i32", "i32");
impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "i64", "i64");
impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "bool");

View File

@@ -1,5 +1,3 @@
use daqbuf_err as err;
use err::thiserror;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::merge::DrainIntoDstResult;
@@ -13,9 +11,9 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_transition { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! trace_transition { ($($arg:expr),*) => ( if true { trace!($($arg),*); } ) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if true { trace!($($arg),*); } ) }
macro_rules! tracer_poll_enter {
($self:expr) => {
@@ -33,18 +31,18 @@ macro_rules! tracer_loop_enter {
};
}
#[allow(unused)]
macro_rules! debug_fetch { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
macro_rules! debug_fetch { ($($arg:expr),*) => ( if true { debug!($($arg),*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "EventsOneBeforeAndBulk")]
pub enum Error {
Unordered,
Logic,
Input(Box<dyn std::error::Error + Send>),
LimitPoll,
LimitLoop,
}
autoerr::create_error_v1!(
name(Error, "EventsOneBeforeAndBulk"),
enum variants {
Unordered,
Logic,
Input(Box<dyn std::error::Error + Send>),
LimitPoll,
LimitLoop,
},
);
#[derive(Debug)]
pub enum Output<T> {
@@ -164,7 +162,7 @@ where
}
// Separate events into before and bulk
let ppp = MergeableTy::find_lowest_index_ge(&item, self.ts0);
trace_transition!("partition_point {ppp:?} {n:?}", n = item.len());
trace_transition!("partition_point {:?} {:?}", ppp, item.len());
if let Some(pp) = ppp {
if pp == 0 {
// all entries are bulk
@@ -172,12 +170,22 @@ where
self.state = State::Bulk;
if let Some(before) = self.consume_buf_get_latest() {
self.out.push_back(item);
let emit_len = before.len();
let item = Output::Before(before);
trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item);
trace_emit!(
"State::Begin Before {} emit_len {}",
self.dbgname,
emit_len
);
Ready(Some(Ok(item)))
} else {
let emit_len = item.len();
let item = Output::Bulk(item);
trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item);
trace_emit!(
"State::Begin Bulk {} emit_len {}",
self.dbgname,
emit_len
);
Ready(Some(Ok(item)))
}
} else {
@@ -189,19 +197,21 @@ where
DrainIntoDstResult::Done => {
if let Some(before) = self.consume_buf_get_latest() {
self.out.push_back(item);
let emit_len = before.len();
let item = Output::Before(before);
trace_emit!(
"State::Begin Before {} emit {:?}",
"State::Begin Before {} emit_len {}",
self.dbgname,
item
emit_len
);
Ready(Some(Ok(item)))
} else {
let emit_len = item.len();
let item = Output::Bulk(item);
trace_emit!(
"State::Begin Bulk {} emit {:?}",
"State::Begin Bulk {} emit_len {}",
self.dbgname,
item
emit_len
);
Ready(Some(Ok(item)))
}
@@ -214,19 +224,21 @@ where
self.buf = Some(buf);
if let Some(before) = self.consume_buf_get_latest() {
self.out.push_back(item);
let emit_len = before.len();
let item = Output::Before(before);
trace_emit!(
"State::Begin Before {} emit {:?}",
"State::Begin Before {} emit_len {}",
self.dbgname,
item
emit_len
);
Ready(Some(Ok(item)))
} else {
let emit_len = item.len();
let item = Output::Bulk(item);
trace_emit!(
"State::Begin Bulk {} emit {:?}",
"State::Begin Bulk {} emit_len {}",
self.dbgname,
item
emit_len
);
Ready(Some(Ok(item)))
}
@@ -270,8 +282,9 @@ where
self.state = State::Done;
trace_transition!("transition from Begin to end of stream");
if let Some(before) = self.consume_buf_get_latest() {
let emit_len = before.len();
let item = Output::Before(before);
trace_emit!("State::Begin EOS {} emit {:?}", self.dbgname, item);
trace_emit!("State::Begin EOS {} emit_len {}", self.dbgname, emit_len);
Ready(Some(Ok(item)))
} else {
trace_emit!("State::Begin EOS {} emit None", self.dbgname);
@@ -307,8 +320,9 @@ where
if item.len() == 0 {
self.seen_empty_during_bulk = true;
}
let item_len = item.len();
let item = Output::Bulk(item);
trace_emit!("State::Bulk data {} emit {:?}", self.dbgname, item);
trace_emit!("State::Bulk data {} item_len {}", self.dbgname, item_len);
Ready(Some(Ok(item)))
}
}

View File

@@ -1,22 +1,19 @@
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use netpod::ttl::RetentionTime;
use scylla::prepared_statement::PreparedStatement;
use scylla::Session;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaPrepare")]
pub enum Error {
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError),
ScyllaWorker(Box<crate::worker::Error>),
MissingQuery(String),
RangeEndOverflow,
InvalidFuture,
TestError(String),
}
autoerr::create_error_v1!(
name(Error, "ScyllaPrepare"),
enum variants {
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaWorker(Box<crate::worker::Error>),
MissingQuery(String),
RangeEndOverflow,
InvalidFuture,
TestError(String),
},
);
#[derive(Debug)]
pub struct StmtsLspShape {

View File

@@ -2,7 +2,6 @@ pub mod accounting;
pub mod bincache;
pub mod conn;
pub mod errconv;
pub mod events;
pub mod events2;
pub mod range;
pub mod schema;

View File

@@ -1,5 +1,5 @@
use crate::conn::create_scy_session_no_ks;
use crate::events::ReadJobTrace;
use crate::events2::events::ReadJobTrace;
use crate::events2::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
use async_channel::Receiver;
@@ -28,7 +28,7 @@ autoerr::create_error_v1!(
enum variants {
ScyllaConnection(err::Error),
Prepare(#[from] crate::events2::prepare::Error),
EventsQuery(#[from] crate::events::Error),
Events(#[from] crate::events2::events::Error),
Msp(#[from] crate::events2::msp::Error),
ChannelSend,
ChannelRecv,
@@ -270,7 +270,9 @@ impl ScyllaWorker {
// TODO count for stats
}
}
Job::WriteCacheF32(_, _, tx) => {
Job::WriteCacheF32(a, b, tx) => {
let _ = a;
let _ = b;
// let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await;
let res = Err(streams::timebin::cached::reader::Error::TodoImpl);
if tx.send(res).await.is_err() {

View File

@@ -20,7 +20,6 @@ crc32fast = "1.4.2"
byteorder = "1.5.0"
async-channel = "1.9.0"
rand_xoshiro = "0.6.0"
thiserror = "=0.0.1"
autoerr = "0.0.3"
chrono = { version = "0.4.39", features = ["serde"] }
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
@@ -35,6 +34,3 @@ http-body-util = "0.1.2"
[dev-dependencies]
taskrun = { path = "../taskrun" }
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

View File

@@ -62,6 +62,7 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp(
Ok(Box::pin(items))
}
#[allow(unused)]
async fn open_event_data_streams_tcp<T>(subq: EventsSubQuery, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
where
// TODO group bounds in new trait

View File

@@ -5,11 +5,12 @@ use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TcpReadAsBytes")]
pub enum Error {
IO(#[from] std::io::Error),
}
autoerr::create_error_v1!(
name(Error, "TcpReadAsBytes"),
enum variants {
IO(#[from] std::io::Error),
},
);
pub struct TcpReadAsBytes<INP> {
inp: INP,

View File

@@ -22,6 +22,8 @@ daqbuf-err = { path = "../../../daqbuf-err" }
[features]
with-console = []
#console-subscriber = { version = "0.3.0" }
DISABLED_LOKI = []
DISABLED_CONSOLE = []
with-loki = []
#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] }

View File

@@ -215,7 +215,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
sr = g.parent();
}
}
allow = true;
// allow = true;
allow
} else {
false
@@ -251,7 +251,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
sr = g.parent();
}
}
allow = true;
// allow = true;
allow
} else {
false
@@ -286,14 +286,14 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
let reg = tracing_subscriber::registry();
#[cfg(DISABLED_CONSOLE)]
#[cfg(feature = "DISABLED_CONSOLE")]
let reg = {
let (console_layer, console_server) = console_subscriber::ConsoleLayer::builder().build();
tokio::spawn(console_server.serve());
reg.with(console_layer)
};
#[cfg(DISABLED_CONSOLE)]
#[cfg(feature = "DISABLED_CONSOLE")]
let reg = {
let pid = std::process::id();
// let cspn = format!("/tmp/daqbuffer.tokio.console.pid.{pid}");
@@ -315,7 +315,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
format!("{e}")
})?;
}
#[cfg(DISABLED_LOKI)]
#[cfg(feature = "DISABLED_LOKI")]
// TODO tracing_loki seems not well composable, try open telemetry instead.
if false {
/*let fmt_layer = tracing_subscriber::fmt::Layer::new()