From c6f18b298637a1b996f48274f360843e63df1bf7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 2 Feb 2024 19:43:41 +0100 Subject: [PATCH] WIP series lookup test --- daqingest/src/daemon.rs | 26 -------------- dbpg/Cargo.toml | 3 ++ dbpg/src/seriesbychannel.rs | 67 +++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 26 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 4012772..a492898 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -90,32 +90,6 @@ impl Daemon { .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - { - let (tx, rx) = async_channel::bounded(1); - let item = dbpg::seriesbychannel::ChannelInfoQuery { - backend: "amd32test".into(), - channel: "dummy-0000".into(), - scalar_type: netpod::ScalarType::U16.to_scylla_i32(), - shape_dims: vec![500], - tx: Box::pin(tx), - }; - channel_info_query_tx.send(item).await?; - let res = rx.recv().await?; - debug!("received A: {res:?}"); - - let (tx, rx) = async_channel::bounded(1); - let item = dbpg::seriesbychannel::ChannelInfoQuery { - backend: "amd32test".into(), - channel: "dummy-0000".into(), - scalar_type: netpod::ScalarType::U16.to_scylla_i32(), - shape_dims: vec![500], - tx: Box::pin(tx), - }; - channel_info_query_tx.send(item).await?; - let res = rx.recv().await?; - debug!("received B: {res:?}"); - } - let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let query_item_tx_weak = query_item_tx.downgrade(); diff --git a/dbpg/Cargo.toml b/dbpg/Cargo.toml index 8d82015..a954ee0 100644 --- a/dbpg/Cargo.toml +++ b/dbpg/Cargo.toml @@ -4,6 +4,9 @@ version = "0.0.1" authors = ["Dominik Werder "] edition = "2021" +[lib] +doctest = false + [dependencies] log = { path = "../log" } err = { path = "../../daqbuffer/crates/err" } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 3edab59..57e48c4 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -775,3 +775,70 @@ async fn psql_play(db: &Database) -> Result<(), Error> { } Ok(()) } + +#[test] +fn test_series_by_channel_01() { + let fut = async { + use crate as dbpg; + let backend = "bck-test-00"; + let channel = "chn-test-00"; + let series_by_channel_stats = Arc::new(SeriesByChannelStats::new()); + let pgconf = test_db_conf(); + { + let (pg, _pg_client_jh) = crate::conn::make_pg_client(&pgconf).await?; + crate::schema::schema_check(&pg).await.unwrap(); + pg.execute("delete from series_by_channel where facility = $1", &[&backend]) + .await?; + } + // TODO keep join handles and await later + let (channel_info_query_tx, jhs, jh) = + dbpg::seriesbychannel::start_lookup_workers(1, &pgconf, series_by_channel_stats.clone()).await?; + + let (tx, rx) = async_channel::bounded(1); + let item = dbpg::seriesbychannel::ChannelInfoQuery { + backend: backend.into(), + channel: channel.into(), + scalar_type: netpod::ScalarType::U16.to_scylla_i32(), + shape_dims: vec![64], + tx: Box::pin(tx), + }; + channel_info_query_tx.send(item).await.unwrap(); + + tokio::time::sleep(Duration::from_millis(2000)).await; + + let res = rx.recv().await.unwrap(); + debug!("received A: {res:?}"); + + let (tx, rx) = async_channel::bounded(1); + let item = dbpg::seriesbychannel::ChannelInfoQuery { + backend: backend.into(), + channel: channel.into(), + scalar_type: netpod::ScalarType::U16.to_scylla_i32(), + shape_dims: vec![64], + tx: Box::pin(tx), + }; + channel_info_query_tx.send(item).await.unwrap(); + let res = rx.recv().await.unwrap(); + debug!("received B: {res:?}"); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[cfg(test)] +fn test_db_conf() -> Database { + Database { + host: "127.0.0.1".into(), + port: 5432, + user: "daqbuffer".into(), + pass: "daqbuffer".into(), + name: "daqbuffer".into(), + } +} + +#[cfg(test)] +async fn test_db_conn() -> Result { + let db = test_db_conf(); + let (pg, pg_client_jh) = crate::conn::make_pg_client(&db).await?; + Ok(pg) +}