WIP refactor

This commit is contained in:
Dominik Werder
2023-09-06 18:19:01 +02:00
parent ba9bb7e26c
commit ce4a1291aa
3 changed files with 60 additions and 16 deletions

View File

@@ -56,7 +56,7 @@ impl From<crate::err::Error> for Error {
pub type BoxedSend = Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
pub trait CanSendChannelInfoResult: Sync {
fn make_send(&self, item: Result<Existence<SeriesId>, Error>) -> BoxedSend;
fn make_send(&self, item: Result<ChannelInfoResult, Error>) -> BoxedSend;
}
pub struct ChannelInfoQuery {
@@ -67,11 +67,18 @@ pub struct ChannelInfoQuery {
pub tx: Pin<Box<dyn CanSendChannelInfoResult + Send>>,
}
struct ChannelInfoResult {
struct ChannelInfoResult2 {
backend: String,
channel: String,
series: Existence<SeriesId>,
tx: Pin<Box<dyn CanSendChannelInfoResult + Send>>,
// only for trace:
channel: String,
}
#[derive(Debug)]
pub struct ChannelInfoResult {
pub backend: String,
pub channel: String,
pub series: Existence<SeriesId>,
}
struct Worker {
@@ -114,7 +121,7 @@ impl Worker {
async fn select(
&self,
batch: Vec<ChannelInfoQuery>,
) -> Result<(Vec<ChannelInfoResult>, Vec<ChannelInfoQuery>), Error> {
) -> Result<(Vec<ChannelInfoResult2>, Vec<ChannelInfoQuery>), Error> {
let mut backend = Vec::new();
let mut channel = Vec::new();
let mut scalar_type = Vec::new();
@@ -159,10 +166,12 @@ impl Worker {
if rid as u32 == qrid {
let series: i64 = row.get(0);
let series = SeriesId::new(series as _);
let res = ChannelInfoResult {
let res = ChannelInfoResult2 {
// TODO take from database query. Needs test.
backend: backend[0].clone(),
channel,
series: Existence::Existing(series),
tx,
channel,
};
result.push(res);
}
@@ -281,8 +290,13 @@ impl Worker {
};
let res4 = res3?;
for r in res4 {
trace3!("try to send result for {} {:?}", r.channel, r.series);
let fut = r.tx.make_send(Ok(r.series));
let item = ChannelInfoResult {
backend: r.backend,
channel: r.channel,
series: r.series,
};
trace3!("try to send result for {:?}", item);
let fut = r.tx.make_send(Ok(item));
match fut.await {
Ok(()) => {}
Err(_e) => {

View File

@@ -11,6 +11,7 @@ use crate::timebin::ConnTimeBin;
use async_channel::Sender;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
use err::Error;
use futures_util::stream::FuturesUnordered;
use futures_util::Future;
@@ -307,7 +308,7 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 {
#[derive(Debug)]
pub enum ConnCommandKind {
SeriesLookupResult(Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>),
SeriesLookupResult(Result<ChannelInfoResult, dbpg::seriesbychannel::Error>),
ChannelAdd(String, ChannelStatusSeriesId),
ChannelRemove(String),
CheckHealth,
@@ -321,7 +322,7 @@ pub struct ConnCommand {
}
impl ConnCommand {
pub fn series_lookup(qu: Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>) -> Self {
pub fn series_lookup(qu: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>) -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::SeriesLookupResult(qu),
@@ -405,7 +406,7 @@ struct SendSeriesLookup {
impl CanSendChannelInfoResult for SendSeriesLookup {
fn make_send(
&self,
item: Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>,
item: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>,
) -> dbpg::seriesbychannel::BoxedSend {
let tx = self.tx.clone();
let fut = async move { tx.send(ConnCommand::series_lookup(item)).await.map_err(|_| ()) };

View File

@@ -20,6 +20,7 @@ use atomic::AtomicUsize;
use dbpg::seriesbychannel::BoxedSend;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
@@ -110,7 +111,7 @@ pub struct ChannelAdd {
#[derive(Debug)]
pub enum ConnSetCmd {
SeriesLookupResult(Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>),
SeriesLookupResult(Result<ChannelInfoResult, dbpg::seriesbychannel::Error>),
ChannelAdd(ChannelAdd),
ChannelAddWithStatusId(ChannelAddWithStatusId),
ChannelAddWithAddr(ChannelAddWithAddr),
@@ -165,7 +166,7 @@ struct SeriesLookupSender {
}
impl CanSendChannelInfoResult for SeriesLookupSender {
fn make_send(&self, item: Result<Existence<SeriesId>, dbpg::seriesbychannel::Error>) -> BoxedSend {
fn make_send(&self, item: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>) -> BoxedSend {
let tx = self.tx.clone();
let fut = async move {
tx.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::SeriesLookupResult(item)))
@@ -242,9 +243,11 @@ impl CaConnSet {
CaConnSetEvent::ConnSetCmd(cmd) => match cmd {
ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await,
ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await,
ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await,
ConnSetCmd::IocAddrQueryResult(res) => {
for e in res {
if let Some(addr) = e.addr {
debug!("ioc found {e:?}");
let ch = Channel::new(e.channel.clone());
if let Some(chst) = self.channel_states.inner().get(&ch) {
if let ChannelStateValue::Active(ast) = &chst.value {
@@ -273,7 +276,6 @@ impl CaConnSet {
}
Ok(())
}
ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await,
ConnSetCmd::CheckHealth => {
error!("TODO implement check health");
Ok(())
@@ -283,7 +285,7 @@ impl CaConnSet {
self.shutdown = true;
Ok(())
}
ConnSetCmd::SeriesLookupResult(_) => todo!(),
ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
},
CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value {
CaConnEventValue::None => Ok(()),
@@ -298,6 +300,30 @@ impl CaConnSet {
}
}
async fn handle_series_lookup_result(
&mut self,
res: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>,
) -> Result<(), Error> {
debug!("handle_series_lookup_result {res:?}");
match res {
Ok(res) => {
let add = ChannelAddWithStatusId {
backend: res.backend,
name: res.channel,
local_epics_hostname: self.local_epics_hostname.clone(),
cssid: ChannelStatusSeriesId::new(res.series.into_inner().id()),
};
self.connset_tx
.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelAddWithStatusId(add)))
.await?;
}
Err(e) => {
warn!("TODO handle error {e}");
}
}
Ok(())
}
async fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> {
// TODO should I add the transition through ActiveChannelState::Init as well?
let ch = Channel::new(add.name.clone());
@@ -320,6 +346,7 @@ impl CaConnSet {
}
async fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> {
debug!("handle_add_channel_with_status_id {add:?}");
let ch = Channel::new(add.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(chst2) = &mut chst.value {
@@ -332,6 +359,8 @@ impl CaConnSet {
},
},
};
let qu = IocAddrQuery { name: add.name };
self.search_tx.send(qu).await?;
} else {
warn!("TODO have a status series id but no more channel");
}