diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 036737a..2d5a2ba 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -204,38 +204,3 @@ pub enum FindChannelError { MultipleFound, Database(String), } - -// On sf-databuffer, the channel name identifies the series. But we can also have a series id. -// This function is used if the request provides only the series-id, but no name. -pub async fn find_sf_channel_by_series( - channel: SfDbChannel, - pgclient: Arc, -) -> Result { - debug!("find_sf_channel_by_series {:?}", channel); - let series = channel.series().ok_or_else(|| FindChannelError::BadSeriesId)?; - let sql = "select rowid from facilities where name = $1"; - let rows = pgclient - .query(sql, &[&channel.backend()]) - .await - .map_err(|e| FindChannelError::Database(e.to_string()))?; - let row = rows - .into_iter() - .next() - .ok_or_else(|| FindChannelError::UnknownBackend)?; - let backend_id: i64 = row.get(0); - let sql = "select name from channels where facility = $1 and rowid = $2"; - let rows = pgclient - .query(sql, &[&backend_id, &(series as i64)]) - .await - .map_err(|e| FindChannelError::Database(e.to_string()))?; - if rows.len() > 1 { - return Err(FindChannelError::MultipleFound); - } - if let Some(row) = rows.into_iter().next() { - let name = row.get::<_, String>(0); - let channel = SfDbChannel::from_full(channel.backend(), channel.series(), name); - Ok(channel) - } else { - return Err(FindChannelError::NoFound); - } -} diff --git a/crates/dbconn/src/worker.rs b/crates/dbconn/src/worker.rs index ae4fa8e..de37d51 100644 --- a/crates/dbconn/src/worker.rs +++ b/crates/dbconn/src/worker.rs @@ -10,6 +10,7 @@ use netpod::ChConf; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::Database; +use netpod::SfDbChannel; use taskrun::tokio; use tokio::task::JoinHandle; use tokio_postgres::Client; @@ -44,6 +45,10 @@ enum Job { Sender>, crate::channelinfo::Error>>, ), SearchChannel(ChannelSearchQuery, Sender>), + SfChannelBySeries( + netpod::SfDbChannel, + Sender>, + ), } #[derive(Debug, Clone)] @@ -63,7 +68,7 @@ impl PgQueue { Ok(rx) } - pub async fn chconf_best_matching_name_range_job( + pub async fn chconf_best_matching_name_range( &self, backend: &str, name: &str, @@ -95,6 +100,17 @@ impl PgQueue { let ret = rx.recv().await?; Ok(ret) } + + pub async fn find_sf_channel_by_series( + &self, + query: netpod::SfDbChannel, + ) -> Result, Error> { + let (tx, rx) = async_channel::bounded(1); + let job = Job::SfChannelBySeries(query, tx); + self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; + let ret = rx.recv().await?; + Ok(ret) + } } #[derive(Debug)] @@ -154,6 +170,12 @@ impl PgWorker { // TODO count for stats } } + Job::SfChannelBySeries(query, tx) => { + let res = find_sf_channel_by_series(query, &self.pg).await; + if tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } } } } @@ -171,3 +193,39 @@ impl PgWorker { self.rx.close(); } } + +// On sf-databuffer, the channel name identifies the series. But we can also have a series id. +// This function is used if the request provides only the series-id, but no name. +async fn find_sf_channel_by_series( + channel: SfDbChannel, + pgclient: &Client, +) -> Result { + use crate::FindChannelError; + debug!("find_sf_channel_by_series {:?}", channel); + let series = channel.series().ok_or_else(|| FindChannelError::BadSeriesId)?; + let sql = "select rowid from facilities where name = $1"; + let rows = pgclient + .query(sql, &[&channel.backend()]) + .await + .map_err(|e| FindChannelError::Database(e.to_string()))?; + let row = rows + .into_iter() + .next() + .ok_or_else(|| FindChannelError::UnknownBackend)?; + let backend_id: i64 = row.get(0); + let sql = "select name from channels where facility = $1 and rowid = $2"; + let rows = pgclient + .query(sql, &[&backend_id, &(series as i64)]) + .await + .map_err(|e| FindChannelError::Database(e.to_string()))?; + if rows.len() > 1 { + return Err(FindChannelError::MultipleFound); + } + if let Some(row) = rows.into_iter().next() { + let name = row.get::<_, String>(0); + let channel = SfDbChannel::from_full(channel.backend(), channel.series(), name); + Ok(channel) + } else { + return Err(FindChannelError::NoFound); + } +} diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 65ec83a..edde9dc 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -24,6 +24,7 @@ use httpclient::StreamResponse; use httpclient::ToJsonBody; use netpod::log::*; use netpod::req_uri_to_url; +use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; @@ -87,12 +88,15 @@ async fn plain_events( pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result { + let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) + .await? + .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; if accepts_cbor_framed(req.headers()) { - Ok(plain_events_cbor_framed(req, evq, ctx, pgqueue, ncc).await?) + Ok(plain_events_cbor_framed(req, evq, ch_conf, ctx, ncc).await?) } else if accepts_json_framed(req.headers()) { - Ok(plain_events_json_framed(req, evq, ctx, pgqueue, ncc).await?) + Ok(plain_events_json_framed(req, evq, ch_conf, ctx, ncc).await?) } else if accepts_json_or_all(req.headers()) { - Ok(plain_events_json(req, evq, ctx, pgqueue, ncc).await?) + Ok(plain_events_json(req, evq, ch_conf, ctx, ncc).await?) } else { let ret = response_err_msg(StatusCode::NOT_ACCEPTABLE, format!("unsupported accept {:?}", req))?; Ok(ret) @@ -102,13 +106,10 @@ async fn plain_events( async fn plain_events_cbor_framed( req: Requ, evq: PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result { - let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) - .await? - .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; debug!("plain_events_cbor_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let stream = streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; @@ -130,13 +131,10 @@ async fn plain_events_cbor_framed( async fn plain_events_json_framed( req: Requ, evq: PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result { - let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) - .await? - .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; debug!("plain_events_json_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; @@ -148,18 +146,14 @@ async fn plain_events_json_framed( async fn plain_events_json( req: Requ, evq: PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result { let self_name = "plain_events_json"; debug!("{self_name} req: {:?}", req); let (_head, _body) = req.into_parts(); // TODO handle None case better and return 404 - let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) - .await - .map_err(Error::from)? - .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; debug!("{self_name} chconf_from_events_quorum: {ch_conf:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let item = diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index ee95bef..307e6ed 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -36,7 +36,6 @@ use nodenet::configquorum::find_config_basics_quorum; use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; use scyllaconn::errconv::ErrConv; -use scyllaconn::range::ScyllaSeriesRange; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -524,6 +523,8 @@ impl IocForChannel { &[&q.backend, &q.name], ) .await?; + drop(pg_client); + pgjh.await??; if let Some(row) = rows.first() { let ioc_addr = row.get(0); let ret = IocForChannelRes { ioc_addr }; @@ -626,7 +627,7 @@ impl ScyllaSeriesTsMsp { }; let chconf = shared_res .pgqueue - .chconf_best_matching_name_range_job(backend, name, nano_range) + .chconf_best_matching_name_range(backend, name, nano_range) .await .map_err(|e| Error::with_msg_no_trace(format!("error from pg worker: {e}")))? .recv() @@ -734,6 +735,8 @@ impl AmbigiousChannelNames { &[], ) .await?; + drop(pg_client); + pgjh.await??; let mut ret = AmbigiousChannelNamesResponse { ambigious: Vec::new() }; for row in rows { let g = AmbigiousChannel { @@ -748,16 +751,6 @@ impl AmbigiousChannelNames { } } -struct TestData01Iter {} - -impl Iterator for TestData01Iter { - type Item = f64; - - fn next(&mut self) -> Option { - None - } -} - struct Msps(Vec); struct Lsps(Vec); struct Pulses(Vec); diff --git a/crates/nodenet/src/channelconfig.rs b/crates/nodenet/src/channelconfig.rs index 257d035..8a21e00 100644 --- a/crates/nodenet/src/channelconfig.rs +++ b/crates/nodenet/src/channelconfig.rs @@ -220,7 +220,7 @@ async fn scylla_chconf_from_sf_db_channel( } else { // TODO let called function allow to return None instead of error-not-found let ret = pgqueue - .chconf_best_matching_name_range_job(channel.backend(), channel.name(), range) + .chconf_best_matching_name_range(channel.backend(), channel.name(), range) .await? .recv() .await??; diff --git a/crates/nodenet/src/configquorum.rs b/crates/nodenet/src/configquorum.rs index 1763c5b..64aaf1b 100644 --- a/crates/nodenet/src/configquorum.rs +++ b/crates/nodenet/src/configquorum.rs @@ -98,11 +98,11 @@ pub async fn find_config_basics_quorum( if let Some(_cfg) = &ncc.node.sf_databuffer { let channel = if channel.name().is_empty() { if let Some(_) = channel.series() { - let (pgclient, _pgjh) = dbconn::create_connection(&ncc.node_config.cluster.database).await?; - let pgclient = std::sync::Arc::new(pgclient); - dbconn::find_sf_channel_by_series(channel, pgclient) + pgqueue + .find_sf_channel_by_series(channel) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))? + .map_err(|e| Error::with_msg_no_trace(e.to_string()))? } else { channel } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index b43bdf0..31e7737 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -128,12 +128,12 @@ pub async fn create_response_bytes_stream( scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result { - info!( - "create_response_bytes_stream {:?} {:?}", + debug!( + "create_response_bytes_stream {:?} {:?} wasm1 {:?}", evq.ch_conf().scalar_type(), evq.ch_conf().shape(), + evq.wasm1() ); - debug!("wasm1 {:?}", evq.wasm1()); let reqctx = netpod::ReqCtx::new_from_single_reqid(evq.reqid().into()).into(); if evq.create_errors_contains("nodenet_parse_query") { let e = Error::DebugTest;