Fix channel series lookup

This commit is contained in:
Dominik Werder
2024-02-14 16:50:40 +01:00
parent 37612404f1
commit e1599ab0b7
5 changed files with 63 additions and 51 deletions

View File

@@ -76,7 +76,7 @@ create table if not exists series_by_channel (
scalar_type int not null,
shape_dims int[] storage plain not null,
agg_kind int not null,
tscs timestamptz[] storage plain default array[now()]
tscs timestamptz[] storage plain not null default array[now()]
)";
let _ = pgc.execute(sql, &[]).await;
@@ -134,10 +134,15 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> {
async fn migrate_02(pgc: &PgClient) -> Result<(), Error> {
// TODO after all migrations, should check that the schema is as expected.
let sql = "alter table series_by_channel add if not exists tscs timestamptz[] storage plain default array[now()]";
let sql = "alter table series_by_channel alter shape_dims set storage plain";
let _ = pgc.execute(sql, &[]).await?;
let sql = "alter table series_by_channel add if not exists tscs timestamptz[] storage plain not null default array[now()]";
let _ = pgc.execute(sql, &[]).await?;
let sql = "alter table series_by_channel alter tscs set storage plain";
let _ = pgc.execute(sql, &[]).await?;
let sql = "alter table series_by_channel alter tscs set not null";
let _ = pgc.execute(sql, &[]).await?;
let sql = concat!("alter table series_by_channel drop constraint if exists series_by_channel_nondup");
let _ = pgc.execute(sql, &[]).await?;

View File

@@ -6,7 +6,6 @@ use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
use log::*;
use md5::Digest;
@@ -14,7 +13,6 @@ use netpod::Database;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use series::series::Existence;
use series::SeriesId;
use stats::SeriesByChannelStats;
use std::pin::Pin;
@@ -83,8 +81,8 @@ pub struct ChannelInfoQuery {
pub backend: String,
pub channel: String,
pub kind: SeriesKind,
pub scalar_type: i32,
pub shape_dims: Vec<i32>,
pub scalar_type: ScalarType,
pub shape: Shape,
pub tx: Pin<Box<dyn CanSendChannelInfoResult + Send>>,
}
@@ -95,7 +93,7 @@ impl fmt::Debug for ChannelInfoQuery {
.field("channel", &self.channel)
.field("kind", &self.kind)
.field("scalar_type", &self.scalar_type)
.field("shape_dims", &self.shape_dims)
.field("shape", &self.shape)
.finish()
}
}
@@ -160,7 +158,7 @@ impl Worker {
")",
" select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs, t.kind from q1",
" join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel",
" and t.agg_kind = 0",
" and t.kind = q1.kind and t.agg_kind = 0",
" order by q1.rid",
);
let qu_select = pg
@@ -216,15 +214,19 @@ impl Worker {
x.backend,
x.channel,
x.scalar_type,
x.shape_dims
x.shape
);
}
self.pg.execute("begin", &[]).await?;
match self.handle_batch::<FR>(batch).await {
Ok(()) => {
let ts1 = Instant::now();
match self.pg.execute("commit", &[]).await {
Ok(n) => {
debug!("commit {n}");
let dt = ts1.elapsed();
if dt > Duration::from_millis(40) {
debug!("commit {} {:.0} ms", n, dt.as_secs_f32());
}
}
Err(e) => {
warn!("commit error {e}");
@@ -351,7 +353,9 @@ impl Worker {
let tscs: Vec<DateTime<Utc>> = row.get(4);
let kind: i16 = row.get(5);
let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?;
if false {
if job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000"
|| series == SeriesId::new(1605348259462543621)
{
debug!(
"select worker found in database {:?} {:?} {:?} {:?} {:?} {:?}",
rid, series, scalar_type, shape_dims, tscs, kind
@@ -366,10 +370,8 @@ impl Worker {
// debug!("check for {job:?}");
// TODO call decide with empty accumulator: will result in DoesntExist.
let v = std::mem::replace(&mut acc, Vec::new());
let scalar_type_job = ScalarType::from_scylla_i32(job.scalar_type).map_err(|_| Error::ScalarType)?;
let shape_job = Shape::from_scylla_shape_dims(job.shape_dims.as_slice()).map_err(|_| Error::Shape)?;
let dec = Self::decide_matching_via_db(scalar_type_job, shape_job, v)?;
debug!("decision {dec:?}");
let dec = Self::decide_matching_via_db(job.scalar_type.clone(), job.shape.clone(), v)?;
// debug!("decision {dec:?}");
result.push(FoundResult { job, status: dec });
}
Ok(result)
@@ -432,14 +434,15 @@ impl Worker {
fn assert_varying_types(v: &Vec<(SeriesId, ScalarType, Shape, DateTime<Utc>)>) -> Result<(), Error> {
if v.len() > 1 {
let mut z_0 = &v[0].0;
let mut z_1 = &v[0].1;
let mut z_2 = &v[0].2;
for r in v[1..].iter() {
if r.1 == *z_1 && r.2 == *z_2 {
debug!("{v:?}");
let e = Error::DbConsistencySeries(format!("no change between entries"));
let e = Error::DbConsistencySeries(format!("no change between entries {:?} {:?}", r.0, z_0));
return Err(e);
}
z_0 = &r.0;
z_1 = &r.1;
z_2 = &r.2;
}
@@ -451,23 +454,27 @@ impl Worker {
async fn insert_missing<FR: HashSalter>(&self, batch: &Vec<ChannelInfoQuery>) -> Result<(), Error> {
// debug!("insert_missing len {}", batch.len());
let (backends, channels, kinds, scalar_types, shape_dimss, mut hashers) = batch
let (backends, channels, kinds, scalar_types, shapes, mut hashers) = batch
.iter()
.map(|job| {
let backend = &job.backend;
let channel = &job.channel;
let kind = job.kind.to_db_i16();
let scalar_type = &job.scalar_type;
let shape = &job.shape_dims;
let shape = &job.shape;
let hasher = {
let mut h = md5::Md5::new();
h.update(job.backend.as_bytes());
h.update(job.channel.as_bytes());
h.update(format!("{:?}", job.scalar_type).as_bytes());
h.update(format!("{:?}", job.shape_dims).as_bytes());
h.update(format!("{:?}", scalar_type).as_bytes());
h.update(format!("{:?}", shape).as_bytes());
h
};
(backend, channel, kind, scalar_type, shape, hasher)
let x = (backend, channel, kind, scalar_type.to_scylla_i32(), shape, hasher);
if channel == "TEST:MEDIUM:WAVE-01024:F32:000000" {
debug!("INSERT {x:?}");
}
x
})
.fold(
(Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
@@ -510,14 +517,7 @@ impl Worker {
};
}
}
use serde_json::Value as JsVal;
let shape_dims_jss: Vec<_> = shape_dimss
.iter()
.map(|x| {
let v = x.iter().map(|x| JsVal::Number(serde_json::Number::from(*x))).collect();
JsVal::Array(v)
})
.collect();
let shape_dims_jss: Vec<_> = shapes.iter().map(|x| x.to_json_value()).collect();
let cc = self
.pg
.execute(
@@ -544,6 +544,9 @@ impl Worker {
async fn update_used_before<FR: HashSalter>(&self, sid: Vec<i64>) -> Result<(), Error> {
debug!("update_used_before {sid:?}");
if sid.contains(&1605348259462543621) {
debug!("UPDATE TSC FOR 1605348259462543621");
}
let sql = concat!(
"update series_by_channel set",
" tscs = tscs || now()::timestamptz",
@@ -736,8 +739,8 @@ fn test_series_by_channel_01() {
backend: backend.into(),
channel: channel.into(),
kind: SeriesKind::ChannelData,
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
scalar_type: netpod::ScalarType::U16,
shape: Shape::Wave(64),
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();
@@ -751,8 +754,8 @@ fn test_series_by_channel_01() {
backend: backend.into(),
channel: channel_01.into(),
kind: SeriesKind::ChannelData,
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
scalar_type: netpod::ScalarType::U16,
shape: Shape::Wave(64),
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();
@@ -766,8 +769,8 @@ fn test_series_by_channel_01() {
backend: backend.into(),
channel: channel_02.into(),
kind: SeriesKind::ChannelData,
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
scalar_type: netpod::ScalarType::U16,
shape: Shape::Wave(64),
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();
@@ -795,8 +798,8 @@ fn test_series_by_channel_01() {
backend: backend.into(),
channel: channel.into(),
kind: SeriesKind::ChannelData,
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
scalar_type: netpod::ScalarType::U16,
shape: Shape::Wave(64),
tx: Box::pin(tx),
};
channel_info_query_tx.send(item).await.unwrap();

View File

@@ -32,12 +32,13 @@ use futures_util::Stream;
use futures_util::StreamExt;
use hashbrown::HashMap;
use log::*;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use scywr::iteminsertqueue::ChannelInfoItem;
use scywr::iteminsertqueue::ChannelStatusItem;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use series::ChannelStatusSeriesId;
use serieswriter::writer::EstablishWorkerJob;
use statemap::ActiveChannelState;
@@ -379,6 +380,7 @@ pub struct CaConnSet {
rogue_channel_count: u64,
connect_fail_count: usize,
establish_worker_tx: async_channel::Sender<EstablishWorkerJob>,
cssid_latency_max: Duration,
}
impl CaConnSet {
@@ -447,6 +449,7 @@ impl CaConnSet {
rogue_channel_count: 0,
connect_fail_count: 0,
establish_worker_tx,
cssid_latency_max: Duration::from_millis(2000),
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
@@ -539,8 +542,8 @@ impl CaConnSet {
backend: cmd.backend,
channel: cmd.name,
kind: SeriesKind::ChannelStatus,
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: Vec::new(),
scalar_type: ScalarType::ChannelStatus,
shape: Shape::Scalar,
tx: Box::pin(SeriesLookupSender { tx }),
};
self.channel_info_query_queue.push_back(item);
@@ -598,7 +601,12 @@ impl CaConnSet {
let ch = Channel::new(cmd.name.clone());
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(chst2) = &mut chst.value {
if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 {
if let ActiveChannelState::WaitForStatusSeriesId { since } = chst2 {
let dt = since.elapsed().unwrap();
if dt > self.cssid_latency_max {
self.cssid_latency_max = dt + Duration::from_millis(2000);
debug!("slow cssid fetch dt {:.0} ms {:?}", 1e3 * dt.as_secs_f32(), cmd);
}
*chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState {
cssid: cmd.cssid,
addr_find_backoff: 0,
@@ -1257,7 +1265,7 @@ impl CaConnSet {
}
ActiveChannelState::WaitForStatusSeriesId { since } => {
let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > Duration::from_millis(5000) {
if dt > Duration::from_millis(20000) {
warn!("timeout can not get status series id for {ch:?}");
*st2 = ActiveChannelState::Init { since: stnow };
} else {

View File

@@ -1,8 +1,6 @@
use serde::Deserialize;
use serde::Serialize;
pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = 14;
#[derive(Clone, Debug)]
pub enum Existence<T> {
Created(T),

View File

@@ -4,7 +4,6 @@ use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use future::ready;
use futures_util::future;
use futures_util::StreamExt;
use log::*;
@@ -19,7 +18,6 @@ use netpod::TS_MSP_GRID_UNIT;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::InsertItem;
use scywr::iteminsertqueue::QueryItem;
use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::SeriesWriterEstablishStats;
@@ -83,8 +81,8 @@ impl SeriesWriter {
backend: backend.clone(),
channel: channel.clone(),
kind: SeriesKind::ChannelStatus,
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: shape.to_scylla_vec(),
scalar_type: ScalarType::ChannelStatus,
shape: Shape::Scalar,
tx: Box::pin(tx),
};
worker_tx.send(item).await?;
@@ -107,8 +105,8 @@ impl SeriesWriter {
backend,
channel,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
scalar_type: scalar_type.clone(),
shape: shape.clone(),
tx: Box::pin(tx),
};
worker_tx.send(item).await?;