From e1599ab0b7c9abfd141b7cc890e9221e81ca9771 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 14 Feb 2024 16:50:40 +0100 Subject: [PATCH] Fix channel series lookup --- dbpg/src/schema.rs | 9 ++++- dbpg/src/seriesbychannel.rs | 75 +++++++++++++++++++------------------ netfetch/src/ca/connset.rs | 18 ++++++--- series/src/series.rs | 2 - serieswriter/src/writer.rs | 10 ++--- 5 files changed, 63 insertions(+), 51 deletions(-) diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index 1a40e4f..8bb9894 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -76,7 +76,7 @@ create table if not exists series_by_channel ( scalar_type int not null, shape_dims int[] storage plain not null, agg_kind int not null, - tscs timestamptz[] storage plain default array[now()] + tscs timestamptz[] storage plain not null default array[now()] )"; let _ = pgc.execute(sql, &[]).await; @@ -134,10 +134,15 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> { async fn migrate_02(pgc: &PgClient) -> Result<(), Error> { // TODO after all migrations, should check that the schema is as expected. - let sql = "alter table series_by_channel add if not exists tscs timestamptz[] storage plain default array[now()]"; + let sql = "alter table series_by_channel alter shape_dims set storage plain"; + let _ = pgc.execute(sql, &[]).await?; + + let sql = "alter table series_by_channel add if not exists tscs timestamptz[] storage plain not null default array[now()]"; let _ = pgc.execute(sql, &[]).await?; let sql = "alter table series_by_channel alter tscs set storage plain"; let _ = pgc.execute(sql, &[]).await?; + let sql = "alter table series_by_channel alter tscs set not null"; + let _ = pgc.execute(sql, &[]).await?; let sql = concat!("alter table series_by_channel drop constraint if exists series_by_channel_nondup"); let _ = pgc.execute(sql, &[]).await?; diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index b70feab..e3d45d3 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -6,7 +6,6 @@ use core::fmt; use err::thiserror; use err::ThisError; use futures_util::Future; -use futures_util::StreamExt; use futures_util::TryFutureExt; use log::*; use md5::Digest; @@ -14,7 +13,6 @@ use netpod::Database; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; -use series::series::Existence; use series::SeriesId; use stats::SeriesByChannelStats; use std::pin::Pin; @@ -83,8 +81,8 @@ pub struct ChannelInfoQuery { pub backend: String, pub channel: String, pub kind: SeriesKind, - pub scalar_type: i32, - pub shape_dims: Vec, + pub scalar_type: ScalarType, + pub shape: Shape, pub tx: Pin>, } @@ -95,7 +93,7 @@ impl fmt::Debug for ChannelInfoQuery { .field("channel", &self.channel) .field("kind", &self.kind) .field("scalar_type", &self.scalar_type) - .field("shape_dims", &self.shape_dims) + .field("shape", &self.shape) .finish() } } @@ -160,7 +158,7 @@ impl Worker { ")", " select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs, t.kind from q1", " join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel", - " and t.agg_kind = 0", + " and t.kind = q1.kind and t.agg_kind = 0", " order by q1.rid", ); let qu_select = pg @@ -216,15 +214,19 @@ impl Worker { x.backend, x.channel, x.scalar_type, - x.shape_dims + x.shape ); } self.pg.execute("begin", &[]).await?; match self.handle_batch::(batch).await { Ok(()) => { + let ts1 = Instant::now(); match self.pg.execute("commit", &[]).await { Ok(n) => { - debug!("commit {n}"); + let dt = ts1.elapsed(); + if dt > Duration::from_millis(40) { + debug!("commit {} {:.0} ms", n, dt.as_secs_f32()); + } } Err(e) => { warn!("commit error {e}"); @@ -351,7 +353,9 @@ impl Worker { let tscs: Vec> = row.get(4); let kind: i16 = row.get(5); let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?; - if false { + if job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000" + || series == SeriesId::new(1605348259462543621) + { debug!( "select worker found in database {:?} {:?} {:?} {:?} {:?} {:?}", rid, series, scalar_type, shape_dims, tscs, kind @@ -366,10 +370,8 @@ impl Worker { // debug!("check for {job:?}"); // TODO call decide with empty accumulator: will result in DoesntExist. let v = std::mem::replace(&mut acc, Vec::new()); - let scalar_type_job = ScalarType::from_scylla_i32(job.scalar_type).map_err(|_| Error::ScalarType)?; - let shape_job = Shape::from_scylla_shape_dims(job.shape_dims.as_slice()).map_err(|_| Error::Shape)?; - let dec = Self::decide_matching_via_db(scalar_type_job, shape_job, v)?; - debug!("decision {dec:?}"); + let dec = Self::decide_matching_via_db(job.scalar_type.clone(), job.shape.clone(), v)?; + // debug!("decision {dec:?}"); result.push(FoundResult { job, status: dec }); } Ok(result) @@ -432,14 +434,15 @@ impl Worker { fn assert_varying_types(v: &Vec<(SeriesId, ScalarType, Shape, DateTime)>) -> Result<(), Error> { if v.len() > 1 { + let mut z_0 = &v[0].0; let mut z_1 = &v[0].1; let mut z_2 = &v[0].2; for r in v[1..].iter() { if r.1 == *z_1 && r.2 == *z_2 { - debug!("{v:?}"); - let e = Error::DbConsistencySeries(format!("no change between entries")); + let e = Error::DbConsistencySeries(format!("no change between entries {:?} {:?}", r.0, z_0)); return Err(e); } + z_0 = &r.0; z_1 = &r.1; z_2 = &r.2; } @@ -451,23 +454,27 @@ impl Worker { async fn insert_missing(&self, batch: &Vec) -> Result<(), Error> { // debug!("insert_missing len {}", batch.len()); - let (backends, channels, kinds, scalar_types, shape_dimss, mut hashers) = batch + let (backends, channels, kinds, scalar_types, shapes, mut hashers) = batch .iter() .map(|job| { let backend = &job.backend; let channel = &job.channel; let kind = job.kind.to_db_i16(); let scalar_type = &job.scalar_type; - let shape = &job.shape_dims; + let shape = &job.shape; let hasher = { let mut h = md5::Md5::new(); h.update(job.backend.as_bytes()); h.update(job.channel.as_bytes()); - h.update(format!("{:?}", job.scalar_type).as_bytes()); - h.update(format!("{:?}", job.shape_dims).as_bytes()); + h.update(format!("{:?}", scalar_type).as_bytes()); + h.update(format!("{:?}", shape).as_bytes()); h }; - (backend, channel, kind, scalar_type, shape, hasher) + let x = (backend, channel, kind, scalar_type.to_scylla_i32(), shape, hasher); + if channel == "TEST:MEDIUM:WAVE-01024:F32:000000" { + debug!("INSERT {x:?}"); + } + x }) .fold( (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()), @@ -510,14 +517,7 @@ impl Worker { }; } } - use serde_json::Value as JsVal; - let shape_dims_jss: Vec<_> = shape_dimss - .iter() - .map(|x| { - let v = x.iter().map(|x| JsVal::Number(serde_json::Number::from(*x))).collect(); - JsVal::Array(v) - }) - .collect(); + let shape_dims_jss: Vec<_> = shapes.iter().map(|x| x.to_json_value()).collect(); let cc = self .pg .execute( @@ -544,6 +544,9 @@ impl Worker { async fn update_used_before(&self, sid: Vec) -> Result<(), Error> { debug!("update_used_before {sid:?}"); + if sid.contains(&1605348259462543621) { + debug!("UPDATE TSC FOR 1605348259462543621"); + } let sql = concat!( "update series_by_channel set", " tscs = tscs || now()::timestamptz", @@ -736,8 +739,8 @@ fn test_series_by_channel_01() { backend: backend.into(), channel: channel.into(), kind: SeriesKind::ChannelData, - scalar_type: netpod::ScalarType::U16.to_scylla_i32(), - shape_dims: vec![64], + scalar_type: netpod::ScalarType::U16, + shape: Shape::Wave(64), tx: Box::pin(tx), }; channel_info_query_tx.send(item).await.unwrap(); @@ -751,8 +754,8 @@ fn test_series_by_channel_01() { backend: backend.into(), channel: channel_01.into(), kind: SeriesKind::ChannelData, - scalar_type: netpod::ScalarType::U16.to_scylla_i32(), - shape_dims: vec![64], + scalar_type: netpod::ScalarType::U16, + shape: Shape::Wave(64), tx: Box::pin(tx), }; channel_info_query_tx.send(item).await.unwrap(); @@ -766,8 +769,8 @@ fn test_series_by_channel_01() { backend: backend.into(), channel: channel_02.into(), kind: SeriesKind::ChannelData, - scalar_type: netpod::ScalarType::U16.to_scylla_i32(), - shape_dims: vec![64], + scalar_type: netpod::ScalarType::U16, + shape: Shape::Wave(64), tx: Box::pin(tx), }; channel_info_query_tx.send(item).await.unwrap(); @@ -795,8 +798,8 @@ fn test_series_by_channel_01() { backend: backend.into(), channel: channel.into(), kind: SeriesKind::ChannelData, - scalar_type: netpod::ScalarType::U16.to_scylla_i32(), - shape_dims: vec![64], + scalar_type: netpod::ScalarType::U16, + shape: Shape::Wave(64), tx: Box::pin(tx), }; channel_info_query_tx.send(item).await.unwrap(); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 8c7aab6..0541efd 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -32,12 +32,13 @@ use futures_util::Stream; use futures_util::StreamExt; use hashbrown::HashMap; use log::*; +use netpod::ScalarType; use netpod::SeriesKind; +use netpod::Shape; use scywr::iteminsertqueue::ChannelInfoItem; use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; use serde::Serialize; -use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use series::ChannelStatusSeriesId; use serieswriter::writer::EstablishWorkerJob; use statemap::ActiveChannelState; @@ -379,6 +380,7 @@ pub struct CaConnSet { rogue_channel_count: u64, connect_fail_count: usize, establish_worker_tx: async_channel::Sender, + cssid_latency_max: Duration, } impl CaConnSet { @@ -447,6 +449,7 @@ impl CaConnSet { rogue_channel_count: 0, connect_fail_count: 0, establish_worker_tx, + cssid_latency_max: Duration::from_millis(2000), }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -539,8 +542,8 @@ impl CaConnSet { backend: cmd.backend, channel: cmd.name, kind: SeriesKind::ChannelStatus, - scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, - shape_dims: Vec::new(), + scalar_type: ScalarType::ChannelStatus, + shape: Shape::Scalar, tx: Box::pin(SeriesLookupSender { tx }), }; self.channel_info_query_queue.push_back(item); @@ -598,7 +601,12 @@ impl CaConnSet { let ch = Channel::new(cmd.name.clone()); if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { - if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 { + if let ActiveChannelState::WaitForStatusSeriesId { since } = chst2 { + let dt = since.elapsed().unwrap(); + if dt > self.cssid_latency_max { + self.cssid_latency_max = dt + Duration::from_millis(2000); + debug!("slow cssid fetch dt {:.0} ms {:?}", 1e3 * dt.as_secs_f32(), cmd); + } *chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState { cssid: cmd.cssid, addr_find_backoff: 0, @@ -1257,7 +1265,7 @@ impl CaConnSet { } ActiveChannelState::WaitForStatusSeriesId { since } => { let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > Duration::from_millis(5000) { + if dt > Duration::from_millis(20000) { warn!("timeout can not get status series id for {ch:?}"); *st2 = ActiveChannelState::Init { since: stnow }; } else { diff --git a/series/src/series.rs b/series/src/series.rs index 944ab6a..1bb7e6d 100644 --- a/series/src/series.rs +++ b/series/src/series.rs @@ -1,8 +1,6 @@ use serde::Deserialize; use serde::Serialize; -pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = 14; - #[derive(Clone, Debug)] pub enum Existence { Created(T), diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 7ce5431..c2e7c61 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -4,7 +4,6 @@ use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; -use future::ready; use futures_util::future; use futures_util::StreamExt; use log::*; @@ -19,7 +18,6 @@ use netpod::TS_MSP_GRID_UNIT; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::InsertItem; use scywr::iteminsertqueue::QueryItem; -use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use series::ChannelStatusSeriesId; use series::SeriesId; use stats::SeriesWriterEstablishStats; @@ -83,8 +81,8 @@ impl SeriesWriter { backend: backend.clone(), channel: channel.clone(), kind: SeriesKind::ChannelStatus, - scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, - shape_dims: shape.to_scylla_vec(), + scalar_type: ScalarType::ChannelStatus, + shape: Shape::Scalar, tx: Box::pin(tx), }; worker_tx.send(item).await?; @@ -107,8 +105,8 @@ impl SeriesWriter { backend, channel, kind: SeriesKind::ChannelData, - scalar_type: scalar_type.to_scylla_i32(), - shape_dims: shape.to_scylla_vec(), + scalar_type: scalar_type.clone(), + shape: shape.clone(), tx: Box::pin(tx), }; worker_tx.send(item).await?;