From 1456bd84848eac3aad7ea6db9f967b152ba88dd9 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 2 Feb 2024 16:27:44 +0100 Subject: [PATCH] WIP --- daqingest/src/daemon.rs | 26 +++ daqingest/src/opts.rs | 2 +- dbpg/Cargo.toml | 5 +- dbpg/src/schema.rs | 8 + dbpg/src/seriesbychannel.rs | 395 +++++++++++++++++++++++++++++++++++- netfetch/src/ca/connset.rs | 2 - netfetch/src/ca/statemap.rs | 2 - 7 files changed, 433 insertions(+), 7 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index a492898..4012772 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -90,6 +90,32 @@ impl Daemon { .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + { + let (tx, rx) = async_channel::bounded(1); + let item = dbpg::seriesbychannel::ChannelInfoQuery { + backend: "amd32test".into(), + channel: "dummy-0000".into(), + scalar_type: netpod::ScalarType::U16.to_scylla_i32(), + shape_dims: vec![500], + tx: Box::pin(tx), + }; + channel_info_query_tx.send(item).await?; + let res = rx.recv().await?; + debug!("received A: {res:?}"); + + let (tx, rx) = async_channel::bounded(1); + let item = dbpg::seriesbychannel::ChannelInfoQuery { + backend: "amd32test".into(), + channel: "dummy-0000".into(), + scalar_type: netpod::ScalarType::U16.to_scylla_i32(), + shape_dims: vec![500], + tx: Box::pin(tx), + }; + channel_info_query_tx.send(item).await?; + let res = rx.recv().await?; + debug!("received B: {res:?}"); + } + let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let query_item_tx_weak = query_item_tx.downgrade(); diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 11ef057..8cb3ee9 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -8,7 +8,7 @@ pub struct DaqIngestOpts { #[arg(long, action(clap::ArgAction::Count))] pub verbose: u8, #[clap(long)] - pub tag: Option, + pub label: Option, #[command(subcommand)] pub subcmd: SubCmd, #[arg(long)] diff --git a/dbpg/Cargo.toml b/dbpg/Cargo.toml index 9ab8358..8d82015 100644 --- a/dbpg/Cargo.toml +++ b/dbpg/Cargo.toml @@ -12,7 +12,10 @@ taskrun = { path = "../../daqbuffer/crates/taskrun" } batchtools = { path = "../batchtools" } stats = { path = "../stats" } series = { path = "../series" } -tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] } +chrono = "0.4.33" +tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4", "with-serde_json-1"] } futures-util = "0.3.29" async-channel = "2.1.1" md-5 = "0.10.6" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1" diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index 16a3da2..0794102 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -148,10 +148,18 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> { Ok(()) } +async fn migrate_02(pgc: &PgClient) -> Result<(), Error> { + // TODO after all migrations, should check that the schema is as expected. + let sql = "alter table series_by_channel add tscs timestamptz[] default array[now()]"; + let _ = pgc.execute(sql, &[]).await; + Ok(()) +} + pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> { pgc.execute("set client_min_messages = 'warning'", &[]).await?; migrate_00(&pgc).await?; migrate_01(&pgc).await?; + migrate_02(&pgc).await?; pgc.execute("reset client_min_messages", &[]).await?; info!("schema_check done"); Ok(()) diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 55cb685..3edab59 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -1,5 +1,7 @@ use async_channel::Receiver; use async_channel::Sender; +use chrono::DateTime; +use chrono::Utc; use core::fmt; use err::thiserror; use err::ThisError; @@ -45,6 +47,8 @@ pub enum Error { CreateSeriesFail, SeriesMissing, ChannelError, + #[error("DbConsistencySeries({0})")] + DbConsistencySeries(String), } impl From for Error { @@ -357,6 +361,325 @@ impl Worker { } } +struct Worker2 { + pg: PgClient, + qu_select: PgStatement, + qu_insert: PgStatement, + batch_rx: Receiver>, + stats: Arc, + pg_client_jh: JoinHandle>, +} + +#[derive(Debug)] +enum MatchingSeries { + DoesntExist, + UsedBefore, + Latest, +} + +impl Worker2 { + async fn new( + db: &Database, + batch_rx: Receiver>, + stats: Arc, + ) -> Result { + use tokio_postgres::types::Type; + let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?; + let sql = concat!( + "with q1 as (", + " select * from unnest($1, $2, $3)", + " as inp (rid, backend, channel)", + ")", + " select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs from q1", + " join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel", + " and t.agg_kind = 0", + " order by q1.rid", + ); + let qu_select = pg + .prepare_typed(sql, &[Type::INT4_ARRAY, Type::TEXT_ARRAY, Type::TEXT_ARRAY]) + .await?; + let sql = concat!( + "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::bigint[])", + " as inp (backend, channel, scalar_type, shape_dims, series))", + " insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", + " select series, backend, channel, scalar_type, shape_dims::int[], 0 from q1", + " on conflict do nothing" + ); + let qu_insert = pg.prepare(sql).await?; + let ret = Self { + pg, + qu_select, + qu_insert, + batch_rx, + stats, + pg_client_jh, + }; + Ok(ret) + } + + async fn select( + &self, + batch: Vec, + ) -> Result<(Vec, Vec), Error> { + let (rids, backends, channels, jobs) = batch + .into_iter() + .enumerate() + .map(|(i, e)| { + let rid = i as i32; + let backend = e.backend.clone(); + let channel = e.channel.clone(); + let job = e; + (rid, backend, channel, job) + }) + .fold((Vec::new(), Vec::new(), Vec::new(), Vec::new()), |mut a, v| { + a.0.push(v.0); + a.1.push(v.1); + a.2.push(v.2); + a.3.push(v.3); + a + }); + debug!("select worker start batch of {} {:?}", channels.len(), channels); + let rows = self.pg.query(&self.qu_select, &[&rids, &backends, &channels]).await?; + let mut row_it = rows.into_iter(); + let mut row_opt = row_it.next(); + let mut acc = Vec::new(); + for (&rid, job) in rids.iter().zip(jobs.into_iter()) { + loop { + break if let Some(row) = &row_opt { + if row.get::<_, i32>(0) == rid { + let series: i64 = row.get(1); + let scalar_type: i32 = row.get(2); + let shape_dims: Vec = row.get(3); + let tscs: Vec> = row.get(4); + let series = SeriesId::new(series as _); + debug!( + "select worker found in database {:?} {:?} {:?} {:?} {:?}", + rid, series, scalar_type, shape_dims, tscs + ); + // let res = ChannelInfoResult2 { + // backend: job.backend, + // channel: job.channel, + // series: Existence::Existing(series), + // tx: job.tx, + // }; + acc.push((rid, series, scalar_type, shape_dims, tscs)); + row_opt = row_it.next(); + continue; + } + }; + } + debug!("check for {job:?}"); + // TODO call decide with empty accumulator: will result in DoesntExist. + let v = std::mem::replace(&mut acc, Vec::new()); + let x = Self::decide_matching_via_db(v); + let dec = match x { + Ok(x) => x, + Err(e) => { + error!("{e}"); + return Err(e); + } + }; + // TODO after decision, do something with it. + debug!("decision {dec:?}"); + } + let result = Vec::new(); + let missing = Vec::new(); + Ok((result, missing)) + } + + fn decide_matching_via_db( + acc: Vec<(i32, SeriesId, i32, Vec, Vec>)>, + ) -> Result { + let a2 = acc.iter().map(|x| x.4.clone()).collect(); + Self::assert_order(a2)?; + let unfolded = Self::unfold_series_rows(acc)?; + Self::assert_varying_types(&unfolded)?; + Ok(MatchingSeries::Latest) + } + + fn unfold_series_rows( + acc: Vec<(i32, SeriesId, i32, Vec, Vec>)>, + ) -> Result, DateTime)>, Error> { + let mut ret = Vec::new(); + for g in acc.iter() { + for h in g.4.iter() { + ret.push((g.1.clone(), g.2, g.3.clone(), *h)); + } + } + ret.sort_by_key(|x| x.3); + Ok(ret) + } + + fn assert_order(v: Vec>>) -> Result<(), Error> { + for x in &v { + if x.len() > 1 { + let mut z = x[0]; + for &y in x[1..].iter() { + if y <= z { + let e = Error::DbConsistencySeries(format!("tscs not in order")); + return Err(e); + } else { + z = y; + } + } + } + } + Ok(()) + } + + fn assert_varying_types(v: &Vec<(SeriesId, i32, Vec, DateTime)>) -> Result<(), Error> { + if v.len() > 1 { + let mut z_1 = &v[0].1; + let mut z_2 = &v[0].2; + for r in v[1..].iter() { + if r.1 == *z_1 && r.2 == *z_2 { + debug!("{v:?}"); + let e = Error::DbConsistencySeries(format!("no change between entries")); + return Err(e); + } + z_1 = &r.1; + z_2 = &r.2; + } + Ok(()) + } else { + Ok(()) + } + } + + async fn insert_missing(&self, batch: &Vec) -> Result<(), Error> { + error!("TODO insert_missing"); + if true { + return Ok(()); + } + let tsbeg = Instant::now(); + let mut backends = Vec::new(); + let mut channels = Vec::new(); + let mut scalar_types = Vec::new(); + let mut shape_dimss = Vec::new(); + let mut shape_dims_strs = Vec::new(); + let mut hashers = Vec::new(); + for e in batch.into_iter() { + { + let mut h = md5::Md5::new(); + h.update(e.backend.as_bytes()); + h.update(e.channel.as_bytes()); + h.update(format!("{:?}", e.scalar_type).as_bytes()); + h.update(format!("{:?}", e.shape_dims).as_bytes()); + hashers.push(h); + } + backends.push(&e.backend); + channels.push(&e.channel); + scalar_types.push(e.scalar_type); + let mut dims = String::with_capacity(32); + dims.push('{'); + for (i, &v) in e.shape_dims.iter().enumerate() { + if i > 0 { + dims.push(','); + } + use std::fmt::Write; + write!(dims, "{}", v).unwrap(); + } + dims.push('}'); + shape_dims_strs.push(dims); + shape_dimss.push(&e.shape_dims); + } + let mut i1 = 0; + loop { + i1 += 1; + if i1 >= 200 { + return Err(Error::CreateSeriesFail); + } + let mut seriess = Vec::with_capacity(hashers.len()); + let mut all_good = true; + for h in &mut hashers { + let mut good = false; + for _ in 0..800 { + h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); + let f = h.clone().finalize(); + let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + if series >= 1000000000000000000 && series <= i64::MAX as u64 { + seriess.push(series as i64); + good = true; + break; + } + } + if !good { + all_good = false; + break; + } + } + if !all_good { + continue; + } + self.pg + .execute( + &self.qu_insert, + &[&backends, &channels, &scalar_types, &shape_dims_strs, &seriess], + ) + .await?; + break; + } + Ok(()) + } + + async fn work(&mut self) -> Result<(), Error> { + let batch_rx = &self.batch_rx; + while let Ok(batch) = batch_rx.recv().await { + self.stats.recv_batch().inc(); + self.stats.recv_items().add(batch.len() as _); + for x in &batch { + trace3!( + "search for {} {} {:?} {:?}", + x.backend, + x.channel, + x.scalar_type, + x.shape_dims + ); + } + let (res1, missing) = self.select(batch).await?; + let res3 = if missing.len() > 0 { + trace2!("missing {}", missing.len()); + for x in &missing { + trace2!("insert missing {x:?}"); + } + let missing_count = missing.len(); + self.insert_missing(&missing).await?; + let (res2, missing2) = self.select(missing).await?; + if missing2.len() > 0 { + for x in &missing2 { + warn!("series ids still missing after insert {}", x.channel); + } + Err(Error::SeriesMissing) + } else { + trace2!("select missing after insert {} of {}", missing_count, res2.len()); + Ok((res1, res2)) + } + } else { + Ok((res1, Vec::new())) + }?; + for r in res3.0.into_iter().chain(res3.1.into_iter()) { + let item = ChannelInfoResult { + backend: r.backend, + channel: r.channel, + series: r.series, + }; + // trace3!("try to send result for {:?}", item); + let fut = r.tx.make_send(Ok(item)); + match fut.await { + Ok(()) => {} + Err(_e) => { + //warn!("can not deliver result"); + // return Err(Error::ChannelError); + self.stats.res_tx_fail.inc(); + } + } + } + } + debug!("Worker2 done"); + Ok(()) + } +} + pub async fn start_lookup_workers( worker_count: usize, db: &Database, @@ -376,9 +699,79 @@ pub async fn start_lookup_workers( let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx); let mut jhs = Vec::new(); for _ in 0..worker_count { - let mut worker = Worker::new(db, batch_rx.clone(), stats.clone()).await?; + let mut worker = Worker2::new(db, batch_rx.clone(), stats.clone()).await?; let jh = tokio::task::spawn(async move { worker.work().await }); jhs.push(jh); } Ok((query_tx, jhs, bjh)) } + +async fn psql_play(db: &Database) -> Result<(), Error> { + use tokio_postgres::types::ToSql; + use tokio_postgres::types::Type; + let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?; + if false { + let sql = concat!("select pg_typeof($1)"); + let qu = pg.prepare_typed(sql, &[Type::INT4_ARRAY]).await?; + // let qu = pg.prepare(sql).await?; + let p1 = 4f64; + let rows = pg.query(&qu, &[&p1]).await; + debug!("{rows:?}"); + let p1 = &[12i32, 13, 14][..]; + let rows = pg.query(&qu, &[&p1]).await; + debug!("{rows:?}"); + let p1 = vec![12i32, 13, 14]; + let rows = pg.query(&qu, &[&p1]).await; + debug!("{rows:?}"); + let p1 = vec![12i64, 13, 14]; + let rows = pg.query(&qu, &[&p1]).await; + debug!("{rows:?}"); + let p1 = vec![String::from("a"), String::from("b")]; + let rows = pg.query(&qu, &[&p1]).await; + debug!("{rows:?}"); + let p1 = vec![vec![4i64, 8], vec![10, 12]]; + let rows = pg.query(&qu, &[&p1]).await; + debug!("{rows:?}"); + } + if false { + let sql = concat!( + "with q1 as (", + "select * from unnest($1) as inp (col1)", + ")", + " select * from q1", + ); + let qu = pg.prepare_typed(sql, &[Type::INT4_ARRAY]).await?; + let p1 = vec![1_i32, 2, 3, 4]; + let rows = pg.query(&qu, &[&p1]).await?; + for row in rows { + debug!("{:?} {:?}", row, row.columns()); + let ca: i32 = row.get(0); + let cb: Vec = row.get(0); + debug!("{:?} {:?}", ca, cb); + } + } + if false { + let sql = concat!( + "with q1 as (", + "select col1 from unnest($1) as inp (col1)", + ")", + " select col1::int4[] from q1", + ); + let qu = pg.prepare_typed(sql, &[Type::JSONB_ARRAY]).await?; + // JsVal + // let p1 = vec![JsVal::Array(vec![3_i32, 4, 5, 6])]; + let p1 = vec![ + serde_json::to_value(&vec![1_i32, 2, 3, 4]).unwrap(), + serde_json::to_value(&vec![5_i32, 6, 7, 8]).unwrap(), + ]; + // let p1 = serde_json::to_value(vec![vec![1_i32, 2, 3, 4], vec![5_i32, 6, 7, 8]]).unwrap(); + let rows = pg.query(&qu, &[&p1]).await?; + for row in rows { + debug!("{:?} {:?}", row, row.columns()); + let ca = row.try_get::<_, i32>(0); + let cb = row.try_get::<_, Vec>(0); + debug!("{:?} {:?}", ca, cb); + } + } + Ok(()) +} diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index e2ed9e1..b8744e0 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -529,8 +529,6 @@ impl CaConnSet { value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { since: SystemTime::now(), }), - running_cmd_id: None, - health_timeout_count: 0, }; self.channel_states.insert(ch.clone(), item); self.channel_states.get_mut(&ch).unwrap() diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 4b8c5a2..4332302 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -133,8 +133,6 @@ pub enum ChannelStateValue { #[derive(Debug, Clone, Serialize)] pub struct ChannelState { pub value: ChannelStateValue, - pub running_cmd_id: Option, - pub health_timeout_count: usize, } #[derive(Debug, Clone, Serialize)]