From a967fbd08ac429b06d3cb9e06ed47b28bf2a6e58 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 12 Feb 2025 09:30:40 +0100 Subject: [PATCH] Extend binned line to requested end --- Cargo.toml | 4 +- crates/commonio/Cargo.toml | 4 +- crates/daqbuf-redis/Cargo.toml | 2 +- crates/daqbuffer/Cargo.toml | 8 +- crates/daqbufp2/Cargo.toml | 16 +- crates/dbconn/Cargo.toml | 14 +- crates/dbconn/src/search.rs | 1 + crates/disk/Cargo.toml | 30 +-- crates/dq/Cargo.toml | 10 +- crates/httpclient/Cargo.toml | 24 +- crates/httpret/Cargo.toml | 33 ++- crates/httpret/src/api1.rs | 2 + crates/httpret/src/api4/binned.rs | 245 +++++++++++--------- crates/httpret/src/api4/databuffer_tools.rs | 6 +- crates/httpret/src/api4/eventdata.rs | 6 +- crates/httpret/src/api4/events.rs | 154 ++++++------ crates/httpret/src/api4/search.rs | 15 +- crates/httpret/src/channel_status.rs | 5 +- crates/httpret/src/err.rs | 1 + crates/httpret/src/http3.rs | 138 +++++++++++ crates/httpret/src/http3_dummy.rs | 34 +++ crates/httpret/src/httpret.rs | 11 + crates/httpret/src/proxy.rs | 10 +- crates/nodenet/Cargo.toml | 17 +- crates/scyllaconn/Cargo.toml | 4 +- crates/streamio/Cargo.toml | 26 +-- crates/taskrun/Cargo.toml | 12 +- crates/taskrun/src/taskrun.rs | 8 +- 28 files changed, 548 insertions(+), 292 deletions(-) create mode 100644 crates/httpret/src/http3.rs create mode 100644 crates/httpret/src/http3_dummy.rs diff --git a/Cargo.toml b/Cargo.toml index 3f6d2e7..56a35b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,8 @@ members = ["crates/*"] resolver = "2" [profile.release] -opt-level = 2 -debug = 1 +opt-level = 1 +debug = 0 overflow-checks = false debug-assertions = false lto = "thin" diff --git a/crates/commonio/Cargo.toml b/crates/commonio/Cargo.toml index ebed590..974ed23 100644 --- a/crates/commonio/Cargo.toml +++ b/crates/commonio/Cargo.toml @@ -9,14 +9,14 @@ path = "src/commonio.rs" [dependencies] tracing = "0.1" -futures-util = "0.3.15" +futures-util = "0.3.31" bytes = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" chrono = "0.4" async-channel = "1.9" parking_lot = "0.12" -crc32fast = "1.2" +crc32fast = "1.4" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../taskrun" } diff --git a/crates/daqbuf-redis/Cargo.toml b/crates/daqbuf-redis/Cargo.toml index ed7ca41..203962f 100644 --- a/crates/daqbuf-redis/Cargo.toml +++ b/crates/daqbuf-redis/Cargo.toml @@ -9,4 +9,4 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" daqbuf-err = { path = "../../../daqbuf-err" } taskrun = { path = "../taskrun" } -redis = { version = "0.27.6", features = [] } +redis = { version = "0.28.2", features = [] } diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 1f96b93..dc943d8 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -5,14 +5,14 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -futures-util = "0.3.29" +futures-util = "0.3.31" bytes = "1.10.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -serde_yaml = "0.9.27" -chrono = "0.4.31" -url = "2.5.0" +serde_yaml = "0.9.34" +chrono = "0.4.39" +url = "2.5.4" clap = { version = "4.5.28", features = ["derive", "cargo"] } daqbuf-err = { path = "../../../daqbuf-err" } taskrun = { path = "../taskrun" } diff --git a/crates/daqbufp2/Cargo.toml b/crates/daqbufp2/Cargo.toml index b2f4dc1..bcaa700 100644 --- a/crates/daqbufp2/Cargo.toml +++ b/crates/daqbufp2/Cargo.toml @@ -8,19 +8,19 @@ edition = "2021" path = "src/daqbufp2.rs" [dependencies] -tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -hyper = { version = "1.0.1", features = ["client", "http1", "http2"] } +tokio = { version = "1.43.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +hyper = { version = "1.6.0", features = ["client", "http1", "http2"] } http = "1" -tracing = "0.1.25" -tracing-subscriber = "0.3.16" -futures-util = "0.3.25" -bytes = "1.0.1" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +futures-util = "0.3.31" +bytes = "1.10.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" chrono = "0.4" -url = "2.2.2" -lazy_static = "1.4.0" +url = "2.5.4" +lazy_static = "1.5.0" daqbuf-err = { path = "../../../daqbuf-err" } taskrun = { path = "../taskrun" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } diff --git a/crates/dbconn/Cargo.toml b/crates/dbconn/Cargo.toml index 7d668f8..08b8e2f 100644 --- a/crates/dbconn/Cargo.toml +++ b/crates/dbconn/Cargo.toml @@ -10,16 +10,16 @@ path = "src/dbconn.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4", "with-serde_json-1"] } -crc32fast = "1.3.2" -byteorder = "1.4" -futures-util = "0.3.30" -bytes = "1.6.0" +tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } +crc32fast = "1.4.2" +byteorder = "1.5" +futures-util = "0.3.31" +bytes = "1.10.0" pin-project = "1" #dashmap = "3" async-channel = "1.9.0" -chrono = "0.4.38" -regex = "1.10.4" +chrono = "0.4.39" +regex = "1.11.1" autoerr = "0.0.3" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index 5fd3523..d61b1f7 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -284,6 +284,7 @@ pub async fn search_channel( pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result { + debug!("search_channel {:?}", query); let backend = &ncc.node_config.cluster.backend; let pgconf = &ncc.node_config.cluster.database; let mut query = query; diff --git a/crates/disk/Cargo.toml b/crates/disk/Cargo.toml index 9b0613f..9d02ad1 100644 --- a/crates/disk/Cargo.toml +++ b/crates/disk/Cargo.toml @@ -8,27 +8,27 @@ edition = "2021" path = "src/disk.rs" [dependencies] -serde = { version = "1.0.193", features = ["derive"] } -serde_json = "1.0.108" +serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.138" serde_cbor = "0.11.2" -chrono = { version = "0.4.31", features = ["serde"] } -tokio-stream = {version = "0.1.14", features = ["fs"]} +chrono = { version = "0.4.39", features = ["serde"] } +tokio-stream = {version = "0.1.17", features = ["fs"]} async-channel = "1.9.0" -crossbeam = "0.8.2" -bytes = "1.5.0" -crc32fast = "1.3.2" -arrayref = "0.3.6" +crossbeam = "0.8.4" +bytes = "1.10.0" +crc32fast = "1.4.2" +arrayref = "0.3.9" byteorder = "1.5.0" -futures-util = "0.3.14" -async-stream = "0.3.0" -tracing = "0.1.40" +futures-util = "0.3.31" +async-stream = "0.3.6" +tracing = "0.1.41" tracing-futures = { version = "0.2.5", features = ["futures-03", "std-future"] } fs2 = "0.4.3" -libc = "0.2.93" +libc = "0.2.169" hex = "0.4.3" -num-traits = "0.2.14" -num-derive = "0.4.0" -url = "2.5.0" +num-traits = "0.2.19" +num-derive = "0.4.2" +url = "2.5.4" tiny-keccak = { version = "2.0", features = ["sha3"] } daqbuf-err = { path = "../../../daqbuf-err" } taskrun = { path = "../taskrun" } diff --git a/crates/dq/Cargo.toml b/crates/dq/Cargo.toml index a9e5dce..80bec4d 100644 --- a/crates/dq/Cargo.toml +++ b/crates/dq/Cargo.toml @@ -8,11 +8,11 @@ edition = "2021" path = "src/dq.rs" [dependencies] -tokio = { version = "1.41.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -futures-util = "0.3.14" -clap = { version = "4.0", features = ["derive", "cargo"] } -chrono = "0.4.19" -bytes = "1.7" +tokio = { version = "1.43.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +futures-util = "0.3.31" +clap = { version = "4.5", features = ["derive", "cargo"] } +chrono = "0.4.39" +bytes = "1.10" daqbuf-err = { path = "../../../daqbuf-err" } taskrun = { path = "../taskrun" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } diff --git a/crates/httpclient/Cargo.toml b/crates/httpclient/Cargo.toml index 0323cfa..6fb744a 100644 --- a/crates/httpclient/Cargo.toml +++ b/crates/httpclient/Cargo.toml @@ -5,19 +5,19 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -futures-util = "0.3.25" -serde = { version = "1.0.147", features = ["derive"] } -serde_json = "1.0.89" -url = "2.5.0" -tokio = { version = "1.34.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -tracing = "0.1.40" -http = "1.0.0" -http-body = "1.0.0" -http-body-util = "0.1.0" -hyper = { version = "1.0.1", features = ["http1", "http2", "client", "server"] } +futures-util = "0.3.31" +serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.138" +url = "2.5.4" +tokio = { version = "1.43.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tracing = "0.1.41" +http = "1.2.0" +http-body = "1.0.1" +http-body-util = "0.1.2" +hyper = { version = "1.6.0", features = ["http1", "http2", "client", "server"] } #hyper-tls = { version = "0.6.0" } -hyper-util = { version = "0.1.1", features = ["full"] } -bytes = "1.5.0" +hyper-util = { version = "0.1.10", features = ["full"] } +bytes = "1.10.0" async-channel = "1.9.0" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index ef1af7c..e7d43ad 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -10,22 +10,28 @@ path = "src/httpret.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -url = "2.5.0" -http = "1.0.0" -http-body-util = { version = "0.1.0" } -hyper = { version = "1.1.0", features = ["http1", "http2", "client", "server"] } -hyper-util = { version = "0.1.1", features = ["http1", "http2", "client", "server"] } -bytes = "1.5.0" -futures-util = "0.3.14" +url = "2.5.4" +http = "1.2.0" +http-body-util = { version = "0.1.2" } +hyper = { version = "1.6.0", features = ["http1", "http2", "client", "server"] } +hyper-util = { version = "0.1.10", features = ["http1", "http2", "client", "server"] } +h3-quinn = { version = "0.0.7", optional = true } +quinn = { version = "0.11.6", optional = true, default-features = false, features = ["log", "platform-verifier", "runtime-tokio", "rustls-ring"] } +rustls = { version = "0.23.22", optional = true, default-features = false, features = ["logging", "std", "tls12", "ring"] } +rustls-pki-types = { version = "1.11.0", optional = true } +pin-project = "1.1.9" +bytes = "1.10.0" +futures-util = "0.3.31" tracing = "0.1" tracing-futures = "0.2" -async-channel = "1.9.0" +async-channel = "2.3.1" itertools = "0.13.0" -chrono = "0.4.23" +time = "0.3.37" +chrono = "0.4.39" md-5 = "0.10.6" -regex = "1.10.2" -rand = "0.8.5" -ciborium = "0.2.1" +regex = "1.11.1" +rand = "0.9.0" +ciborium = "0.2.2" flate2 = "1" brotli = "7.0.0" autoerr = "0.0.3" @@ -47,4 +53,7 @@ daqbuf-redis = { path = "../daqbuf-redis" } httpclient = { path = "../httpclient" } [features] +default = [] +#default = ["http3"] prometheus_endpoint = [] +http3 = ["h3-quinn", "quinn", "rustls", "rustls-pki-types"] diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 0efd9e0..5821b04 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -168,6 +168,7 @@ pub async fn channel_search_list_v1( description_regex: query.description_regex.map_or(String::new(), |k| k), icase: false, kind: SeriesKind::default(), + log_level: String::new(), }; let urls = proxy_config .backends @@ -279,6 +280,7 @@ pub async fn channel_search_configs_v1( description_regex: query.description_regex.map_or(String::new(), |k| k), icase: false, kind: SeriesKind::default(), + log_level: String::new(), }; let urls = proxy_config .backends diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index b6c7611..e2d5b3b 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -7,9 +7,8 @@ use crate::requests::accepts_octets; use crate::ServiceSharedResources; use daqbuf_err as err; use dbconn::worker::PgQueue; -use err::thiserror; -use err::ThisError; use http::header::CONTENT_TYPE; +use http::request::Parts; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -24,6 +23,7 @@ use httpclient::ToJsonBody; use netpod::log::*; use netpod::req_uri_to_url; use netpod::timeunits::SEC; +use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; @@ -40,25 +40,27 @@ use std::sync::Arc; use streams::collect::CollectResult; use streams::eventsplainreader::DummyCacheReadProvider; use streams::eventsplainreader::SfDatabufferEventReadProvider; +use streams::streamtimeout::StreamTimeout2; use streams::timebin::cached::reader::EventsReadProvider; use streams::timebin::CacheReadProvider; use tracing::Instrument; -use url::Url; +use tracing::Span; -#[derive(Debug, ThisError)] -#[cstm(name = "Api4Binned")] -pub enum Error { - ChannelNotFound, - BadQuery(String), - HttpLib(#[from] http::Error), - ChannelConfig(crate::channelconfig::Error), - Retrieval(#[from] crate::RetrievalError), - EventsCbor(#[from] streams::plaineventscbor::Error), - EventsJson(#[from] streams::plaineventsjson::Error), - ServerError, - BinnedStream(err::Error), - TimebinnedJson(#[from] streams::timebinnedjson::Error), -} +autoerr::create_error_v1!( + name(Error, "Api4Binned"), + enum variants { + ChannelNotFound, + BadQuery(String), + HttpLib(#[from] http::Error), + ChannelConfig(crate::channelconfig::Error), + Retrieval(#[from] crate::RetrievalError), + EventsCbor(#[from] streams::plaineventscbor::Error), + EventsJson(#[from] streams::plaineventsjson::Error), + ServerError, + BinnedStream(err::Error), + TimebinnedJson(#[from] streams::timebinnedjson::Error), + }, +); impl From for Error { fn from(value: crate::channelconfig::Error) -> Self { @@ -104,7 +106,7 @@ impl BinnedHandler { } _ => { error!("EventsHandler sees: {e}"); - Ok(error_response(e.public_message(), ctx.reqid())) + Ok(error_response(e.to_string(), ctx.reqid())) } }, } @@ -126,19 +128,61 @@ async fn binned( { Err(Error::ServerError)?; } - if accepts_cbor_framed(req.headers()) { - Ok(binned_cbor_framed(url, req, ctx, pgqueue, scyqueue, ncc).await?) - } else if accepts_json_framed(req.headers()) { - Ok(binned_json_framed(url, req, ctx, pgqueue, scyqueue, ncc).await?) - } else if accepts_json_or_all(req.headers()) { - Ok(binned_json_single(url, req, ctx, pgqueue, scyqueue, ncc).await?) - } else if accepts_octets(req.headers()) { + let reqid = ctx.reqid(); + let (head, _body) = req.into_parts(); + let query = BinnedQuery::from_url(&url).map_err(|e| { + error!("binned_cbor_framed: {e:?}"); + Error::BadQuery(e.to_string()) + })?; + let logspan = if query.log_level() == "trace" { + trace!("enable trace for handler"); + tracing::span!(tracing::Level::INFO, "log_span_trace") + } else if query.log_level() == "debug" { + debug!("enable debug for handler"); + tracing::span!(tracing::Level::INFO, "log_span_debug") + } else { + tracing::Span::none() + }; + let span1 = span!( + Level::INFO, + "httpret::binned_cbor_framed", + reqid, + beg = query.range().beg_u64() / SEC, + end = query.range().end_u64() / SEC, + ch = query.channel().name(), + ); + span1.in_scope(|| { + debug!("binned begin {:?}", query); + }); + binned_instrumented(head, ctx, query, pgqueue, scyqueue, ncc, logspan.clone()) + .instrument(logspan) + .instrument(span1) + .await +} + +async fn binned_instrumented( + head: Parts, + ctx: &ReqCtx, + query: BinnedQuery, + pgqueue: &PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, + logspan: Span, +) -> Result { + let res2 = HandleRes2::new(ctx, logspan, query.clone(), pgqueue, scyqueue, ncc).await?; + if accepts_cbor_framed(&head.headers) { + Ok(binned_cbor_framed(res2, ctx, ncc).await?) + } else if accepts_json_framed(&head.headers) { + Ok(binned_json_framed(res2, ctx, ncc).await?) + } else if accepts_json_or_all(&head.headers) { + Ok(binned_json_single(res2, ctx, ncc).await?) + } else if accepts_octets(&head.headers) { Ok(error_response( format!("binary binned data not yet available"), ctx.reqid(), )) } else { - let ret = error_response(format!("Unsupported Accept: {:?}", req.headers()), ctx.reqid()); + let ret = error_response(format!("Unsupported Accept: {:?}", &head.headers), ctx.reqid()); Ok(ret) } } @@ -182,48 +226,20 @@ fn make_read_provider( } async fn binned_json_single( - url: Url, - req: Requ, + res2: HandleRes2<'_>, ctx: &ReqCtx, - pgqueue: &PgQueue, - scyqueue: Option, - ncc: &NodeConfigCached, + _ncc: &NodeConfigCached, ) -> Result { // TODO unify with binned_json_framed - debug!("binned_json_single {:?}", req); - let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id(); - let (_head, _body) = req.into_parts(); - let query = BinnedQuery::from_url(&url).map_err(|e| { - error!("binned_json: {e:?}"); - Error::BadQuery(e.to_string()) - })?; - let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) - .await? - .ok_or_else(|| Error::ChannelNotFound)?; - let span1 = span!( - Level::INFO, - "httpret::binned", - reqid, - beg = query.range().beg_u64() / SEC, - end = query.range().end_u64() / SEC, - ch = query.channel().name(), - ); - span1.in_scope(|| { - debug!("begin"); - }); - let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); - let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); - let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + debug!("binned_json_single"); let item = streams::timebinnedjson::timebinned_json( - query, - ch_conf, + res2.query, + res2.ch_conf, ctx, - cache_read_provider, - events_read_provider, - timeout_provider, + res2.cache_read_provider, + res2.events_read_provider, + res2.timeout_provider, ) - .instrument(span1) .await?; match item { CollectResult::Some(item) => { @@ -245,50 +261,30 @@ async fn binned_json_single( } async fn binned_json_framed( - url: Url, - req: Requ, + res2: HandleRes2<'_>, ctx: &ReqCtx, - pgqueue: &PgQueue, - scyqueue: Option, ncc: &NodeConfigCached, ) -> Result { - debug!("binned_json_framed {:?}", req); - let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id(); - let (_head, _body) = req.into_parts(); - let query = BinnedQuery::from_url(&url).map_err(|e| { - error!("binned_json_framed: {e:?}"); - Error::BadQuery(e.to_string()) - })?; + debug!("binned_json_framed"); // TODO handle None case better and return 404 - let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) + let ch_conf = ch_conf_from_binned(&res2.query, ctx, res2.pgqueue, ncc) .await? .ok_or_else(|| Error::ChannelNotFound)?; - let span1 = span!( - Level::INFO, - "httpret::binned", - reqid, - beg = query.range().beg_u64() / SEC, - end = query.range().end_u64() / SEC, - ch = query.channel().name(), - ); - span1.in_scope(|| { - debug!("begin"); - }); let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); + make_read_provider(ch_conf.name(), res2.scyqueue, open_bytes, ctx, ncc); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let stream = streams::timebinnedjson::timebinned_json_framed( - query, + res2.query, ch_conf, ctx, cache_read_provider, events_read_provider, timeout_provider, ) - .instrument(span1) .await?; let stream = streams::lenframe::bytes_chunks_to_len_framed_str(stream); + let stream = streams::instrument::InstrumentStream::new(stream, res2.logspan); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) @@ -297,53 +293,74 @@ async fn binned_json_framed( } async fn binned_cbor_framed( - url: Url, - req: Requ, + res2: HandleRes2<'_>, ctx: &ReqCtx, - pgqueue: &PgQueue, - scyqueue: Option, ncc: &NodeConfigCached, ) -> Result { - debug!("binned_cbor_framed {:?}", req); - let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id(); - let (_head, _body) = req.into_parts(); - let query = BinnedQuery::from_url(&url).map_err(|e| { - error!("binned_cbor_framed: {e:?}"); - Error::BadQuery(e.to_string()) - })?; + debug!("binned_cbor_framed"); // TODO handle None case better and return 404 - let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) + let ch_conf = ch_conf_from_binned(&res2.query, ctx, res2.pgqueue, ncc) .await? .ok_or_else(|| Error::ChannelNotFound)?; - let span1 = span!( - Level::INFO, - "httpret::binned_cbor_framed", - reqid, - beg = query.range().beg_u64() / SEC, - end = query.range().end_u64() / SEC, - ch = query.channel().name(), - ); - span1.in_scope(|| { - debug!("begin"); - }); let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); + make_read_provider(ch_conf.name(), res2.scyqueue, open_bytes, ctx, ncc); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let stream = streams::timebinnedjson::timebinned_cbor_framed( - query, + res2.query, ch_conf, ctx, cache_read_provider, events_read_provider, timeout_provider, ) - .instrument(span1) .await?; let stream = streams::lenframe::bytes_chunks_to_framed(stream); + let stream = streams::instrument::InstrumentStream::new(stream, res2.logspan); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_CBOR_FRAMED) .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) .body(body_stream(stream))?; Ok(ret) } + +struct HandleRes2<'a> { + logspan: Span, + query: BinnedQuery, + ch_conf: ChannelTypeConfigGen, + events_read_provider: Arc, + cache_read_provider: Arc, + timeout_provider: Box, + pgqueue: &'a PgQueue, + scyqueue: Option, +} + +impl<'a> HandleRes2<'a> { + async fn new( + ctx: &ReqCtx, + logspan: Span, + query: BinnedQuery, + pgqueue: &'a PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, + ) -> Result { + let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) + .await? + .ok_or_else(|| Error::ChannelNotFound)?; + let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let (events_read_provider, cache_read_provider) = + make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc); + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let ret = Self { + logspan, + query, + ch_conf, + events_read_provider, + cache_read_provider, + timeout_provider, + pgqueue, + scyqueue, + }; + Ok(ret) + } +} diff --git a/crates/httpret/src/api4/databuffer_tools.rs b/crates/httpret/src/api4/databuffer_tools.rs index eae20f4..075f1bb 100644 --- a/crates/httpret/src/api4/databuffer_tools.rs +++ b/crates/httpret/src/api4/databuffer_tools.rs @@ -285,7 +285,9 @@ async fn find_active( } } +#[pin_project::pin_project] struct FindActiveStream { + #[pin] rx: Receiver>, } @@ -308,9 +310,9 @@ impl FindActiveStream { impl Stream for FindActiveStream { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - match self.rx.poll_next_unpin(cx) { + match self.project().rx.poll_next_unpin(cx) { Ready(Some(item)) => Ready(Some(item)), Ready(None) => Ready(None), Pending => Pending, diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index 795af03..7a4d177 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -83,12 +83,12 @@ impl EventDataHandler { .await .map_err(|_| EventDataError::InternalError)?; let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?; - info!("{:?}", evsubq); + debug!("{:?}", evsubq); let logspan = if evsubq.log_level() == "trace" { - trace!("enable trace for handler"); + trace!("emit trace span"); tracing::span!(tracing::Level::INFO, "log_span_trace") } else if evsubq.log_level() == "debug" { - debug!("enable debug for handler"); + debug!("emit debug span"); tracing::span!(tracing::Level::INFO, "log_span_debug") } else { tracing::Span::none() diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index d12912b..8d87214 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -32,6 +32,7 @@ use netpod::APP_JSON_FRAMED; use netpod::HEADER_NAME_REQUEST_ID; use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::events::PlainEventsQuery; +use std::pin::Pin; use std::sync::Arc; use streams::collect::CollectResult; use streams::instrument::InstrumentStream; @@ -39,7 +40,9 @@ use streams::lenframe::bytes_chunks_to_framed; use streams::lenframe::bytes_chunks_to_len_framed_str; use streams::plaineventscbor::plain_events_cbor_stream; use streams::plaineventsjson::plain_events_json_stream; +use streams::streamtimeout::StreamTimeout2; use tracing::Instrument; +use tracing::Span; #[derive(Debug, ThisError)] #[cstm(name = "Api4Events")] @@ -134,7 +137,7 @@ impl EventsHandler { } else { tracing::Span::none() }; - match plain_events(req, evq, ctx, &shared_res.pgqueue, ncc) + match plain_events_prep(req, evq, ctx, &shared_res.pgqueue, ncc, logspan.clone()) .instrument(logspan) .await { @@ -144,100 +147,80 @@ impl EventsHandler { } } -async fn plain_events( +async fn plain_events_prep( req: Requ, evq: PlainEventsQuery, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCached, + logspan: Span, ) -> Result { - let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) - .await? - .ok_or_else(|| Error::ChannelNotFound)?; + let res2 = HandleRes2::new(ctx, logspan, evq.clone(), pgqueue, ncc).await?; + plain_events(res2, req).await +} + +async fn plain_events(res2: HandleRes2<'_>, req: Requ) -> Result { if accepts_cbor_framed(req.headers()) { - Ok(plain_events_cbor_framed(req, evq, ch_conf, ctx, ncc).await?) + Ok(plain_events_cbor_framed(req, res2).await?) } else if accepts_json_framed(req.headers()) { - Ok(plain_events_json_framed(req, evq, ch_conf, ctx, ncc).await?) + Ok(plain_events_json_framed(req, res2).await?) } else if accepts_json_or_all(req.headers()) { - Ok(plain_events_json(req, evq, ch_conf, ctx, ncc).await?) + Ok(plain_events_json(req, res2).await?) } else { - let ret = error_response(format!("unsupported accept"), ctx.reqid()); + let ret = error_response(format!("unsupported accept"), res2.ctx.reqid()); Ok(ret) } } -async fn plain_events_cbor_framed( - req: Requ, - evq: PlainEventsQuery, - ch_conf: ChannelTypeConfigGen, - ctx: &ReqCtx, - ncc: &NodeConfigCached, -) -> Result { - debug!("plain_events_cbor_framed {ch_conf:?} {req:?}"); - let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let open_bytes = Arc::pin(open_bytes); - let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); - let stream = plain_events_cbor_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?; +async fn plain_events_cbor_framed(req: Requ, res2: HandleRes2<'_>) -> Result { + debug!("plain_events_cbor_framed {:?} {:?}", res2.ch_conf, req); + let stream = plain_events_cbor_stream( + &res2.evq, + res2.ch_conf, + res2.ctx, + res2.open_bytes, + res2.timeout_provider, + ) + .await?; let stream = bytes_chunks_to_framed(stream); - let logspan = if evq.log_level() == "trace" { - trace!("enable trace for handler"); - tracing::span!(tracing::Level::INFO, "log_span_trace") - } else if evq.log_level() == "debug" { - debug!("enable debug for handler"); - tracing::span!(tracing::Level::INFO, "log_span_debug") - } else { - tracing::Span::none() - }; - let stream = InstrumentStream::new(stream, logspan); + let stream = InstrumentStream::new(stream, res2.logspan); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_CBOR_FRAMED) - .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .header(HEADER_NAME_REQUEST_ID, res2.ctx.reqid()) .body(body_stream(stream))?; Ok(ret) } -async fn plain_events_json_framed( - req: Requ, - evq: PlainEventsQuery, - ch_conf: ChannelTypeConfigGen, - ctx: &ReqCtx, - ncc: &NodeConfigCached, -) -> Result { - debug!("plain_events_json_framed {ch_conf:?} {req:?}"); - let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let open_bytes = Arc::pin(open_bytes); - let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); - let stream = plain_events_json_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?; +async fn plain_events_json_framed(req: Requ, res2: HandleRes2<'_>) -> Result { + debug!("plain_events_json_framed {:?} {:?}", res2.ch_conf, req); + let stream = plain_events_json_stream( + &res2.evq, + res2.ch_conf, + res2.ctx, + res2.open_bytes, + res2.timeout_provider, + ) + .await?; let stream = bytes_chunks_to_len_framed_str(stream); + let stream = InstrumentStream::new(stream, res2.logspan); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) - .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .header(HEADER_NAME_REQUEST_ID, res2.ctx.reqid()) .body(body_stream(stream))?; Ok(ret) } -async fn plain_events_json( - req: Requ, - evq: PlainEventsQuery, - ch_conf: ChannelTypeConfigGen, - ctx: &ReqCtx, - ncc: &NodeConfigCached, -) -> Result { +async fn plain_events_json(req: Requ, res2: HandleRes2<'_>) -> Result { let self_name = "plain_events_json"; - debug!("{self_name} {ch_conf:?} {req:?}"); + debug!("{self_name} {:?} {:?}", res2.ch_conf, req); let (_head, _body) = req.into_parts(); - // TODO handle None case better and return 404 - debug!("{self_name} chconf_from_events_quorum: {ch_conf:?}"); - let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let open_bytes = Arc::pin(open_bytes); - let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let item = streams::plaineventsjson::plain_events_json( - &evq, - ch_conf, - ctx, - &ncc.node_config.cluster, - open_bytes, - timeout_provider, + &res2.evq, + res2.ch_conf, + res2.ctx, + &res2.ncc.node_config.cluster, + res2.open_bytes, + res2.timeout_provider, ) .await; debug!("{self_name} returned {}", item.is_ok()); @@ -252,7 +235,7 @@ async fn plain_events_json( CollectResult::Some(item) => { let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON) - .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .header(HEADER_NAME_REQUEST_ID, res2.ctx.reqid()) .body(ToJsonBody::from(item.into_bytes()).into_body())?; debug!("{self_name} response created"); Ok(ret) @@ -261,9 +244,48 @@ async fn plain_events_json( let ret = error_status_response( StatusCode::GATEWAY_TIMEOUT, format!("no data within timeout"), - ctx.reqid(), + res2.ctx.reqid(), ); Ok(ret) } } } + +struct HandleRes2<'a> { + logspan: Span, + evq: PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, + open_bytes: Pin>, + timeout_provider: Box, + #[allow(unused)] + pgqueue: &'a PgQueue, + ctx: &'a ReqCtx, + ncc: &'a NodeConfigCached, +} + +impl<'a> HandleRes2<'a> { + async fn new( + ctx: &'a ReqCtx, + logspan: Span, + evq: PlainEventsQuery, + pgqueue: &'a PgQueue, + ncc: &'a NodeConfigCached, + ) -> Result { + let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) + .await? + .ok_or_else(|| Error::ChannelNotFound)?; + let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let ret = Self { + logspan, + evq, + ch_conf, + open_bytes, + timeout_provider, + pgqueue, + ctx, + ncc, + }; + Ok(ret) + } +} diff --git a/crates/httpret/src/api4/search.rs b/crates/httpret/src/api4/search.rs index 0ec5404..1a9a66e 100644 --- a/crates/httpret/src/api4/search.rs +++ b/crates/httpret/src/api4/search.rs @@ -16,6 +16,7 @@ use netpod::ChannelSearchResult; use netpod::NodeConfigCached; use netpod::ACCEPT_ALL; use netpod::APP_JSON; +use tracing::Instrument; pub struct ChannelSearchHandler {} @@ -55,7 +56,17 @@ impl ChannelSearchHandler { async fn channel_search(req: Requ, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result { let url = req_uri_to_url(req.uri())?; let query = ChannelSearchQuery::from_url(&url)?; - info!("search query: {:?}", query); - let res = dbconn::search::search_channel(query, pgqueue, ncc).await?; + let logspan = if query.log_level() == "trace" { + trace!("enable trace for handler"); + tracing::span!(tracing::Level::INFO, "log_span_trace") + } else if query.log_level() == "debug" { + debug!("enable debug for handler"); + tracing::span!(tracing::Level::INFO, "log_span_debug") + } else { + tracing::Span::none() + }; + let res = dbconn::search::search_channel(query, pgqueue, ncc) + .instrument(logspan) + .await?; Ok(res) } diff --git a/crates/httpret/src/channel_status.rs b/crates/httpret/src/channel_status.rs index 009900c..a61d06f 100644 --- a/crates/httpret/src/channel_status.rs +++ b/crates/httpret/src/channel_status.rs @@ -8,7 +8,6 @@ use http::Method; use http::StatusCode; use httpclient::body_empty; use httpclient::body_string; -use httpclient::error_response; use httpclient::error_status_response; use httpclient::IntoBody; use httpclient::Requ; @@ -30,7 +29,7 @@ pub struct ConnectionStatusEvents {} impl ConnectionStatusEvents { pub fn handler(req: &Requ) -> Option { - if req.uri().path() == "/api/4/status/connection/events" { + if req.uri().path() == "/api/4/private/status/connection/events" { Some(Self {}) } else { None @@ -90,7 +89,7 @@ impl ConnectionStatusEvents { let _do_one_before_range = true; let ret = Vec::new(); if true { - return Err(Error::with_msg_no_trace("TODO channel_status fetch_data")); + return Err(Error::with_msg_no_trace("TODO dedicated connection status?")); } // let mut stream = // scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy); diff --git a/crates/httpret/src/err.rs b/crates/httpret/src/err.rs index 58eb12c..8970893 100644 --- a/crates/httpret/src/err.rs +++ b/crates/httpret/src/err.rs @@ -114,3 +114,4 @@ impl Convable for nodenet::channelconfig::Error {} impl Convable for query::api4::Error {} impl Convable for query::api4::events::Error {} impl Convable for netpod::Error {} +impl Convable for crate::http3::Error {} diff --git a/crates/httpret/src/http3.rs b/crates/httpret/src/http3.rs new file mode 100644 index 0000000..ab07252 --- /dev/null +++ b/crates/httpret/src/http3.rs @@ -0,0 +1,138 @@ +use quinn; +use quinn::crypto::rustls::QuicServerConfig; +use quinn::Endpoint; +use quinn::EndpointConfig; +use quinn::Incoming; +use rustls::pki_types::pem::PemObject; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use taskrun::tokio; + +macro_rules! info { ($($arg:expr),*) => ( if true { netpod::log::info!($($arg),*); } ); } + +autoerr::create_error_v1!( + name(Error, "Http3Support"), + enum variants { + NoRuntime, + IO(#[from] std::io::Error), + }, +); + +pub struct Http3Support { + ep: Option, +} + +impl Http3Support { + pub async fn new(bind_addr: SocketAddr) -> Result { + let key = match PemObject::from_pem_file("key.pem") { + Ok(x) => x, + Err(e) => { + info!("key error {}", e); + return Ok(Self::dummy()); + } + }; + let cert = match PemObject::from_pem_file("cert.pem") { + Ok(x) => x, + Err(e) => { + info!("cert error {}", e); + return Ok(Self::dummy()); + } + }; + let conf = EndpointConfig::default(); + let mut tls_conf = match rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(vec![cert], key) + { + Ok(x) => x, + Err(e) => { + info!("tls config error {}", e); + return Ok(Self::dummy()); + } + }; + tls_conf.alpn_protocols = vec![b"h3".to_vec()]; + tls_conf.max_early_data_size = u32::MAX; + let tls_conf = tls_conf; + let v = match QuicServerConfig::try_from(tls_conf) { + Ok(x) => x, + Err(e) => { + info!("config error {}", e); + return Ok(Self::dummy()); + } + }; + let quic_conf = Arc::new(v); + let conf_srv = quinn::ServerConfig::with_crypto(quic_conf); + let sock = std::net::UdpSocket::bind(bind_addr)?; + info!("h3 sock {:?}", sock); + let rt = quinn::default_runtime().ok_or_else(|| Error::NoRuntime)?; + let ep = Endpoint::new(conf, Some(conf_srv), sock, rt)?; + { + let ep = ep.clone(); + tokio::task::spawn(Self::accept(ep)); + } + let ret = Self { ep: Some(ep) }; + Ok(ret) + } + + fn dummy() -> Self { + Self { ep: None } + } + + pub async fn wait_idle(self) -> () { + if let Some(ep) = self.ep.as_ref() { + ep.close(quinn::VarInt::from_u32(1), b"shutdown"); + ep.wait_idle().await + } + } + + async fn accept(ep: Endpoint) { + info!("accepting h3"); + while let Some(inc) = ep.accept().await { + tokio::spawn(Self::handle_incoming(inc)); + } + } + + async fn handle_incoming(inc: Incoming) { + info!("new incoming {:?}", inc.remote_address()); + let conn = match inc.await { + Ok(x) => x, + Err(e) => { + info!("connection error {}", e); + return; + } + }; + let fut1 = { + let conn = conn.clone(); + async move { + let bi = conn.accept_bi().await; + info!("got bi {:?}", bi); + match bi { + Ok(mut v) => { + v.0.write(b"some-data").await; + } + Err(e) => {} + } + } + }; + let fut2 = { + let conn = conn.clone(); + async move { + let uni = conn.accept_uni().await; + info!("got uni {:?}", uni); + } + }; + tokio::spawn(fut1); + tokio::spawn(fut2); + } +} + +// impl Future for Http3Support { +// type Output = Result<(), Error>; + +// fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { +// todo!() +// } +// } diff --git a/crates/httpret/src/http3_dummy.rs b/crates/httpret/src/http3_dummy.rs new file mode 100644 index 0000000..31e24fb --- /dev/null +++ b/crates/httpret/src/http3_dummy.rs @@ -0,0 +1,34 @@ +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +autoerr::create_error_v1!( + name(Error, "Http3Support"), + enum variants { + NoRuntime, + IO(#[from] std::io::Error), + }, +); + +pub struct Http3Support {} + +impl Http3Support { + pub async fn new(bind_addr: SocketAddr) -> Result { + let ret = Self {}; + Ok(ret) + } + + pub async fn wait_idle(&self) -> () { + () + } +} + +impl Future for Http3Support { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + todo!() + } +} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 221a92a..652d946 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -14,6 +14,13 @@ pub mod pulsemap; pub mod requests; pub mod settings; +#[cfg(feature = "http3")] +pub mod http3; +#[cfg(not(feature = "http3"))] +pub mod http3_dummy; +#[cfg(not(feature = "http3"))] +use http3_dummy as http3; + use crate::bodystream::response; use crate::err::Error; use daqbuf_err; @@ -68,6 +75,7 @@ autoerr::create_error_v1!( Fmt(#[from] std::fmt::Error), Url(#[from] url::ParseError), Netpod(#[from] netpod::Error), + Http3Support(#[from] crate::http3::Error), }, ); @@ -152,6 +160,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res let shared_res = Arc::new(shared_res); use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?; + let http3 = http3::Http3Support::new(bind_addr.clone()).await?; // tokio::net::TcpSocket::new_v4()?.listen(200)? let listener = TcpListener::bind(bind_addr).await?; loop { @@ -188,6 +197,8 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res } }); } + info!("http service done"); + let _x: () = http3.wait_idle().await; info!("http host done"); // rawjh.await??; Ok(()) diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index d5aab3d..bcf4faa 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -69,12 +69,18 @@ use tokio::net::TcpListener; use tracing::Instrument; use url::Url; +#[cfg(feature = "http3")] +use crate::http3; +#[cfg(not(feature = "http3"))] +use crate::http3_dummy as http3; + const DISTRI_PRE: &str = "/distri/"; pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -> Result<(), Error> { status_board_init(); use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; + let http3 = http3::Http3Support::new(bind_addr.clone()).await?; let listener = TcpListener::bind(bind_addr).await?; loop { let (stream, addr) = match listener.accept().await { @@ -103,7 +109,9 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) - } }); } - info!("proxy done"); + info!("http service done"); + let _x: () = http3.wait_idle().await; + info!("http host done"); Ok(()) } diff --git a/crates/nodenet/Cargo.toml b/crates/nodenet/Cargo.toml index a4d6e0b..9a000d6 100644 --- a/crates/nodenet/Cargo.toml +++ b/crates/nodenet/Cargo.toml @@ -7,16 +7,17 @@ edition = "2021" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_cbor = "0.11.1" -chrono = { version = "0.4.19", features = ["serde"] } +serde_cbor = "0.11.2" +chrono = { version = "0.4.39", features = ["serde"] } async-channel = "1.9.0" -bytes = "1.4.0" -crc32fast = "1.2.1" -arrayref = "0.3.6" -byteorder = "1.4.3" -futures-util = "0.3.14" -tracing = "0.1.25" +bytes = "1.10.0" +crc32fast = "1.4.2" +arrayref = "0.3.9" +byteorder = "1.5.0" +futures-util = "0.3.31" +tracing = "0.1.41" hex = "0.4.3" +autoerr = "0.0.3" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../../../daqbuf-query", package = "daqbuf-query" } diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 0a1953e..fe15a76 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -8,10 +8,10 @@ edition = "2021" path = "src/scyllaconn.rs" [dependencies] -futures-util = "0.3.24" +futures-util = "0.3.31" pin-project = "1" async-channel = "2.3.1" -scylla = "0.15.0" +scylla = "0.15.1" autoerr = "0.0.3" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } diff --git a/crates/streamio/Cargo.toml b/crates/streamio/Cargo.toml index 19c7f17..d6e01c4 100644 --- a/crates/streamio/Cargo.toml +++ b/crates/streamio/Cargo.toml @@ -5,24 +5,24 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] } -tokio-stream = "0.1.16" -futures-util = "0.3.15" -pin-project = "1.0.12" +tokio = { version = "1.43", features = ["io-util", "net", "time", "sync", "fs"] } +tokio-stream = "0.1.17" +futures-util = "0.3.31" +pin-project = "1.1.9" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_cbor = "0.11.1" -typetag = "0.2.14" -ciborium = "0.2.1" -bytes = "1.8" -arrayref = "0.3.6" -crc32fast = "1.3.2" -byteorder = "1.4.3" +serde_cbor = "0.11.2" +typetag = "0.2.19" +ciborium = "0.2.2" +bytes = "1.10" +arrayref = "0.3.9" +crc32fast = "1.4.2" +byteorder = "1.5.0" async-channel = "1.9.0" rand_xoshiro = "0.6.0" thiserror = "=0.0.1" autoerr = "0.0.3" -chrono = { version = "0.4.19", features = ["serde"] } +chrono = { version = "0.4.39", features = ["serde"] } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../../../daqbuf-query", package = "daqbuf-query" } items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } @@ -31,7 +31,7 @@ parse = { path = "../../../daqbuf-parse", package = "daqbuf-parse" } streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" } http = "1" http-body = "1" -http-body-util = "0.1.0" +http-body-util = "0.1.2" [dev-dependencies] taskrun = { path = "../taskrun" } diff --git a/crates/taskrun/Cargo.toml b/crates/taskrun/Cargo.toml index d6c94b6..ac1742d 100644 --- a/crates/taskrun/Cargo.toml +++ b/crates/taskrun/Cargo.toml @@ -8,14 +8,14 @@ edition = "2021" path = "src/taskrun.rs" [dependencies] -tokio = { version = "1.38.0", features = ["full", "tracing", "time"] } -futures-util = "0.3.28" -tracing = "0.1.40" +tokio = { version = "1.43.0", features = ["full", "tracing", "time"] } +futures-util = "0.3.31" +tracing = "0.1.41" tracing-log = "0.2.0" -tracing-subscriber = { version = "0.3.18", features = ["fmt", "time"] } +tracing-subscriber = { version = "0.3.19", features = ["fmt", "time"] } time = { version = "0.3", features = ["formatting"] } -backtrace = "0.3.71" -chrono = "0.4.38" +backtrace = "0.3.74" +chrono = "0.4.39" daqbuf-err = { path = "../../../daqbuf-err" } diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 316537e..993f838 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -215,8 +215,8 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { sr = g.parent(); } } - // allow - true + allow + // true } else { false } @@ -251,8 +251,8 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { sr = g.parent(); } } - // allow - true + allow + // true } else { false }