WIP series lookup test

This commit is contained in:
Dominik Werder
2024-02-02 19:43:41 +01:00
parent 1456bd8484
commit c6f18b2986
3 changed files with 70 additions and 26 deletions

View File

@@ -90,32 +90,6 @@ impl Daemon {
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
{
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: "amd32test".into(),
channel: "dummy-0000".into(),
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![500],
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await?;
let res = rx.recv().await?;
debug!("received A: {res:?}");
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: "amd32test".into(),
channel: "dummy-0000".into(),
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![500],
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await?;
let res = rx.recv().await?;
debug!("received B: {res:?}");
}
let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let query_item_tx_weak = query_item_tx.downgrade();

View File

@@ -4,6 +4,9 @@ version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
doctest = false
[dependencies]
log = { path = "../log" }
err = { path = "../../daqbuffer/crates/err" }

View File

@@ -775,3 +775,70 @@ async fn psql_play(db: &Database) -> Result<(), Error> {
}
Ok(())
}
#[test]
fn test_series_by_channel_01() {
let fut = async {
use crate as dbpg;
let backend = "bck-test-00";
let channel = "chn-test-00";
let series_by_channel_stats = Arc::new(SeriesByChannelStats::new());
let pgconf = test_db_conf();
{
let (pg, _pg_client_jh) = crate::conn::make_pg_client(&pgconf).await?;
crate::schema::schema_check(&pg).await.unwrap();
pg.execute("delete from series_by_channel where facility = $1", &[&backend])
.await?;
}
// TODO keep join handles and await later
let (channel_info_query_tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers(1, &pgconf, series_by_channel_stats.clone()).await?;
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel.into(),
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();
tokio::time::sleep(Duration::from_millis(2000)).await;
let res = rx.recv().await.unwrap();
debug!("received A: {res:?}");
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel.into(),
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();
let res = rx.recv().await.unwrap();
debug!("received B: {res:?}");
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}
#[cfg(test)]
fn test_db_conf() -> Database {
Database {
host: "127.0.0.1".into(),
port: 5432,
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
name: "daqbuffer".into(),
}
}
#[cfg(test)]
async fn test_db_conn() -> Result<PgClient, Error> {
let db = test_db_conf();
let (pg, pg_client_jh) = crate::conn::make_pg_client(&db).await?;
Ok(pg)
}