diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index 754773b..1a40e4f 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -72,6 +72,7 @@ create table if not exists series_by_channel ( series bigint not null primary key, facility text not null, channel text not null, + kind int2 not null, scalar_type int not null, shape_dims int[] storage plain not null, agg_kind int not null, @@ -79,14 +80,11 @@ create table if not exists series_by_channel ( )"; let _ = pgc.execute(sql, &[]).await; - { - let sql = "alter table series_by_channel drop tscreate"; - let _ = pgc.execute(sql, &[]).await; - } + let sql = "alter table series_by_channel drop tscreate"; + let _ = pgc.execute(sql, &[]).await; + if !has_table("ioc_by_channel_log", pgc).await? { - let _ = pgc - .execute( - " + let sql = " create table if not exists ioc_by_channel_log ( facility text not null, channel text not null, @@ -96,22 +94,15 @@ create table if not exists ioc_by_channel_log ( queryaddr text, responseaddr text, addr text -) -", - &[], - ) - .await; - let _ = pgc - .execute( - " +)"; + let _ = pgc.execute(sql, &[]).await; + let sql = " create index if not exists ioc_by_channel_log_channel on ioc_by_channel_log ( facility, channel ) -", - &[], - ) - .await; +"; + let _ = pgc.execute(sql, &[]).await; } Ok(()) } @@ -138,18 +129,6 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> { ) .await?; } - { - let sql = concat!( - "alter table series_by_channel add constraint series_by_channel_nondup", - " unique (facility, channel, scalar_type, shape_dims, agg_kind)" - ); - match pgc.execute(sql, &[]).await { - Ok(_) => { - info!("constraint added"); - } - Err(_) => {} - } - } Ok(()) } @@ -159,6 +138,24 @@ async fn migrate_02(pgc: &PgClient) -> Result<(), Error> { let _ = pgc.execute(sql, &[]).await?; let sql = "alter table series_by_channel alter tscs set storage plain"; let _ = pgc.execute(sql, &[]).await?; + + let sql = concat!("alter table series_by_channel drop constraint if exists series_by_channel_nondup"); + let _ = pgc.execute(sql, &[]).await?; + let sql = "alter table series_by_channel add if not exists kind int2 not null default 0"; + let _ = pgc.execute(sql, &[]).await?; + let sql = "alter table series_by_channel alter kind drop default"; + let _ = pgc.execute(sql, &[]).await?; + let sql = "update series_by_channel set kind = 1 where kind = 0 and scalar_type = 14"; + let _ = pgc.execute(sql, &[]).await?; + let sql = "update series_by_channel set kind = 2 where kind = 0 and scalar_type >= 0 and scalar_type <= 13"; + let _ = pgc.execute(sql, &[]).await?; + + // TODO this can fail if exists, but must verify that it exists in proper form + let sql = concat!( + "alter table series_by_channel add constraint series_by_channel_nondup_02", + " unique (facility, channel, kind, scalar_type, shape_dims, agg_kind)" + ); + let _ = pgc.execute(sql, &[]).await; Ok(()) } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 22e7de2..b70feab 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -12,6 +12,7 @@ use log::*; use md5::Digest; use netpod::Database; use netpod::ScalarType; +use netpod::SeriesKind; use netpod::Shape; use series::series::Existence; use series::SeriesId; @@ -81,6 +82,7 @@ impl CanSendChannelInfoResult for async_channel::Sender, pub tx: Pin>, @@ -91,6 +93,7 @@ impl fmt::Debug for ChannelInfoQuery { fmt.debug_struct("ChannelInfoQuery") .field("backend", &self.backend) .field("channel", &self.channel) + .field("kind", &self.kind) .field("scalar_type", &self.scalar_type) .field("shape_dims", &self.shape_dims) .finish() @@ -152,24 +155,28 @@ impl Worker { let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?; let sql = concat!( "with q1 as (", - " select * from unnest($1, $2, $3)", - " as inp (rid, backend, channel)", + " select * from unnest($1, $2, $3, $4)", + " as inp (rid, backend, channel, kind)", ")", - " select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs from q1", + " select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs, t.kind from q1", " join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel", " and t.agg_kind = 0", " order by q1.rid", ); let qu_select = pg - .prepare_typed(sql, &[Type::INT4_ARRAY, Type::TEXT_ARRAY, Type::TEXT_ARRAY]) + .prepare_typed( + sql, + &[Type::INT4_ARRAY, Type::TEXT_ARRAY, Type::TEXT_ARRAY, Type::INT2_ARRAY], + ) .await?; + let sql = concat!( "with q1 as (", - " select * from unnest($1, $2, $3, $4, $5)", - " as inp (backend, channel, scalar_type, shape_dims, series)", + " select * from unnest($1, $2, $3, $4, $5, $6)", + " as inp (backend, channel, scalar_type, shape_dims, series, kind)", ")", - " insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " select series, backend, channel, scalar_type,", + " insert into series_by_channel (series, facility, channel, kind, scalar_type, shape_dims, agg_kind)", + " select series, backend, channel, kind, scalar_type,", " array(select e::int from jsonb_array_elements(shape_dims) as e) as shape_dims,", " 0 from q1", " on conflict do nothing" @@ -183,6 +190,7 @@ impl Worker { Type::INT4_ARRAY, Type::JSONB_ARRAY, Type::INT8_ARRAY, + Type::INT2_ARRAY, ], ) .await?; @@ -301,25 +309,33 @@ impl Worker { } async fn select(&self, batch: Vec) -> Result, Error> { - let (rids, backends, channels, jobs) = batch + let (rids, backends, channels, kinds, jobs) = batch .into_iter() .enumerate() .map(|(i, e)| { let rid = i as i32; let backend = e.backend.clone(); let channel = e.channel.clone(); + let kind = e.kind.to_db_i16(); let job = e; - (rid, backend, channel, job) + (rid, backend, channel, kind, job) }) - .fold((Vec::new(), Vec::new(), Vec::new(), Vec::new()), |mut a, v| { - a.0.push(v.0); - a.1.push(v.1); - a.2.push(v.2); - a.3.push(v.3); - a - }); + .fold( + (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()), + |mut a, v| { + a.0.push(v.0); + a.1.push(v.1); + a.2.push(v.2); + a.3.push(v.3); + a.4.push(v.4); + a + }, + ); // debug!("select worker start batch of {} {:?}", channels.len(), channels); - let rows = self.pg.query(&self.qu_select, &[&rids, &backends, &channels]).await?; + let rows = self + .pg + .query(&self.qu_select, &[&rids, &backends, &channels, &kinds]) + .await?; let mut row_it = rows.into_iter(); let mut row_opt = row_it.next(); let mut acc = Vec::new(); @@ -333,10 +349,12 @@ impl Worker { let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec>(3).as_slice()) .map_err(|_| Error::Shape)?; let tscs: Vec> = row.get(4); + let kind: i16 = row.get(5); + let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?; if false { debug!( - "select worker found in database {:?} {:?} {:?} {:?} {:?}", - rid, series, scalar_type, shape_dims, tscs + "select worker found in database {:?} {:?} {:?} {:?} {:?} {:?}", + rid, series, scalar_type, shape_dims, tscs, kind ); } acc.push((rid, series, scalar_type, shape_dims, tscs)); @@ -433,11 +451,12 @@ impl Worker { async fn insert_missing(&self, batch: &Vec) -> Result<(), Error> { // debug!("insert_missing len {}", batch.len()); - let (backends, channels, scalar_types, shape_dimss, mut hashers) = batch + let (backends, channels, kinds, scalar_types, shape_dimss, mut hashers) = batch .iter() .map(|job| { let backend = &job.backend; let channel = &job.channel; + let kind = job.kind.to_db_i16(); let scalar_type = &job.scalar_type; let shape = &job.shape_dims; let hasher = { @@ -448,16 +467,17 @@ impl Worker { h.update(format!("{:?}", job.shape_dims).as_bytes()); h }; - (backend, channel, scalar_type, shape, hasher) + (backend, channel, kind, scalar_type, shape, hasher) }) .fold( - (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()), + (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()), |mut a, x| { a.0.push(x.0); a.1.push(x.1); a.2.push(x.2); a.3.push(x.3); a.4.push(x.4); + a.5.push(x.5); a }, ); @@ -502,7 +522,14 @@ impl Worker { .pg .execute( &self.qu_insert, - &[&backends, &channels, &scalar_types, &shape_dims_jss, &series_ids], + &[ + &backends, + &channels, + &scalar_types, + &shape_dims_jss, + &series_ids, + &kinds, + ], ) .await .unwrap(); @@ -674,8 +701,8 @@ fn test_series_by_channel_01() { // Block the id let id1: i64 = 5609172854884670524; let sql = concat!( - "insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " values ($1, $2, $3, $4, array[]::int4[], 0)" + "insert into series_by_channel (series, facility, channel, kind, scalar_type, shape_dims, agg_kind)", + " values ($1, $2, $3, 2, $4, array[]::int4[], 0)" ); pg.execute(sql, &[&id1, &backend, &"test-block-00", &1i32]).await?; @@ -684,15 +711,15 @@ fn test_series_by_channel_01() { let id: i64 = 4802468414253815536; let sql = concat!( "insert into series_by_channel", - " (series, facility, channel, scalar_type, shape_dims, agg_kind, tscs)", - " values ($1, $2, $3, 5, array[64]::int4[], 0, array['2000-10-10T08:00:00Z'::timestamptz])" + " (series, facility, channel, kind, scalar_type, shape_dims, agg_kind, tscs)", + " values ($1, $2, $3, 2, 5, array[64]::int4[], 0, array['2000-10-10T08:00:00Z'::timestamptz])" ); pg.execute(sql, &[&id, &backend, &channel_02]).await?; let id: i64 = 6409375609862757444; let sql = concat!( "insert into series_by_channel", - " (series, facility, channel, scalar_type, shape_dims, agg_kind, tscs)", - " values ($1, $2, $3, 8, array[64]::int4[], 0, array['2001-10-10T08:00:00Z'::timestamptz])" + " (series, facility, channel, kind, scalar_type, shape_dims, agg_kind, tscs)", + " values ($1, $2, $3, 2, 8, array[64]::int4[], 0, array['2001-10-10T08:00:00Z'::timestamptz])" ); pg.execute(sql, &[&id, &backend, &channel_02]).await?; } @@ -708,6 +735,7 @@ fn test_series_by_channel_01() { let item = dbpg::seriesbychannel::ChannelInfoQuery { backend: backend.into(), channel: channel.into(), + kind: SeriesKind::ChannelData, scalar_type: netpod::ScalarType::U16.to_scylla_i32(), shape_dims: vec![64], tx: Box::pin(tx), @@ -722,6 +750,7 @@ fn test_series_by_channel_01() { let item = dbpg::seriesbychannel::ChannelInfoQuery { backend: backend.into(), channel: channel_01.into(), + kind: SeriesKind::ChannelData, scalar_type: netpod::ScalarType::U16.to_scylla_i32(), shape_dims: vec![64], tx: Box::pin(tx), @@ -736,6 +765,7 @@ fn test_series_by_channel_01() { let item = dbpg::seriesbychannel::ChannelInfoQuery { backend: backend.into(), channel: channel_02.into(), + kind: SeriesKind::ChannelData, scalar_type: netpod::ScalarType::U16.to_scylla_i32(), shape_dims: vec![64], tx: Box::pin(tx), @@ -764,6 +794,7 @@ fn test_series_by_channel_01() { let item = dbpg::seriesbychannel::ChannelInfoQuery { backend: backend.into(), channel: channel.into(), + kind: SeriesKind::ChannelData, scalar_type: netpod::ScalarType::U16.to_scylla_i32(), shape_dims: vec![64], tx: Box::pin(tx), @@ -776,7 +807,7 @@ fn test_series_by_channel_01() { let rows = pg .query( concat!( - "select series, channel, scalar_type, shape_dims", + "select series, channel, kind, scalar_type, shape_dims", " from series_by_channel where facility = $1", " and channel not like 'test-block%'", " order by channel, scalar_type, shape_dims", @@ -788,8 +819,8 @@ fn test_series_by_channel_01() { for row in rows { let series: i64 = row.get(0); let channel: String = row.get(1); - let scalar_type: i32 = row.get(2); - let shape_dims: Vec = row.get(3); + let scalar_type: i32 = row.get(3); + let shape_dims: Vec = row.get(4); all.push((series, channel, scalar_type, shape_dims)); } let exp: Vec<(i64, String, i32, Vec)> = vec![ diff --git a/dbpg/src/seriesid.rs b/dbpg/src/seriesid.rs index 217e567..bbc9f9b 100644 --- a/dbpg/src/seriesid.rs +++ b/dbpg/src/seriesid.rs @@ -1,15 +1,7 @@ -use crate::conn::PgClient; use err::thiserror; use err::ThisError; -use log::*; -use netpod::ScalarType; -use netpod::Shape; -use series::series::Existence; -use series::SeriesId; -use std::time::Duration; -use std::time::Instant; -use taskrun::tokio; +// TODO still needed? #[derive(Debug, ThisError)] pub enum Error { Postgres(#[from] tokio_postgres::Error), @@ -17,76 +9,3 @@ pub enum Error { BadIdGenerated, CanNotInsertSeriesId, } - -async fn _get_series_id( - backend: &str, - name: &str, - scalar_type: &ScalarType, - shape: &Shape, - pg_client: &PgClient, -) -> Result, Error> { - let channel_name = name; - let scalar_type = scalar_type.to_scylla_i32(); - let shape = shape.to_scylla_vec(); - let res = pg_client - .query( - "select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0", - &[&backend, &channel_name, &scalar_type, &shape], - ) - .await?; - let mut all = Vec::new(); - for row in res { - let series: i64 = row.get(0); - let series = series as u64; - all.push(series); - } - let rn = all.len(); - let tsbeg = Instant::now(); - if rn == 0 { - use md5::Digest; - let mut h = md5::Md5::new(); - h.update(backend.as_bytes()); - h.update(channel_name.as_bytes()); - h.update(format!("{:?}", scalar_type).as_bytes()); - h.update(format!("{:?}", shape).as_bytes()); - for _ in 0..200 { - h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); - let f = h.clone().finalize(); - let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - if series > i64::MAX as u64 { - continue; - } - if series == 0 { - continue; - } - if series <= 0 || series > i64::MAX as u64 { - return Err(Error::BadIdGenerated); - } - let sql = concat!( - "insert into series_by_channel", - " (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " values ($1, $2, $3, $4, $5, 0) on conflict do nothing" - ); - let res = pg_client - .execute(sql, &[&(series as i64), &backend, &channel_name, &scalar_type, &shape]) - .await - .unwrap(); - if res == 1 { - let series = Existence::Created(SeriesId::new(series)); - return Ok(series); - } else { - warn!( - "tried to insert {series:?} for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} trying again..." - ); - } - tokio::time::sleep(Duration::from_millis(20)).await; - } - error!("tried to insert new series id for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} but failed"); - Err(Error::CanNotInsertSeriesId) - } else { - let series = all[0] as u64; - let series = Existence::Existing(SeriesId::new(series)); - debug!("get_series_id {backend:?} {channel_name:?} {scalar_type:?} {shape:?} {series:?}"); - Ok(series) - } -} diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index e15bce2..8c7aab6 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -32,6 +32,7 @@ use futures_util::Stream; use futures_util::StreamExt; use hashbrown::HashMap; use log::*; +use netpod::SeriesKind; use scywr::iteminsertqueue::ChannelInfoItem; use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; @@ -537,6 +538,7 @@ impl CaConnSet { let item = ChannelInfoQuery { backend: cmd.backend, channel: cmd.name, + kind: SeriesKind::ChannelStatus, scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, shape_dims: Vec::new(), tx: Box::pin(SeriesLookupSender { tx }), diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 3800765..7ce5431 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -11,6 +11,7 @@ use log::*; use netpod::timeunits::HOUR; use netpod::timeunits::SEC; use netpod::ScalarType; +use netpod::SeriesKind; use netpod::Shape; use netpod::TsNano; use netpod::TS_MSP_GRID_SPACING; @@ -81,6 +82,7 @@ impl SeriesWriter { let item = ChannelInfoQuery { backend: backend.clone(), channel: channel.clone(), + kind: SeriesKind::ChannelStatus, scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, shape_dims: shape.to_scylla_vec(), tx: Box::pin(tx), @@ -104,6 +106,7 @@ impl SeriesWriter { let item = ChannelInfoQuery { backend, channel, + kind: SeriesKind::ChannelData, scalar_type: scalar_type.to_scylla_i32(), shape_dims: shape.to_scylla_vec(), tx: Box::pin(tx),