Generate series id part 1

This commit is contained in:
Dominik Werder
2024-02-06 11:39:35 +01:00
parent c6f18b2986
commit 9aaa3b0fd6
5 changed files with 295 additions and 159 deletions

View File

@@ -85,10 +85,11 @@ impl Daemon {
let insert_worker_stats = Arc::new(InsertWorkerStats::new());
// TODO keep join handles and await later
let (channel_info_query_tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf, series_by_channel_stats.clone())
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers::<
dbpg::seriesbychannel::SalterRandom,
>(4, &opts.pgconf, series_by_channel_stats.clone())
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
let query_item_tx_weak = query_item_tx.downgrade();

View File

@@ -75,13 +75,14 @@ create table if not exists series_by_channel (
scalar_type int not null,
shape_dims int[] not null,
agg_kind int not null,
tscreate timestamptz not null default now()
tscs timestamptz[] storage plain default array[now()]
)";
let _ = pgc.execute(sql, &[]).await;
let sql = "alter table series_by_channel add tscreate timestamptz not null default now()";
let _ = pgc.execute(sql, &[]).await;
{
let sql = "alter table series_by_channel drop tscreate";
let _ = pgc.execute(sql, &[]).await;
}
if !has_table("ioc_by_channel_log", pgc).await? {
let _ = pgc
.execute(
@@ -138,11 +139,15 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> {
.await?;
}
{
match pgc.execute("alter table series_by_channel add constraint series_by_channel_nondup unique (facility, channel, scalar_type, shape_dims, agg_kind)", &[]).await {
let sql = concat!(
"alter table series_by_channel add constraint series_by_channel_nondup",
" unique (facility, channel, scalar_type, shape_dims, agg_kind)"
);
match pgc.execute(sql, &[]).await {
Ok(_) => {
info!("constraint added");
}
Err(_)=>{}
Err(_) => {}
}
}
Ok(())
@@ -150,8 +155,8 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> {
async fn migrate_02(pgc: &PgClient) -> Result<(), Error> {
// TODO after all migrations, should check that the schema is as expected.
let sql = "alter table series_by_channel add tscs timestamptz[] default array[now()]";
let _ = pgc.execute(sql, &[]).await;
let sql = "alter table series_by_channel add if not exists tscs timestamptz[] storage plain default array[now()]";
let _ = pgc.execute(sql, &[]).await?;
Ok(())
}

View File

@@ -11,6 +11,8 @@ use futures_util::TryFutureExt;
use log::*;
use md5::Digest;
use netpod::Database;
use netpod::ScalarType;
use netpod::Shape;
use series::series::Existence;
use series::SeriesId;
use stats::SeriesByChannelStats;
@@ -49,6 +51,8 @@ pub enum Error {
ChannelError,
#[error("DbConsistencySeries({0})")]
DbConsistencySeries(String),
ScalarType,
Shape,
}
impl From<crate::err::Error> for Error {
@@ -104,7 +108,7 @@ struct ChannelInfoResult2 {
pub struct ChannelInfoResult {
pub backend: String,
pub channel: String,
pub series: Existence<SeriesId>,
pub series: SeriesId,
}
struct Worker {
@@ -342,7 +346,7 @@ impl Worker {
let item = ChannelInfoResult {
backend: r.backend,
channel: r.channel,
series: r.series,
series: r.series.into_inner(),
};
// trace3!("try to send result for {:?}", item);
let fut = r.tx.make_send(Ok(item));
@@ -370,11 +374,16 @@ struct Worker2 {
pg_client_jh: JoinHandle<Result<(), crate::err::Error>>,
}
struct FoundResult {
job: ChannelInfoQuery,
status: MatchingSeries,
}
#[derive(Debug)]
enum MatchingSeries {
DoesntExist,
UsedBefore,
Latest,
UsedBefore(SeriesId),
Latest(SeriesId),
}
impl Worker2 {
@@ -399,13 +408,28 @@ impl Worker2 {
.prepare_typed(sql, &[Type::INT4_ARRAY, Type::TEXT_ARRAY, Type::TEXT_ARRAY])
.await?;
let sql = concat!(
"with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::bigint[])",
" as inp (backend, channel, scalar_type, shape_dims, series))",
"with q1 as (",
" select * from unnest($1, $2, $3, $4, $5)",
" as inp (backend, channel, scalar_type, shape_dims, series)",
")",
" insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" select series, backend, channel, scalar_type, shape_dims::int[], 0 from q1",
" select series, backend, channel, scalar_type,",
" array(select e::int from jsonb_array_elements(shape_dims) as e) as shape_dims,",
" 0 from q1",
" on conflict do nothing"
);
let qu_insert = pg.prepare(sql).await?;
let qu_insert = pg
.prepare_typed(
sql,
&[
Type::TEXT_ARRAY,
Type::TEXT_ARRAY,
Type::INT4_ARRAY,
Type::JSONB_ARRAY,
Type::INT8_ARRAY,
],
)
.await?;
let ret = Self {
pg,
qu_select,
@@ -417,10 +441,7 @@ impl Worker2 {
Ok(ret)
}
async fn select(
&self,
batch: Vec<ChannelInfoQuery>,
) -> Result<(Vec<ChannelInfoResult2>, Vec<ChannelInfoQuery>), Error> {
async fn select(&self, batch: Vec<ChannelInfoQuery>) -> Result<Vec<FoundResult>, Error> {
let (rids, backends, channels, jobs) = batch
.into_iter()
.enumerate()
@@ -443,25 +464,20 @@ impl Worker2 {
let mut row_it = rows.into_iter();
let mut row_opt = row_it.next();
let mut acc = Vec::new();
let mut result = Vec::new();
for (&rid, job) in rids.iter().zip(jobs.into_iter()) {
loop {
break if let Some(row) = &row_opt {
if row.get::<_, i32>(0) == rid {
let series: i64 = row.get(1);
let scalar_type: i32 = row.get(2);
let shape_dims: Vec<i32> = row.get(3);
let series = SeriesId::new(row.get::<_, i64>(1) as _);
let scalar_type = ScalarType::from_scylla_i32(row.get(2)).map_err(|_| Error::ScalarType)?;
let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec<i32>>(3).as_slice())
.map_err(|_| Error::Shape)?;
let tscs: Vec<DateTime<Utc>> = row.get(4);
let series = SeriesId::new(series as _);
debug!(
"select worker found in database {:?} {:?} {:?} {:?} {:?}",
rid, series, scalar_type, shape_dims, tscs
);
// let res = ChannelInfoResult2 {
// backend: job.backend,
// channel: job.channel,
// series: Existence::Existing(series),
// tx: job.tx,
// };
acc.push((rid, series, scalar_type, shape_dims, tscs));
row_opt = row_it.next();
continue;
@@ -471,46 +487,49 @@ impl Worker2 {
debug!("check for {job:?}");
// TODO call decide with empty accumulator: will result in DoesntExist.
let v = std::mem::replace(&mut acc, Vec::new());
let x = Self::decide_matching_via_db(v);
let dec = match x {
Ok(x) => x,
Err(e) => {
error!("{e}");
return Err(e);
}
};
// TODO after decision, do something with it.
let scalar_type_job = ScalarType::from_scylla_i32(job.scalar_type).map_err(|_| Error::ScalarType)?;
let shape_job = Shape::from_scylla_shape_dims(job.shape_dims.as_slice()).map_err(|_| Error::Shape)?;
let dec = Self::decide_matching_via_db(scalar_type_job, shape_job, v)?;
debug!("decision {dec:?}");
result.push(FoundResult { job, status: dec });
}
let result = Vec::new();
let missing = Vec::new();
Ok((result, missing))
Ok(result)
}
fn decide_matching_via_db(
acc: Vec<(i32, SeriesId, i32, Vec<i32>, Vec<DateTime<Utc>>)>,
scalar_type: ScalarType,
shape: Shape,
acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec<DateTime<Utc>>)>,
) -> Result<MatchingSeries, Error> {
let a2 = acc.iter().map(|x| x.4.clone()).collect();
let a2 = acc.iter().map(|x| &x.4).collect();
Self::assert_order(a2)?;
let unfolded = Self::unfold_series_rows(acc)?;
Self::assert_varying_types(&unfolded)?;
Ok(MatchingSeries::Latest)
if let Some(last) = unfolded.last() {
if last.1 == scalar_type && last.2 == shape {
Ok(MatchingSeries::Latest(last.0.clone()))
} else {
Ok(MatchingSeries::UsedBefore(last.0.clone()))
}
} else {
Ok(MatchingSeries::DoesntExist)
}
}
fn unfold_series_rows(
acc: Vec<(i32, SeriesId, i32, Vec<i32>, Vec<DateTime<Utc>>)>,
) -> Result<Vec<(SeriesId, i32, Vec<i32>, DateTime<Utc>)>, Error> {
acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec<DateTime<Utc>>)>,
) -> Result<Vec<(SeriesId, ScalarType, Shape, DateTime<Utc>)>, Error> {
let mut ret = Vec::new();
for g in acc.iter() {
for h in g.4.iter() {
ret.push((g.1.clone(), g.2, g.3.clone(), *h));
ret.push((g.1.clone(), g.2.clone(), g.3.clone(), *h));
}
}
ret.sort_by_key(|x| x.3);
ret.sort_by(|a, b| a.cmp(b));
Ok(ret)
}
fn assert_order(v: Vec<Vec<DateTime<Utc>>>) -> Result<(), Error> {
fn assert_order(v: Vec<&Vec<DateTime<Utc>>>) -> Result<(), Error> {
for x in &v {
if x.len() > 1 {
let mut z = x[0];
@@ -527,7 +546,7 @@ impl Worker2 {
Ok(())
}
fn assert_varying_types(v: &Vec<(SeriesId, i32, Vec<i32>, DateTime<Utc>)>) -> Result<(), Error> {
fn assert_varying_types(v: &Vec<(SeriesId, ScalarType, Shape, DateTime<Utc>)>) -> Result<(), Error> {
if v.len() > 1 {
let mut z_1 = &v[0].1;
let mut z_2 = &v[0].2;
@@ -546,83 +565,98 @@ impl Worker2 {
}
}
async fn insert_missing(&self, batch: &Vec<ChannelInfoQuery>) -> Result<(), Error> {
error!("TODO insert_missing");
if true {
return Ok(());
}
async fn insert_missing<FR: HashSalter>(&self, batch: &Vec<ChannelInfoQuery>) -> Result<(), Error> {
debug!("insert_missing len {}", batch.len());
let tsbeg = Instant::now();
let mut backends = Vec::new();
let mut channels = Vec::new();
let mut scalar_types = Vec::new();
let mut shape_dimss = Vec::new();
let mut shape_dims_strs = Vec::new();
let mut hashers = Vec::new();
for e in batch.into_iter() {
{
let mut h = md5::Md5::new();
h.update(e.backend.as_bytes());
h.update(e.channel.as_bytes());
h.update(format!("{:?}", e.scalar_type).as_bytes());
h.update(format!("{:?}", e.shape_dims).as_bytes());
hashers.push(h);
}
backends.push(&e.backend);
channels.push(&e.channel);
scalar_types.push(e.scalar_type);
let mut dims = String::with_capacity(32);
dims.push('{');
for (i, &v) in e.shape_dims.iter().enumerate() {
if i > 0 {
dims.push(',');
}
use std::fmt::Write;
write!(dims, "{}", v).unwrap();
}
dims.push('}');
shape_dims_strs.push(dims);
shape_dimss.push(&e.shape_dims);
}
let mut i1 = 0;
let (backends, channels, scalar_types, shape_dimss, mut hashers) = batch
.iter()
.map(|job| {
let backend = &job.backend;
let channel = &job.channel;
let scalar_type = &job.scalar_type;
let shape = &job.shape_dims;
let hasher = {
let mut h = md5::Md5::new();
h.update(job.backend.as_bytes());
h.update(job.channel.as_bytes());
h.update(format!("{:?}", job.scalar_type).as_bytes());
h.update(format!("{:?}", job.shape_dims).as_bytes());
h
};
(backend, channel, scalar_type, shape, hasher)
})
.fold(
(Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
|mut a, x| {
a.0.push(x.0);
a.1.push(x.1);
a.2.push(x.2);
a.3.push(x.3);
a.4.push(x.4);
a
},
);
let mut total_insert_count = 0;
let mut i1: u16 = 0;
loop {
i1 += 1;
if i1 >= 200 {
debug!("i1 abort");
return Err(Error::CreateSeriesFail);
}
let mut seriess = Vec::with_capacity(hashers.len());
let mut all_good = true;
debug!("loop i1 {i1}");
let mut series_ids = Vec::with_capacity(hashers.len());
for h in &mut hashers {
let mut good = false;
for _ in 0..800 {
h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
let f = h.clone().finalize();
let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
if series >= 1000000000000000000 && series <= i64::MAX as u64 {
seriess.push(series as i64);
good = true;
break;
}
}
if !good {
all_good = false;
break;
let mut i2: u16 = 0;
loop {
break {
i2 += 1;
if i2 >= 800 {
debug!("i2 abort");
return Err(Error::CreateSeriesFail);
}
FR::hupd(&mut |buf| h.update(buf), i1, i2);
// h.update(i1.to_le_bytes());
// h.update(i2.to_le_bytes());
// h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
let f = h.clone().finalize();
let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
if series >= 1000000000000000000 && series <= i64::MAX as u64 {
debug!("push {series}");
series_ids.push(series as i64);
} else {
continue;
}
};
}
}
if !all_good {
continue;
}
self.pg
debug!("execute...");
use serde_json::Value as JsVal;
let shape_dims_jss: Vec<_> = shape_dimss
.iter()
.map(|x| {
let v = x.iter().map(|x| JsVal::Number(serde_json::Number::from(*x))).collect();
JsVal::Array(v)
})
.collect();
let cc = self
.pg
.execute(
&self.qu_insert,
&[&backends, &channels, &scalar_types, &shape_dims_strs, &seriess],
&[&backends, &channels, &scalar_types, &shape_dims_jss, &series_ids],
)
.await?;
break;
.await
.unwrap();
debug!("cc {} channels {}", cc, channels.len());
total_insert_count += cc;
if total_insert_count == channels.len() as u64 {
break;
}
}
Ok(())
}
async fn work(&mut self) -> Result<(), Error> {
async fn work<FR: HashSalter>(&mut self) -> Result<(), Error> {
let batch_rx = &self.batch_rx;
while let Ok(batch) = batch_rx.recv().await {
self.stats.recv_batch().inc();
@@ -636,27 +670,58 @@ impl Worker2 {
x.shape_dims
);
}
let (res1, missing) = self.select(batch).await?;
let res3 = if missing.len() > 0 {
trace2!("missing {}", missing.len());
for x in &missing {
trace2!("insert missing {x:?}");
let batch_len = batch.len();
let res1 = self.select(batch).await?;
let mut latest = Vec::new();
let mut used_before = Vec::new();
let mut doesnt_exist = Vec::new();
for x in res1 {
match x.status {
MatchingSeries::DoesntExist => doesnt_exist.push(x),
MatchingSeries::UsedBefore(..) => used_before.push(x),
MatchingSeries::Latest(sid) => latest.push((x.job, sid)),
}
let missing_count = missing.len();
self.insert_missing(&missing).await?;
let (res2, missing2) = self.select(missing).await?;
if missing2.len() > 0 {
for x in &missing2 {
warn!("series ids still missing after insert {}", x.channel);
}
if doesnt_exist.len() != 0 {
let batch2 = doesnt_exist.into_iter().map(|x| x.job).collect();
self.insert_missing::<FR>(&batch2).await?;
let res2 = self.select(batch2).await?;
for x in res2 {
match x.status {
MatchingSeries::DoesntExist => {
error!("series ids still missing after insert {:?}", x.job);
return Err(Error::SeriesMissing);
}
MatchingSeries::UsedBefore(..) => {
error!("series ids still missing after insert {:?}", x.job);
return Err(Error::SeriesMissing);
}
MatchingSeries::Latest(sid) => latest.push((x.job, sid)),
}
Err(Error::SeriesMissing)
} else {
trace2!("select missing after insert {} of {}", missing_count, res2.len());
Ok((res1, res2))
}
} else {
Ok((res1, Vec::new()))
}?;
};
if used_before.len() != 0 {
// TODO update the used before, then select again
};
if latest.len() != batch_len {
error!("can not register all series");
return Err(Error::SeriesMissing);
}
for e in latest {
let item = ChannelInfoResult {
backend: e.0.backend,
channel: e.0.channel,
series: e.1.clone(),
};
let item = Ok(item);
match e.0.tx.make_send(item).await {
Ok(()) => {}
Err(_) => {
self.stats.res_tx_fail.inc();
}
};
}
#[cfg(DISABLED)]
for r in res3.0.into_iter().chain(res3.1.into_iter()) {
let item = ChannelInfoResult {
backend: r.backend,
@@ -680,7 +745,11 @@ impl Worker2 {
}
}
pub async fn start_lookup_workers(
pub trait HashSalter {
fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16);
}
pub async fn start_lookup_workers<FR: HashSalter>(
worker_count: usize,
db: &Database,
stats: Arc<SeriesByChannelStats>,
@@ -700,12 +769,31 @@ pub async fn start_lookup_workers(
let mut jhs = Vec::new();
for _ in 0..worker_count {
let mut worker = Worker2::new(db, batch_rx.clone(), stats.clone()).await?;
let jh = tokio::task::spawn(async move { worker.work().await });
let jh = tokio::task::spawn(async move { worker.work::<FR>().await });
jhs.push(jh);
}
Ok((query_tx, jhs, bjh))
}
struct SalterTest;
impl HashSalter for SalterTest {
fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16) {
hupd(&i1.to_le_bytes());
hupd(&i2.to_le_bytes());
}
}
pub struct SalterRandom;
impl HashSalter for SalterRandom {
fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16) {
let tsnow = Instant::now();
let b = unsafe { &*(&tsnow as *const Instant as *const [u8; core::mem::size_of::<Instant>()]) };
hupd(b)
}
}
async fn psql_play(db: &Database) -> Result<(), Error> {
use tokio_postgres::types::ToSql;
use tokio_postgres::types::Type;
@@ -750,7 +838,7 @@ async fn psql_play(db: &Database) -> Result<(), Error> {
debug!("{:?} {:?}", ca, cb);
}
}
if false {
if true {
let sql = concat!(
"with q1 as (",
"select col1 from unnest($1) as inp (col1)",
@@ -782,31 +870,61 @@ fn test_series_by_channel_01() {
use crate as dbpg;
let backend = "bck-test-00";
let channel = "chn-test-00";
let channel_01 = "chn-test-01";
let series_by_channel_stats = Arc::new(SeriesByChannelStats::new());
let pgconf = test_db_conf();
if false {
psql_play(&pgconf).await?;
return Ok(());
}
{
let (pg, _pg_client_jh) = crate::conn::make_pg_client(&pgconf).await?;
crate::schema::schema_check(&pg).await.unwrap();
pg.execute("delete from series_by_channel where facility = $1", &[&backend])
.await?;
// Block the id
let id1: i64 = 5609172854884670524;
let sql = concat!(
"insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" values ($1, $2, $3, $4, array[]::int4[], 0)"
);
pg.execute(sql, &[&id1, &backend, &"test-block-00", &1i32]).await?;
}
// TODO keep join handles and await later
let (channel_info_query_tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers(1, &pgconf, series_by_channel_stats.clone()).await?;
dbpg::seriesbychannel::start_lookup_workers::<SalterTest>(1, &pgconf, series_by_channel_stats.clone())
.await?;
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel.into(),
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
let rx0 = {
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel.into(),
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();
rx
};
channel_info_query_tx.send(item).await.unwrap();
let rx1 = {
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel_01.into(),
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();
rx
};
// tokio::time::sleep(Duration::from_millis(2000)).await;
tokio::time::sleep(Duration::from_millis(2000)).await;
let res = rx.recv().await.unwrap();
let res = rx0.recv().await.unwrap();
debug!("received A: {res:?}");
let res = rx1.recv().await.unwrap();
debug!("received A: {res:?}");
let (tx, rx) = async_channel::bounded(1);
@@ -827,12 +945,22 @@ fn test_series_by_channel_01() {
#[cfg(test)]
fn test_db_conf() -> Database {
Database {
host: "127.0.0.1".into(),
port: 5432,
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
name: "daqbuffer".into(),
if false {
Database {
host: "127.0.0.1".into(),
port: 5432,
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
name: "daqbuffer".into(),
}
} else {
Database {
host: "sf-scylladb-amd32-01.psi.ch".into(),
port: 5173,
user: "test_daqbuffer".into(),
pass: "test_daqbuffer".into(),
name: "test_daqbuffer".into(),
}
}
}

View File

@@ -566,7 +566,7 @@ impl CaConnSet {
}
match res {
Ok(res) => {
let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id());
let cssid = ChannelStatusSeriesId::new(res.series.id());
self.channel_by_cssid
.insert(cssid.clone(), Channel::new(res.channel.clone()));
let add = ChannelAddWithStatusId {

View File

@@ -87,7 +87,7 @@ impl SeriesWriter {
};
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id());
let cssid = ChannelStatusSeriesId::new(res.series.id());
Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, tsnow).await
}
@@ -110,7 +110,7 @@ impl SeriesWriter {
};
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series.into_inner();
let sid = res.series;
let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10));
binner.setup_for(&scalar_type, &shape, tsnow)?;
let res = Self {
@@ -342,7 +342,9 @@ fn write_00() {
let scy = scywr::session::create_session(scyconf).await?;
let stats = SeriesByChannelStats::new();
let stats = Arc::new(stats);
let (tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(1, dbconf, stats).await?;
let (tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers::<dbpg::seriesbychannel::SalterRandom>(1, dbconf, stats)
.await?;
let backend = "bck-test-00";
let channel = "chn-test-00";
let scalar_type = ScalarType::I16;