diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 3c51772..1f9fd8e 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.1-aa.0" +version = "0.5.1-aa.1" authors = ["Dominik Werder "] edition = "2021" @@ -13,7 +13,7 @@ serde_json = "1.0" serde_yaml = "0.9.27" chrono = "0.4.31" url = "2.5.0" -clap = { version = "4.4.11", features = ["derive", "cargo"] } +clap = { version = "4.5.7", features = ["derive", "cargo"] } err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 1d3be9d..5e0a973 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -1,5 +1,6 @@ use crate::err::Error; use crate::response; +use crate::ServiceSharedResources; use crate::ToPublicResponse; use dbconn::create_connection; use dbconn::worker::PgQueue; @@ -15,8 +16,12 @@ use httpclient::ToJsonBody; use netpod::get_url_query_pairs; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; +use netpod::query::PulseRangeQuery; +use netpod::query::TimeRangeQuery; +use netpod::range::evrange::SeriesRange; use netpod::req_uri_to_url; use netpod::timeunits::*; +use netpod::ttl::RetentionTime; use netpod::ChannelConfigQuery; use netpod::ChannelConfigResponse; use netpod::ChannelTypeConfigGen; @@ -31,6 +36,7 @@ use nodenet::configquorum::find_config_basics_quorum; use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; use scyllaconn::errconv::ErrConv; +use scyllaconn::range::ScyllaSeriesRange; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -368,7 +374,7 @@ pub struct ScyllaChannelsActive {} impl ScyllaChannelsActive { pub fn handler(req: &Requ) -> Option { - if req.uri().path() == "/api/4/channels/active" { + if req.uri().path() == "/api/4/private/channels/active" { Some(Self {}) } else { None @@ -470,7 +476,7 @@ pub struct IocForChannel {} impl IocForChannel { pub fn handler(req: &Requ) -> Option { - if req.uri().path() == "/api/4/channel/ioc" { + if req.uri().path() == "/api/4/private/channel/ioc" { Some(Self {}) } else { None @@ -530,8 +536,8 @@ impl IocForChannel { #[derive(Clone, Debug, Deserialize)] pub struct ScyllaSeriesTsMspQuery { - #[serde(rename = "seriesId")] - series: u64, + name: String, + range: SeriesRange, } impl FromUrl for ScyllaSeriesTsMspQuery { @@ -541,32 +547,45 @@ impl FromUrl for ScyllaSeriesTsMspQuery { } fn from_pairs(pairs: &BTreeMap) -> Result { - let s = pairs - .get("seriesId") - .ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?; - let series: u64 = s.parse()?; - Ok(Self { series }) + let name = pairs + .get("channelName") + .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? + .into(); + let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { + SeriesRange::TimeRange(x.into()) + } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { + SeriesRange::PulseRange(x.into()) + } else { + return Err(err::Error::with_public_msg_no_trace("no time range in url")); + }; + Ok(Self { name, range }) } } #[derive(Clone, Debug, Serialize)] pub struct ScyllaSeriesTsMspResponse { - #[serde(rename = "tsMsps")] - ts_msps: Vec, + st_ts_msp_ms: Vec, + mt_ts_msp_ms: Vec, + lt_ts_msp_ms: Vec, } pub struct ScyllaSeriesTsMsp {} impl ScyllaSeriesTsMsp { pub fn handler(req: &Requ) -> Option { - if req.uri().path() == "/api/4/scylla/series/tsMsps" { + if req.uri().path() == "/api/4/private/scylla/series/tsMsp" { Some(Self {}) } else { None } } - pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { + pub async fn handle( + &self, + req: Requ, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, + ) -> Result { if req.method() == Method::GET { let accept_def = APP_JSON; let accept = req @@ -576,7 +595,7 @@ impl ScyllaSeriesTsMsp { if accept == APP_JSON || accept == ACCEPT_ALL { let url = req_uri_to_url(req.uri())?; let q = ScyllaSeriesTsMspQuery::from_url(&url)?; - match self.get_ts_msps(&q, node_config).await { + match self.get_ts_msps(&q, shared_res, ncc).await { Ok(k) => { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) @@ -595,25 +614,65 @@ impl ScyllaSeriesTsMsp { async fn get_ts_msps( &self, q: &ScyllaSeriesTsMspQuery, - node_config: &NodeConfigCached, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, ) -> Result { - let scyco = node_config - .node_config - .cluster - .scylla_st() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let scy = scyllaconn::conn::create_scy_session(scyco).await?; - let mut ts_msps = Vec::new(); - let mut res = scy - .query_iter("select ts_msp from ts_msp where series = ?", (q.series as i64,)) + let backend = &ncc.node_config.cluster.backend; + let name = &q.name; + let nano_range = if let SeriesRange::TimeRange(x) = q.range.clone() { + x + } else { + todo!() + }; + let chconf = shared_res + .pgqueue + .chconf_best_matching_name_range_job(backend, name, nano_range) .await - .err_conv()?; - while let Some(row) = res.next().await { - let row = row.err_conv()?; - let (ts_msp,): (i64,) = row.into_typed().err_conv()?; - ts_msps.push(ts_msp as u64); + .map_err(|e| Error::with_msg_no_trace(format!("error from pg worker: {e}")))? + .recv() + .await + .unwrap() + .unwrap(); + use scyllaconn::SeriesId; + let sid = SeriesId::new(chconf.series()); + let scyqueue = shared_res.scyqueue.clone().unwrap(); + + let mut st_ts_msp_ms = Vec::new(); + let mut msp_stream = + scyllaconn::events2::msp::MspStream::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone()); + use chrono::TimeZone; + while let Some(x) = msp_stream.next().await { + let v = x.unwrap().ms(); + let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); + let s = st.format(netpod::DATETIME_FMT_0MS).to_string(); + st_ts_msp_ms.push(s); } - let ret = ScyllaSeriesTsMspResponse { ts_msps }; + + let mut mt_ts_msp_ms = Vec::new(); + let mut msp_stream = + scyllaconn::events2::msp::MspStream::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone()); + while let Some(x) = msp_stream.next().await { + let v = x.unwrap().ms(); + let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); + let s = st.format(netpod::DATETIME_FMT_0MS).to_string(); + mt_ts_msp_ms.push(s); + } + + let mut lt_ts_msp_ms = Vec::new(); + let mut msp_stream = + scyllaconn::events2::msp::MspStream::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone()); + while let Some(x) = msp_stream.next().await { + let v = x.unwrap().ms(); + let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); + let s = st.format(netpod::DATETIME_FMT_0MS).to_string(); + lt_ts_msp_ms.push(s); + } + + let ret = ScyllaSeriesTsMspResponse { + st_ts_msp_ms, + mt_ts_msp_ms, + lt_ts_msp_ms, + }; Ok(ret) } } diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 7d7104c..78bcb4a 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -373,7 +373,7 @@ async fn http_service_inner( } else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ScyllaSeriesTsMsp::handler(&req) { - Ok(h.handle(req, &node_config).await?) + Ok(h.handle(req, &shared_res, &node_config).await?) } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api4::accounting::AccountingToplistCounts::handler(&req) { diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index a380991..1044c86 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -72,7 +72,7 @@ pub async fn scylla_channel_event_stream( item } }, - Err(e) => Err(e), + Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn eevents error {e}"))), }; item }); diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 94a09bf..265ee55 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -9,6 +9,7 @@ path = "src/scyllaconn.rs" [dependencies] futures-util = "0.3.24" +pin-project = "1" async-channel = "2.3.1" scylla = "0.13.0" err = { path = "../err" } @@ -16,3 +17,4 @@ netpod = { path = "../netpod" } query = { path = "../query" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } +series = { path = "../../../daqingest/series" } diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index f5dbf55..89dc3fb 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -1,7 +1,8 @@ use crate::errconv::ErrConv; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; -use err::Error; +use err::thiserror; +use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -32,6 +33,24 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; +#[derive(Debug, ThisError)] +pub enum Error { + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), + ScyllaWorker(Box), + MissingQuery(String), + RangeEndOverflow, + InvalidFuture, + TestError(String), +} + +impl From for Error { + fn from(value: crate::worker::Error) -> Self { + Self::ScyllaWorker(Box::new(value)) + } +} + #[derive(Debug)] pub struct StmtsLspShape { u8: PreparedStatement, @@ -52,7 +71,7 @@ impl StmtsLspShape { fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> { let ret = match stname { "u8" => &self.u8, - _ => return Err(Error::with_msg_no_trace(format!("no query for stname {stname}"))), + _ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))), }; Ok(ret) } @@ -123,7 +142,7 @@ async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> table_name, select_cond ); - let qu = scy.prepare(cql).await.err_conv()?; + let qu = scy.prepare(cql).await?; Ok(qu) } @@ -153,7 +172,7 @@ async fn make_lsp( stname, select_cond ); - let qu = scy.prepare(cql).await.err_conv()?; + let qu = scy.prepare(cql).await?; Ok(qu) } @@ -234,54 +253,63 @@ impl StmtsEvents { } } -pub(super) async fn find_ts_msp_worker( +pub(super) async fn find_ts_msp( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + bck: bool, + stmts: &StmtsEvents, + scy: &ScySession, +) -> Result, Error> { + trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck); + if bck { + find_ts_msp_bck(rt, series, range, stmts, scy).await + } else { + find_ts_msp_fwd(rt, series, range, stmts, scy).await + } +} + +async fn find_ts_msp_fwd( rt: &RetentionTime, series: u64, range: ScyllaSeriesRange, stmts: &StmtsEvents, scy: &ScySession, -) -> Result<(VecDeque, VecDeque), Error> { - trace!("find_ts_msp series {:?} {:?}", series, range); - let mut ret1 = VecDeque::new(); - let mut ret2 = VecDeque::new(); - let params = (series as i64, range.beg().ms() as i64); - trace!("find_ts_msp query 1 params {:?}", params); - let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params) - .await - .err_conv()? - .into_typed::<(i64,)>(); - while let Some(x) = res.next().await { - let row = x.map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let ts = TsMs::from_ms_u64(row.0 as u64); - trace!("query 1 ts_msp {}", ts); - ret1.push_front(ts); - } +) -> Result, Error> { + let mut ret = VecDeque::new(); + // TODO time range truncation can be handled better let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); - trace!("find_ts_msp query 2 params {:?}", params); let mut res = scy .execute_iter(stmts.rt(rt).ts_msp_fwd.clone(), params) - .await - .err_conv()? + .await? .into_typed::<(i64,)>(); while let Some(x) = res.next().await { - let row = x.map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let row = x?; let ts = TsMs::from_ms_u64(row.0 as u64); - trace!("query 2 ts_msp {}", ts); - ret2.push_front(ts); + ret.push_back(ts); } - // let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? limit 1"; - // let params = (series as i64, range.end().ms() as i64); - // trace!("find_ts_msp query 3 params {:?}", params); - // let res = scy.query(cql, params).await.err_conv()?; - // for row in res.rows_typed_or_empty::<(i64,)>() { - // let row = row.err_conv()?; - // let ts = TsMs::from_ms_u64(row.0 as u64); - // trace!("query 3 ts_msp {}", ts); - // ret2.push_back(ts); - // } - trace!("find_ts_msp n1 {:?} n2 {:?}", ret1.len(), ret2.len()); - Ok((ret1, ret2)) + Ok(ret) +} + +async fn find_ts_msp_bck( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + stmts: &StmtsEvents, + scy: &ScySession, +) -> Result, Error> { + let mut ret = VecDeque::new(); + let params = (series as i64, range.beg().ms() as i64); + let mut res = scy + .execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params) + .await? + .into_typed::<(i64,)>(); + while let Some(x) = res.next().await { + let row = x?; + let ts = TsMs::from_ms_u64(row.0 as u64); + ret.push_front(ts); + } + Ok(ret) } trait ValTy: Sized + 'static { @@ -438,8 +466,12 @@ where // TODO could take scyqeue out of opts struct. let scyqueue = opts.scyqueue.clone(); let futgen = Box::new(|scy: Arc, stmts: Arc| { - let fut = read_next_values_2::(opts, scy, stmts); - Box::pin(fut) as Pin, err::Error>> + Send>> + let fut = async { + read_next_values_2::(opts, scy, stmts) + .await + .map_err(crate::worker::Error::from) + }; + Box::pin(fut) as Pin, crate::worker::Error>> + Send>> }); let res = scyqueue.read_next_values(futgen).await?; Ok(res) @@ -459,7 +491,7 @@ where let range = opts.range; let table_name = ST::table_name(); if range.end() > TsNano::from_ns(i64::MAX as u64) { - return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + return Err(Error::RangeEndOverflow); } let ret = if opts.fwd { let ts_lsp_min = if range.beg() > ts_msp.ns() { @@ -505,10 +537,10 @@ where ts_lsp_max.ns() as i64, ); trace!("FWD event search params {:?}", params); - let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?; + let mut res = scy.execute_iter(qu.clone(), params).await?; let mut rows = Vec::new(); while let Some(x) = res.next().await { - rows.push(x.err_conv()?); + rows.push(x?); } let mut last_before = None; let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut last_before)?; @@ -541,10 +573,10 @@ where .st(ST::st_name())?; let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); trace!("BCK event search params {:?}", params); - let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?; + let mut res = scy.execute_iter(qu.clone(), params).await?; let mut rows = Vec::new(); while let Some(x) = res.next().await { - rows.push(x.err_conv()?); + rows.push(x?); } let mut _last_before = None; let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut _last_before)?; @@ -570,21 +602,21 @@ fn convert_rows( for row in rows { let (ts, pulse, value) = if with_values { if ST::is_valueblob() { - let row: (i64, i64, Vec) = row.into_typed().err_conv()?; + let row: (i64, i64, Vec) = row.into_typed()?; trace!("read a value blob len {}", row.2.len()); let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); let pulse = row.1 as u64; let value = ValTy::from_valueblob(row.2); (ts, pulse, value) } else { - let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?; + let row: (i64, i64, ST::ScyTy) = row.into_typed()?; let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); let pulse = row.1 as u64; let value = ValTy::from_scyty(row.2); (ts, pulse, value) } } else { - let row: (i64, i64) = row.into_typed().err_conv()?; + let row: (i64, i64) = row.into_typed()?; let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); let pulse = row.1 as u64; let value = ValTy::default(); @@ -651,9 +683,7 @@ impl ReadValues { ts_msps, fwd, with_values, - fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( - "future not initialized", - )))), + fut: Box::pin(futures_util::future::ready(Err(Error::InvalidFuture))), fut_done: false, scyqueue, }; @@ -738,7 +768,7 @@ impl ReadValues { enum FrState { New, - FindMsp(Pin, VecDeque), crate::worker::Error>> + Send>>), + FindMsp(Pin, crate::worker::Error>> + Send>>), ReadBack1(ReadValues), ReadBack2(ReadValues), ReadValues(ReadValues), @@ -930,9 +960,10 @@ async fn find_ts_msp_via_queue( rt: RetentionTime, series: u64, range: ScyllaSeriesRange, + bck: bool, scyqueue: ScyllaQueue, -) -> Result<(VecDeque, VecDeque), crate::worker::Error> { - scyqueue.find_ts_msp(rt, series, range).await +) -> Result, crate::worker::Error> { + scyqueue.find_ts_msp(rt, series, range, bck).await } impl Stream for EventsStreamScylla { @@ -941,8 +972,7 @@ impl Stream for EventsStreamScylla { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; if self.do_test_stream_error { - let e = Error::with_msg(format!("Test PRIVATE STREAM error.")) - .add_public_msg(format!("Test PUBLIC STREAM error.")); + let e = Error::TestError("test-message".into()); return Ready(Some(Err(e))); } loop { @@ -967,14 +997,15 @@ impl Stream for EventsStreamScylla { FrState::New => { let series = self.series.clone(); let range = self.range.clone(); - let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, self.scyqueue.clone()); + // TODO this no longer works, we miss the backwards part here + let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone()); let fut = Box::pin(fut); self.state = FrState::FindMsp(fut); continue; } FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok((msps1, msps2))) => { - self.ts_msps_found(msps1, msps2); + Ready(Ok(msps)) => { + self.ts_msps_found(VecDeque::new(), msps); continue; } Ready(Err(e)) => { diff --git a/crates/scyllaconn/src/events2.rs b/crates/scyllaconn/src/events2.rs index e69de29..a0554af 100644 --- a/crates/scyllaconn/src/events2.rs +++ b/crates/scyllaconn/src/events2.rs @@ -0,0 +1 @@ +pub mod msp; diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs new file mode 100644 index 0000000..a9213b3 --- /dev/null +++ b/crates/scyllaconn/src/events2/msp.rs @@ -0,0 +1,192 @@ +use crate::range::ScyllaSeriesRange; +use crate::worker::ScyllaQueue; +use err::thiserror; +use err::ThisError; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use netpod::ttl::RetentionTime; +use netpod::TsMs; +use series::SeriesId; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +pub enum Error { + Worker(#[from] crate::worker::Error), + Logic, +} + +enum Resolvable +where + F: Future, +{ + Future(F), + Output(::Output), + Taken, +} + +impl Resolvable +where + F: Future, +{ + fn take(&mut self) -> Option<::Output> { + let x = std::mem::replace(self, Resolvable::Taken); + match x { + Resolvable::Future(_) => None, + Resolvable::Output(x) => Some(x), + Resolvable::Taken => None, + } + } +} + +struct BckAndFirstFwd { + scyqueue: ScyllaQueue, + fut_bck: Resolvable, crate::worker::Error>> + Send>>>, + fut_fwd: Resolvable, crate::worker::Error>> + Send>>>, +} + +enum State { + BckAndFirstFwd(BckAndFirstFwd), + InputDone, +} + +#[pin_project::pin_project] +pub struct MspStream { + rt: RetentionTime, + series: SeriesId, + range: ScyllaSeriesRange, + #[pin] + state: State, + out: VecDeque, +} + +impl MspStream { + pub fn new(rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, scyqueue: ScyllaQueue) -> Self { + let fut_bck = { + let scyqueue = scyqueue.clone(); + let rt = rt.clone(); + let series = series.clone(); + let range = range.clone(); + async move { scyqueue.find_ts_msp(rt, series.id(), range, true).await } + }; + let fut_fwd = { + let scyqueue = scyqueue.clone(); + let rt = rt.clone(); + let series = series.clone(); + let range = range.clone(); + async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await } + }; + Self { + rt, + series, + range, + state: State::BckAndFirstFwd(BckAndFirstFwd { + scyqueue, + fut_bck: Resolvable::Future(Box::pin(fut_bck)), + fut_fwd: Resolvable::Future(Box::pin(fut_fwd)), + }), + out: VecDeque::new(), + } + } +} + +impl Stream for MspStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match &mut self.state { + State::BckAndFirstFwd(st) => { + let mut have_pending = false; + let rsv = &mut st.fut_bck; + match rsv { + Resolvable::Future(fut) => match fut.poll_unpin(cx) { + Ready(x) => { + *rsv = Resolvable::Output(x); + } + Pending => { + have_pending = true; + } + }, + Resolvable::Output(_) => {} + Resolvable::Taken => {} + } + let rsv = &mut st.fut_fwd; + match rsv { + Resolvable::Future(fut) => match fut.poll_unpin(cx) { + Ready(x) => { + *rsv = Resolvable::Output(x); + } + Pending => { + have_pending = true; + } + }, + Resolvable::Output(_) => {} + Resolvable::Taken => {} + } + if have_pending { + Pending + } else { + let taken_bck = st.fut_bck.take(); + let taken_fwd = st.fut_fwd.take(); + if let Some(x) = taken_bck { + match x { + Ok(v) => { + for e in v { + self.out.push_back(e) + } + + if let Some(x) = taken_fwd { + match x { + Ok(v) => { + for e in v { + self.out.push_back(e) + } + + self.state = State::InputDone; + continue; + } + Err(e) => Ready(Some(Err(e.into()))), + } + } else { + Ready(Some(Err(Error::Logic))) + } + } + Err(e) => Ready(Some(Err(e.into()))), + } + } else { + Ready(Some(Err(Error::Logic))) + } + } + } + State::InputDone => { + if let Some(x) = self.out.pop_front() { + Ready(Some(Ok(x))) + } else { + Ready(None) + } + } + }; + } + } +} + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: MspStream = todoval(); + trait_assert(x); +} + +fn todoval() -> T { + todo!() +} diff --git a/crates/scyllaconn/src/scyllaconn.rs b/crates/scyllaconn/src/scyllaconn.rs index 50de65f..b50e9ec 100644 --- a/crates/scyllaconn/src/scyllaconn.rs +++ b/crates/scyllaconn/src/scyllaconn.rs @@ -9,3 +9,4 @@ pub mod status; pub mod worker; pub use scylla; +pub use series::SeriesId; diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 2047fb3..8c1f1bb 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -19,18 +19,13 @@ use std::sync::Arc; #[derive(Debug, ThisError)] pub enum Error { - Error(#[from] err::Error), + ScyllaConnection(err::Error), + EventsQuery(#[from] crate::events::Error), ChannelSend, ChannelRecv, Join, } -impl err::ToErr for Error { - fn to_err(self) -> err::Error { - err::Error::from_string(self) - } -} - #[derive(Debug)] enum Job { FindTsMsp( @@ -38,7 +33,8 @@ enum Job { // series-id u64, ScyllaSeriesRange, - Sender, VecDeque), Error>>, + bool, + Sender, Error>>, ), ReadNextValues(ReadNextValues), } @@ -48,7 +44,7 @@ struct ReadNextValues { dyn FnOnce( Arc, Arc, - ) -> Pin, err::Error>> + Send>> + ) -> Pin, Error>> + Send>> + Send, >, // fut: Pin, Error>> + Send>>, @@ -72,9 +68,10 @@ impl ScyllaQueue { rt: RetentionTime, series: u64, range: ScyllaSeriesRange, - ) -> Result<(VecDeque, VecDeque), Error> { + bck: bool, + ) -> Result, Error> { let (tx, rx) = async_channel::bounded(1); - let job = Job::FindTsMsp(rt, series, range, tx); + let job = Job::FindTsMsp(rt, series, range, bck, tx); self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; Ok(res) @@ -85,7 +82,7 @@ impl ScyllaQueue { F: FnOnce( Arc, Arc, - ) -> Pin, err::Error>> + Send>> + ) -> Pin, Error>> + Send>> + Send + 'static, { @@ -126,7 +123,9 @@ impl ScyllaWorker { } pub async fn work(self) -> Result<(), Error> { - let scy = create_scy_session_no_ks(&self.scyconf_st).await?; + let scy = create_scy_session_no_ks(&self.scyconf_st) + .await + .map_err(Error::ScyllaConnection)?; let scy = Arc::new(scy); let kss = [ self.scyconf_st.keyspace.as_str(), @@ -145,8 +144,8 @@ impl ScyllaWorker { } }; match job { - Job::FindTsMsp(rt, series, range, tx) => { - let res = crate::events::find_ts_msp_worker(&rt, series, range, &stmts, &scy).await; + Job::FindTsMsp(rt, series, range, bck, tx) => { + let res = crate::events::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; if tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats } diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 846b9f7..4a20afc 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -169,6 +169,9 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { + if true { + return true; + } if *meta.level() <= tracing::Level::TRACE { if ["httpret", "scyllaconn"].contains(&meta.target()) { let mut sr = ctx.lookup_current();