From ce4a1291aaab596bed0d9ff83c9fa1223d3b580a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 6 Sep 2023 18:19:01 +0200 Subject: [PATCH] WIP refactor --- dbpg/src/seriesbychannel.rs | 32 +++++++++++++++++++++++--------- netfetch/src/ca/conn.rs | 7 ++++--- netfetch/src/ca/connset.rs | 37 +++++++++++++++++++++++++++++++++---- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 1ee5a78..70bd128 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -56,7 +56,7 @@ impl From for Error { pub type BoxedSend = Pin> + Send>>; pub trait CanSendChannelInfoResult: Sync { - fn make_send(&self, item: Result, Error>) -> BoxedSend; + fn make_send(&self, item: Result) -> BoxedSend; } pub struct ChannelInfoQuery { @@ -67,11 +67,18 @@ pub struct ChannelInfoQuery { pub tx: Pin>, } -struct ChannelInfoResult { +struct ChannelInfoResult2 { + backend: String, + channel: String, series: Existence, tx: Pin>, - // only for trace: - channel: String, +} + +#[derive(Debug)] +pub struct ChannelInfoResult { + pub backend: String, + pub channel: String, + pub series: Existence, } struct Worker { @@ -114,7 +121,7 @@ impl Worker { async fn select( &self, batch: Vec, - ) -> Result<(Vec, Vec), Error> { + ) -> Result<(Vec, Vec), Error> { let mut backend = Vec::new(); let mut channel = Vec::new(); let mut scalar_type = Vec::new(); @@ -159,10 +166,12 @@ impl Worker { if rid as u32 == qrid { let series: i64 = row.get(0); let series = SeriesId::new(series as _); - let res = ChannelInfoResult { + let res = ChannelInfoResult2 { + // TODO take from database query. Needs test. + backend: backend[0].clone(), + channel, series: Existence::Existing(series), tx, - channel, }; result.push(res); } @@ -281,8 +290,13 @@ impl Worker { }; let res4 = res3?; for r in res4 { - trace3!("try to send result for {} {:?}", r.channel, r.series); - let fut = r.tx.make_send(Ok(r.series)); + let item = ChannelInfoResult { + backend: r.backend, + channel: r.channel, + series: r.series, + }; + trace3!("try to send result for {:?}", item); + let fut = r.tx.make_send(Ok(item)); match fut.await { Ok(()) => {} Err(_e) => { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 5cee7e2..cdff375 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -11,6 +11,7 @@ use crate::timebin::ConnTimeBin; use async_channel::Sender; use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; +use dbpg::seriesbychannel::ChannelInfoResult; use err::Error; use futures_util::stream::FuturesUnordered; use futures_util::Future; @@ -307,7 +308,7 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 { #[derive(Debug)] pub enum ConnCommandKind { - SeriesLookupResult(Result, dbpg::seriesbychannel::Error>), + SeriesLookupResult(Result), ChannelAdd(String, ChannelStatusSeriesId), ChannelRemove(String), CheckHealth, @@ -321,7 +322,7 @@ pub struct ConnCommand { } impl ConnCommand { - pub fn series_lookup(qu: Result, dbpg::seriesbychannel::Error>) -> Self { + pub fn series_lookup(qu: Result) -> Self { Self { id: Self::make_id(), kind: ConnCommandKind::SeriesLookupResult(qu), @@ -405,7 +406,7 @@ struct SendSeriesLookup { impl CanSendChannelInfoResult for SendSeriesLookup { fn make_send( &self, - item: Result, dbpg::seriesbychannel::Error>, + item: Result, ) -> dbpg::seriesbychannel::BoxedSend { let tx = self.tx.clone(); let fut = async move { tx.send(ConnCommand::series_lookup(item)).await.map_err(|_| ()) }; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index f8e0ee8..4be71b9 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -20,6 +20,7 @@ use atomic::AtomicUsize; use dbpg::seriesbychannel::BoxedSend; use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; +use dbpg::seriesbychannel::ChannelInfoResult; use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; @@ -110,7 +111,7 @@ pub struct ChannelAdd { #[derive(Debug)] pub enum ConnSetCmd { - SeriesLookupResult(Result, dbpg::seriesbychannel::Error>), + SeriesLookupResult(Result), ChannelAdd(ChannelAdd), ChannelAddWithStatusId(ChannelAddWithStatusId), ChannelAddWithAddr(ChannelAddWithAddr), @@ -165,7 +166,7 @@ struct SeriesLookupSender { } impl CanSendChannelInfoResult for SeriesLookupSender { - fn make_send(&self, item: Result, dbpg::seriesbychannel::Error>) -> BoxedSend { + fn make_send(&self, item: Result) -> BoxedSend { let tx = self.tx.clone(); let fut = async move { tx.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::SeriesLookupResult(item))) @@ -242,9 +243,11 @@ impl CaConnSet { CaConnSetEvent::ConnSetCmd(cmd) => match cmd { ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await, ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await, + ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await, ConnSetCmd::IocAddrQueryResult(res) => { for e in res { if let Some(addr) = e.addr { + debug!("ioc found {e:?}"); let ch = Channel::new(e.channel.clone()); if let Some(chst) = self.channel_states.inner().get(&ch) { if let ChannelStateValue::Active(ast) = &chst.value { @@ -273,7 +276,6 @@ impl CaConnSet { } Ok(()) } - ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await, ConnSetCmd::CheckHealth => { error!("TODO implement check health"); Ok(()) @@ -283,7 +285,7 @@ impl CaConnSet { self.shutdown = true; Ok(()) } - ConnSetCmd::SeriesLookupResult(_) => todo!(), + ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, }, CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value { CaConnEventValue::None => Ok(()), @@ -298,6 +300,30 @@ impl CaConnSet { } } + async fn handle_series_lookup_result( + &mut self, + res: Result, + ) -> Result<(), Error> { + debug!("handle_series_lookup_result {res:?}"); + match res { + Ok(res) => { + let add = ChannelAddWithStatusId { + backend: res.backend, + name: res.channel, + local_epics_hostname: self.local_epics_hostname.clone(), + cssid: ChannelStatusSeriesId::new(res.series.into_inner().id()), + }; + self.connset_tx + .send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelAddWithStatusId(add))) + .await?; + } + Err(e) => { + warn!("TODO handle error {e}"); + } + } + Ok(()) + } + async fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> { // TODO should I add the transition through ActiveChannelState::Init as well? let ch = Channel::new(add.name.clone()); @@ -320,6 +346,7 @@ impl CaConnSet { } async fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> { + debug!("handle_add_channel_with_status_id {add:?}"); let ch = Channel::new(add.name.clone()); if let Some(chst) = self.channel_states.inner().get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { @@ -332,6 +359,8 @@ impl CaConnSet { }, }, }; + let qu = IocAddrQuery { name: add.name }; + self.search_tx.send(qu).await?; } else { warn!("TODO have a status series id but no more channel"); }