This commit is contained in:
Dominik Werder
2024-02-07 15:19:37 +01:00
parent 3c4970aa2c
commit 37612404f1
5 changed files with 98 additions and 146 deletions
+28 -31
View File
@@ -72,6 +72,7 @@ create table if not exists series_by_channel (
series bigint not null primary key,
facility text not null,
channel text not null,
kind int2 not null,
scalar_type int not null,
shape_dims int[] storage plain not null,
agg_kind int not null,
@@ -79,14 +80,11 @@ create table if not exists series_by_channel (
)";
let _ = pgc.execute(sql, &[]).await;
{
let sql = "alter table series_by_channel drop tscreate";
let _ = pgc.execute(sql, &[]).await;
}
let sql = "alter table series_by_channel drop tscreate";
let _ = pgc.execute(sql, &[]).await;
if !has_table("ioc_by_channel_log", pgc).await? {
let _ = pgc
.execute(
"
let sql = "
create table if not exists ioc_by_channel_log (
facility text not null,
channel text not null,
@@ -96,22 +94,15 @@ create table if not exists ioc_by_channel_log (
queryaddr text,
responseaddr text,
addr text
)
",
&[],
)
.await;
let _ = pgc
.execute(
"
)";
let _ = pgc.execute(sql, &[]).await;
let sql = "
create index if not exists ioc_by_channel_log_channel on ioc_by_channel_log (
facility,
channel
)
",
&[],
)
.await;
";
let _ = pgc.execute(sql, &[]).await;
}
Ok(())
}
@@ -138,18 +129,6 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> {
)
.await?;
}
{
let sql = concat!(
"alter table series_by_channel add constraint series_by_channel_nondup",
" unique (facility, channel, scalar_type, shape_dims, agg_kind)"
);
match pgc.execute(sql, &[]).await {
Ok(_) => {
info!("constraint added");
}
Err(_) => {}
}
}
Ok(())
}
@@ -159,6 +138,24 @@ async fn migrate_02(pgc: &PgClient) -> Result<(), Error> {
let _ = pgc.execute(sql, &[]).await?;
let sql = "alter table series_by_channel alter tscs set storage plain";
let _ = pgc.execute(sql, &[]).await?;
let sql = concat!("alter table series_by_channel drop constraint if exists series_by_channel_nondup");
let _ = pgc.execute(sql, &[]).await?;
let sql = "alter table series_by_channel add if not exists kind int2 not null default 0";
let _ = pgc.execute(sql, &[]).await?;
let sql = "alter table series_by_channel alter kind drop default";
let _ = pgc.execute(sql, &[]).await?;
let sql = "update series_by_channel set kind = 1 where kind = 0 and scalar_type = 14";
let _ = pgc.execute(sql, &[]).await?;
let sql = "update series_by_channel set kind = 2 where kind = 0 and scalar_type >= 0 and scalar_type <= 13";
let _ = pgc.execute(sql, &[]).await?;
// TODO this can fail if exists, but must verify that it exists in proper form
let sql = concat!(
"alter table series_by_channel add constraint series_by_channel_nondup_02",
" unique (facility, channel, kind, scalar_type, shape_dims, agg_kind)"
);
let _ = pgc.execute(sql, &[]).await;
Ok(())
}
+64 -33
View File
@@ -12,6 +12,7 @@ use log::*;
use md5::Digest;
use netpod::Database;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use series::series::Existence;
use series::SeriesId;
@@ -81,6 +82,7 @@ impl CanSendChannelInfoResult for async_channel::Sender<Result<ChannelInfoResult
pub struct ChannelInfoQuery {
pub backend: String,
pub channel: String,
pub kind: SeriesKind,
pub scalar_type: i32,
pub shape_dims: Vec<i32>,
pub tx: Pin<Box<dyn CanSendChannelInfoResult + Send>>,
@@ -91,6 +93,7 @@ impl fmt::Debug for ChannelInfoQuery {
fmt.debug_struct("ChannelInfoQuery")
.field("backend", &self.backend)
.field("channel", &self.channel)
.field("kind", &self.kind)
.field("scalar_type", &self.scalar_type)
.field("shape_dims", &self.shape_dims)
.finish()
@@ -152,24 +155,28 @@ impl Worker {
let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?;
let sql = concat!(
"with q1 as (",
" select * from unnest($1, $2, $3)",
" as inp (rid, backend, channel)",
" select * from unnest($1, $2, $3, $4)",
" as inp (rid, backend, channel, kind)",
")",
" select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs from q1",
" select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs, t.kind from q1",
" join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel",
" and t.agg_kind = 0",
" order by q1.rid",
);
let qu_select = pg
.prepare_typed(sql, &[Type::INT4_ARRAY, Type::TEXT_ARRAY, Type::TEXT_ARRAY])
.prepare_typed(
sql,
&[Type::INT4_ARRAY, Type::TEXT_ARRAY, Type::TEXT_ARRAY, Type::INT2_ARRAY],
)
.await?;
let sql = concat!(
"with q1 as (",
" select * from unnest($1, $2, $3, $4, $5)",
" as inp (backend, channel, scalar_type, shape_dims, series)",
" select * from unnest($1, $2, $3, $4, $5, $6)",
" as inp (backend, channel, scalar_type, shape_dims, series, kind)",
")",
" insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" select series, backend, channel, scalar_type,",
" insert into series_by_channel (series, facility, channel, kind, scalar_type, shape_dims, agg_kind)",
" select series, backend, channel, kind, scalar_type,",
" array(select e::int from jsonb_array_elements(shape_dims) as e) as shape_dims,",
" 0 from q1",
" on conflict do nothing"
@@ -183,6 +190,7 @@ impl Worker {
Type::INT4_ARRAY,
Type::JSONB_ARRAY,
Type::INT8_ARRAY,
Type::INT2_ARRAY,
],
)
.await?;
@@ -301,25 +309,33 @@ impl Worker {
}
async fn select(&self, batch: Vec<ChannelInfoQuery>) -> Result<Vec<FoundResult>, Error> {
let (rids, backends, channels, jobs) = batch
let (rids, backends, channels, kinds, jobs) = batch
.into_iter()
.enumerate()
.map(|(i, e)| {
let rid = i as i32;
let backend = e.backend.clone();
let channel = e.channel.clone();
let kind = e.kind.to_db_i16();
let job = e;
(rid, backend, channel, job)
(rid, backend, channel, kind, job)
})
.fold((Vec::new(), Vec::new(), Vec::new(), Vec::new()), |mut a, v| {
a.0.push(v.0);
a.1.push(v.1);
a.2.push(v.2);
a.3.push(v.3);
a
});
.fold(
(Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
|mut a, v| {
a.0.push(v.0);
a.1.push(v.1);
a.2.push(v.2);
a.3.push(v.3);
a.4.push(v.4);
a
},
);
// debug!("select worker start batch of {} {:?}", channels.len(), channels);
let rows = self.pg.query(&self.qu_select, &[&rids, &backends, &channels]).await?;
let rows = self
.pg
.query(&self.qu_select, &[&rids, &backends, &channels, &kinds])
.await?;
let mut row_it = rows.into_iter();
let mut row_opt = row_it.next();
let mut acc = Vec::new();
@@ -333,10 +349,12 @@ impl Worker {
let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec<i32>>(3).as_slice())
.map_err(|_| Error::Shape)?;
let tscs: Vec<DateTime<Utc>> = row.get(4);
let kind: i16 = row.get(5);
let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?;
if false {
debug!(
"select worker found in database {:?} {:?} {:?} {:?} {:?}",
rid, series, scalar_type, shape_dims, tscs
"select worker found in database {:?} {:?} {:?} {:?} {:?} {:?}",
rid, series, scalar_type, shape_dims, tscs, kind
);
}
acc.push((rid, series, scalar_type, shape_dims, tscs));
@@ -433,11 +451,12 @@ impl Worker {
async fn insert_missing<FR: HashSalter>(&self, batch: &Vec<ChannelInfoQuery>) -> Result<(), Error> {
// debug!("insert_missing len {}", batch.len());
let (backends, channels, scalar_types, shape_dimss, mut hashers) = batch
let (backends, channels, kinds, scalar_types, shape_dimss, mut hashers) = batch
.iter()
.map(|job| {
let backend = &job.backend;
let channel = &job.channel;
let kind = job.kind.to_db_i16();
let scalar_type = &job.scalar_type;
let shape = &job.shape_dims;
let hasher = {
@@ -448,16 +467,17 @@ impl Worker {
h.update(format!("{:?}", job.shape_dims).as_bytes());
h
};
(backend, channel, scalar_type, shape, hasher)
(backend, channel, kind, scalar_type, shape, hasher)
})
.fold(
(Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
(Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()),
|mut a, x| {
a.0.push(x.0);
a.1.push(x.1);
a.2.push(x.2);
a.3.push(x.3);
a.4.push(x.4);
a.5.push(x.5);
a
},
);
@@ -502,7 +522,14 @@ impl Worker {
.pg
.execute(
&self.qu_insert,
&[&backends, &channels, &scalar_types, &shape_dims_jss, &series_ids],
&[
&backends,
&channels,
&scalar_types,
&shape_dims_jss,
&series_ids,
&kinds,
],
)
.await
.unwrap();
@@ -674,8 +701,8 @@ fn test_series_by_channel_01() {
// Block the id
let id1: i64 = 5609172854884670524;
let sql = concat!(
"insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" values ($1, $2, $3, $4, array[]::int4[], 0)"
"insert into series_by_channel (series, facility, channel, kind, scalar_type, shape_dims, agg_kind)",
" values ($1, $2, $3, 2, $4, array[]::int4[], 0)"
);
pg.execute(sql, &[&id1, &backend, &"test-block-00", &1i32]).await?;
@@ -684,15 +711,15 @@ fn test_series_by_channel_01() {
let id: i64 = 4802468414253815536;
let sql = concat!(
"insert into series_by_channel",
" (series, facility, channel, scalar_type, shape_dims, agg_kind, tscs)",
" values ($1, $2, $3, 5, array[64]::int4[], 0, array['2000-10-10T08:00:00Z'::timestamptz])"
" (series, facility, channel, kind, scalar_type, shape_dims, agg_kind, tscs)",
" values ($1, $2, $3, 2, 5, array[64]::int4[], 0, array['2000-10-10T08:00:00Z'::timestamptz])"
);
pg.execute(sql, &[&id, &backend, &channel_02]).await?;
let id: i64 = 6409375609862757444;
let sql = concat!(
"insert into series_by_channel",
" (series, facility, channel, scalar_type, shape_dims, agg_kind, tscs)",
" values ($1, $2, $3, 8, array[64]::int4[], 0, array['2001-10-10T08:00:00Z'::timestamptz])"
" (series, facility, channel, kind, scalar_type, shape_dims, agg_kind, tscs)",
" values ($1, $2, $3, 2, 8, array[64]::int4[], 0, array['2001-10-10T08:00:00Z'::timestamptz])"
);
pg.execute(sql, &[&id, &backend, &channel_02]).await?;
}
@@ -708,6 +735,7 @@ fn test_series_by_channel_01() {
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel.into(),
kind: SeriesKind::ChannelData,
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
@@ -722,6 +750,7 @@ fn test_series_by_channel_01() {
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel_01.into(),
kind: SeriesKind::ChannelData,
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
@@ -736,6 +765,7 @@ fn test_series_by_channel_01() {
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel_02.into(),
kind: SeriesKind::ChannelData,
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
@@ -764,6 +794,7 @@ fn test_series_by_channel_01() {
let item = dbpg::seriesbychannel::ChannelInfoQuery {
backend: backend.into(),
channel: channel.into(),
kind: SeriesKind::ChannelData,
scalar_type: netpod::ScalarType::U16.to_scylla_i32(),
shape_dims: vec![64],
tx: Box::pin(tx),
@@ -776,7 +807,7 @@ fn test_series_by_channel_01() {
let rows = pg
.query(
concat!(
"select series, channel, scalar_type, shape_dims",
"select series, channel, kind, scalar_type, shape_dims",
" from series_by_channel where facility = $1",
" and channel not like 'test-block%'",
" order by channel, scalar_type, shape_dims",
@@ -788,8 +819,8 @@ fn test_series_by_channel_01() {
for row in rows {
let series: i64 = row.get(0);
let channel: String = row.get(1);
let scalar_type: i32 = row.get(2);
let shape_dims: Vec<i32> = row.get(3);
let scalar_type: i32 = row.get(3);
let shape_dims: Vec<i32> = row.get(4);
all.push((series, channel, scalar_type, shape_dims));
}
let exp: Vec<(i64, String, i32, Vec<i32>)> = vec![
+1 -82
View File
@@ -1,15 +1,7 @@
use crate::conn::PgClient;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::ScalarType;
use netpod::Shape;
use series::series::Existence;
use series::SeriesId;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
// TODO still needed?
#[derive(Debug, ThisError)]
pub enum Error {
Postgres(#[from] tokio_postgres::Error),
@@ -17,76 +9,3 @@ pub enum Error {
BadIdGenerated,
CanNotInsertSeriesId,
}
async fn _get_series_id(
backend: &str,
name: &str,
scalar_type: &ScalarType,
shape: &Shape,
pg_client: &PgClient,
) -> Result<Existence<SeriesId>, Error> {
let channel_name = name;
let scalar_type = scalar_type.to_scylla_i32();
let shape = shape.to_scylla_vec();
let res = pg_client
.query(
"select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0",
&[&backend, &channel_name, &scalar_type, &shape],
)
.await?;
let mut all = Vec::new();
for row in res {
let series: i64 = row.get(0);
let series = series as u64;
all.push(series);
}
let rn = all.len();
let tsbeg = Instant::now();
if rn == 0 {
use md5::Digest;
let mut h = md5::Md5::new();
h.update(backend.as_bytes());
h.update(channel_name.as_bytes());
h.update(format!("{:?}", scalar_type).as_bytes());
h.update(format!("{:?}", shape).as_bytes());
for _ in 0..200 {
h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
let f = h.clone().finalize();
let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
if series > i64::MAX as u64 {
continue;
}
if series == 0 {
continue;
}
if series <= 0 || series > i64::MAX as u64 {
return Err(Error::BadIdGenerated);
}
let sql = concat!(
"insert into series_by_channel",
" (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" values ($1, $2, $3, $4, $5, 0) on conflict do nothing"
);
let res = pg_client
.execute(sql, &[&(series as i64), &backend, &channel_name, &scalar_type, &shape])
.await
.unwrap();
if res == 1 {
let series = Existence::Created(SeriesId::new(series));
return Ok(series);
} else {
warn!(
"tried to insert {series:?} for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} trying again..."
);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
error!("tried to insert new series id for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} but failed");
Err(Error::CanNotInsertSeriesId)
} else {
let series = all[0] as u64;
let series = Existence::Existing(SeriesId::new(series));
debug!("get_series_id {backend:?} {channel_name:?} {scalar_type:?} {shape:?} {series:?}");
Ok(series)
}
}
+2
View File
@@ -32,6 +32,7 @@ use futures_util::Stream;
use futures_util::StreamExt;
use hashbrown::HashMap;
use log::*;
use netpod::SeriesKind;
use scywr::iteminsertqueue::ChannelInfoItem;
use scywr::iteminsertqueue::ChannelStatusItem;
use scywr::iteminsertqueue::QueryItem;
@@ -537,6 +538,7 @@ impl CaConnSet {
let item = ChannelInfoQuery {
backend: cmd.backend,
channel: cmd.name,
kind: SeriesKind::ChannelStatus,
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: Vec::new(),
tx: Box::pin(SeriesLookupSender { tx }),
+3
View File
@@ -11,6 +11,7 @@ use log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use netpod::TS_MSP_GRID_SPACING;
@@ -81,6 +82,7 @@ impl SeriesWriter {
let item = ChannelInfoQuery {
backend: backend.clone(),
channel: channel.clone(),
kind: SeriesKind::ChannelStatus,
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
@@ -104,6 +106,7 @@ impl SeriesWriter {
let item = ChannelInfoQuery {
backend,
channel,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),