diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index 66d2db4..754773b 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -73,7 +73,7 @@ create table if not exists series_by_channel ( facility text not null, channel text not null, scalar_type int not null, - shape_dims int[] not null, + shape_dims int[] storage plain not null, agg_kind int not null, tscs timestamptz[] storage plain default array[now()] )"; @@ -157,6 +157,8 @@ async fn migrate_02(pgc: &PgClient) -> Result<(), Error> { // TODO after all migrations, should check that the schema is as expected. let sql = "alter table series_by_channel add if not exists tscs timestamptz[] storage plain default array[now()]"; let _ = pgc.execute(sql, &[]).await?; + let sql = "alter table series_by_channel alter tscs set storage plain"; + let _ = pgc.execute(sql, &[]).await?; Ok(()) } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index d19e527..67fd2b2 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -97,281 +97,11 @@ impl fmt::Debug for ChannelInfoQuery { } } -struct ChannelInfoResult2 { - backend: String, - channel: String, - series: Existence, - tx: Pin>, -} - #[derive(Debug)] pub struct ChannelInfoResult { pub backend: String, pub channel: String, - pub series: SeriesId, -} - -struct Worker { - pg: PgClient, - qu_select: PgStatement, - qu_insert: PgStatement, - batch_rx: Receiver>, - stats: Arc, - pg_client_jh: JoinHandle>, -} - -impl Worker { - async fn new( - db: &Database, - batch_rx: Receiver>, - stats: Arc, - ) -> Result { - let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?; - let sql = concat!( - "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])", - " as inp (backend, channel, scalar_type, shape_dims, rid))", - " select t.series, q1.rid, t.channel from series_by_channel t", - " join q1 on t.facility = q1.backend and t.channel = q1.channel", - " and t.scalar_type = q1.scalar_type and t.shape_dims = q1.shape_dims::int[]", - " and t.agg_kind = 0", - " order by q1.rid", - ); - let qu_select = pg.prepare(sql).await?; - let sql = concat!( - "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::bigint[])", - " as inp (backend, channel, scalar_type, shape_dims, series))", - " insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " select series, backend, channel, scalar_type, shape_dims::int[], 0 from q1", - " on conflict do nothing" - ); - let qu_insert = pg.prepare(sql).await?; - let ret = Self { - pg, - qu_select, - qu_insert, - batch_rx, - stats, - pg_client_jh, - }; - Ok(ret) - } - - async fn select( - &self, - batch: Vec, - ) -> Result<(Vec, Vec), Error> { - let mut backend = Vec::new(); - let mut channel = Vec::new(); - let mut scalar_type = Vec::new(); - let mut shape_dims = Vec::new(); - let mut shape_dims_str = Vec::new(); - let mut rid = Vec::new(); - let mut tx = Vec::new(); - for (i, e) in batch.into_iter().enumerate() { - backend.push(e.backend); - channel.push(e.channel); - scalar_type.push(e.scalar_type); - let mut dims = String::with_capacity(32); - dims.push('{'); - for (i, &v) in e.shape_dims.iter().enumerate() { - if i > 0 { - dims.push(','); - } - use std::fmt::Write; - write!(dims, "{}", v).unwrap(); - } - dims.push('}'); - shape_dims_str.push(dims); - shape_dims.push(e.shape_dims); - rid.push(i as i32); - tx.push((i as u32, e.tx)); - } - let rows = self - .pg - .query( - &self.qu_select, - &[&backend, &channel, &scalar_type, &shape_dims_str, &rid], - ) - .await?; - let mut result = Vec::new(); - let mut missing = Vec::new(); - let mut it1 = rows.into_iter(); - let mut e1 = it1.next(); - for (qrid, tx) in tx { - let i = qrid as usize; - let found = if let Some(row) = &e1 { - let rid: i32 = row.get(1); - if rid as u32 == qrid { - let series: i64 = row.get(0); - let ch2: String = row.get(2); - let series = SeriesId::new(series as _); - let res = ChannelInfoResult2 { - // TODO take from database query. Needs test. - backend: backend[0].clone(), - channel: ch2, - series: Existence::Existing(series), - tx, - }; - result.push(res); - e1 = it1.next(); - None - } else { - Some(tx) - } - } else { - Some(tx) - }; - if let Some(tx) = found { - let k = ChannelInfoQuery { - backend: backend[i].clone(), - channel: channel[i].clone(), - scalar_type: scalar_type[i].clone(), - shape_dims: shape_dims[i].clone(), - tx, - }; - missing.push(k); - } - } - Ok((result, missing)) - } - - async fn insert_missing(&self, batch: &Vec) -> Result<(), Error> { - let tsbeg = Instant::now(); - let mut backends = Vec::new(); - let mut channels = Vec::new(); - let mut scalar_types = Vec::new(); - let mut shape_dimss = Vec::new(); - let mut shape_dims_strs = Vec::new(); - let mut hashers = Vec::new(); - for e in batch.into_iter() { - { - let mut h = md5::Md5::new(); - h.update(e.backend.as_bytes()); - h.update(e.channel.as_bytes()); - h.update(format!("{:?}", e.scalar_type).as_bytes()); - h.update(format!("{:?}", e.shape_dims).as_bytes()); - hashers.push(h); - } - backends.push(&e.backend); - channels.push(&e.channel); - scalar_types.push(e.scalar_type); - let mut dims = String::with_capacity(32); - dims.push('{'); - for (i, &v) in e.shape_dims.iter().enumerate() { - if i > 0 { - dims.push(','); - } - use std::fmt::Write; - write!(dims, "{}", v).unwrap(); - } - dims.push('}'); - shape_dims_strs.push(dims); - shape_dimss.push(&e.shape_dims); - } - let mut i1 = 0; - loop { - i1 += 1; - if i1 >= 200 { - return Err(Error::CreateSeriesFail); - } - let mut seriess = Vec::with_capacity(hashers.len()); - let mut all_good = true; - for h in &mut hashers { - let mut good = false; - for _ in 0..800 { - h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); - let f = h.clone().finalize(); - let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - if series >= 1000000000000000000 && series <= i64::MAX as u64 { - seriess.push(series as i64); - good = true; - break; - } - } - if !good { - all_good = false; - break; - } - } - if !all_good { - continue; - } - self.pg - .execute( - &self.qu_insert, - &[&backends, &channels, &scalar_types, &shape_dims_strs, &seriess], - ) - .await?; - break; - } - Ok(()) - } - - async fn work(&mut self) -> Result<(), Error> { - let batch_rx = &self.batch_rx; - while let Ok(batch) = batch_rx.recv().await { - self.stats.recv_batch().inc(); - self.stats.recv_items().add(batch.len() as _); - for x in &batch { - trace3!( - "search for {} {} {:?} {:?}", - x.backend, - x.channel, - x.scalar_type, - x.shape_dims - ); - } - let (res1, missing) = self.select(batch).await?; - let res3 = if missing.len() > 0 { - trace2!("missing {}", missing.len()); - for x in &missing { - trace2!("insert missing {x:?}"); - } - let missing_count = missing.len(); - self.insert_missing(&missing).await?; - let (res2, missing2) = self.select(missing).await?; - if missing2.len() > 0 { - for x in &missing2 { - warn!("series ids still missing after insert {}", x.channel); - } - Err(Error::SeriesMissing) - } else { - trace2!("select missing after insert {} of {}", missing_count, res2.len()); - Ok((res1, res2)) - } - } else { - Ok((res1, Vec::new())) - }?; - for r in res3.0.into_iter().chain(res3.1.into_iter()) { - let item = ChannelInfoResult { - backend: r.backend, - channel: r.channel, - series: r.series.into_inner(), - }; - // trace3!("try to send result for {:?}", item); - let fut = r.tx.make_send(Ok(item)); - match fut.await { - Ok(()) => {} - Err(_e) => { - //warn!("can not deliver result"); - // return Err(Error::ChannelError); - self.stats.res_tx_fail.inc(); - } - } - } - } - debug!("Worker done"); - Ok(()) - } -} - -struct Worker2 { - pg: PgClient, - qu_select: PgStatement, - qu_insert: PgStatement, - batch_rx: Receiver>, - stats: Arc, - pg_client_jh: JoinHandle>, + pub series: RegisteredSeries, } struct FoundResult { @@ -386,7 +116,33 @@ enum MatchingSeries { Latest(SeriesId), } -impl Worker2 { +#[derive(Debug, Clone)] +pub enum RegisteredSeries { + Created(SeriesId), + Updated(SeriesId), + Current(SeriesId), +} + +impl RegisteredSeries { + pub fn to_series(&self) -> SeriesId { + match self { + RegisteredSeries::Created(sid) => sid.clone(), + RegisteredSeries::Updated(sid) => sid.clone(), + RegisteredSeries::Current(sid) => sid.clone(), + } + } +} + +struct Worker { + pg: PgClient, + qu_select: PgStatement, + qu_insert: PgStatement, + batch_rx: Receiver>, + stats: Arc, + pg_client_jh: JoinHandle>, +} + +impl Worker { async fn new( db: &Database, batch_rx: Receiver>, @@ -441,6 +197,109 @@ impl Worker2 { Ok(ret) } + async fn work(&mut self) -> Result<(), Error> { + let batch_rx = self.batch_rx.clone(); + while let Ok(batch) = batch_rx.recv().await { + self.stats.recv_batch().inc(); + self.stats.recv_items().add(batch.len() as _); + for x in &batch { + trace3!( + "search for {} {} {:?} {:?}", + x.backend, + x.channel, + x.scalar_type, + x.shape_dims + ); + } + self.pg.execute("begin", &[]).await?; + match self.handle_batch::(batch).await { + Ok(()) => { + match self.pg.execute("commit", &[]).await { + Ok(n) => { + debug!("commit {n}"); + } + Err(e) => { + warn!("commit error {e}"); + self.pg.execute("rollback", &[]).await?; + tokio::time::sleep(Duration::from_millis(1000)).await; + } + }; + } + Err(e) => { + error!("transaction error {e}"); + self.pg.execute("rollback", &[]).await?; + } + }; + } + debug!("Worker2 done"); + Ok(()) + } + + async fn handle_batch(&mut self, batch: Vec) -> Result<(), Error> { + let batch_len = batch.len(); + let res1 = self.select(batch).await?; + let mut latest = Vec::new(); + let mut used_before = Vec::new(); + let mut doesnt_exist = Vec::new(); + for x in res1 { + match x.status { + MatchingSeries::DoesntExist => doesnt_exist.push(x), + MatchingSeries::UsedBefore(sid) => used_before.push((x.job, RegisteredSeries::Updated(sid))), + MatchingSeries::Latest(sid) => latest.push((x.job, RegisteredSeries::Current(sid))), + } + } + if doesnt_exist.len() != 0 { + let batch2 = doesnt_exist.into_iter().map(|x| x.job).collect(); + self.insert_missing::(&batch2).await?; + let res2 = self.select(batch2).await?; + for x in res2 { + match x.status { + MatchingSeries::DoesntExist => { + error!("series ids still missing after insert {:?}", x.job); + return Err(Error::SeriesMissing); + } + MatchingSeries::UsedBefore(sid) => { + // Could theoretically happen when we ask in the same batch + // for the same channel name but of different type/shape. + warn!("series id used before after insert {:?}", x.job); + // return Err(Error::SeriesMissing); + latest.push((x.job, RegisteredSeries::Created(sid))); + } + MatchingSeries::Latest(sid) => { + latest.push((x.job, RegisteredSeries::Created(sid))); + } + } + } + }; + if used_before.len() != 0 { + // TODO update the used before, then select again + let sid: Vec<_> = used_before.iter().map(|x| x.1.to_series().id() as i64).collect(); + self.update_used_before::(sid).await?; + for e in used_before { + latest.push(e); + } + }; + if latest.len() != batch_len { + error!("can not register all series"); + return Err(Error::SeriesMissing); + } + for e in latest { + let item = ChannelInfoResult { + backend: e.0.backend, + channel: e.0.channel, + series: e.1.clone(), + }; + let item = Ok(item); + match e.0.tx.make_send(item).await { + Ok(()) => {} + Err(_) => { + self.stats.res_tx_fail.inc(); + } + }; + } + Ok(()) + } + async fn select(&self, batch: Vec) -> Result, Error> { let (rids, backends, channels, jobs) = batch .into_iter() @@ -459,7 +318,7 @@ impl Worker2 { a.3.push(v.3); a }); - debug!("select worker start batch of {} {:?}", channels.len(), channels); + // debug!("select worker start batch of {} {:?}", channels.len(), channels); let rows = self.pg.query(&self.qu_select, &[&rids, &backends, &channels]).await?; let mut row_it = rows.into_iter(); let mut row_opt = row_it.next(); @@ -474,17 +333,19 @@ impl Worker2 { let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec>(3).as_slice()) .map_err(|_| Error::Shape)?; let tscs: Vec> = row.get(4); - debug!( - "select worker found in database {:?} {:?} {:?} {:?} {:?}", - rid, series, scalar_type, shape_dims, tscs - ); + if false { + debug!( + "select worker found in database {:?} {:?} {:?} {:?} {:?}", + rid, series, scalar_type, shape_dims, tscs + ); + } acc.push((rid, series, scalar_type, shape_dims, tscs)); row_opt = row_it.next(); continue; } }; } - debug!("check for {job:?}"); + // debug!("check for {job:?}"); // TODO call decide with empty accumulator: will result in DoesntExist. let v = std::mem::replace(&mut acc, Vec::new()); let scalar_type_job = ScalarType::from_scylla_i32(job.scalar_type).map_err(|_| Error::ScalarType)?; @@ -509,7 +370,12 @@ impl Worker2 { if last.1 == scalar_type && last.2 == shape { Ok(MatchingSeries::Latest(last.0.clone())) } else { - Ok(MatchingSeries::UsedBefore(last.0.clone())) + for e in unfolded.into_iter().rev() { + if e.1 == scalar_type && e.2 == shape { + return Ok(MatchingSeries::UsedBefore(e.0.clone())); + } + } + Ok(MatchingSeries::DoesntExist) } } else { Ok(MatchingSeries::DoesntExist) @@ -566,8 +432,7 @@ impl Worker2 { } async fn insert_missing(&self, batch: &Vec) -> Result<(), Error> { - debug!("insert_missing len {}", batch.len()); - let tsbeg = Instant::now(); + // debug!("insert_missing len {}", batch.len()); let (backends, channels, scalar_types, shape_dimss, mut hashers) = batch .iter() .map(|job| { @@ -601,10 +466,9 @@ impl Worker2 { loop { i1 += 1; if i1 >= 200 { - debug!("i1 abort"); + warn!("create series id outer abort"); return Err(Error::CreateSeriesFail); } - debug!("loop i1 {i1}"); let mut series_ids = Vec::with_capacity(hashers.len()); for h in &mut hashers { let mut i2: u16 = 0; @@ -612,17 +476,13 @@ impl Worker2 { break { i2 += 1; if i2 >= 800 { - debug!("i2 abort"); + warn!("create series id inner abort"); return Err(Error::CreateSeriesFail); } FR::hupd(&mut |buf| h.update(buf), i1, i2); - // h.update(i1.to_le_bytes()); - // h.update(i2.to_le_bytes()); - // h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); let f = h.clone().finalize(); let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); if series >= 1000000000000000000 && series <= i64::MAX as u64 { - debug!("push {series}"); series_ids.push(series as i64); } else { continue; @@ -630,7 +490,6 @@ impl Worker2 { }; } } - debug!("execute..."); use serde_json::Value as JsVal; let shape_dims_jss: Vec<_> = shape_dimss .iter() @@ -647,7 +506,7 @@ impl Worker2 { ) .await .unwrap(); - debug!("cc {} channels {}", cc, channels.len()); + // debug!("cc {} channels {}", cc, channels.len()); total_insert_count += cc; if total_insert_count == channels.len() as u64 { break; @@ -656,91 +515,19 @@ impl Worker2 { Ok(()) } - async fn work(&mut self) -> Result<(), Error> { - let batch_rx = &self.batch_rx; - while let Ok(batch) = batch_rx.recv().await { - self.stats.recv_batch().inc(); - self.stats.recv_items().add(batch.len() as _); - for x in &batch { - trace3!( - "search for {} {} {:?} {:?}", - x.backend, - x.channel, - x.scalar_type, - x.shape_dims - ); - } - let batch_len = batch.len(); - let res1 = self.select(batch).await?; - let mut latest = Vec::new(); - let mut used_before = Vec::new(); - let mut doesnt_exist = Vec::new(); - for x in res1 { - match x.status { - MatchingSeries::DoesntExist => doesnt_exist.push(x), - MatchingSeries::UsedBefore(..) => used_before.push(x), - MatchingSeries::Latest(sid) => latest.push((x.job, sid)), - } - } - if doesnt_exist.len() != 0 { - let batch2 = doesnt_exist.into_iter().map(|x| x.job).collect(); - self.insert_missing::(&batch2).await?; - let res2 = self.select(batch2).await?; - for x in res2 { - match x.status { - MatchingSeries::DoesntExist => { - error!("series ids still missing after insert {:?}", x.job); - return Err(Error::SeriesMissing); - } - MatchingSeries::UsedBefore(..) => { - error!("series ids still missing after insert {:?}", x.job); - return Err(Error::SeriesMissing); - } - MatchingSeries::Latest(sid) => latest.push((x.job, sid)), - } - } - }; - if used_before.len() != 0 { - // TODO update the used before, then select again - }; - if latest.len() != batch_len { - error!("can not register all series"); - return Err(Error::SeriesMissing); - } - for e in latest { - let item = ChannelInfoResult { - backend: e.0.backend, - channel: e.0.channel, - series: e.1.clone(), - }; - let item = Ok(item); - match e.0.tx.make_send(item).await { - Ok(()) => {} - Err(_) => { - self.stats.res_tx_fail.inc(); - } - }; - } - #[cfg(DISABLED)] - for r in res3.0.into_iter().chain(res3.1.into_iter()) { - let item = ChannelInfoResult { - backend: r.backend, - channel: r.channel, - series: r.series, - }; - // trace3!("try to send result for {:?}", item); - let fut = r.tx.make_send(Ok(item)); - match fut.await { - Ok(()) => {} - Err(_e) => { - //warn!("can not deliver result"); - // return Err(Error::ChannelError); - self.stats.res_tx_fail.inc(); - } - } - } - } - debug!("Worker2 done"); + async fn update_used_before(&self, sid: Vec) -> Result<(), Error> { + debug!("update_used_before {sid:?}"); + let sql = concat!( + "update series_by_channel set", + " tscs = tscs || now()::timestamptz", + " where series = any ($1)", + ); + let qu = self + .pg + .prepare_typed(sql, &[tokio_postgres::types::Type::INT8_ARRAY]) + .await?; + let n = self.pg.execute(&qu, &[&sid]).await?; + trace!("update_used_before n {n}"); Ok(()) } } @@ -768,7 +555,7 @@ pub async fn start_lookup_workers( let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx); let mut jhs = Vec::new(); for _ in 0..worker_count { - let mut worker = Worker2::new(db, batch_rx.clone(), stats.clone()).await?; + let mut worker = Worker::new(db, batch_rx.clone(), stats.clone()).await?; let jh = tokio::task::spawn(async move { worker.work::().await }); jhs.push(jh); } @@ -871,15 +658,16 @@ fn test_series_by_channel_01() { let backend = "bck-test-00"; let channel = "chn-test-00"; let channel_01 = "chn-test-01"; + let channel_02 = "chn-test-02"; let series_by_channel_stats = Arc::new(SeriesByChannelStats::new()); let pgconf = test_db_conf(); if false { psql_play(&pgconf).await?; return Ok(()); } + let (pg, _pg_client_jh) = crate::conn::make_pg_client(&pgconf).await?; + crate::schema::schema_check(&pg).await.unwrap(); { - let (pg, _pg_client_jh) = crate::conn::make_pg_client(&pgconf).await?; - crate::schema::schema_check(&pg).await.unwrap(); pg.execute("delete from series_by_channel where facility = $1", &[&backend]) .await?; @@ -890,13 +678,32 @@ fn test_series_by_channel_01() { " values ($1, $2, $3, $4, array[]::int4[], 0)" ); pg.execute(sql, &[&id1, &backend, &"test-block-00", &1i32]).await?; + + if true { + // Create a used-before case + let id: i64 = 4802468414253815536; + let sql = concat!( + "insert into series_by_channel", + " (series, facility, channel, scalar_type, shape_dims, agg_kind, tscs)", + " values ($1, $2, $3, 5, array[64]::int4[], 0, array['2000-10-10T08:00:00Z'::timestamptz])" + ); + pg.execute(sql, &[&id, &backend, &channel_02]).await?; + let id: i64 = 6409375609862757444; + let sql = concat!( + "insert into series_by_channel", + " (series, facility, channel, scalar_type, shape_dims, agg_kind, tscs)", + " values ($1, $2, $3, 8, array[64]::int4[], 0, array['2001-10-10T08:00:00Z'::timestamptz])" + ); + pg.execute(sql, &[&id, &backend, &channel_02]).await?; + } } // TODO keep join handles and await later let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers::(1, &pgconf, series_by_channel_stats.clone()) .await?; - let rx0 = { + let mut rxs = Vec::new(); + let rx = { let (tx, rx) = async_channel::bounded(1); let item = dbpg::seriesbychannel::ChannelInfoQuery { backend: backend.into(), @@ -908,7 +715,9 @@ fn test_series_by_channel_01() { channel_info_query_tx.send(item).await.unwrap(); rx }; - let rx1 = { + rxs.push(rx); + + let rx = { let (tx, rx) = async_channel::bounded(1); let item = dbpg::seriesbychannel::ChannelInfoQuery { backend: backend.into(), @@ -920,12 +729,26 @@ fn test_series_by_channel_01() { channel_info_query_tx.send(item).await.unwrap(); rx }; - // tokio::time::sleep(Duration::from_millis(2000)).await; + rxs.push(rx); - let res = rx0.recv().await.unwrap(); - debug!("received A: {res:?}"); - let res = rx1.recv().await.unwrap(); - debug!("received A: {res:?}"); + let rx = { + let (tx, rx) = async_channel::bounded(1); + let item = dbpg::seriesbychannel::ChannelInfoQuery { + backend: backend.into(), + channel: channel_02.into(), + scalar_type: netpod::ScalarType::U16.to_scylla_i32(), + shape_dims: vec![64], + tx: Box::pin(tx), + }; + channel_info_query_tx.send(item).await.unwrap(); + rx + }; + rxs.push(rx); + + for rx in rxs { + let res = rx.recv().await.unwrap(); + debug!("received A: {res:?}"); + } let (tx, rx) = async_channel::bounded(1); let item = dbpg::seriesbychannel::ChannelInfoQuery { @@ -937,7 +760,37 @@ fn test_series_by_channel_01() { }; channel_info_query_tx.send(item).await.unwrap(); let res = rx.recv().await.unwrap(); - debug!("received B: {res:?}"); + debug!("received C: {res:?}"); + + { + let rows = pg + .query( + concat!( + "select series, channel, scalar_type, shape_dims", + " from series_by_channel where facility = $1", + " and channel not like 'test-block%'", + " order by channel, scalar_type, shape_dims", + ), + &[&backend], + ) + .await?; + let mut all = Vec::new(); + for row in rows { + let series: i64 = row.get(0); + let channel: String = row.get(1); + let scalar_type: i32 = row.get(2); + let shape_dims: Vec = row.get(3); + all.push((series, channel, scalar_type, shape_dims)); + } + let exp: Vec<(i64, String, i32, Vec)> = vec![ + (5191776216635917420, "chn-test-00".into(), 5, vec![64]), + (5025677968516078602, "chn-test-01".into(), 5, vec![64]), + (4802468414253815536, "chn-test-02".into(), 5, vec![64]), + (6409375609862757444, "chn-test-02".into(), 8, vec![64]), + ]; + assert_eq!(all, exp); + // TODO assert that the correct chn-test-02 got updated + } Ok::<_, Error>(()) }; taskrun::run(fut).unwrap(); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index ec81ba7..dbed8c9 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -566,7 +566,7 @@ impl CaConnSet { } match res { Ok(res) => { - let cssid = ChannelStatusSeriesId::new(res.series.id()); + let cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); self.channel_by_cssid .insert(cssid.clone(), Channel::new(res.channel.clone())); let add = ChannelAddWithStatusId { diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 2552cec..3800765 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -87,7 +87,7 @@ impl SeriesWriter { }; worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; - let cssid = ChannelStatusSeriesId::new(res.series.id()); + let cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, tsnow).await } @@ -110,7 +110,7 @@ impl SeriesWriter { }; worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; - let sid = res.series; + let sid = res.series.to_series(); let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10)); binner.setup_for(&scalar_type, &shape, tsnow)?; let res = Self {