From 1b3267c1a1652e6380dba4af69ef804992d75a4f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 19 Jun 2023 15:18:30 +0200 Subject: [PATCH] Refactor date serde --- daqbufp2/src/test/api1/data_api_python.rs | 5 +- httpret/src/channel_status.rs | 1 - httpret/src/httpret.rs | 5 +- httpret/src/pulsemap.rs | 4 +- items_0/src/isodate.rs | 3 +- items_2/src/framable.rs | 8 +- items_2/src/items_2.rs | 3 +- netpod/src/netpod.rs | 124 ++++++++++++++++++++-- netpod/src/query.rs | 12 ++- netpod/src/query/datetime.rs | 56 +++++++++- nodenet/src/conn.rs | 52 +++++---- nodenet/src/conn/test.rs | 7 +- streams/src/tcprawclient.rs | 35 +++--- 13 files changed, 236 insertions(+), 79 deletions(-) diff --git a/daqbufp2/src/test/api1/data_api_python.rs b/daqbufp2/src/test/api1/data_api_python.rs index 4275e96..fc3e6f3 100644 --- a/daqbufp2/src/test/api1/data_api_python.rs +++ b/daqbufp2/src/test/api1/data_api_python.rs @@ -10,6 +10,7 @@ use netpod::Cluster; use netpod::HostPort; use netpod::SfDbChannel; use netpod::APP_JSON; +use netpod::DATETIME_FMT_3MS; use url::Url; const TEST_BACKEND: &str = "testbackend-00"; @@ -26,8 +27,8 @@ async fn fetch_data_api_python_blob( let beg_date = beg_date.parse()?; let end_date = end_date.parse()?; let _range = NanoRange::from_date_time(beg_date, end_date); - let start_date = beg_date.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); - let end_date = end_date.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + let start_date = beg_date.format(DATETIME_FMT_3MS).to_string(); + let end_date = end_date.format(DATETIME_FMT_3MS).to_string(); let query = serde_json::json!({ "range": { "type": "date", diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index 2116fb5..0b6cbbc 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -1,7 +1,6 @@ use crate::bodystream::response; use crate::err::Error; use crate::ReqCtx; -use err::anyhow::Context; use futures_util::StreamExt; use http::Method; use http::Request; diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 30dc035..9de47d3 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -642,6 +642,7 @@ pub struct StatusBoardEntry { mod instant_serde { use super::*; + use netpod::DATETIME_FMT_3MS; use serde::Serializer; pub fn ser(x: &SystemTime, ser: S) -> Result { use chrono::LocalResult; @@ -650,11 +651,11 @@ mod instant_serde { match res { LocalResult::None => Err(serde::ser::Error::custom(format!("Bad local instant conversion"))), LocalResult::Single(dt) => { - let s = dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + let s = dt.format(DATETIME_FMT_3MS).to_string(); ser.serialize_str(&s) } LocalResult::Ambiguous(dt, _dt2) => { - let s = dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + let s = dt.format(DATETIME_FMT_3MS).to_string(); ser.serialize_str(&s) } } diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 2b9df76..aa39bcf 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -23,6 +23,7 @@ use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; use netpod::NodeConfigCached; +use netpod::DATETIME_FMT_9MS; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -1429,12 +1430,11 @@ impl Api4MapPulse2HttpFunction { Ok(Some(val)) => { let sec = val / SEC; let ns = val % SEC; - let date_fmt = "%Y-%m-%dT%H:%M:%S.%9fZ"; let datetime = Utc .timestamp_opt(sec as i64, ns as u32) .earliest() .ok_or_else(|| Error::with_msg_no_trace("DateTime earliest fail"))? - .format(date_fmt) + .format(DATETIME_FMT_9MS) .to_string(); let res = Api4MapPulse2Response { sec, ns, datetime }; Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&res)?))?) diff --git a/items_0/src/isodate.rs b/items_0/src/isodate.rs index daeffd7..6f98c5c 100644 --- a/items_0/src/isodate.rs +++ b/items_0/src/isodate.rs @@ -1,6 +1,7 @@ use chrono::DateTime; use chrono::TimeZone; use chrono::Utc; +use netpod::DATETIME_FMT_3MS; use serde::Deserialize; use serde::Serialize; use serde::Serializer; @@ -13,7 +14,7 @@ impl Serialize for IsoDateTime { where S: Serializer, { - serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string()) + serializer.serialize_str(&self.0.format(DATETIME_FMT_3MS).to_string()) } } diff --git a/items_2/src/framable.rs b/items_2/src/framable.rs index 1973056..d27fe82 100644 --- a/items_2/src/framable.rs +++ b/items_2/src/framable.rs @@ -124,9 +124,15 @@ where } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct EventQueryJsonStringFrame(pub String); +impl EventQueryJsonStringFrame { + pub fn str(&self) -> &str { + &self.0 + } +} + impl FrameTypeInnerStatic for EventQueryJsonStringFrame { const FRAME_TYPE_ID: u32 = EVENT_QUERY_JSON_STRING_FRAME; } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index eb9ef45..2065dac 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -32,6 +32,7 @@ use items_0::MergeError; use merger::Mergeable; use netpod::range::evrange::SeriesRange; use netpod::timeunits::*; +use netpod::DATETIME_FMT_3MS; use serde::Deserialize; use serde::Serialize; use serde::Serializer; @@ -145,7 +146,7 @@ impl Serialize for IsoDateTime { where S: Serializer, { - serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string()) + serializer.serialize_str(&self.0.format(DATETIME_FMT_3MS).to_string()) } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index dc484c1..7363a2f 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -47,6 +47,10 @@ pub const CONNECTION_STATUS_DIV: u64 = timeunits::DAY; pub const TS_MSP_GRID_UNIT: u64 = timeunits::SEC * 10; pub const TS_MSP_GRID_SPACING: u64 = 6 * 2; +pub const DATETIME_FMT_3MS: &str = "%Y-%m-%dT%H:%M:%S.%3fZ"; +pub const DATETIME_FMT_6MS: &str = "%Y-%m-%dT%H:%M:%S.%6fZ"; +pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ"; + const TEST_BACKEND: &str = "testbackend-00"; pub fn is_false(x: T) -> bool @@ -439,6 +443,7 @@ mod serde_port { where D: serde::Deserializer<'de>, { + // We expect to use json or yaml only. de.deserialize_any(Vis) } @@ -1157,7 +1162,64 @@ where pub ix: [T; 2], } -#[derive(Clone, Deserialize, PartialEq, PartialOrd)] +#[derive(Clone, PartialEq, PartialOrd)] +pub struct DtNano(u64); + +impl DtNano { + pub fn from_ns(ns: u64) -> Self { + Self(ns) + } + + pub fn ns(&self) -> u64 { + self.0 + } +} + +mod dt_nano_serde { + use super::DtNano; + use de::Visitor; + use serde::de; + use serde::Deserialize; + use serde::Serialize; + use std::fmt; + + impl Serialize for DtNano { + fn serialize(&self, ser: S) -> Result + where + S: serde::Serializer, + { + ser.serialize_u64(self.ns()) + } + } + + struct Vis1; + + impl<'de> Visitor<'de> for Vis1 { + type Value = DtNano; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "an integer of nanoseconds") + } + + fn visit_u64(self, v: u64) -> Result + where + E: de::Error, + { + Ok(DtNano::from_ns(v)) + } + } + + impl<'de> Deserialize<'de> for DtNano { + fn deserialize(de: D) -> Result + where + D: serde::Deserializer<'de>, + { + de.deserialize_u64(Vis1) + } + } +} + +#[derive(Clone, PartialEq, PartialOrd)] pub struct TsNano(pub u64); mod ts_nano_ser { @@ -1165,23 +1227,54 @@ mod ts_nano_ser { use crate::timeunits::SEC; use chrono::TimeZone; use chrono::Utc; + use de::Visitor; + use serde::de; + use serde::Deserialize; use serde::Serialize; + use std::fmt; impl Serialize for TsNano { fn serialize(&self, ser: S) -> Result where S: serde::Serializer, { - let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32); - let value = format!("{}", ts.earliest().unwrap()); - ser.serialize_newtype_struct("TsNano", &value) + if false { + let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32); + let value = format!("{}", ts.earliest().unwrap()); + ser.serialize_newtype_struct("TsNano", &value) + } else { + ser.serialize_u64(self.ns()) + } + } + } + + struct Vis1; + + impl<'de> Visitor<'de> for Vis1 { + type Value = TsNano; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "integer nanoseconds since unix epoch") + } + + fn visit_u64(self, v: u64) -> Result + where + E: de::Error, + { + Ok(TsNano::from_ns(v)) + } + } + + impl<'de> Deserialize<'de> for TsNano { + fn deserialize(de: D) -> Result + where + D: serde::Deserializer<'de>, + { + de.deserialize_u64(Vis1) } } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)] -pub struct PulseId(u64); - impl TsNano { pub fn from_ns(ns: u64) -> Self { Self(ns) @@ -1204,11 +1297,17 @@ impl fmt::Debug for TsNano { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32); f.debug_struct("TsNano") - .field("ts", &ts.earliest().unwrap_or(Default::default())) + .field( + "ts", + &ts.earliest().unwrap_or(Default::default()).format(DATETIME_FMT_3MS), + ) .finish() } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)] +pub struct PulseId(u64); + impl PulseId { pub fn from_id(id: u64) -> Self { Self(id) @@ -2389,16 +2488,19 @@ impl FromUrl for ChannelConfigQuery { impl AppendToUrl for ChannelConfigQuery { fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; self.channel.append_to_url(url); let mut g = url.query_pairs_mut(); g.append_pair( "begDate", - &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + &Utc.timestamp_nanos(self.range.beg as i64) + .format(DATETIME_FMT_3MS) + .to_string(), ); g.append_pair( "endDate", - &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + &Utc.timestamp_nanos(self.range.end as i64) + .format(DATETIME_FMT_3MS) + .to_string(), ); if self.expand { g.append_pair("expand", "true"); diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 3a6b125..7dbeae7 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -16,6 +16,7 @@ use crate::PulseRange; use crate::SeriesRange; use crate::SfDbChannel; use crate::ToNanos; +use crate::DATETIME_FMT_6MS; use chrono::DateTime; use chrono::TimeZone; use chrono::Utc; @@ -118,7 +119,7 @@ impl FromUrl for TimeRangeQuery { impl AppendToUrl for TimeRangeQuery { fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ"; + let date_fmt = DATETIME_FMT_6MS; let mut g = url.query_pairs_mut(); g.append_pair( "begDate", @@ -314,16 +315,19 @@ impl FromUrl for ChannelStateEventsQuery { impl AppendToUrl for ChannelStateEventsQuery { fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ"; self.channel.append_to_url(url); let mut g = url.query_pairs_mut(); g.append_pair( "begDate", - &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + &Utc.timestamp_nanos(self.range.beg as i64) + .format(DATETIME_FMT_6MS) + .to_string(), ); g.append_pair( "endDate", - &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + &Utc.timestamp_nanos(self.range.end as i64) + .format(DATETIME_FMT_6MS) + .to_string(), ); } } diff --git a/netpod/src/query/datetime.rs b/netpod/src/query/datetime.rs index 446ce50..c0c0578 100644 --- a/netpod/src/query/datetime.rs +++ b/netpod/src/query/datetime.rs @@ -1,14 +1,16 @@ -use chrono::{DateTime, FixedOffset}; +use chrono::DateTime; +use chrono::FixedOffset; use err::Error; -use serde::{de::Visitor, Deserialize, Serialize}; +use serde::de::Visitor; +use serde::Deserialize; +use serde::Serialize; use std::fmt; use std::ops; +use std::time::Duration; #[derive(Clone, Debug, PartialEq)] pub struct Datetime(DateTime); -impl Datetime {} - impl From> for Datetime { fn from(x: DateTime) -> Self { Datetime(x) @@ -84,3 +86,49 @@ impl<'de> Deserialize<'de> for Datetime { deserializer.deserialize_str(Vis1) } } + +#[test] +fn ser_00() { + use chrono::TimeZone; + let x = FixedOffset::east_opt(0) + .unwrap() + .with_ymd_and_hms(2023, 2, 3, 15, 12, 40) + .earliest() + .unwrap(); + let x = Datetime(x); + let s = serde_json::to_string(&x).unwrap(); + + assert_eq!(s, r#""2023-02-03T15:12:40Z""#); +} + +#[test] +fn ser_01() { + use chrono::TimeZone; + let x = FixedOffset::east_opt(0) + .unwrap() + .with_ymd_and_hms(2023, 2, 3, 15, 12, 40) + .earliest() + .unwrap() + .checked_add_signed(chrono::Duration::milliseconds(876)) + .unwrap(); + let x = Datetime(x); + let s = serde_json::to_string(&x).unwrap(); + + assert_eq!(s, r#""2023-02-03T15:12:40.876Z""#); +} + +#[test] +fn ser_02() { + use chrono::TimeZone; + let x = FixedOffset::east_opt(0) + .unwrap() + .with_ymd_and_hms(2023, 2, 3, 15, 12, 40) + .earliest() + .unwrap() + .checked_add_signed(chrono::Duration::nanoseconds(543432120)) + .unwrap(); + let x = Datetime(x); + let s = serde_json::to_string(&x).unwrap(); + + assert_eq!(s, r#""2023-02-03T15:12:40.543432Z""#); +} diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index f211ea9..f4e60ad 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -19,12 +19,12 @@ use items_2::frame::make_term_frame; use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; use netpod::log::*; -use netpod::ChConf; use netpod::ChannelTypeConfigGen; use netpod::NodeConfigCached; use netpod::PerfOpts; use query::api4::events::PlainEventsQuery; -use serde_json::Value as JsValue; +use serde::Deserialize; +use serde::Serialize; use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::InMemoryFrameAsyncReadStream; @@ -162,6 +162,18 @@ async fn make_channel_events_stream( Ok(ret) } +#[derive(Debug, Serialize, Deserialize)] +pub struct Frame1Parts { + query: PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, +} + +impl Frame1Parts { + pub fn new(query: PlainEventsQuery, ch_conf: ChannelTypeConfigGen) -> Self { + Self { query, ch_conf } + } +} + async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, Error> { let perf_opts = PerfOpts::default(); let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); @@ -188,9 +200,8 @@ async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, - ncc: &NodeConfigCached, ) -> Result<(PlainEventsQuery, ChannelTypeConfigGen), Error> { - if frames.len() != 2 { + if frames.len() != 1 { error!("{:?}", frames); error!("missing command frame len {}", frames.len()); let e = Error::with_msg("missing command frame"); @@ -214,37 +225,22 @@ async fn events_parse_input_query( }, Err(e) => return Err(e), }; - let evq: PlainEventsQuery = serde_json::from_str(&qitem.0).map_err(|e| { - let e = Error::with_msg_no_trace(format!("json parse error: {e}")); + let cmd: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|e| { + let e = Error::with_msg_no_trace(format!("json parse error: {} inp {:?}", e, qitem.str())); error!("{e}"); e })?; - debug!("events_parse_input_query {:?}", evq); + debug!("events_parse_input_query {:?}", cmd); if query_frame.tyid() != EVENT_QUERY_JSON_STRING_FRAME { return Err(Error::with_msg("query frame wrong type")); } - let qitem = match decode_frame::>(&frames[1]) { - Ok(k) => match k { - Ok(k) => match k { - StreamItem::DataItem(k) => match k { - RangeCompletableItem::Data(k) => k, - RangeCompletableItem::RangeComplete => return Err(Error::with_msg("bad query item")), - }, - _ => return Err(Error::with_msg("bad query item")), - }, - Err(e) => return Err(e), - }, - Err(e) => return Err(e), - }; - let ch_conf: ChannelTypeConfigGen = serde_json::from_str(&qitem.0)?; - info!("\n\nparsed second frame:\n{ch_conf:?}"); - Ok((evq, ch_conf)) + Ok((cmd.query, cmd.ch_conf)) } async fn events_conn_handler_inner_try( stream: TcpStream, addr: SocketAddr, - node_config: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result<(), ConnErr> { let _ = addr; let (netin, mut netout) = stream.into_split(); @@ -252,7 +248,7 @@ async fn events_conn_handler_inner_try( Ok(x) => x, Err(e) => return Err((e, netout).into()), }; - let (evq, ch_conf) = match events_parse_input_query(frames, node_config).await { + let (evq, ch_conf) = match events_parse_input_query(frames).await { Ok(x) => x, Err(e) => return Err((e, netout).into()), }; @@ -262,7 +258,7 @@ async fn events_conn_handler_inner_try( Ok(x) => x, Err(e) => return Err((e, netout).into()), }; - match disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, node_config).await { + match disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, ncc).await { Ok(stream) => { let stream = stream.map(|x| Box::new(x) as _); Box::pin(stream) @@ -270,7 +266,7 @@ async fn events_conn_handler_inner_try( Err(e) => return Err((e, netout).into()), } } else { - match make_channel_events_stream(evq.clone(), ch_conf, node_config).await { + match make_channel_events_stream(evq.clone(), ch_conf, ncc).await { Ok(stream) => { if false { // TODO wasm example @@ -325,7 +321,7 @@ async fn events_conn_handler_inner_try( } { let item = LogItem { - node_ix: node_config.ix as _, + node_ix: ncc.ix as _, level: Level::INFO, msg: format!("buf_len_histo: {:?}", buf_len_histo), }; diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index d3330f7..e7cfb9b 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -1,4 +1,5 @@ use crate::conn::events_conn_handler; +use crate::conn::Frame1Parts; use err::Error; use futures_util::StreamExt; use items_0::streamitem::sitem_data; @@ -79,11 +80,9 @@ fn raw_data_00() { beg: SEC, end: SEC * 10, }; - if true { - todo!("must add 2nd frame with channel type info"); - } let qu = PlainEventsQuery::new(channel, range); - let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap()); + let frame1 = Frame1Parts::new(qu, err::todoval()); + let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap()); let frame = sitem_data(query).make_frame()?; let jh = taskrun::spawn(events_conn_handler(client, addr, cfg)); con.write_all(&frame).await.unwrap(); diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 44ab81c..8899793 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -24,11 +24,24 @@ use netpod::PerfOpts; use query::api4::events::PlainEventsQuery; use serde::de::DeserializeOwned; use serde::Serialize; +use serde_json::json; use std::fmt; use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; +pub fn make_node_command_frame(query: Q, ch_conf: &ChannelTypeConfigGen) -> Result +where + Q: Serialize, +{ + let obj = json!({ + "query": query, + "ch_conf":ch_conf, + }); + let ret = serde_json::to_string(&obj)?; + Ok(EventQueryJsonStringFrame(ret)) +} + pub async fn x_processed_event_blobs_stream_from_node( query: PlainEventsQuery, ch_conf: ChannelTypeConfigGen, @@ -37,19 +50,12 @@ pub async fn x_processed_event_blobs_stream_from_node( ) -> Result> + Send>>, Error> { let addr = format!("{}:{}", node.host, node.port_raw); debug!("x_processed_event_blobs_stream_from_node to: {addr}",); + let frame1 = make_node_command_frame(&query, &ch_conf)?; let net = TcpStream::connect(addr.clone()).await?; - let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); - - let item = sitem_data(EventQueryJsonStringFrame(qjs)); + let item = sitem_data(frame1); let buf = item.make_frame()?; netout.write_all(&buf).await?; - - let s = serde_json::to_string(&ch_conf)?; - let item = sitem_data(EventQueryJsonStringFrame(s)); - let buf = item.make_frame()?; - netout.write_all(&buf).await?; - let buf = make_term_frame()?; netout.write_all(&buf).await?; netout.flush().await?; @@ -73,23 +79,16 @@ where T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, { // TODO when unit tests established, change to async connect: + let frame1 = make_node_command_frame(&query, &ch_conf)?; let mut streams = Vec::new(); for node in &cluster.nodes { let addr = format!("{}:{}", node.host, node.port_raw); debug!("open_tcp_streams to: {addr}"); let net = TcpStream::connect(addr.clone()).await?; - let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); - - let item = sitem_data(EventQueryJsonStringFrame(qjs)); + let item = sitem_data(frame1.clone()); let buf = item.make_frame()?; netout.write_all(&buf).await?; - - let s = serde_json::to_string(ch_conf)?; - let item = sitem_data(EventQueryJsonStringFrame(s)); - let buf = item.make_frame()?; - netout.write_all(&buf).await?; - let buf = make_term_frame()?; netout.write_all(&buf).await?; netout.flush().await?;