Update series if used before

This commit is contained in:
Dominik Werder
2024-02-06 16:31:20 +01:00
parent 9aaa3b0fd6
commit f5ca38caaf
4 changed files with 245 additions and 390 deletions

View File

@@ -73,7 +73,7 @@ create table if not exists series_by_channel (
facility text not null,
channel text not null,
scalar_type int not null,
shape_dims int[] not null,
shape_dims int[] storage plain not null,
agg_kind int not null,
tscs timestamptz[] storage plain default array[now()]
)";
@@ -157,6 +157,8 @@ async fn migrate_02(pgc: &PgClient) -> Result<(), Error> {
// TODO after all migrations, should check that the schema is as expected.
let sql = "alter table series_by_channel add if not exists tscs timestamptz[] storage plain default array[now()]";
let _ = pgc.execute(sql, &[]).await?;
let sql = "alter table series_by_channel alter tscs set storage plain";
let _ = pgc.execute(sql, &[]).await?;
Ok(())
}

View File

@@ -97,281 +97,11 @@ impl fmt::Debug for ChannelInfoQuery {
}
}
struct ChannelInfoResult2 {
backend: String,
channel: String,
series: Existence<SeriesId>,
tx: Pin<Box<dyn CanSendChannelInfoResult + Send>>,
}
#[derive(Debug)]
pub struct ChannelInfoResult {
pub backend: String,
pub channel: String,
pub series: SeriesId,
}
struct Worker {
pg: PgClient,
qu_select: PgStatement,
qu_insert: PgStatement,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
pg_client_jh: JoinHandle<Result<(), crate::err::Error>>,
}
impl Worker {
async fn new(
db: &Database,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
) -> Result<Self, Error> {
let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?;
let sql = concat!(
"with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])",
" as inp (backend, channel, scalar_type, shape_dims, rid))",
" select t.series, q1.rid, t.channel from series_by_channel t",
" join q1 on t.facility = q1.backend and t.channel = q1.channel",
" and t.scalar_type = q1.scalar_type and t.shape_dims = q1.shape_dims::int[]",
" and t.agg_kind = 0",
" order by q1.rid",
);
let qu_select = pg.prepare(sql).await?;
let sql = concat!(
"with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::bigint[])",
" as inp (backend, channel, scalar_type, shape_dims, series))",
" insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" select series, backend, channel, scalar_type, shape_dims::int[], 0 from q1",
" on conflict do nothing"
);
let qu_insert = pg.prepare(sql).await?;
let ret = Self {
pg,
qu_select,
qu_insert,
batch_rx,
stats,
pg_client_jh,
};
Ok(ret)
}
async fn select(
&self,
batch: Vec<ChannelInfoQuery>,
) -> Result<(Vec<ChannelInfoResult2>, Vec<ChannelInfoQuery>), Error> {
let mut backend = Vec::new();
let mut channel = Vec::new();
let mut scalar_type = Vec::new();
let mut shape_dims = Vec::new();
let mut shape_dims_str = Vec::new();
let mut rid = Vec::new();
let mut tx = Vec::new();
for (i, e) in batch.into_iter().enumerate() {
backend.push(e.backend);
channel.push(e.channel);
scalar_type.push(e.scalar_type);
let mut dims = String::with_capacity(32);
dims.push('{');
for (i, &v) in e.shape_dims.iter().enumerate() {
if i > 0 {
dims.push(',');
}
use std::fmt::Write;
write!(dims, "{}", v).unwrap();
}
dims.push('}');
shape_dims_str.push(dims);
shape_dims.push(e.shape_dims);
rid.push(i as i32);
tx.push((i as u32, e.tx));
}
let rows = self
.pg
.query(
&self.qu_select,
&[&backend, &channel, &scalar_type, &shape_dims_str, &rid],
)
.await?;
let mut result = Vec::new();
let mut missing = Vec::new();
let mut it1 = rows.into_iter();
let mut e1 = it1.next();
for (qrid, tx) in tx {
let i = qrid as usize;
let found = if let Some(row) = &e1 {
let rid: i32 = row.get(1);
if rid as u32 == qrid {
let series: i64 = row.get(0);
let ch2: String = row.get(2);
let series = SeriesId::new(series as _);
let res = ChannelInfoResult2 {
// TODO take from database query. Needs test.
backend: backend[0].clone(),
channel: ch2,
series: Existence::Existing(series),
tx,
};
result.push(res);
e1 = it1.next();
None
} else {
Some(tx)
}
} else {
Some(tx)
};
if let Some(tx) = found {
let k = ChannelInfoQuery {
backend: backend[i].clone(),
channel: channel[i].clone(),
scalar_type: scalar_type[i].clone(),
shape_dims: shape_dims[i].clone(),
tx,
};
missing.push(k);
}
}
Ok((result, missing))
}
async fn insert_missing(&self, batch: &Vec<ChannelInfoQuery>) -> Result<(), Error> {
let tsbeg = Instant::now();
let mut backends = Vec::new();
let mut channels = Vec::new();
let mut scalar_types = Vec::new();
let mut shape_dimss = Vec::new();
let mut shape_dims_strs = Vec::new();
let mut hashers = Vec::new();
for e in batch.into_iter() {
{
let mut h = md5::Md5::new();
h.update(e.backend.as_bytes());
h.update(e.channel.as_bytes());
h.update(format!("{:?}", e.scalar_type).as_bytes());
h.update(format!("{:?}", e.shape_dims).as_bytes());
hashers.push(h);
}
backends.push(&e.backend);
channels.push(&e.channel);
scalar_types.push(e.scalar_type);
let mut dims = String::with_capacity(32);
dims.push('{');
for (i, &v) in e.shape_dims.iter().enumerate() {
if i > 0 {
dims.push(',');
}
use std::fmt::Write;
write!(dims, "{}", v).unwrap();
}
dims.push('}');
shape_dims_strs.push(dims);
shape_dimss.push(&e.shape_dims);
}
let mut i1 = 0;
loop {
i1 += 1;
if i1 >= 200 {
return Err(Error::CreateSeriesFail);
}
let mut seriess = Vec::with_capacity(hashers.len());
let mut all_good = true;
for h in &mut hashers {
let mut good = false;
for _ in 0..800 {
h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
let f = h.clone().finalize();
let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
if series >= 1000000000000000000 && series <= i64::MAX as u64 {
seriess.push(series as i64);
good = true;
break;
}
}
if !good {
all_good = false;
break;
}
}
if !all_good {
continue;
}
self.pg
.execute(
&self.qu_insert,
&[&backends, &channels, &scalar_types, &shape_dims_strs, &seriess],
)
.await?;
break;
}
Ok(())
}
async fn work(&mut self) -> Result<(), Error> {
let batch_rx = &self.batch_rx;
while let Ok(batch) = batch_rx.recv().await {
self.stats.recv_batch().inc();
self.stats.recv_items().add(batch.len() as _);
for x in &batch {
trace3!(
"search for {} {} {:?} {:?}",
x.backend,
x.channel,
x.scalar_type,
x.shape_dims
);
}
let (res1, missing) = self.select(batch).await?;
let res3 = if missing.len() > 0 {
trace2!("missing {}", missing.len());
for x in &missing {
trace2!("insert missing {x:?}");
}
let missing_count = missing.len();
self.insert_missing(&missing).await?;
let (res2, missing2) = self.select(missing).await?;
if missing2.len() > 0 {
for x in &missing2 {
warn!("series ids still missing after insert {}", x.channel);
}
Err(Error::SeriesMissing)
} else {
trace2!("select missing after insert {} of {}", missing_count, res2.len());
Ok((res1, res2))
}
} else {
Ok((res1, Vec::new()))
}?;
for r in res3.0.into_iter().chain(res3.1.into_iter()) {
let item = ChannelInfoResult {
backend: r.backend,
channel: r.channel,
series: r.series.into_inner(),
};
// trace3!("try to send result for {:?}", item);
let fut = r.tx.make_send(Ok(item));
match fut.await {
Ok(()) => {}
Err(_e) => {
//warn!("can not deliver result");
// return Err(Error::ChannelError);
self.stats.res_tx_fail.inc();
}
}
}
}
debug!("Worker done");
Ok(())
}
}
struct Worker2 {
pg: PgClient,
qu_select: PgStatement,
qu_insert: PgStatement,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
pg_client_jh: JoinHandle<Result<(), crate::err::Error>>,
pub series: RegisteredSeries,
}
struct FoundResult {
@@ -386,7 +116,33 @@ enum MatchingSeries {
Latest(SeriesId),
}
impl Worker2 {
#[derive(Debug, Clone)]
pub enum RegisteredSeries {
Created(SeriesId),
Updated(SeriesId),
Current(SeriesId),
}
impl RegisteredSeries {
pub fn to_series(&self) -> SeriesId {
match self {
RegisteredSeries::Created(sid) => sid.clone(),
RegisteredSeries::Updated(sid) => sid.clone(),
RegisteredSeries::Current(sid) => sid.clone(),
}
}
}
struct Worker {
pg: PgClient,
qu_select: PgStatement,
qu_insert: PgStatement,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
pg_client_jh: JoinHandle<Result<(), crate::err::Error>>,
}
impl Worker {
async fn new(
db: &Database,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
@@ -441,6 +197,109 @@ impl Worker2 {
Ok(ret)
}
async fn work<FR: HashSalter>(&mut self) -> Result<(), Error> {
let batch_rx = self.batch_rx.clone();
while let Ok(batch) = batch_rx.recv().await {
self.stats.recv_batch().inc();
self.stats.recv_items().add(batch.len() as _);
for x in &batch {
trace3!(
"search for {} {} {:?} {:?}",
x.backend,
x.channel,
x.scalar_type,
x.shape_dims
);
}
self.pg.execute("begin", &[]).await?;
match self.handle_batch::<FR>(batch).await {
Ok(()) => {
match self.pg.execute("commit", &[]).await {
Ok(n) => {
debug!("commit {n}");
}
Err(e) => {
warn!("commit error {e}");
self.pg.execute("rollback", &[]).await?;
tokio::time::sleep(Duration::from_millis(1000)).await;
}
};
}
Err(e) => {
error!("transaction error {e}");
self.pg.execute("rollback", &[]).await?;
}
};
}
debug!("Worker2 done");
Ok(())
}
async fn handle_batch<FR: HashSalter>(&mut self, batch: Vec<ChannelInfoQuery>) -> Result<(), Error> {
let batch_len = batch.len();
let res1 = self.select(batch).await?;
let mut latest = Vec::new();
let mut used_before = Vec::new();
let mut doesnt_exist = Vec::new();
for x in res1 {
match x.status {
MatchingSeries::DoesntExist => doesnt_exist.push(x),
MatchingSeries::UsedBefore(sid) => used_before.push((x.job, RegisteredSeries::Updated(sid))),
MatchingSeries::Latest(sid) => latest.push((x.job, RegisteredSeries::Current(sid))),
}
}
if doesnt_exist.len() != 0 {
let batch2 = doesnt_exist.into_iter().map(|x| x.job).collect();
self.insert_missing::<FR>(&batch2).await?;
let res2 = self.select(batch2).await?;
for x in res2 {
match x.status {
MatchingSeries::DoesntExist => {
error!("series ids still missing after insert {:?}", x.job);
return Err(Error::SeriesMissing);
}
MatchingSeries::UsedBefore(sid) => {
// Could theoretically happen when we ask in the same batch
// for the same channel name but of different type/shape.
warn!("series id used before after insert {:?}", x.job);
// return Err(Error::SeriesMissing);
latest.push((x.job, RegisteredSeries::Created(sid)));
}
MatchingSeries::Latest(sid) => {
latest.push((x.job, RegisteredSeries::Created(sid)));
}
}
}
};
if used_before.len() != 0 {
// TODO update the used before, then select again
let sid: Vec<_> = used_before.iter().map(|x| x.1.to_series().id() as i64).collect();
self.update_used_before::<FR>(sid).await?;
for e in used_before {
latest.push(e);
}
};
if latest.len() != batch_len {
error!("can not register all series");
return Err(Error::SeriesMissing);
}
for e in latest {
let item = ChannelInfoResult {
backend: e.0.backend,
channel: e.0.channel,
series: e.1.clone(),
};
let item = Ok(item);
match e.0.tx.make_send(item).await {
Ok(()) => {}
Err(_) => {
self.stats.res_tx_fail.inc();
}
};
}
Ok(())
}
async fn select(&self, batch: Vec<ChannelInfoQuery>) -> Result<Vec<FoundResult>, Error> {
let (rids, backends, channels, jobs) = batch
.into_iter()
@@ -459,7 +318,7 @@ impl Worker2 {
a.3.push(v.3);
a
});
debug!("select worker start batch of {} {:?}", channels.len(), channels);
// debug!("select worker start batch of {} {:?}", channels.len(), channels);
let rows = self.pg.query(&self.qu_select, &[&rids, &backends, &channels]).await?;
let mut row_it = rows.into_iter();
let mut row_opt = row_it.next();
@@ -474,17 +333,19 @@ impl Worker2 {
let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec<i32>>(3).as_slice())
.map_err(|_| Error::Shape)?;
let tscs: Vec<DateTime<Utc>> = row.get(4);
debug!(
"select worker found in database {:?} {:?} {:?} {:?} {:?}",
rid, series, scalar_type, shape_dims, tscs
);
if false {
debug!(
"select worker found in database {:?} {:?} {:?} {:?} {:?}",
rid, series, scalar_type, shape_dims, tscs
);
}
acc.push((rid, series, scalar_type, shape_dims, tscs));
row_opt = row_it.next();
continue;
}
};
}
debug!("check for {job:?}");
// debug!("check for {job:?}");
// TODO call decide with empty accumulator: will result in DoesntExist.
let v = std::mem::replace(&mut acc, Vec::new());
let scalar_type_job = ScalarType::from_scylla_i32(job.scalar_type).map_err(|_| Error::ScalarType)?;
@@ -509,7 +370,12 @@ impl Worker2 {
if last.1 == scalar_type && last.2 == shape {
Ok(MatchingSeries::Latest(last.0.clone()))
} else {
Ok(MatchingSeries::UsedBefore(last.0.clone()))
for e in unfolded.into_iter().rev() {
if e.1 == scalar_type && e.2 == shape {
return Ok(MatchingSeries::UsedBefore(e.0.clone()));
}
}
Ok(MatchingSeries::DoesntExist)
}
} else {
Ok(MatchingSeries::DoesntExist)
@@ -566,8 +432,7 @@ impl Worker2 {
}
async fn insert_missing<FR: HashSalter>(&self, batch: &Vec<ChannelInfoQuery>) -> Result<(), Error> {
debug!("insert_missing len {}", batch.len());
let tsbeg = Instant::now();
// debug!("insert_missing len {}", batch.len());
let (backends, channels, scalar_types, shape_dimss, mut hashers) = batch
.iter()
.map(|job| {
@@ -601,10 +466,9 @@ impl Worker2 {
loop {
i1 += 1;
if i1 >= 200 {
debug!("i1 abort");
warn!("create series id outer abort");
return Err(Error::CreateSeriesFail);
}
debug!("loop i1 {i1}");
let mut series_ids = Vec::with_capacity(hashers.len());
for h in &mut hashers {
let mut i2: u16 = 0;
@@ -612,17 +476,13 @@ impl Worker2 {
break {
i2 += 1;
if i2 >= 800 {
debug!("i2 abort");
warn!("create series id inner abort");
return Err(Error::CreateSeriesFail);
}
FR::hupd(&mut |buf| h.update(buf), i1, i2);
// h.update(i1.to_le_bytes());
// h.update(i2.to_le_bytes());
// h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
let f = h.clone().finalize();
let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
if series >= 1000000000000000000 && series <= i64::MAX as u64 {
debug!("push {series}");
series_ids.push(series as i64);
} else {
continue;
@@ -630,7 +490,6 @@ impl Worker2 {
};
}
}
debug!("execute...");
use serde_json::Value as JsVal;
let shape_dims_jss: Vec<_> = shape_dimss
.iter()
@@ -647,7 +506,7 @@ impl Worker2 {
)
.await
.unwrap();
debug!("cc {} channels {}", cc, channels.len());
// debug!("cc {} channels {}", cc, channels.len());
total_insert_count += cc;
if total_insert_count == channels.len() as u64 {
break;
@@ -656,91 +515,19 @@ impl Worker2 {
Ok(())
}
async fn work<FR: HashSalter>(&mut self) -> Result<(), Error> {
let batch_rx = &self.batch_rx;
while let Ok(batch) = batch_rx.recv().await {
self.stats.recv_batch().inc();
self.stats.recv_items().add(batch.len() as _);
for x in &batch {
trace3!(
"search for {} {} {:?} {:?}",
x.backend,
x.channel,
x.scalar_type,
x.shape_dims
);
}
let batch_len = batch.len();
let res1 = self.select(batch).await?;
let mut latest = Vec::new();
let mut used_before = Vec::new();
let mut doesnt_exist = Vec::new();
for x in res1 {
match x.status {
MatchingSeries::DoesntExist => doesnt_exist.push(x),
MatchingSeries::UsedBefore(..) => used_before.push(x),
MatchingSeries::Latest(sid) => latest.push((x.job, sid)),
}
}
if doesnt_exist.len() != 0 {
let batch2 = doesnt_exist.into_iter().map(|x| x.job).collect();
self.insert_missing::<FR>(&batch2).await?;
let res2 = self.select(batch2).await?;
for x in res2 {
match x.status {
MatchingSeries::DoesntExist => {
error!("series ids still missing after insert {:?}", x.job);
return Err(Error::SeriesMissing);
}
MatchingSeries::UsedBefore(..) => {
error!("series ids still missing after insert {:?}", x.job);
return Err(Error::SeriesMissing);
}
MatchingSeries::Latest(sid) => latest.push((x.job, sid)),
}
}
};
if used_before.len() != 0 {
// TODO update the used before, then select again
};
if latest.len() != batch_len {
error!("can not register all series");
return Err(Error::SeriesMissing);
}
for e in latest {
let item = ChannelInfoResult {
backend: e.0.backend,
channel: e.0.channel,
series: e.1.clone(),
};
let item = Ok(item);
match e.0.tx.make_send(item).await {
Ok(()) => {}
Err(_) => {
self.stats.res_tx_fail.inc();
}
};
}
#[cfg(DISABLED)]
for r in res3.0.into_iter().chain(res3.1.into_iter()) {
let item = ChannelInfoResult {
backend: r.backend,
channel: r.channel,
series: r.series,
};
// trace3!("try to send result for {:?}", item);
let fut = r.tx.make_send(Ok(item));
match fut.await {
Ok(()) => {}
Err(_e) => {
//warn!("can not deliver result");
// return Err(Error::ChannelError);
self.stats.res_tx_fail.inc();
}
}
}
}
debug!("Worker2 done");
async fn update_used_before<FR: HashSalter>(&self, sid: Vec<i64>) -> Result<(), Error> {
debug!("update_used_before {sid:?}");
let sql = concat!(
"update series_by_channel set",
" tscs = tscs || now()::timestamptz",
" where series = any ($1)",
);
let qu = self
.pg
.prepare_typed(sql, &[tokio_postgres::types::Type::INT8_ARRAY])
.await?;
let n = self.pg.execute(&qu, &[&sid]).await?;
trace!("update_used_before n {n}");
Ok(())
}
}
@@ -768,7 +555,7 @@ pub async fn start_lookup_workers<FR: HashSalter>(
let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx);
let mut jhs = Vec::new();
for _ in 0..worker_count {
let mut worker = Worker2::new(db, batch_rx.clone(), stats.clone()).await?;
let mut worker = Worker::new(db, batch_rx.clone(), stats.clone()).await?;
let jh = tokio::task::spawn(async move { worker.work::<FR>().await });
jhs.push(jh);
}
@@ -871,15 +658,16 @@ fn test_series_by_channel_01() {
let backend = "bck-test-00";
let channel = "chn-test-00";
let channel_01 = "chn-test-01";
let channel_02 = "chn-test-02";
let series_by_channel_stats = Arc::new(SeriesByChannelStats::new());
let pgconf = test_db_conf();
if false {
psql_play(&pgconf).await?;
return Ok(());
}
let (pg, _pg_client_jh) = crate::conn::make_pg_client(&pgconf).await?;
crate::schema::schema_check(&pg).await.unwrap();
{
let (pg, _pg_client_jh) = crate::conn::make_pg_client(&pgconf).await?;
crate::schema::schema_check(&pg).await.unwrap();
pg.execute("delete from series_by_channel where facility = $1", &[&backend])
.await?;
@@ -890,13 +678,32 @@ fn test_series_by_channel_01() {
" values ($1, $2, $3, $4, array[]::int4[], 0)"
);
pg.execute(sql, &[&id1, &backend, &"test-block-00", &1i32]).await?;
if true {
// Create a used-before case
let id: i64 = 4802468414253815536;
let sql = concat!(
"insert into series_by_channel",
" (series, facility, channel, scalar_type, shape_dims, agg_kind, tscs)",
" values ($1, $2, $3, 5, array[64]::int4[], 0, array['2000-10-10T08:00:00Z'::timestamptz])"
);
pg.execute(sql, &[&id, &backend, &channel_02]).await?;
let id: i64 = 6409375609862757444;
let sql = concat!(
"insert into series_by_channel",
" (series, facility, channel, scalar_type, shape_dims, agg_kind, tscs)",
" values ($1, $2, $3, 8, array[64]::int4[], 0, array['2001-10-10T08:00:00Z'::timestamptz])"
);
pg.execute(sql, &[&id, &backend, &channel_02]).await?;
}
}
// TODO keep join handles and await later
let (channel_info_query_tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers::<SalterTest>(1, &pgconf, series_by_channel_stats.clone())
.await?;
let rx0 = {
let mut rxs = Vec::new();
let rx = {
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
@@ -908,7 +715,9 @@ fn test_series_by_channel_01() {
channel_info_query_tx.send(item).await.unwrap();
rx
};
let rx1 = {
rxs.push(rx);
let rx = {
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
@@ -920,12 +729,26 @@ fn test_series_by_channel_01() {
channel_info_query_tx.send(item).await.unwrap();
rx
};
// tokio::time::sleep(Duration::from_millis(2000)).await;
rxs.push(rx);
let res = rx0.recv().await.unwrap();
debug!("received A: {res:?}");
let res = rx1.recv().await.unwrap();
debug!("received A: {res:?}");
let rx = {
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel_02.into(),
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();
rx
};
rxs.push(rx);
for rx in rxs {
let res = rx.recv().await.unwrap();
debug!("received A: {res:?}");
}
let (tx, rx) = async_channel::bounded(1);
let item = dbpg::seriesbychannel::ChannelInfoQuery {
@@ -937,7 +760,37 @@ fn test_series_by_channel_01() {
};
channel_info_query_tx.send(item).await.unwrap();
let res = rx.recv().await.unwrap();
debug!("received B: {res:?}");
debug!("received C: {res:?}");
{
let rows = pg
.query(
concat!(
"select series, channel, scalar_type, shape_dims",
" from series_by_channel where facility = $1",
" and channel not like 'test-block%'",
" order by channel, scalar_type, shape_dims",
),
&[&backend],
)
.await?;
let mut all = Vec::new();
for row in rows {
let series: i64 = row.get(0);
let channel: String = row.get(1);
let scalar_type: i32 = row.get(2);
let shape_dims: Vec<i32> = row.get(3);
all.push((series, channel, scalar_type, shape_dims));
}
let exp: Vec<(i64, String, i32, Vec<i32>)> = vec![
(5191776216635917420, "chn-test-00".into(), 5, vec![64]),
(5025677968516078602, "chn-test-01".into(), 5, vec![64]),
(4802468414253815536, "chn-test-02".into(), 5, vec![64]),
(6409375609862757444, "chn-test-02".into(), 8, vec![64]),
];
assert_eq!(all, exp);
// TODO assert that the correct chn-test-02 got updated
}
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();

View File

@@ -566,7 +566,7 @@ impl CaConnSet {
}
match res {
Ok(res) => {
let cssid = ChannelStatusSeriesId::new(res.series.id());
let cssid = ChannelStatusSeriesId::new(res.series.to_series().id());
self.channel_by_cssid
.insert(cssid.clone(), Channel::new(res.channel.clone()));
let add = ChannelAddWithStatusId {

View File

@@ -87,7 +87,7 @@ impl SeriesWriter {
};
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let cssid = ChannelStatusSeriesId::new(res.series.id());
let cssid = ChannelStatusSeriesId::new(res.series.to_series().id());
Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, tsnow).await
}
@@ -110,7 +110,7 @@ impl SeriesWriter {
};
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series;
let sid = res.series.to_series();
let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10));
binner.setup_for(&scalar_type, &shape, tsnow)?;
let res = Self {