diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index a492898..4692251 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -85,10 +85,11 @@ impl Daemon { let insert_worker_stats = Arc::new(InsertWorkerStats::new()); // TODO keep join handles and await later - let (channel_info_query_tx, jhs, jh) = - dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf, series_by_channel_stats.clone()) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers::< + dbpg::seriesbychannel::SalterRandom, + >(4, &opts.pgconf, series_by_channel_stats.clone()) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let query_item_tx_weak = query_item_tx.downgrade(); diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index 0794102..66d2db4 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -75,13 +75,14 @@ create table if not exists series_by_channel ( scalar_type int not null, shape_dims int[] not null, agg_kind int not null, - tscreate timestamptz not null default now() + tscs timestamptz[] storage plain default array[now()] )"; let _ = pgc.execute(sql, &[]).await; - let sql = "alter table series_by_channel add tscreate timestamptz not null default now()"; - let _ = pgc.execute(sql, &[]).await; - + { + let sql = "alter table series_by_channel drop tscreate"; + let _ = pgc.execute(sql, &[]).await; + } if !has_table("ioc_by_channel_log", pgc).await? { let _ = pgc .execute( @@ -138,11 +139,15 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> { .await?; } { - match pgc.execute("alter table series_by_channel add constraint series_by_channel_nondup unique (facility, channel, scalar_type, shape_dims, agg_kind)", &[]).await { + let sql = concat!( + "alter table series_by_channel add constraint series_by_channel_nondup", + " unique (facility, channel, scalar_type, shape_dims, agg_kind)" + ); + match pgc.execute(sql, &[]).await { Ok(_) => { info!("constraint added"); } - Err(_)=>{} + Err(_) => {} } } Ok(()) @@ -150,8 +155,8 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> { async fn migrate_02(pgc: &PgClient) -> Result<(), Error> { // TODO after all migrations, should check that the schema is as expected. - let sql = "alter table series_by_channel add tscs timestamptz[] default array[now()]"; - let _ = pgc.execute(sql, &[]).await; + let sql = "alter table series_by_channel add if not exists tscs timestamptz[] storage plain default array[now()]"; + let _ = pgc.execute(sql, &[]).await?; Ok(()) } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 57e48c4..d19e527 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -11,6 +11,8 @@ use futures_util::TryFutureExt; use log::*; use md5::Digest; use netpod::Database; +use netpod::ScalarType; +use netpod::Shape; use series::series::Existence; use series::SeriesId; use stats::SeriesByChannelStats; @@ -49,6 +51,8 @@ pub enum Error { ChannelError, #[error("DbConsistencySeries({0})")] DbConsistencySeries(String), + ScalarType, + Shape, } impl From for Error { @@ -104,7 +108,7 @@ struct ChannelInfoResult2 { pub struct ChannelInfoResult { pub backend: String, pub channel: String, - pub series: Existence, + pub series: SeriesId, } struct Worker { @@ -342,7 +346,7 @@ impl Worker { let item = ChannelInfoResult { backend: r.backend, channel: r.channel, - series: r.series, + series: r.series.into_inner(), }; // trace3!("try to send result for {:?}", item); let fut = r.tx.make_send(Ok(item)); @@ -370,11 +374,16 @@ struct Worker2 { pg_client_jh: JoinHandle>, } +struct FoundResult { + job: ChannelInfoQuery, + status: MatchingSeries, +} + #[derive(Debug)] enum MatchingSeries { DoesntExist, - UsedBefore, - Latest, + UsedBefore(SeriesId), + Latest(SeriesId), } impl Worker2 { @@ -399,13 +408,28 @@ impl Worker2 { .prepare_typed(sql, &[Type::INT4_ARRAY, Type::TEXT_ARRAY, Type::TEXT_ARRAY]) .await?; let sql = concat!( - "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::bigint[])", - " as inp (backend, channel, scalar_type, shape_dims, series))", + "with q1 as (", + " select * from unnest($1, $2, $3, $4, $5)", + " as inp (backend, channel, scalar_type, shape_dims, series)", + ")", " insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " select series, backend, channel, scalar_type, shape_dims::int[], 0 from q1", + " select series, backend, channel, scalar_type,", + " array(select e::int from jsonb_array_elements(shape_dims) as e) as shape_dims,", + " 0 from q1", " on conflict do nothing" ); - let qu_insert = pg.prepare(sql).await?; + let qu_insert = pg + .prepare_typed( + sql, + &[ + Type::TEXT_ARRAY, + Type::TEXT_ARRAY, + Type::INT4_ARRAY, + Type::JSONB_ARRAY, + Type::INT8_ARRAY, + ], + ) + .await?; let ret = Self { pg, qu_select, @@ -417,10 +441,7 @@ impl Worker2 { Ok(ret) } - async fn select( - &self, - batch: Vec, - ) -> Result<(Vec, Vec), Error> { + async fn select(&self, batch: Vec) -> Result, Error> { let (rids, backends, channels, jobs) = batch .into_iter() .enumerate() @@ -443,25 +464,20 @@ impl Worker2 { let mut row_it = rows.into_iter(); let mut row_opt = row_it.next(); let mut acc = Vec::new(); + let mut result = Vec::new(); for (&rid, job) in rids.iter().zip(jobs.into_iter()) { loop { break if let Some(row) = &row_opt { if row.get::<_, i32>(0) == rid { - let series: i64 = row.get(1); - let scalar_type: i32 = row.get(2); - let shape_dims: Vec = row.get(3); + let series = SeriesId::new(row.get::<_, i64>(1) as _); + let scalar_type = ScalarType::from_scylla_i32(row.get(2)).map_err(|_| Error::ScalarType)?; + let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec>(3).as_slice()) + .map_err(|_| Error::Shape)?; let tscs: Vec> = row.get(4); - let series = SeriesId::new(series as _); debug!( "select worker found in database {:?} {:?} {:?} {:?} {:?}", rid, series, scalar_type, shape_dims, tscs ); - // let res = ChannelInfoResult2 { - // backend: job.backend, - // channel: job.channel, - // series: Existence::Existing(series), - // tx: job.tx, - // }; acc.push((rid, series, scalar_type, shape_dims, tscs)); row_opt = row_it.next(); continue; @@ -471,46 +487,49 @@ impl Worker2 { debug!("check for {job:?}"); // TODO call decide with empty accumulator: will result in DoesntExist. let v = std::mem::replace(&mut acc, Vec::new()); - let x = Self::decide_matching_via_db(v); - let dec = match x { - Ok(x) => x, - Err(e) => { - error!("{e}"); - return Err(e); - } - }; - // TODO after decision, do something with it. + let scalar_type_job = ScalarType::from_scylla_i32(job.scalar_type).map_err(|_| Error::ScalarType)?; + let shape_job = Shape::from_scylla_shape_dims(job.shape_dims.as_slice()).map_err(|_| Error::Shape)?; + let dec = Self::decide_matching_via_db(scalar_type_job, shape_job, v)?; debug!("decision {dec:?}"); + result.push(FoundResult { job, status: dec }); } - let result = Vec::new(); - let missing = Vec::new(); - Ok((result, missing)) + Ok(result) } fn decide_matching_via_db( - acc: Vec<(i32, SeriesId, i32, Vec, Vec>)>, + scalar_type: ScalarType, + shape: Shape, + acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec>)>, ) -> Result { - let a2 = acc.iter().map(|x| x.4.clone()).collect(); + let a2 = acc.iter().map(|x| &x.4).collect(); Self::assert_order(a2)?; let unfolded = Self::unfold_series_rows(acc)?; Self::assert_varying_types(&unfolded)?; - Ok(MatchingSeries::Latest) + if let Some(last) = unfolded.last() { + if last.1 == scalar_type && last.2 == shape { + Ok(MatchingSeries::Latest(last.0.clone())) + } else { + Ok(MatchingSeries::UsedBefore(last.0.clone())) + } + } else { + Ok(MatchingSeries::DoesntExist) + } } fn unfold_series_rows( - acc: Vec<(i32, SeriesId, i32, Vec, Vec>)>, - ) -> Result, DateTime)>, Error> { + acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec>)>, + ) -> Result)>, Error> { let mut ret = Vec::new(); for g in acc.iter() { for h in g.4.iter() { - ret.push((g.1.clone(), g.2, g.3.clone(), *h)); + ret.push((g.1.clone(), g.2.clone(), g.3.clone(), *h)); } } - ret.sort_by_key(|x| x.3); + ret.sort_by(|a, b| a.cmp(b)); Ok(ret) } - fn assert_order(v: Vec>>) -> Result<(), Error> { + fn assert_order(v: Vec<&Vec>>) -> Result<(), Error> { for x in &v { if x.len() > 1 { let mut z = x[0]; @@ -527,7 +546,7 @@ impl Worker2 { Ok(()) } - fn assert_varying_types(v: &Vec<(SeriesId, i32, Vec, DateTime)>) -> Result<(), Error> { + fn assert_varying_types(v: &Vec<(SeriesId, ScalarType, Shape, DateTime)>) -> Result<(), Error> { if v.len() > 1 { let mut z_1 = &v[0].1; let mut z_2 = &v[0].2; @@ -546,83 +565,98 @@ impl Worker2 { } } - async fn insert_missing(&self, batch: &Vec) -> Result<(), Error> { - error!("TODO insert_missing"); - if true { - return Ok(()); - } + async fn insert_missing(&self, batch: &Vec) -> Result<(), Error> { + debug!("insert_missing len {}", batch.len()); let tsbeg = Instant::now(); - let mut backends = Vec::new(); - let mut channels = Vec::new(); - let mut scalar_types = Vec::new(); - let mut shape_dimss = Vec::new(); - let mut shape_dims_strs = Vec::new(); - let mut hashers = Vec::new(); - for e in batch.into_iter() { - { - let mut h = md5::Md5::new(); - h.update(e.backend.as_bytes()); - h.update(e.channel.as_bytes()); - h.update(format!("{:?}", e.scalar_type).as_bytes()); - h.update(format!("{:?}", e.shape_dims).as_bytes()); - hashers.push(h); - } - backends.push(&e.backend); - channels.push(&e.channel); - scalar_types.push(e.scalar_type); - let mut dims = String::with_capacity(32); - dims.push('{'); - for (i, &v) in e.shape_dims.iter().enumerate() { - if i > 0 { - dims.push(','); - } - use std::fmt::Write; - write!(dims, "{}", v).unwrap(); - } - dims.push('}'); - shape_dims_strs.push(dims); - shape_dimss.push(&e.shape_dims); - } - let mut i1 = 0; + let (backends, channels, scalar_types, shape_dimss, mut hashers) = batch + .iter() + .map(|job| { + let backend = &job.backend; + let channel = &job.channel; + let scalar_type = &job.scalar_type; + let shape = &job.shape_dims; + let hasher = { + let mut h = md5::Md5::new(); + h.update(job.backend.as_bytes()); + h.update(job.channel.as_bytes()); + h.update(format!("{:?}", job.scalar_type).as_bytes()); + h.update(format!("{:?}", job.shape_dims).as_bytes()); + h + }; + (backend, channel, scalar_type, shape, hasher) + }) + .fold( + (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()), + |mut a, x| { + a.0.push(x.0); + a.1.push(x.1); + a.2.push(x.2); + a.3.push(x.3); + a.4.push(x.4); + a + }, + ); + let mut total_insert_count = 0; + let mut i1: u16 = 0; loop { i1 += 1; if i1 >= 200 { + debug!("i1 abort"); return Err(Error::CreateSeriesFail); } - let mut seriess = Vec::with_capacity(hashers.len()); - let mut all_good = true; + debug!("loop i1 {i1}"); + let mut series_ids = Vec::with_capacity(hashers.len()); for h in &mut hashers { - let mut good = false; - for _ in 0..800 { - h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); - let f = h.clone().finalize(); - let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - if series >= 1000000000000000000 && series <= i64::MAX as u64 { - seriess.push(series as i64); - good = true; - break; - } - } - if !good { - all_good = false; - break; + let mut i2: u16 = 0; + loop { + break { + i2 += 1; + if i2 >= 800 { + debug!("i2 abort"); + return Err(Error::CreateSeriesFail); + } + FR::hupd(&mut |buf| h.update(buf), i1, i2); + // h.update(i1.to_le_bytes()); + // h.update(i2.to_le_bytes()); + // h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); + let f = h.clone().finalize(); + let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + if series >= 1000000000000000000 && series <= i64::MAX as u64 { + debug!("push {series}"); + series_ids.push(series as i64); + } else { + continue; + } + }; } } - if !all_good { - continue; - } - self.pg + debug!("execute..."); + use serde_json::Value as JsVal; + let shape_dims_jss: Vec<_> = shape_dimss + .iter() + .map(|x| { + let v = x.iter().map(|x| JsVal::Number(serde_json::Number::from(*x))).collect(); + JsVal::Array(v) + }) + .collect(); + let cc = self + .pg .execute( &self.qu_insert, - &[&backends, &channels, &scalar_types, &shape_dims_strs, &seriess], + &[&backends, &channels, &scalar_types, &shape_dims_jss, &series_ids], ) - .await?; - break; + .await + .unwrap(); + debug!("cc {} channels {}", cc, channels.len()); + total_insert_count += cc; + if total_insert_count == channels.len() as u64 { + break; + } } Ok(()) } - async fn work(&mut self) -> Result<(), Error> { + async fn work(&mut self) -> Result<(), Error> { let batch_rx = &self.batch_rx; while let Ok(batch) = batch_rx.recv().await { self.stats.recv_batch().inc(); @@ -636,27 +670,58 @@ impl Worker2 { x.shape_dims ); } - let (res1, missing) = self.select(batch).await?; - let res3 = if missing.len() > 0 { - trace2!("missing {}", missing.len()); - for x in &missing { - trace2!("insert missing {x:?}"); + let batch_len = batch.len(); + let res1 = self.select(batch).await?; + let mut latest = Vec::new(); + let mut used_before = Vec::new(); + let mut doesnt_exist = Vec::new(); + for x in res1 { + match x.status { + MatchingSeries::DoesntExist => doesnt_exist.push(x), + MatchingSeries::UsedBefore(..) => used_before.push(x), + MatchingSeries::Latest(sid) => latest.push((x.job, sid)), } - let missing_count = missing.len(); - self.insert_missing(&missing).await?; - let (res2, missing2) = self.select(missing).await?; - if missing2.len() > 0 { - for x in &missing2 { - warn!("series ids still missing after insert {}", x.channel); + } + if doesnt_exist.len() != 0 { + let batch2 = doesnt_exist.into_iter().map(|x| x.job).collect(); + self.insert_missing::(&batch2).await?; + let res2 = self.select(batch2).await?; + for x in res2 { + match x.status { + MatchingSeries::DoesntExist => { + error!("series ids still missing after insert {:?}", x.job); + return Err(Error::SeriesMissing); + } + MatchingSeries::UsedBefore(..) => { + error!("series ids still missing after insert {:?}", x.job); + return Err(Error::SeriesMissing); + } + MatchingSeries::Latest(sid) => latest.push((x.job, sid)), } - Err(Error::SeriesMissing) - } else { - trace2!("select missing after insert {} of {}", missing_count, res2.len()); - Ok((res1, res2)) } - } else { - Ok((res1, Vec::new())) - }?; + }; + if used_before.len() != 0 { + // TODO update the used before, then select again + }; + if latest.len() != batch_len { + error!("can not register all series"); + return Err(Error::SeriesMissing); + } + for e in latest { + let item = ChannelInfoResult { + backend: e.0.backend, + channel: e.0.channel, + series: e.1.clone(), + }; + let item = Ok(item); + match e.0.tx.make_send(item).await { + Ok(()) => {} + Err(_) => { + self.stats.res_tx_fail.inc(); + } + }; + } + #[cfg(DISABLED)] for r in res3.0.into_iter().chain(res3.1.into_iter()) { let item = ChannelInfoResult { backend: r.backend, @@ -680,7 +745,11 @@ impl Worker2 { } } -pub async fn start_lookup_workers( +pub trait HashSalter { + fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16); +} + +pub async fn start_lookup_workers( worker_count: usize, db: &Database, stats: Arc, @@ -700,12 +769,31 @@ pub async fn start_lookup_workers( let mut jhs = Vec::new(); for _ in 0..worker_count { let mut worker = Worker2::new(db, batch_rx.clone(), stats.clone()).await?; - let jh = tokio::task::spawn(async move { worker.work().await }); + let jh = tokio::task::spawn(async move { worker.work::().await }); jhs.push(jh); } Ok((query_tx, jhs, bjh)) } +struct SalterTest; + +impl HashSalter for SalterTest { + fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16) { + hupd(&i1.to_le_bytes()); + hupd(&i2.to_le_bytes()); + } +} + +pub struct SalterRandom; + +impl HashSalter for SalterRandom { + fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16) { + let tsnow = Instant::now(); + let b = unsafe { &*(&tsnow as *const Instant as *const [u8; core::mem::size_of::()]) }; + hupd(b) + } +} + async fn psql_play(db: &Database) -> Result<(), Error> { use tokio_postgres::types::ToSql; use tokio_postgres::types::Type; @@ -750,7 +838,7 @@ async fn psql_play(db: &Database) -> Result<(), Error> { debug!("{:?} {:?}", ca, cb); } } - if false { + if true { let sql = concat!( "with q1 as (", "select col1 from unnest($1) as inp (col1)", @@ -782,31 +870,61 @@ fn test_series_by_channel_01() { use crate as dbpg; let backend = "bck-test-00"; let channel = "chn-test-00"; + let channel_01 = "chn-test-01"; let series_by_channel_stats = Arc::new(SeriesByChannelStats::new()); let pgconf = test_db_conf(); + if false { + psql_play(&pgconf).await?; + return Ok(()); + } { let (pg, _pg_client_jh) = crate::conn::make_pg_client(&pgconf).await?; crate::schema::schema_check(&pg).await.unwrap(); pg.execute("delete from series_by_channel where facility = $1", &[&backend]) .await?; + + // Block the id + let id1: i64 = 5609172854884670524; + let sql = concat!( + "insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", + " values ($1, $2, $3, $4, array[]::int4[], 0)" + ); + pg.execute(sql, &[&id1, &backend, &"test-block-00", &1i32]).await?; } // TODO keep join handles and await later let (channel_info_query_tx, jhs, jh) = - dbpg::seriesbychannel::start_lookup_workers(1, &pgconf, series_by_channel_stats.clone()).await?; + dbpg::seriesbychannel::start_lookup_workers::(1, &pgconf, series_by_channel_stats.clone()) + .await?; - let (tx, rx) = async_channel::bounded(1); - let item = dbpg::seriesbychannel::ChannelInfoQuery { - backend: backend.into(), - channel: channel.into(), - scalar_type: netpod::ScalarType::U16.to_scylla_i32(), - shape_dims: vec![64], - tx: Box::pin(tx), + let rx0 = { + let (tx, rx) = async_channel::bounded(1); + let item = dbpg::seriesbychannel::ChannelInfoQuery { + backend: backend.into(), + channel: channel.into(), + scalar_type: netpod::ScalarType::U16.to_scylla_i32(), + shape_dims: vec![64], + tx: Box::pin(tx), + }; + channel_info_query_tx.send(item).await.unwrap(); + rx }; - channel_info_query_tx.send(item).await.unwrap(); + let rx1 = { + let (tx, rx) = async_channel::bounded(1); + let item = dbpg::seriesbychannel::ChannelInfoQuery { + backend: backend.into(), + channel: channel_01.into(), + scalar_type: netpod::ScalarType::U16.to_scylla_i32(), + shape_dims: vec![64], + tx: Box::pin(tx), + }; + channel_info_query_tx.send(item).await.unwrap(); + rx + }; + // tokio::time::sleep(Duration::from_millis(2000)).await; - tokio::time::sleep(Duration::from_millis(2000)).await; - - let res = rx.recv().await.unwrap(); + let res = rx0.recv().await.unwrap(); + debug!("received A: {res:?}"); + let res = rx1.recv().await.unwrap(); debug!("received A: {res:?}"); let (tx, rx) = async_channel::bounded(1); @@ -827,12 +945,22 @@ fn test_series_by_channel_01() { #[cfg(test)] fn test_db_conf() -> Database { - Database { - host: "127.0.0.1".into(), - port: 5432, - user: "daqbuffer".into(), - pass: "daqbuffer".into(), - name: "daqbuffer".into(), + if false { + Database { + host: "127.0.0.1".into(), + port: 5432, + user: "daqbuffer".into(), + pass: "daqbuffer".into(), + name: "daqbuffer".into(), + } + } else { + Database { + host: "sf-scylladb-amd32-01.psi.ch".into(), + port: 5173, + user: "test_daqbuffer".into(), + pass: "test_daqbuffer".into(), + name: "test_daqbuffer".into(), + } } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index b8744e0..ec81ba7 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -566,7 +566,7 @@ impl CaConnSet { } match res { Ok(res) => { - let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id()); + let cssid = ChannelStatusSeriesId::new(res.series.id()); self.channel_by_cssid .insert(cssid.clone(), Channel::new(res.channel.clone())); let add = ChannelAddWithStatusId { diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 5e8e81a..2552cec 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -87,7 +87,7 @@ impl SeriesWriter { }; worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; - let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id()); + let cssid = ChannelStatusSeriesId::new(res.series.id()); Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, tsnow).await } @@ -110,7 +110,7 @@ impl SeriesWriter { }; worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; - let sid = res.series.into_inner(); + let sid = res.series; let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10)); binner.setup_for(&scalar_type, &shape, tsnow)?; let res = Self { @@ -342,7 +342,9 @@ fn write_00() { let scy = scywr::session::create_session(scyconf).await?; let stats = SeriesByChannelStats::new(); let stats = Arc::new(stats); - let (tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(1, dbconf, stats).await?; + let (tx, jhs, jh) = + dbpg::seriesbychannel::start_lookup_workers::(1, dbconf, stats) + .await?; let backend = "bck-test-00"; let channel = "chn-test-00"; let scalar_type = ScalarType::I16;