diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index 7911b6b..f1901ca 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -6,7 +6,6 @@ use err::thiserror; use err::PublicError; use err::ThisError; use err::ToPublicError; -use futures_util::Stream; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -100,7 +99,7 @@ impl EventDataHandler { let stream = nodenet::conn::create_response_bytes_stream(evsubq, shared_res.scyqueue.as_ref(), ncc) .instrument(logspan.clone()) .await - .map_err(|e| EventDataError::Error(Box::new(e)))?; + .map_err(|e| EventDataError::Error(Box::new(err::Error::from_string(e))))?; let stream = InstrumentStream::new(stream, logspan); let ret = response(StatusCode::OK) .body(body_stream(stream)) diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 7756b33..65ec83a 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -112,27 +112,7 @@ async fn plain_events_cbor_framed( debug!("plain_events_cbor_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let stream = streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; - use future::ready; - let stream = stream - .flat_map(|x| match x { - Ok(y) => { - use bytes::BufMut; - let buf = y.into_inner(); - let adv = (buf.len() + 7) / 8 * 8; - let pad = adv - buf.len(); - let mut b2 = BytesMut::with_capacity(16); - b2.put_u32_le(buf.len() as u32); - b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); - let mut b3 = BytesMut::with_capacity(16); - b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); - stream::iter([Ok::<_, Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())]) - } - Err(e) => { - let e = Error::with_msg_no_trace(e.to_string()); - stream::iter([Err(e), Ok(Bytes::new()), Ok(Bytes::new())]) - } - }) - .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }); + let stream = bytes_chunks_to_framed(stream); let logspan = if evq.log_level() == "trace" { trace!("enable trace for handler"); tracing::span!(tracing::Level::INFO, "log_span_trace") @@ -160,7 +140,7 @@ async fn plain_events_json_framed( debug!("plain_events_json_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; - let stream = bytes_chunks_to_framed(stream); + let stream = bytes_chunks_to_len_framed_str(stream); let ret = response(StatusCode::OK).body(body_stream(stream))?; Ok(ret) } @@ -226,3 +206,26 @@ where }) .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) } + +fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> +where + S: Stream>, + T: Into, +{ + use future::ready; + stream + .flat_map(|x| match x { + Ok(y) => { + use std::fmt::Write; + let s = y.into(); + let mut b2 = String::with_capacity(16); + write!(b2, "\n{}\n", s.len()).unwrap(); + stream::iter([Ok::<_, Error>(b2), Ok(s)]) + } + Err(e) => { + let e = Error::with_msg_no_trace(e.to_string()); + stream::iter([Err(e), Ok(String::new())]) + } + }) + .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) +} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index f870694..dcb1949 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -122,7 +122,15 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res } // let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone())); let (pgqueue, pgworker) = PgWorker::new(&ncc.node_config.cluster.database).await?; - let pgworker_jh = taskrun::spawn(pgworker.work()); + let pgworker_jh = taskrun::spawn(async move { + let x = pgworker.work().await; + match x { + Ok(()) => {} + Err(e) => { + error!("received error from PgWorker: {e}"); + } + } + }); let scyqueue = if let (Some(st), Some(mt), Some(lt)) = ( ncc.node_config.cluster.scylla_st(), ncc.node_config.cluster.scylla_mt(), @@ -134,7 +142,15 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res error!("{e}"); RetrievalError::TextError(e.to_string()) })?; - let scylla_worker_jh = taskrun::spawn(scylla_worker.work()); + let scylla_worker_jh = taskrun::spawn(async move { + let x = scylla_worker.work().await; + match x { + Ok(()) => {} + Err(e) => { + error!("received error from ScyllaWorker: {e}"); + } + } + }); Some(scyqueue) } else { None diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 297b700..cb64af0 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -79,7 +79,7 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) return Err(Error::with_msg_no_trace(msg)); } }; - info!("from {} len {}", tag, res.channels.len()); + info!("from {} len {} {:?}", tag, res.channels.len(), res.channels); let ret = SubRes { tag, status: StatusCode::OK, diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index c495a1d..20aff13 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -151,6 +151,7 @@ pub trait Events: fn pulses(&self) -> &VecDeque; fn frame_type_id(&self) -> u32; fn to_min_max_avg(&mut self) -> Box; + fn to_json_string(&self) -> String; fn to_json_vec_u8(&self) -> Vec; fn to_cbor_vec_u8(&self) -> Vec; fn clear(&mut self); @@ -273,6 +274,10 @@ impl Events for Box { Events::to_min_max_avg(self.as_mut()) } + fn to_json_string(&self) -> String { + Events::to_json_string(self.as_ref()) + } + fn to_json_vec_u8(&self) -> Vec { Events::to_json_vec_u8(self.as_ref()) } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index df8e74d..8ac5a03 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -967,6 +967,16 @@ impl Events for ChannelEvents { } } + fn to_json_string(&self) -> String { + match self { + ChannelEvents::Events(item) => item.to_json_string(), + ChannelEvents::Status(item) => { + error!("TODO convert status to json"); + String::new() + } + } + } + fn to_json_vec_u8(&self) -> Vec { match self { ChannelEvents::Events(item) => item.to_json_vec_u8(), diff --git a/crates/items_2/src/empty.rs b/crates/items_2/src/empty.rs index 26c6edf..a1dd2a9 100644 --- a/crates/items_2/src/empty.rs +++ b/crates/items_2/src/empty.rs @@ -28,6 +28,7 @@ pub fn empty_events_dyn_ev(scalar_type: &ScalarType, shape: &Shape) -> Result Box::new(K::::empty()), Enum => Box::new(K::::empty()), ChannelStatus => Box::new(K::::empty()), + CaStatus => Box::new(K::::empty()), } } Shape::Wave(..) => { @@ -48,6 +49,7 @@ pub fn empty_events_dyn_ev(scalar_type: &ScalarType, shape: &Shape) -> Result Box::new(K::::empty()), Enum => Box::new(K::::empty()), ChannelStatus => Box::new(K::::empty()), + CaStatus => Box::new(K::::empty()), } } Shape::Image(..) => { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 0bc7b5a..e9a1b6f 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -979,7 +979,7 @@ impl Events for EventsDim0 { Box::new(dst) } - fn to_json_vec_u8(&self) -> Vec { + fn to_json_string(&self) -> String { // TODO redesign with mut access, rename to `into_` and take the values out. let mut tss = self.tss.clone(); let mut pulses = self.pulses.clone(); @@ -1000,7 +1000,11 @@ impl Events for EventsDim0 { timed_out: false, continue_at: None, }; - serde_json::to_vec(&ret).unwrap() + serde_json::to_string(&ret).unwrap() + } + + fn to_json_vec_u8(&self) -> Vec { + self.to_json_string().into_bytes() } fn to_cbor_vec_u8(&self) -> Vec { diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index ea7b1e4..cfabe8b 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -949,7 +949,7 @@ impl Events for EventsDim1 { Box::new(item) } - fn to_json_vec_u8(&self) -> Vec { + fn to_json_string(&self) -> String { let ret = EventsDim1ChunkOutput { // TODO use &mut to swap the content tss: self.tss.clone(), @@ -957,7 +957,11 @@ impl Events for EventsDim1 { values: self.values.clone(), scalar_type: STY::scalar_type_name().into(), }; - serde_json::to_vec(&ret).unwrap() + serde_json::to_string(&ret).unwrap() + } + + fn to_json_vec_u8(&self) -> Vec { + self.to_json_string().into_bytes() } fn to_cbor_vec_u8(&self) -> Vec { diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index 12f4aae..3746ade 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -358,6 +358,10 @@ impl Events for EventsXbinDim0 { Box::new(dst) } + fn to_json_string(&self) -> String { + todo!() + } + fn to_json_vec_u8(&self) -> Vec { todo!() } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 4c05a8a..be5dd8c 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -173,6 +173,7 @@ pub struct BodyStream { pub enum SeriesKind { ChannelStatus, ChannelData, + CaStatus, } impl SeriesKind { @@ -181,6 +182,7 @@ impl SeriesKind { match self { ChannelStatus => 1, ChannelData => 2, + CaStatus => 3, } } @@ -188,6 +190,7 @@ impl SeriesKind { let ret = match x { 1 => Self::ChannelData, 2 => Self::ChannelStatus, + 3 => Self::CaStatus, _ => return Err(Error::with_msg_no_trace("bad SeriesKind value")), }; Ok(ret) @@ -259,22 +262,23 @@ impl<'de> serde::de::Visitor<'de> for ScalarTypeVis { } fn visit_str(self, value: &str) -> Result { + use ScalarType::*; let s = value.to_lowercase(); let ret = match s.as_str() { - "u8" => ScalarType::U8, - "u16" => ScalarType::U16, - "u32" => ScalarType::U32, - "u64" => ScalarType::U64, - "i8" => ScalarType::I8, - "i16" => ScalarType::I16, - "i32" => ScalarType::I32, - "i64" => ScalarType::I64, - "f32" => ScalarType::F32, - "f64" => ScalarType::F64, - "bool" => ScalarType::BOOL, - "string" => ScalarType::STRING, - "enum" => ScalarType::Enum, - "channelstatus" => ScalarType::ChannelStatus, + "u8" => U8, + "u16" => U16, + "u32" => U32, + "u64" => U64, + "i8" => I8, + "i16" => I16, + "i32" => I32, + "i64" => I64, + "f32" => F32, + "f64" => F64, + "bool" => BOOL, + "string" => STRING, + "enum" => Enum, + "channelstatus" => ChannelStatus, k => return Err(E::custom(format!("can not understand variant {k:?}"))), }; Ok(ret) @@ -534,16 +538,40 @@ impl ScalarType { #[derive(Debug, Clone, PartialOrd, PartialEq)] pub struct StringFix { data: [char; N], + len: u8, } impl StringFix { pub fn new() -> Self { Self { data: [char::REPLACEMENT_CHARACTER; N], + len: 0, } } } +impl From for StringFix +where + T: AsRef, +{ + fn from(x: T) -> Self { + let sl = x.as_ref(); + let sl = &sl[0..sl.len().min(N)]; + let mut ret = Self::new(); + for (i, ch) in sl.chars().enumerate() { + ret.data[i] = ch; + } + ret.len = sl.len() as u8; + ret + } +} + +impl From> for String { + fn from(x: StringFix) -> Self { + x.data[0..x.len as _].iter().collect() + } +} + mod string_fix_impl_serde { use crate::StringFix; use serde::de::Visitor; @@ -593,6 +621,12 @@ pub struct EnumVariant { name: StringFix<26>, } +impl EnumVariant { + pub fn new(ix: u16, name: StringFix<26>) -> Self { + Self { ix, name } + } +} + impl Default for EnumVariant { fn default() -> Self { Self { @@ -1556,7 +1590,7 @@ impl DtNano { self.0 } - pub const fn ms(&self) -> u64 { + pub const fn ms_u64(&self) -> u64 { self.0 / 1000000 } @@ -1713,6 +1747,10 @@ impl TsNano { self.0 / 1000000 } + pub const fn add_dt_nano(self, v: DtNano) -> Self { + Self(self.0 + v.0) + } + pub const fn sub(self, v: DtNano) -> Self { Self(self.0 - v.0) } @@ -1736,6 +1774,12 @@ impl TsNano { pub const fn to_ts_ms(self) -> TsMs { TsMs::from_ms_u64(self.ms()) } + + pub fn from_system_time(st: SystemTime) -> Self { + let tsunix = st.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); + let x = tsunix.as_secs() * 1000000000 + tsunix.subsec_nanos() as u64; + Self::from_ns(x) + } } impl fmt::Debug for TsNano { @@ -3189,7 +3233,7 @@ impl From for ChannelConfigResponse { backend: value.backend().into(), name: value.name().into(), keyspace: value.ks(), - timebinsize: value.bs().ms(), + timebinsize: value.bs().ms_u64(), scalar_type: value.scalar_type().clone(), shape: value.shape().clone(), byte_order: value.byte_order().clone(), diff --git a/crates/netpod/src/ttl.rs b/crates/netpod/src/ttl.rs index 1e961b1..552cdee 100644 --- a/crates/netpod/src/ttl.rs +++ b/crates/netpod/src/ttl.rs @@ -49,7 +49,7 @@ impl RetentionTime { pub fn ttl_ts_msp(&self) -> Duration { let day = 60 * 60 * 24; match self { - RetentionTime::Short => Duration::from_secs(day * 40), + RetentionTime::Short => Duration::from_secs(day * 7), RetentionTime::Medium => Duration::from_secs(day * 31 * 13), RetentionTime::Long => Duration::from_secs(day * 31 * 12 * 17), } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index dda1cc3..b43bdf0 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -1,7 +1,6 @@ use crate::scylla::scylla_channel_event_stream; use bytes::Bytes; use err::thiserror; -use err::Error; use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; @@ -43,7 +42,16 @@ use tracing::Instrument; mod test; #[derive(Debug, ThisError)] -pub enum NodeNetError {} +#[cstm(name = "NodenetConn")] +pub enum Error { + BadQuery, + Scylla(#[from] crate::scylla::Error), + Error(#[from] err::Error), + Io(#[from] std::io::Error), + Items(#[from] items_2::Error), + NotAvailable, + DebugTest, +} pub async fn events_service(ncc: NodeConfigCached) -> Result<(), Error> { let scyqueue = err::todoval(); @@ -83,15 +91,17 @@ async fn make_channel_events_stream_data( if subq.backend() == TEST_BACKEND { let node_count = ncc.node_config.cluster.nodes.len() as u64; let node_ix = ncc.ix as u64; - streams::generators::make_test_channel_events_stream_data(subq, node_count, node_ix) + let ret = streams::generators::make_test_channel_events_stream_data(subq, node_count, node_ix)?; + Ok(ret) } else if let Some(scyqueue) = scyqueue { let cfg = subq.ch_conf().to_scylla()?; - scylla_channel_event_stream(subq, cfg, scyqueue).await + let ret = scylla_channel_event_stream(subq, cfg, scyqueue).await?; + Ok(ret) } else if let Some(_) = &ncc.node.channel_archiver { - let e = Error::with_msg_no_trace("archapp not built"); + let e = Error::NotAvailable; Err(e) } else if let Some(_) = &ncc.node.archiver_appliance { - let e = Error::with_msg_no_trace("archapp not built"); + let e = Error::NotAvailable; Err(e) } else { let cfg = subq.ch_conf().to_sf_databuffer()?; @@ -126,7 +136,7 @@ pub async fn create_response_bytes_stream( debug!("wasm1 {:?}", evq.wasm1()); let reqctx = netpod::ReqCtx::new_from_single_reqid(evq.reqid().into()).into(); if evq.create_errors_contains("nodenet_parse_query") { - let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); + let e = Error::DebugTest; return Err(e); } if evq.is_event_blobs() { @@ -226,7 +236,7 @@ async fn events_conn_handler_with_reqid( pub async fn events_get_input_frames(netin: INP) -> Result, Error> where - INP: Stream> + Unpin, + INP: Stream> + Unpin, { let mut h = InMemoryFrameStream::new(netin, netpod::ByteSize::from_kb(8)); let mut frames = Vec::new(); @@ -243,7 +253,7 @@ where debug!("ignored incoming frame {:?}", item); } Err(e) => { - return Err(e); + return Err(e.into()); } } } @@ -254,12 +264,12 @@ pub fn events_parse_input_query(frames: Vec) -> Result<(EventsSub if frames.len() != 1 { error!("{:?}", frames); error!("missing command frame len {}", frames.len()); - let e = Error::with_msg("missing command frame"); + let e = Error::BadQuery; return Err(e); } let query_frame = &frames[0]; if query_frame.tyid() != EVENT_QUERY_JSON_STRING_FRAME { - return Err(Error::with_msg("query frame wrong type")); + return Err(Error::BadQuery); } // TODO this does not need all variants of Sitemty. let qitem = match decode_frame::>(query_frame) { @@ -267,17 +277,17 @@ pub fn events_parse_input_query(frames: Vec) -> Result<(EventsSub Ok(k) => match k { StreamItem::DataItem(k) => match k { RangeCompletableItem::Data(k) => k, - RangeCompletableItem::RangeComplete => return Err(Error::with_msg("bad query item")), + RangeCompletableItem::RangeComplete => return Err(Error::BadQuery), }, - _ => return Err(Error::with_msg("bad query item")), + _ => return Err(Error::BadQuery), }, - Err(e) => return Err(e), + Err(e) => return Err(e.into()), }, - Err(e) => return Err(e), + Err(e) => return Err(e.into()), }; info!("parsing json {:?}", qitem.str()); - let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|e| { - let e = Error::with_msg_no_trace(format!("json parse error: {} inp {}", e, qitem.str())); + let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|_e| { + let e = Error::BadQuery; error!("{e}"); error!("input was {}", qitem.str()); e @@ -293,7 +303,7 @@ async fn events_conn_handler_inner_try( ncc: &NodeConfigCached, ) -> Result<(), ConnErr> where - INP: Stream> + Unpin, + INP: Stream> + Unpin, { let _ = addr; let frames = match events_get_input_frames(netin).await { @@ -320,13 +330,13 @@ async fn events_conn_handler_inner( ncc: &NodeConfigCached, ) -> Result<(), Error> where - INP: Stream> + Unpin, + INP: Stream> + Unpin, { match events_conn_handler_inner_try(netin, netout, addr, scyqueue, ncc).await { Ok(_) => (), Err(ce) => { let mut out = ce.netout; - let item: Sitemty = Err(ce.err); + let item: Sitemty = Err(err::Error::from_string(ce.err)); let buf = Framable::make_frame(&item)?; out.write_all(&buf).await?; } @@ -350,7 +360,7 @@ async fn events_conn_handler( Ok(k) => Ok(k), Err(e) => { error!("events_conn_handler sees error: {:?}", e); - Err(e) + Err(e.into()) } } } diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 5e0c14a..cafd95a 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -1,4 +1,5 @@ -use err::Error; +use err::thiserror; +use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use futures_util::TryStreamExt; @@ -9,16 +10,25 @@ use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ChConf; use query::api4::events::EventsSubQuery; +use scyllaconn::events2::events::EventReadOpts; +use scyllaconn::events2::mergert; use scyllaconn::worker::ScyllaQueue; use scyllaconn::SeriesId; use std::pin::Pin; use taskrun::tokio; +#[derive(Debug, ThisError)] +#[cstm(name = "ScyllaChannelEventStream")] +pub enum Error { + MergeRt(#[from] mergert::Error), +} + pub async fn scylla_channel_event_stream( evq: EventsSubQuery, chconf: ChConf, scyqueue: &ScyllaQueue, ) -> Result> + Send>>, Error> { + debug!("scylla_channel_event_stream {evq:?}"); // TODO depends in general on the query // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. // let do_one_before_range = evq.need_one_before_range(); @@ -27,7 +37,7 @@ pub async fn scylla_channel_event_stream( let scalar_type = chconf.scalar_type(); let shape = chconf.shape(); let do_test_stream_error = false; - let with_values = evq.need_value_data(); + let readopts = EventReadOpts::new(evq.need_value_data(), evq.transform().enum_as_string().unwrap_or(false)); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { let x = scyllaconn::events2::events::EventsStreamRt::new( rt, @@ -35,7 +45,7 @@ pub async fn scylla_channel_event_stream( scalar_type.clone(), shape.clone(), evq.range().into(), - with_values, + readopts, scyqueue.clone(), ) .map_err(|e| scyllaconn::events2::mergert::Error::from(e)); @@ -46,7 +56,7 @@ pub async fn scylla_channel_event_stream( scalar_type.clone(), shape.clone(), evq.range().into(), - with_values, + readopts, scyqueue.clone(), ); Box::pin(x) @@ -84,7 +94,10 @@ pub async fn scylla_channel_event_stream( item } }, - Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn events error {e}"))), + Err(e) => Err(err::Error::with_msg_no_trace(format!( + "{}::scylla_channel_event_stream {e}", + module_path!() + ))), }; item }); diff --git a/crates/query/src/transform.rs b/crates/query/src/transform.rs index c76a4fa..2b71c6e 100644 --- a/crates/query/src/transform.rs +++ b/crates/query/src/transform.rs @@ -53,6 +53,8 @@ impl TimeBinningTransformQuery { pub struct TransformQuery { event: EventTransformQuery, time_binning: TimeBinningTransformQuery, + // #[serde(default, skip_serializing_if = "Option::is_none")] + enum_as_string: Option, } impl TransformQuery { @@ -64,6 +66,7 @@ impl TransformQuery { Self { event: EventTransformQuery::ValueFull, time_binning: TimeBinningTransformQuery::None, + enum_as_string: None, } } @@ -71,6 +74,7 @@ impl TransformQuery { Self { event: EventTransformQuery::MinMaxAvgDev, time_binning: TimeBinningTransformQuery::TimeWeighted, + enum_as_string: None, } } @@ -86,6 +90,7 @@ impl TransformQuery { Self { event: EventTransformQuery::EventBlobsVerbatim, time_binning: TimeBinningTransformQuery::None, + enum_as_string: None, } } @@ -93,6 +98,7 @@ impl TransformQuery { Self { event: EventTransformQuery::MinMaxAvgDev, time_binning: TimeBinningTransformQuery::TimeWeighted, + enum_as_string: None, } } @@ -101,6 +107,7 @@ impl TransformQuery { event: EventTransformQuery::PulseIdDiff, // TODO probably we want unweighted binning here. time_binning: TimeBinningTransformQuery::TimeWeighted, + enum_as_string: None, } } @@ -137,6 +144,10 @@ impl TransformQuery { pub fn get_tr_time_binning(&self) -> &TimeBinningTransformQuery { &self.time_binning } + + pub fn enum_as_string(&self) -> Option { + self.enum_as_string.clone() + } } impl FromUrl for TransformQuery { @@ -146,6 +157,20 @@ impl FromUrl for TransformQuery { } fn from_pairs(pairs: &BTreeMap) -> Result { + let enum_as_string = if let Some(k) = pairs.get("enumAsString") { + Some( + k.parse() + .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse enumAsString: {}", k)))?, + ) + } else { + None + }; + // enum_string: Ok(pairs.get("enumString")).and_then(|x| { + // x.map_or(Ok(None), |k| { + // k.parse() + // .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse enumString: {}", k)))? + // }) + // })?, let upre = Self::url_prefix(); let key = "binningScheme"; if let Some(s) = pairs.get(key) { @@ -153,21 +178,25 @@ impl FromUrl for TransformQuery { TransformQuery { event: EventTransformQuery::EventBlobsVerbatim, time_binning: TimeBinningTransformQuery::None, + enum_as_string, } } else if s == "fullValue" { TransformQuery { event: EventTransformQuery::ValueFull, time_binning: TimeBinningTransformQuery::None, + enum_as_string, } } else if s == "timeWeightedScalar" { TransformQuery { event: EventTransformQuery::MinMaxAvgDev, time_binning: TimeBinningTransformQuery::TimeWeighted, + enum_as_string, } } else if s == "unweightedScalar" { TransformQuery { event: EventTransformQuery::ValueFull, time_binning: TimeBinningTransformQuery::None, + enum_as_string, } } else if s == "binnedX" { let _u: usize = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; @@ -175,11 +204,13 @@ impl FromUrl for TransformQuery { TransformQuery { event: EventTransformQuery::MinMaxAvgDev, time_binning: TimeBinningTransformQuery::None, + enum_as_string, } } else if s == "pulseIdDiff" { TransformQuery { event: EventTransformQuery::PulseIdDiff, time_binning: TimeBinningTransformQuery::None, + enum_as_string, } } else { return Err(Error::with_msg("can not extract binningScheme")); @@ -197,6 +228,7 @@ impl FromUrl for TransformQuery { let ret = TransformQuery { event: EventTransformQuery::ValueFull, time_binning: TimeBinningTransformQuery::None, + enum_as_string, }; Ok(ret) } @@ -235,5 +267,10 @@ impl AppendToUrl for TransformQuery { g.append_pair(key, &format!("{}", "pulseIdDiff")); } } + if let Some(x) = self.enum_as_string { + if x { + g.append_pair("enumAsString", "true"); + } + } } } diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 265ee55..41b6194 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -18,3 +18,4 @@ query = { path = "../query" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } series = { path = "../../../daqingest/series" } +taskrun = { path = "../taskrun" } diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 06e41ae..23fe67c 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -1,3 +1,5 @@ +#![allow(unused)] + use crate::errconv::ErrConv; use err::Error; use futures_util::Future; diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index d7e7968..f2e42f3 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -1,3 +1,4 @@ +use crate::events2::events::EventReadOpts; use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; @@ -32,6 +33,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use tracing::Instrument; #[derive(Debug, ThisError)] #[cstm(name = "ScyllaReadEvents")] @@ -45,6 +47,7 @@ pub enum Error { RangeEndOverflow, InvalidFuture, TestError(String), + Logic, } impl From for Error { @@ -63,6 +66,19 @@ pub(super) trait ValTy: Sized + 'static { fn default() -> Self; fn is_valueblob() -> bool; fn st_name() -> &'static str; + fn read_next_values( + opts: ReadNextValuesOpts, + scy: Arc, + stmts: Arc, + ) -> Pin, Error>> + Send>>; + fn convert_rows( + rows: Vec, + range: ScyllaSeriesRange, + ts_msp: TsMs, + with_values: bool, + bck: bool, + last_before: &mut Option<(TsNano, Self)>, + ) -> Result; } macro_rules! impl_scaty_scalar { @@ -71,24 +87,49 @@ macro_rules! impl_scaty_scalar { type ScaTy = $st; type ScyTy = $st_scy; type Container = EventsDim0; + fn from_scyty(inp: Self::ScyTy) -> Self { inp as Self } + fn from_valueblob(_inp: Vec) -> Self { ::default() } + fn table_name() -> &'static str { concat!("scalar_", $table_name) } + fn default() -> Self { ::default() } + fn is_valueblob() -> bool { false } + fn st_name() -> &'static str { $st_name } + + fn read_next_values( + opts: ReadNextValuesOpts, + scy: Arc, + stmts: Arc, + ) -> Pin, Error>> + Send>> { + Box::pin(read_next_values_2::(opts, scy, stmts)) + } + + fn convert_rows( + rows: Vec, + range: ScyllaSeriesRange, + ts_msp: TsMs, + with_values: bool, + bck: bool, + last_before: &mut Option<(TsNano, Self)>, + ) -> Result { + convert_rows_0::(rows, range, ts_msp, with_values, bck, last_before) + } } }; } @@ -99,9 +140,11 @@ macro_rules! impl_scaty_array { type ScaTy = $st; type ScyTy = $st_scy; type Container = EventsDim1; + fn from_scyty(inp: Self::ScyTy) -> Self { inp.into_iter().map(|x| x as Self::ScaTy).collect() } + fn from_valueblob(inp: Vec) -> Self { if inp.len() < 32 { ::default() @@ -118,18 +161,41 @@ macro_rules! impl_scaty_array { c } } + fn table_name() -> &'static str { concat!("array_", $table_name) } + fn default() -> Self { Vec::new() } + fn is_valueblob() -> bool { true } + fn st_name() -> &'static str { $st_name } + + fn read_next_values( + opts: ReadNextValuesOpts, + scy: Arc, + stmts: Arc, + ) -> Pin, Error>> + Send>> { + Box::pin(read_next_values_2::(opts, scy, stmts)) + } + + fn convert_rows( + rows: Vec, + range: ScyllaSeriesRange, + ts_msp: TsMs, + with_values: bool, + bck: bool, + last_before: &mut Option<(TsNano, Self)>, + ) -> Result { + convert_rows_0::(rows, range, ts_msp, with_values, bck, last_before) + } } }; } @@ -164,6 +230,26 @@ impl ValTy for EnumVariant { fn st_name() -> &'static str { "enum" } + + fn read_next_values( + opts: ReadNextValuesOpts, + scy: Arc, + stmts: Arc, + ) -> Pin, Error>> + Send>> { + let fut = read_next_values_2::(opts, scy, stmts); + Box::pin(fut) + } + + fn convert_rows( + rows: Vec, + range: ScyllaSeriesRange, + ts_msp: TsMs, + with_values: bool, + bck: bool, + last_before: &mut Option<(TsNano, Self)>, + ) -> Result { + convert_rows_enum(rows, range, ts_msp, with_values, bck, last_before) + } } impl ValTy for Vec { @@ -196,6 +282,26 @@ impl ValTy for Vec { fn st_name() -> &'static str { "string" } + + fn read_next_values( + opts: ReadNextValuesOpts, + scy: Arc, + stmts: Arc, + ) -> Pin, Error>> + Send>> { + let fut = read_next_values_2::(opts, scy, stmts); + Box::pin(fut) + } + + fn convert_rows( + rows: Vec, + range: ScyllaSeriesRange, + ts_msp: TsMs, + with_values: bool, + bck: bool, + last_before: &mut Option<(TsNano, Self)>, + ) -> Result { + convert_rows_0::(rows, range, ts_msp, with_values, bck, last_before) + } } impl_scaty_scalar!(u8, i8, "u8", "u8"); @@ -230,7 +336,7 @@ pub(super) struct ReadNextValuesOpts { ts_msp: TsMs, range: ScyllaSeriesRange, fwd: bool, - with_values: bool, + readopts: EventReadOpts, scyqueue: ScyllaQueue, } @@ -241,7 +347,7 @@ impl ReadNextValuesOpts { ts_msp: TsMs, range: ScyllaSeriesRange, fwd: bool, - with_values: bool, + readopts: EventReadOpts, scyqueue: ScyllaQueue, ) -> Self { Self { @@ -250,7 +356,7 @@ impl ReadNextValuesOpts { ts_msp, range, fwd, - with_values, + readopts, scyqueue, } } @@ -262,10 +368,18 @@ where { // TODO could take scyqeue out of opts struct. let scyqueue = opts.scyqueue.clone(); - debug!("bbbbbbbbbbbbbbbbbbbbbbbbbbbb"); - let futgen = Box::new(|scy: Arc, stmts: Arc| { - let fut = async { - read_next_values_2::(opts, scy, stmts) + let level = taskrun::query_log_level(); + let futgen = Box::new(move |scy: Arc, stmts: Arc| { + let fut = async move { + let logspan = if level == Level::DEBUG { + tracing::span!(Level::INFO, "log_span_debug") + } else if level == Level::TRACE { + tracing::span!(Level::INFO, "log_span_trace") + } else { + tracing::Span::none() + }; + ST::read_next_values(opts, scy, stmts) + .instrument(logspan) .await .map_err(crate::worker::Error::from) }; @@ -283,17 +397,12 @@ async fn read_next_values_2( where ST: ValTy, { - debug!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa {opts:?}"); - trace!( - "read_next_values_2 {} {} st_name {}", - opts.series, - opts.ts_msp, - ST::st_name() - ); + trace!("read_next_values_2 {:?} st_name {}", opts, ST::st_name()); let series = opts.series; let ts_msp = opts.ts_msp; let range = opts.range; let table_name = ST::table_name(); + let with_values = opts.readopts.with_values; if range.end() > TsNano::from_ns(i64::MAX as u64) { return Err(Error::RangeEndOverflow); } @@ -317,7 +426,7 @@ where ); let qu = stmts .rt(&opts.rt) - .lsp(!opts.fwd, opts.with_values) + .lsp(!opts.fwd, with_values) .shape(ST::is_valueblob()) .st(ST::st_name())?; let params = ( @@ -333,7 +442,7 @@ where rows.push(x?); } let mut last_before = None; - let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut last_before)?; + let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?; ret } else { let ts_lsp_max = if ts_msp.ns() < range.beg() { @@ -344,7 +453,7 @@ where trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,); let qu = stmts .rt(&opts.rt) - .lsp(!opts.fwd, opts.with_values) + .lsp(!opts.fwd, with_values) .shape(ST::is_valueblob()) .st(ST::st_name())?; let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); @@ -355,7 +464,7 @@ where rows.push(x?); } let mut _last_before = None; - let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut _last_before)?; + let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut _last_before)?; if ret.len() > 1 { error!("multiple events in backwards search {}", ret.len()); } @@ -366,478 +475,116 @@ where Ok(ret) } -fn convert_rows( +fn convert_rows_0( rows: Vec, range: ScyllaSeriesRange, ts_msp: TsMs, with_values: bool, bck: bool, - last_before: &mut Option<(TsNano, u64, ST)>, + last_before: &mut Option<(TsNano, ST)>, ) -> Result<::Container, Error> { let mut ret = ::Container::empty(); for row in rows { - let (ts, pulse, value) = if with_values { + let (ts, value) = if with_values { if ST::is_valueblob() { - let row: (i64, i64, Vec) = row.into_typed()?; - trace!("read a value blob len {}", row.2.len()); + let row: (i64, Vec) = row.into_typed()?; + // trace!("read a value blob len {}", row.1.len()); let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let pulse = row.1 as u64; - let value = ValTy::from_valueblob(row.2); - (ts, pulse, value) + let value = ValTy::from_valueblob(row.1); + (ts, value) } else { - let row: (i64, i64, ST::ScyTy) = row.into_typed()?; + let row: (i64, ST::ScyTy) = row.into_typed()?; let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let pulse = row.1 as u64; - let value = ValTy::from_scyty(row.2); - (ts, pulse, value) + let value = ValTy::from_scyty(row.1); + (ts, value) } } else { - let row: (i64, i64) = row.into_typed()?; + let row: (i64,) = row.into_typed()?; let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let pulse = row.1 as u64; let value = ValTy::default(); - (ts, pulse, value) + (ts, value) }; if bck { if ts >= range.beg() { // TODO count as logic error error!("ts >= range.beg"); } else if ts < range.beg() { - ret.push(ts.ns(), pulse, value); + ret.push(ts.ns(), 0, value); } else { - *last_before = Some((ts, pulse, value)); + *last_before = Some((ts, value)); } } else { if ts >= range.end() { // TODO count as logic error error!("ts >= range.end"); } else if ts >= range.beg() { - ret.push(ts.ns(), pulse, value); + ret.push(ts.ns(), 0, value); } else { if last_before.is_none() { warn!("encounter event before range in forward read {ts}"); } - *last_before = Some((ts, pulse, value)); + *last_before = Some((ts, value)); } } } Ok(ret) } -pub(super) struct ReadValues { - rt: RetentionTime, - series: u64, - scalar_type: ScalarType, - shape: Shape, +fn convert_rows_enum( + rows: Vec, range: ScyllaSeriesRange, - ts_msps: VecDeque, - fwd: bool, + ts_msp: TsMs, with_values: bool, - fut: Pin, Error>> + Send>>, - fut_done: bool, - scyqueue: ScyllaQueue, -} - -impl ReadValues { - pub(super) fn new( - rt: RetentionTime, - series: u64, - scalar_type: ScalarType, - shape: Shape, - range: ScyllaSeriesRange, - ts_msps: VecDeque, - fwd: bool, - with_values: bool, - scyqueue: ScyllaQueue, - ) -> Self { - let mut ret = Self { - rt, - series, - scalar_type, - shape, - range, - ts_msps, - fwd, - with_values, - fut: Box::pin(futures_util::future::ready(Err(Error::InvalidFuture))), - fut_done: false, - scyqueue, - }; - ret.next(); - ret - } - - fn next(&mut self) -> bool { - if let Some(ts_msp) = self.ts_msps.pop_front() { - self.fut = self.make_fut(ts_msp); - self.fut_done = false; - true - } else { - false - } - } - - fn make_fut(&mut self, ts_msp: TsMs) -> Pin, Error>> + Send>> { - let opts = ReadNextValuesOpts { - rt: self.rt.clone(), - series: self.series.clone(), - ts_msp, - range: self.range.clone(), - fwd: self.fwd, - with_values: self.with_values, - scyqueue: self.scyqueue.clone(), - }; - let scalar_type = self.scalar_type.clone(); - let shape = self.shape.clone(); - let fut = async move { - match &shape { - Shape::Scalar => match &scalar_type { - ScalarType::U8 => read_next_values::(opts).await, - ScalarType::U16 => read_next_values::(opts).await, - ScalarType::U32 => read_next_values::(opts).await, - ScalarType::U64 => read_next_values::(opts).await, - ScalarType::I8 => read_next_values::(opts).await, - ScalarType::I16 => read_next_values::(opts).await, - ScalarType::I32 => read_next_values::(opts).await, - ScalarType::I64 => read_next_values::(opts).await, - ScalarType::F32 => read_next_values::(opts).await, - ScalarType::F64 => read_next_values::(opts).await, - ScalarType::BOOL => read_next_values::(opts).await, - ScalarType::STRING => read_next_values::(opts).await, - ScalarType::Enum => read_next_values::(opts).await, - ScalarType::ChannelStatus => { - warn!("read scalar channel status not yet supported"); - err::todoval() - } - }, - Shape::Wave(_) => match &scalar_type { - ScalarType::U8 => read_next_values::>(opts).await, - ScalarType::U16 => read_next_values::>(opts).await, - ScalarType::U32 => read_next_values::>(opts).await, - ScalarType::U64 => read_next_values::>(opts).await, - ScalarType::I8 => read_next_values::>(opts).await, - ScalarType::I16 => read_next_values::>(opts).await, - ScalarType::I32 => read_next_values::>(opts).await, - ScalarType::I64 => read_next_values::>(opts).await, - ScalarType::F32 => read_next_values::>(opts).await, - ScalarType::F64 => read_next_values::>(opts).await, - ScalarType::BOOL => read_next_values::>(opts).await, - ScalarType::STRING => { - warn!("read array string not yet supported"); - err::todoval() - } - ScalarType::Enum => read_next_values::>(opts).await, - ScalarType::ChannelStatus => { - warn!("read array channel status not yet supported"); - err::todoval() - } - }, - _ => { - error!("TODO ReadValues add more types"); - err::todoval() + bck: bool, + last_before: &mut Option<(TsNano, EnumVariant)>, +) -> Result<::Container, Error> { + let mut ret = ::Container::empty(); + for row in rows { + let (ts, value) = if with_values { + if EnumVariant::is_valueblob() { + if true { + return Err(Error::Logic); } - } - }; - Box::pin(fut) - } -} - -enum FrState { - New, - FindMsp(Pin, crate::worker::Error>> + Send>>), - ReadBack1(ReadValues), - ReadBack2(ReadValues), - ReadValues(ReadValues), - DataDone, - Done, -} - -pub struct EventsStreamScylla { - rt: RetentionTime, - state: FrState, - series: u64, - scalar_type: ScalarType, - shape: Shape, - range: ScyllaSeriesRange, - do_one_before_range: bool, - ts_msp_bck: VecDeque, - ts_msp_fwd: VecDeque, - scyqueue: ScyllaQueue, - do_test_stream_error: bool, - found_one_after: bool, - with_values: bool, - outqueue: VecDeque>, - ts_seen_max: u64, -} - -impl EventsStreamScylla { - pub fn _new( - rt: RetentionTime, - series: u64, - range: ScyllaSeriesRange, - do_one_before_range: bool, - scalar_type: ScalarType, - shape: Shape, - with_values: bool, - scyqueue: ScyllaQueue, - do_test_stream_error: bool, - ) -> Self { - debug!("EventsStreamScylla::new"); - Self { - rt, - state: FrState::New, - series, - scalar_type, - shape, - range, - do_one_before_range, - ts_msp_bck: VecDeque::new(), - ts_msp_fwd: VecDeque::new(), - scyqueue, - do_test_stream_error, - found_one_after: false, - with_values, - outqueue: VecDeque::new(), - ts_seen_max: 0, - } - } - - fn ts_msps_found(&mut self, msps1: VecDeque, msps2: VecDeque) { - trace!("ts_msps_found msps1 {msps1:?} msps2 {msps2:?}"); - self.ts_msp_bck = msps1; - self.ts_msp_fwd = msps2; - for x in self.ts_msp_bck.iter().rev() { - let x = x.clone(); - if x.ns() >= self.range.end() { - info!("FOUND one-after because of MSP"); - self.found_one_after = true; - } - self.ts_msp_fwd.push_front(x); - } - trace!("ts_msp_bck {:?}", self.ts_msp_bck); - trace!("ts_msp_fwd {:?}", self.ts_msp_fwd); - if let Some(msp) = self.ts_msp_bck.pop_back() { - trace!("start ReadBack1 msp {}", msp); - let st = ReadValues::new( - self.rt.clone(), - self.series, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - [msp].into(), - false, - self.with_values, - self.scyqueue.clone(), - ); - self.state = FrState::ReadBack1(st); - } else if self.ts_msp_fwd.len() > 0 { - trace!("begin immediately with forward read"); - let st = ReadValues::new( - self.rt.clone(), - self.series, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), - true, - self.with_values, - self.scyqueue.clone(), - ); - self.state = FrState::ReadValues(st); - } else { - self.state = FrState::DataDone; - } - } - - fn back_1_done(&mut self, item: Box) { - trace!("back_1_done item len {}", item.len()); - if item.len() > 0 { - self.outqueue.push_back(item); - if self.ts_msp_fwd.len() > 0 { - trace!("start forward read after back1"); - let st = ReadValues::new( - self.rt.clone(), - self.series, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), - true, - self.with_values, - self.scyqueue.clone(), - ); - self.state = FrState::ReadValues(st); + let row: (i64, Vec) = row.into_typed()?; + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ValTy::from_valueblob(row.1); + (ts, value) } else { - self.state = FrState::DataDone; + let row: (i64, i16, String) = row.into_typed()?; + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let val = row.1 as u16; + let valstr = row.2; + let value = EnumVariant::new(val, valstr.into()); + (ts, value) } } else { - if let Some(msp) = self.ts_msp_bck.pop_back() { - trace!("start ReadBack2 msp {}", msp); - let st = ReadValues::new( - self.rt.clone(), - self.series, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - [msp].into(), - false, - self.with_values, - self.scyqueue.clone(), - ); - self.state = FrState::ReadBack2(st); - } else if self.ts_msp_fwd.len() > 0 { - trace!("no 2nd back MSP, go for forward read"); - let st = ReadValues::new( - self.rt.clone(), - self.series, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), - true, - self.with_values, - self.scyqueue.clone(), - ); - self.state = FrState::ReadValues(st); + let row: (i64,) = row.into_typed()?; + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ValTy::default(); + (ts, value) + }; + if bck { + if ts >= range.beg() { + // TODO count as logic error + error!("ts >= range.beg"); + } else if ts < range.beg() { + ret.push(ts.ns(), 0, value); } else { - trace!("no 2nd back msp, but also nothing to go forward"); - self.state = FrState::DataDone; + *last_before = Some((ts, value)); } - } - } - - fn back_2_done(&mut self, item: Box) { - trace!("back_2_done item len {}", item.len()); - if item.len() > 0 { - self.outqueue.push_back(item); - } - if self.ts_msp_fwd.len() > 0 { - trace!("start forward read after back2"); - let st = ReadValues::new( - self.rt.clone(), - self.series, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), - true, - self.with_values, - self.scyqueue.clone(), - ); - self.state = FrState::ReadValues(st); } else { - trace!("nothing to forward read after back 2"); - self.state = FrState::DataDone; - } - } -} - -impl Stream for EventsStreamScylla { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.do_test_stream_error { - let e = Error::TestError("test-message".into()); - return Ready(Some(Err(e))); - } - loop { - if let Some(item) = self.outqueue.pop_front() { - item.verify(); - if let Some(item_min) = item.ts_min() { - if item_min < self.ts_seen_max { - debug!("ordering error A {} {}", item_min, self.ts_seen_max); - } + if ts >= range.end() { + // TODO count as logic error + error!("ts >= range.end"); + } else if ts >= range.beg() { + ret.push(ts.ns(), 0, value); + } else { + if last_before.is_none() { + warn!("encounter event before range in forward read {ts}"); } - if let Some(item_max) = item.ts_max() { - if item_max < self.ts_seen_max { - debug!("ordering error B {} {}", item_max, self.ts_seen_max); - } else { - self.ts_seen_max = item_max; - } - } - debug!("deliver item {}", item.output_info()); - break Ready(Some(Ok(ChannelEvents::Events(item)))); + *last_before = Some((ts, value)); } - break match self.state { - FrState::New => { - let series = self.series.clone(); - let range = self.range.clone(); - // TODO this no longer works, we miss the backwards part here - // let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone()); - // let fut = Box::pin(fut); - let fut = todo!(); - self.state = FrState::FindMsp(fut); - continue; - } - FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok(msps)) => { - self.ts_msps_found(VecDeque::new(), msps); - continue; - } - Ready(Err(e)) => { - error!("EventsStreamScylla FindMsp {e}"); - self.state = FrState::DataDone; - Ready(Some(Err(e.into()))) - } - Pending => Pending, - }, - FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(Ok(item)) => { - st.fut_done = true; - self.back_1_done(item); - continue; - } - Ready(Err(e)) => { - error!("EventsStreamScylla ReadBack1 {e}"); - st.fut_done = true; - self.state = FrState::DataDone; - Ready(Some(Err(e))) - } - Pending => Pending, - }, - FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(Ok(item)) => { - st.fut_done = true; - self.back_2_done(item); - continue; - } - Ready(Err(e)) => { - error!("EventsStreamScylla ReadBack2 {e}"); - st.fut_done = true; - self.state = FrState::DataDone; - Ready(Some(Err(e))) - } - Pending => Pending, - }, - FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(Ok(item)) => { - st.fut_done = true; - if !st.next() { - trace!("ReadValues exhausted"); - self.state = FrState::DataDone; - } - if item.len() > 0 { - self.outqueue.push_back(item); - } - continue; - } - Ready(Err(e)) => { - error!("EventsStreamScylla ReadValues {e}"); - st.fut_done = true; - Ready(Some(Err(e))) - } - Pending => Pending, - }, - FrState::DataDone => { - if self.found_one_after { - // TODO emit RangeComplete - } - self.state = FrState::Done; - continue; - } - FrState::Done => Ready(None), - }; } } + Ok(ret) } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index d976077..da347a6 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -41,6 +41,21 @@ macro_rules! warn_item { }; } +#[derive(Debug, Clone)] +pub struct EventReadOpts { + pub with_values: bool, + pub enum_as_strings: bool, +} + +impl EventReadOpts { + pub fn new(with_values: bool, enum_as_strings: bool) -> Self { + Self { + with_values, + enum_as_strings, + } + } +} + #[derive(Debug, ThisError)] #[cstm(name = "ScyllaEvents")] pub enum Error { @@ -86,7 +101,7 @@ pub struct EventsStreamRt { scalar_type: ScalarType, shape: Shape, range: ScyllaSeriesRange, - with_values: bool, + readopts: EventReadOpts, state: State, scyqueue: ScyllaQueue, msp_inp: MspStreamRt, @@ -101,10 +116,10 @@ impl EventsStreamRt { scalar_type: ScalarType, shape: Shape, range: ScyllaSeriesRange, - with_values: bool, + readopts: EventReadOpts, scyqueue: ScyllaQueue, ) -> Self { - debug!("EventsStreamRt::new {series:?} {range:?} {rt:?}"); + debug!("EventsStreamRt::new {series:?} {range:?} {rt:?} {readopts:?}"); let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series.clone(), range.clone(), scyqueue.clone()); Self { @@ -113,7 +128,7 @@ impl EventsStreamRt { scalar_type, shape, range, - with_values, + readopts, state: State::Begin, scyqueue, msp_inp, @@ -140,7 +155,7 @@ impl EventsStreamRt { ts_msp, self.range.clone(), fwd, - self.with_values, + self.readopts.clone(), scyqueue, ); let scalar_type = self.scalar_type.clone(); diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs index a54465c..6eff0d2 100644 --- a/crates/scyllaconn/src/events2/mergert.rs +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -1,3 +1,4 @@ +use super::events::EventReadOpts; use super::events::EventsStreamRt; use super::firstbefore::FirstBeforeAndInside; use crate::events2::firstbefore; @@ -30,6 +31,8 @@ pub enum Error { Input(#[from] crate::events2::firstbefore::Error), Events(#[from] crate::events2::events::Error), Logic, + OrderMin, + OrderMax, } enum Resolvable @@ -103,7 +106,7 @@ pub struct MergeRts { range: ScyllaSeriesRange, range_mt: ScyllaSeriesRange, range_lt: ScyllaSeriesRange, - with_values: bool, + readopts: EventReadOpts, scyqueue: ScyllaQueue, inp_st: Option>, inp_mt: Option>, @@ -123,7 +126,7 @@ impl MergeRts { scalar_type: ScalarType, shape: Shape, range: ScyllaSeriesRange, - with_values: bool, + readopts: EventReadOpts, scyqueue: ScyllaQueue, ) -> Self { Self { @@ -133,7 +136,7 @@ impl MergeRts { range_mt: range.clone(), range_lt: range.clone(), range, - with_values, + readopts, scyqueue, inp_st: None, inp_mt: None, @@ -161,7 +164,7 @@ impl MergeRts { self.scalar_type.clone(), self.shape.clone(), range, - self.with_values, + self.readopts.clone(), self.scyqueue.clone(), ); let inp = TI::new(inp, tsbeg); @@ -182,7 +185,7 @@ impl MergeRts { self.scalar_type.clone(), self.shape.clone(), range, - self.with_values, + self.readopts.clone(), self.scyqueue.clone(), ); let inp = TI::new(inp, tsbeg); @@ -202,7 +205,7 @@ impl MergeRts { self.scalar_type.clone(), self.shape.clone(), range, - self.with_values, + self.readopts.clone(), self.scyqueue.clone(), ); let inp = TI::new(inp, tsbeg); @@ -309,7 +312,7 @@ impl Stream for MergeRts { "\n\n--------------------------\n", item_min, self.ts_seen_max ); self.state = State::Done; - break Ready(Some(Err(Error::Logic))); + break Ready(Some(Err(Error::OrderMin))); } } if let Some(item_max) = item.ts_max() { @@ -319,7 +322,7 @@ impl Stream for MergeRts { "\n\n--------------------------\n", item_max, self.ts_seen_max ); self.state = State::Done; - break Ready(Some(Err(Error::Logic))); + break Ready(Some(Err(Error::OrderMax))); } else { self.ts_seen_max = item_max; } diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 4319a14..661ec6d 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -31,6 +31,7 @@ pub struct StmtsLspShape { f64: PreparedStatement, bool: PreparedStatement, string: PreparedStatement, + enumvals: PreparedStatement, } impl StmtsLspShape { @@ -48,6 +49,7 @@ impl StmtsLspShape { "f64" => &self.f64, "bool" => &self.bool, "string" => &self.string, + "enum" => &self.enumvals, _ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))), }; Ok(ret) @@ -182,6 +184,15 @@ async fn make_lsp_shape( f64: maker("f64").await?, bool: maker("bool").await?, string: maker("string").await?, + enumvals: if shapepre == "scalar" { + make_lsp(ks, rt, shapepre, "enum", "ts_lsp, value, valuestr", bck, scy).await? + } else { + // exists only for scalar, therefore produce some dummy here + let table_name = "ts_msp"; + let cql = format!("select ts_msp from {}.{}{} limit 1", ks, rt.table_prefix(), table_name); + let qu = scy.prepare(cql).await?; + qu + }, }; Ok(ret) } @@ -204,10 +215,10 @@ async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result x, @@ -166,12 +170,14 @@ impl ScyllaWorker { }; match job { Job::FindTsMsp(rt, series, range, bck, tx) => { + info!("scylla worker Job::FindTsMsp"); let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; if tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats } } Job::ReadNextValues(job) => { + info!("scylla worker Job::ReadNextValues"); let fut = (job.futgen)(scy.clone(), stmts.clone()); let res = fut.await; if job.tx.send(res.map_err(Into::into)).await.is_err() { @@ -179,6 +185,7 @@ impl ScyllaWorker { } } Job::AccountingReadTs(rt, ts, tx) => { + info!("scylla worker Job::AccountingReadTs"); let ks = match &rt { RetentionTime::Short => &self.scyconf_st.keyspace, RetentionTime::Medium => &self.scyconf_mt.keyspace, @@ -191,5 +198,6 @@ impl ScyllaWorker { } } } + info!("scylla worker ended"); } } diff --git a/crates/streams/src/frames/inmem.rs b/crates/streams/src/frames/inmem.rs index 26698e4..b8fe29e 100644 --- a/crates/streams/src/frames/inmem.rs +++ b/crates/streams/src/frames/inmem.rs @@ -44,7 +44,7 @@ impl Stream for TcpReadAsBytes where INP: AsyncRead + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/crates/streams/src/json_stream.rs b/crates/streams/src/json_stream.rs index e1b7989..72bcff3 100644 --- a/crates/streams/src/json_stream.rs +++ b/crates/streams/src/json_stream.rs @@ -1,5 +1,4 @@ use crate::cbor_stream::SitemtyDynEventsStream; -use bytes::Bytes; use err::Error; use futures_util::Stream; use futures_util::StreamExt; @@ -11,10 +10,14 @@ use netpod::log::*; use std::pin::Pin; use std::time::Duration; -pub struct JsonBytes(Bytes); +pub struct JsonBytes(String); impl JsonBytes { - pub fn into_inner(self) -> Bytes { + pub fn new>(s: S) -> Self { + Self(s.into()) + } + + pub fn into_inner(self) -> String { self.0 } @@ -29,7 +32,7 @@ impl WithLen for JsonBytes { } } -impl From for Bytes { +impl From for String { fn from(value: JsonBytes) -> Self { value.0 } @@ -55,29 +58,27 @@ fn map_events(x: Result>>, Error Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { - let buf = evs.to_json_vec_u8(); - let bytes = Bytes::from(buf); - let item = JsonBytes(bytes); + let s = evs.to_json_string(); + let item = JsonBytes::new(s); Ok(item) } RangeCompletableItem::RangeComplete => { let item = serde_json::json!({ "rangeFinal": true, }); - let buf = serde_json::to_vec(&item)?; - let bytes = Bytes::from(buf); - let item = JsonBytes(bytes); + let s = serde_json::to_string(&item)?; + let item = JsonBytes::new(s); Ok(item) } }, StreamItem::Log(item) => { info!("{item:?}"); - let item = JsonBytes(Bytes::new()); + let item = JsonBytes::new(String::new()); Ok(item) } StreamItem::Stats(item) => { info!("{item:?}"); - let item = JsonBytes(Bytes::new()); + let item = JsonBytes::new(String::new()); Ok(item) } }, @@ -85,9 +86,8 @@ fn map_events(x: Result>>, Error let item = serde_json::json!({ "error": e.to_string(), }); - let buf = serde_json::to_vec(&item)?; - let bytes = Bytes::from(buf); - let item = JsonBytes(bytes); + let s = serde_json::to_string(&item)?; + let item = JsonBytes::new(s); Ok(item) } } @@ -97,8 +97,7 @@ fn make_keepalive() -> Result { let item = serde_json::json!({ "type": "keepalive", }); - let buf = serde_json::to_vec(&item).unwrap(); - let bytes = Bytes::from(buf); - let item = Ok(JsonBytes(bytes)); + let s = serde_json::to_string(&item).unwrap(); + let item = Ok(JsonBytes::new(s)); item } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index caf7677..8afa46b 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -24,7 +24,6 @@ use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::BinnedRangeEnum; use netpod::ChannelTypeConfigGen; -use netpod::Cluster; use netpod::ReqCtx; use query::api4::binned::BinnedQuery; use serde_json::Value as JsonValue; @@ -54,7 +53,7 @@ async fn timebinnable_stream( ctx, ); let inmem_bufcap = subq.inmem_bufcap(); - let wasm1 = subq.wasm1().map(ToString::to_string); + let _wasm1 = subq.wasm1().map(ToString::to_string); let mut tr = build_merged_event_transform(subq.transform())?; let bytes_streams = open_bytes.open(subq, ctx.clone()).await?; let mut inps = Vec::new(); diff --git a/crates/taskrun/src/formatter.rs b/crates/taskrun/src/formatter.rs index ae69c29..2a5079d 100644 --- a/crates/taskrun/src/formatter.rs +++ b/crates/taskrun/src/formatter.rs @@ -56,31 +56,35 @@ where write!(writer, " {:>5} ", meta.level().as_str())?; - writer.write_str("[THR ")?; - let current_thread = std::thread::current(); - match current_thread.name() { - Some(name) => { - let n = name.len(); - let max = 32; - if n > max { - let pre = 3; - writer.write_str(&name[0..3])?; - writer.write_char('.')?; - writer.write_str(&name[name.len() + 1 + pre - max..])?; - } else { - writer.write_str(name)?; + if false { + writer.write_str("[THR ")?; + let current_thread = std::thread::current(); + match current_thread.name() { + Some(name) => { + let n = name.len(); + let max = 32; + if n > max { + let pre = 3; + writer.write_str(&name[0..3])?; + writer.write_char('.')?; + writer.write_str(&name[name.len() + 1 + pre - max..])?; + } else { + writer.write_str(name)?; + } + } + None => { + // write!(writer, "{:0>2?} ", current_thread.id())?; + write!(writer, "{:?} ", current_thread.id())?; } } - None => { - // write!(writer, "{:0>2?} ", current_thread.id())?; - write!(writer, "{:?} ", current_thread.id())?; - } + writer.write_char(' ')?; } - writer.write_char(' ')?; - writer.write_str("[TGT ")?; - writer.write_str(meta.target())?; - writer.write_char(' ')?; + { + writer.write_str("[TGT ")?; + writer.write_str(meta.target())?; + writer.write_char(' ')?; + } writer.write_str("[SCP ")?; if let Some(sc) = ctx.event_scope() { diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 41c267f..4863bcd 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -1,6 +1,8 @@ pub mod formatter; pub use tokio; +pub use tracing; +pub use tracing_subscriber; use crate::log::*; // use console_subscriber::ConsoleLayer; @@ -144,6 +146,7 @@ fn collect_env_list(env: &str) -> Vec { .unwrap_or(String::new()) .split(",") .map(str::trim) + .filter(|x| !x.is_empty()) .map(ToString::to_string) .collect() } @@ -186,6 +189,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; let tracing_debug = collect_env_list("TRACING_DEBUG"); let tracing_trace = collect_env_list("TRACING_TRACE"); + // let tracing_trace_always = collect_env_list("TRACING_TRACE_ALWAYS"); let filter_3 = tracing_subscriber::filter::DynFilterFn::new(move |meta, ctx| { if *meta.level() >= tracing::Level::TRACE { let mut target_match = false; @@ -206,7 +210,8 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { sr = g.parent(); } } - allow + // allow + true } else { false } @@ -229,7 +234,8 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { sr = g.parent(); } } - allow + // allow + true } else { false } @@ -374,3 +380,40 @@ where { tokio::spawn(task) } + +pub fn query_log_level() -> tracing::Level { + use tracing::Level; + let mut level = Level::INFO; + if false { + let mut _reg = tracing_subscriber::registry(); + } + tracing::Span::current().id().map(|id| { + tracing::dispatcher::get_default(|disp| { + disp.downcast_ref::().map(|reg| { + use tracing_subscriber::registry::LookupSpan; + if let Some(mut sp) = reg.span(&id) { + loop { + if sp.name() == "log_span_debug" { + if level < Level::DEBUG { + level = Level::DEBUG; + } + } + if sp.name() == "log_span_trace" { + if level < Level::TRACE { + level = Level::TRACE; + } + } + if let Some(x) = sp.parent() { + sp = x; + } else { + break; + } + } + } else { + info!("reg span not available"); + } + }); + }) + }); + level +}