From 962dfe570ee5e6e982267b42aa385e229c5b2739 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 27 Jan 2023 15:43:09 +0100 Subject: [PATCH] Insert bool set --- .cargo/cargo-lock | 24 +- daqingest/src/bin/daqingest.rs | 6 + daqingest/src/daemon.rs | 69 ++- daqingest/src/opts.rs | 27 +- netfetch/src/batchquery/series_by_channel.rs | 128 ++++- netfetch/src/ca/conn.rs | 6 +- netfetch/src/ca/proto.rs | 4 + netfetch/src/ca/search.rs | 5 +- netfetch/src/ca/store.rs | 36 ++ netfetch/src/channelwriter.rs | 65 +-- netfetch/src/conf.rs | 1 + netfetch/src/dbpg.rs | 74 ++- netfetch/src/insertworker.rs | 2 +- netfetch/src/store.rs | 28 +- netfetch/src/zmtp.rs | 528 +++++++++---------- 15 files changed, 646 insertions(+), 357 deletions(-) diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index e5c8663..74acaf8 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -400,9 +400,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b61a7545f753a88bcbe0a70de1fcc0221e10bfc752f576754fa91e663db1622e" +checksum = "322296e2f2e5af4270b54df9e85a02ff037e271af20ba3e7fe1575515dc840b8" dependencies = [ "cc", "cxxbridge-flags", @@ -412,9 +412,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f464457d494b5ed6905c63b0c4704842aba319084a0a3561cdc1359536b53200" +checksum = "017a1385b05d631e7875b1f151c9f012d37b53491e2a87f65bff5c262b2111d8" dependencies = [ "cc", "codespan-reporting", @@ -427,15 +427,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c7119ce3a3701ed81aca8410b9acf6fc399d2629d057b87e2efa4e63a3aaea" +checksum = "c26bbb078acf09bc1ecda02d4223f03bdd28bd4874edcb0379138efc499ce971" [[package]] name = "cxxbridge-macro" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65e07508b90551e610910fa648a1878991d367064997a596135b86df30daf07e" +checksum = "357f40d1f06a24b60ae1fe122542c1fb05d28d32acb2aed064e84bc2ad1e252e" dependencies = [ "proc-macro2", "quote", @@ -491,9 +491,9 @@ dependencies = [ [[package]] name = "either" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" [[package]] name = "erased-serde" @@ -2087,9 +2087,9 @@ checksum = "4553f467ac8e3d374bc9a177a26801e5d0f9b211aa1673fb137a403afd1c9cf5" [[package]] name = "toml_edit" -version = "0.18.0" +version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "729bfd096e40da9c001f778f5cdecbd2957929a24e10e5883d9392220a751581" +checksum = "56c59d8dd7d0dcbc6428bf7aa2f0e823e26e43b3c9aca15bbc9475d23e5fa12b" dependencies = [ "indexmap", "nom8", diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 341d4de..27e6164 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -32,6 +32,12 @@ pub fn main() -> Result<(), Error> { daqingest::daemon::run(conf, channels).await? } }, + SubCmd::Logappend(k) => { + let jh = tokio::task::spawn_blocking(move || { + taskrun::append::append(&k.dir, k.total_size_max_bytes(), std::io::stdin()).unwrap(); + }); + jh.await.map_err(Error::from_string)?; + } } Ok(()) }); diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index d40167a..991b229 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1,5 +1,6 @@ use async_channel::Receiver; use async_channel::Sender; +use async_channel::WeakReceiver; use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; @@ -20,6 +21,9 @@ use netfetch::insertworker::Ttls; use netfetch::metrics::ExtraInsertsConf; use netfetch::metrics::StatsSet; use netfetch::store::CommonInsertItemQueue; +use netfetch::store::ConnectionStatus; +use netfetch::store::ConnectionStatusItem; +use netfetch::store::QueryItem; use netpod::Database; use netpod::ScyllaConfig; use serde::Serialize; @@ -177,6 +181,7 @@ pub struct DaemonOpts { pgconf: Database, scyconf: ScyllaConfig, ttls: Ttls, + test_bsread_addr: Option, } impl DaemonOpts { @@ -249,12 +254,11 @@ pub struct Daemon { count_assigned: usize, last_status_print: SystemTime, insert_workers_jh: Vec>, - #[allow(unused)] - pg_client: Arc, ingest_commons: Arc, caconn_last_channel_check: Instant, stats: Arc, shutting_down: bool, + insert_rx_weak: WeakReceiver, } impl Daemon { @@ -360,7 +364,7 @@ impl Daemon { extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()), store_workers_rate: AtomicU64::new(20000), insert_frac: AtomicU64::new(1000), - ca_conn_set: CaConnSet::new(channel_info_query_tx), + ca_conn_set: CaConnSet::new(channel_info_query_tx.clone()), insert_workers_running: atomic::AtomicUsize::new(0), }; let ingest_commons = Arc::new(ingest_commons); @@ -385,6 +389,8 @@ impl Daemon { let insert_worker_count = 1000; let use_rate_limit_queue = false; + let insert_rx_weak = common_insert_item_queue_2.receiver().unwrap().downgrade(); + // TODO use a new stats type: let store_stats = Arc::new(stats::CaConnStats::new()); let ttls = opts.ttls.clone(); @@ -401,6 +407,34 @@ impl Daemon { ) .await?; + if let Some(bsaddr) = &opts.test_bsread_addr { + //netfetch::zmtp::Zmtp; + let zmtpopts = netfetch::zmtp::ZmtpClientOpts { + backend: opts.backend().into(), + addr: bsaddr.parse().unwrap(), + do_pulse_id: false, + rcvbuf: None, + array_truncate: Some(1024), + process_channel_count_limit: Some(32), + }; + let client = + netfetch::zmtp::BsreadClient::new(zmtpopts, ingest_commons.clone(), channel_info_query_tx.clone()) + .await?; + let fut = { + async move { + let mut client = client; + client.run().await?; + Ok::<_, Error>(()) + } + }; + // TODO await on shutdown + let jh = tokio::spawn(fut); + //let mut jhs = Vec::new(); + //jhs.push(jh); + //futures_util::future::join_all(jhs).await; + //jh.await.map_err(|e| e.to_string()).map_err(Error::from)??; + } + let ret = Self { opts, connection_states: BTreeMap::new(), @@ -421,11 +455,11 @@ impl Daemon { count_assigned: 0, last_status_print: SystemTime::now(), insert_workers_jh: jh_insert_workers, - pg_client, ingest_commons, caconn_last_channel_check: Instant::now(), stats: Arc::new(DaemonStats::new()), shutting_down: false, + insert_rx_weak, }; Ok(ret) } @@ -937,7 +971,11 @@ impl Daemon { .ingest_commons .insert_workers_running .load(atomic::Ordering::Acquire); - info!("qu senders A {:?} {:?} nworkers {}", sa1, sa2, nworkers); + let nitems = self.insert_rx_weak.upgrade().map(|x| x.len()); + info!( + "qu senders A {:?} {:?} nworkers {} nitems {:?}", + sa1, sa2, nworkers, nitems + ); if nworkers == 0 { info!("goodbye"); std::process::exit(0); @@ -1132,6 +1170,19 @@ impl Daemon { ChannelStateValue::ToRemove { addr: _ } => {} } } + let item = QueryItem::ConnectionStatus(ConnectionStatusItem { + ts: SystemTime::now(), + addr: conn_addr, + status: ConnectionStatus::ConnectionHandlerDone, + }); + if let Some(tx) = self.ingest_commons.insert_item_queue.sender() { + if let Err(e) = tokio::time::timeout(Duration::from_millis(1000), tx.send(item)).await { + error!("timeout on insert queue send"); + } else { + } + } else { + error!("can not emit CaConn done event"); + } Ok(()) } @@ -1295,11 +1346,18 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> netfetch::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigint)?; netfetch::linuxhelper::set_signal_handler(libc::SIGTERM, handler_sigterm)?; + netfetch::dbpg::schema_check(opts.postgresql()).await?; + // TODO use a new stats type: //let store_stats = Arc::new(CaConnStats::new()); //let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone()); //let metrics_agg_jh = tokio::spawn(metrics_agg_fut); + let mut channels = channels; + if opts.test_bsread_addr.is_some() { + channels.clear(); + } + let opts2 = DaemonOpts { backend: opts.backend().into(), local_epics_hostname: opts.local_epics_hostname().into(), @@ -1312,6 +1370,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> d0: opts.ttl_d0(), d1: opts.ttl_d1(), }, + test_bsread_addr: opts.test_bsread_addr.clone(), }; let mut daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 94f6e98..f8dcaa5 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use clap::ArgAction::Count; use clap::Parser; use netfetch::zmtp::ZmtpClientOpts; @@ -24,6 +26,7 @@ pub enum SubCmd { BsreadDump(BsreadDump), #[command(subcommand)] ChannelAccess(ChannelAccess), + Logappend(Logappend), } #[derive(Debug, Parser)] @@ -31,9 +34,7 @@ pub struct Bsread { #[arg(long)] pub backend: String, #[arg(long)] - pub scylla: Vec, - #[arg(long)] - pub source: Vec, + pub addr: SocketAddr, #[arg(long)] pub rcvbuf: Option, #[arg(long)] @@ -41,8 +42,6 @@ pub struct Bsread { #[arg(long)] pub do_pulse_id: bool, #[arg(long)] - pub skip_insert: bool, - #[arg(long)] pub process_channel_count_limit: Option, } @@ -50,13 +49,11 @@ impl From for ZmtpClientOpts { fn from(k: Bsread) -> Self { Self { backend: k.backend, - scylla: k.scylla, - sources: k.source, + addr: k.addr, rcvbuf: k.rcvbuf, array_truncate: k.array_truncate, do_pulse_id: k.do_pulse_id, process_channel_count_limit: k.process_channel_count_limit, - skip_insert: k.skip_insert, } } } @@ -91,3 +88,17 @@ pub struct CaSearch { pub struct CaConfig { pub config: String, } + +#[derive(Debug, Parser)] +pub struct Logappend { + #[arg(long)] + pub dir: String, + #[arg(long)] + pub total_mb: Option, +} + +impl Logappend { + pub fn total_size_max_bytes(&self) -> u64 { + 1024 * 1024 * self.total_mb.unwrap_or(20) + } +} diff --git a/netfetch/src/batchquery/series_by_channel.rs b/netfetch/src/batchquery/series_by_channel.rs index f548c41..d184208 100644 --- a/netfetch/src/batchquery/series_by_channel.rs +++ b/netfetch/src/batchquery/series_by_channel.rs @@ -1,14 +1,17 @@ use crate::batcher; use crate::dbpg::make_pg_client; +use crate::errconv::ErrConv; use crate::series::Existence; use crate::series::SeriesId; use async_channel::Receiver; use async_channel::Sender; use err::Error; use futures_util::StreamExt; +use md5::Digest; use netpod::log::*; use netpod::Database; use std::time::Duration; +use std::time::Instant; use tokio::task::JoinHandle; use tokio_postgres::Client as PgClient; use tokio_postgres::Statement as PgStatement; @@ -36,6 +39,7 @@ impl ChannelInfoQuery { struct ChannelInfoResult { series: Vec>, tx: Vec, Error>>>, + missing: Vec, } struct PgRes { @@ -59,20 +63,21 @@ async fn prepare_pgcs(sql: &str, pgcn: usize, db: &Database) -> Result<(Sender

, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> { +async fn select(batch: Vec, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> { let mut backend = Vec::new(); let mut channel = Vec::new(); let mut scalar_type = Vec::new(); - let mut shape_dims: Vec = Vec::new(); + let mut shape_dims = Vec::new(); + let mut shape_dims_str: Vec = Vec::new(); let mut rid = Vec::new(); let mut tx = Vec::new(); for (i, e) in batch.into_iter().enumerate() { backend.push(e.backend); channel.push(e.channel); scalar_type.push(e.scalar_type); - let mut dims = String::with_capacity(16); + let mut dims = String::with_capacity(32); dims.push('{'); - for (i, v) in e.shape_dims.into_iter().enumerate() { + for (i, &v) in e.shape_dims.iter().enumerate() { if i > 0 { dims.push(','); } @@ -80,13 +85,14 @@ async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(Chann write!(dims, "{}", v).unwrap(); } dims.push('}'); - shape_dims.push(dims); + shape_dims_str.push(dims); + shape_dims.push(e.shape_dims); rid.push(i as i32); tx.push((i as u32, e.tx)); } match pgres .pgc - .query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims, &rid]) + .query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims_str, &rid]) .await .map_err(|e| { error!("{e}"); @@ -95,6 +101,7 @@ async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(Chann Ok(rows) => { let mut series_ids = Vec::new(); let mut txs = Vec::new(); + let mut missing = Vec::new(); let mut it1 = rows.into_iter(); let mut e1 = it1.next(); for (qrid, tx) in tx { @@ -107,11 +114,22 @@ async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(Chann txs.push(tx); } e1 = it1.next(); + } else { + let i = qrid as usize; + let k = ChannelInfoQuery { + backend: backend[i].clone(), + channel: channel[i].clone(), + scalar_type: scalar_type[i].clone(), + shape_dims: shape_dims[i].clone(), + tx, + }; + missing.push(k); } } let result = ChannelInfoResult { series: series_ids, tx: txs, + missing, }; Ok((result, pgres)) } @@ -123,6 +141,99 @@ async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(Chann } } +async fn insert_missing(batch: &Vec, pgres: PgRes) -> Result<((), PgRes), Error> { + let tsbeg = Instant::now(); + let mut backends = Vec::new(); + let mut channels = Vec::new(); + let mut scalar_types = Vec::new(); + let mut shape_dimss = Vec::new(); + let mut shape_dims_strs: Vec = Vec::new(); + let mut hashers = Vec::new(); + for e in batch.into_iter() { + { + let mut h = md5::Md5::new(); + h.update(e.backend.as_bytes()); + h.update(e.channel.as_bytes()); + h.update(format!("{:?}", e.scalar_type).as_bytes()); + h.update(format!("{:?}", e.shape_dims).as_bytes()); + hashers.push(h); + } + backends.push(&e.backend); + channels.push(&e.channel); + scalar_types.push(e.scalar_type); + let mut dims = String::with_capacity(32); + dims.push('{'); + for (i, &v) in e.shape_dims.iter().enumerate() { + if i > 0 { + dims.push(','); + } + use std::fmt::Write; + write!(dims, "{}", v).unwrap(); + } + dims.push('}'); + shape_dims_strs.push(dims); + shape_dimss.push(&e.shape_dims); + } + let mut i1 = 0; + loop { + i1 += 1; + if i1 >= 200 { + return Err(Error::with_msg_no_trace("not able to generate series information")); + } + let mut seriess = Vec::with_capacity(hashers.len()); + let mut all_good = true; + for h in &mut hashers { + let mut good = false; + for _ in 0..50 { + h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); + let f = h.clone().finalize(); + let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + if series >= 100000000000000000 && series <= i64::MAX as u64 { + seriess.push(series as i64); + good = true; + break; + } + } + if !good { + all_good = false; + break; + } + } + if !all_good { + continue; + } + let sql = concat!( + "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::bigint[])", + " as inp (backend, channel, scalar_type, shape_dims, series))", + " insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", + " select series, backend, channel, scalar_type, shape_dims::int[], 0 from q1", + " on conflict do nothing" + ); + pgres + .pgc + .execute(sql, &[&backends, &channels, &scalar_types, &shape_dims_strs, &seriess]) + .await + .err_conv()?; + break; + } + Ok(((), pgres)) +} + +async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> { + let (res1, pgres) = select(batch, pgres).await?; + if res1.missing.len() > 0 { + let ((), pgres) = insert_missing(&res1.missing, pgres).await?; + let (res2, pgres) = select(res1.missing, pgres).await?; + if res2.missing.len() > 0 { + Err(Error::with_msg_no_trace("some series not found even after write")) + } else { + Ok((res2, pgres)) + } + } else { + Ok((res1, pgres)) + } +} + async fn run_queries( npg: usize, batch_rx: Receiver>, @@ -131,6 +242,7 @@ async fn run_queries( ) -> Result<(), Error> { let mut stream = batch_rx .map(|batch| { + debug!("see batch of {}", batch.len()); let pgc_rx = pgc_rx.clone(); let pgc_tx = pgc_tx.clone(); async move { @@ -155,8 +267,9 @@ async fn run_queries( for (sid, tx) in res.series.into_iter().zip(res.tx) { match tx.send(Ok(sid)).await { Ok(_) => {} - Err(_) => { + Err(e) => { // TODO count cases, but no log. Client may no longer be interested in this result. + error!("{e}"); } } } @@ -167,6 +280,7 @@ async fn run_queries( } } } + info!("run_queries done"); Ok(()) } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 5bec4c8..4deb913 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1024,13 +1024,12 @@ impl CaConn { ev: proto::EventAddRes, item_queue: &mut VecDeque, ts_msp_last: u64, - inserted_in_ts_msp: u64, ts_msp_grid: Option, stats: Arc, ) -> Result<(), Error> { // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. - let (ts_msp, ts_msp_changed) = if inserted_in_ts_msp >= 64000 || st.ts_msp_last + HOUR <= ts { + let (ts_msp, ts_msp_changed) = if st.inserted_in_ts_msp >= 64000 || st.ts_msp_last + HOUR <= ts { let div = SEC * 10; let ts_msp = ts / div * div; if ts_msp == st.ts_msp_last { @@ -1085,7 +1084,6 @@ impl CaConn { .checked_add(Duration::from_micros((dt * 1e6) as u64)) .ok_or_else(|| Error::with_msg_no_trace("time overflow in next insert"))?; let ts_msp_last = st.ts_msp_last; - let inserted_in_ts_msp = st.inserted_in_ts_msp; // TODO get event timestamp from channel access field let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid { @@ -1105,7 +1103,6 @@ impl CaConn { ev.clone(), item_queue, ts_msp_last, - inserted_in_ts_msp, ts_msp_grid, stats.clone(), )?; @@ -1120,7 +1117,6 @@ impl CaConn { ev, item_queue, ts_msp_last, - inserted_in_ts_msp, ts_msp_grid, stats, )?; diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 0ed27f7..7fc856a 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -162,6 +162,8 @@ pub enum CaDataScalarValue { F64(f64), Enum(i16), String(String), + // TODO remove, CA has no bool, make new enum for other use cases. + Bool(bool), } #[derive(Clone, Debug)] @@ -171,6 +173,8 @@ pub enum CaDataArrayValue { I32(Vec), F32(Vec), F64(Vec), + // TODO remove, CA has no bool, make new enum for other use cases. + Bool(Vec), } #[derive(Clone, Debug)] diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 2203d6a..f201a92 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -137,6 +137,7 @@ impl DbUpdateWorker { pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), Error> { info!("ca_search begin"); + crate::dbpg::schema_check(opts.postgresql()).await?; let mut addrs = Vec::new(); for s in opts.search() { match resolve_address(s).await { @@ -191,7 +192,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), let dbtx: Sender<_> = dbtx; let mut ts_last = Instant::now(); - loop { + 'outer: loop { let ts_now = Instant::now(); if ts_now.duration_since(ts_last) >= Duration::from_millis(2000) { ts_last = ts_now; @@ -240,7 +241,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), Ok(_) => {} Err(_) => { error!("dbtx broken"); - break; + break 'outer; } } } diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 2d7973f..688be22 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -1,4 +1,5 @@ use err::Error; +use futures_util::StreamExt; use netpod::ScyllaConfig; use scylla::prepared_statement::PreparedStatement; use scylla::statement::Consistency; @@ -20,6 +21,7 @@ pub struct DataStore { pub qu_insert_array_i32: Arc, pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, + pub qu_insert_array_bool: Arc, pub qu_insert_muted: Arc, pub qu_insert_item_recv_ivl: Arc, pub qu_insert_connection_status: Arc, @@ -29,6 +31,31 @@ pub struct DataStore { } impl DataStore { + async fn has_table(name: &str, scy: &ScySession, scyconf: &ScyllaConfig) -> Result { + let mut res = scy + .query_iter( + "select table_name from system_schema.tables where keyspace_name = ?", + (&scyconf.keyspace,), + ) + .await + .map_err(|e| e.to_string()) + .map_err(Error::from)?; + while let Some(k) = res.next().await { + let row = k.map_err(|e| e.to_string()).map_err(Error::from)?; + if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() { + if table_name == name { + return Ok(true); + } + } + } + Ok(false) + } + + async fn migrate_00(scy: &ScySession, scyconf: &ScyllaConfig) -> Result<(), Error> { + if !Self::has_table("somename", scy, scyconf).await? {} + Ok(()) + } + pub async fn new(scyconf: &ScyllaConfig) -> Result { let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) @@ -38,6 +65,9 @@ impl DataStore { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); + + Self::migrate_00(&scy, scyconf).await?; + let q = scy .prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?") .await @@ -111,6 +141,11 @@ impl DataStore { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_array_f64 = Arc::new(q); + let q = scy + .prepare("insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_bool = Arc::new(q); // Others: let q = scy .prepare("insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?") @@ -160,6 +195,7 @@ impl DataStore { qu_insert_array_i32, qu_insert_array_f32, qu_insert_array_f64, + qu_insert_array_bool, qu_insert_muted, qu_insert_item_recv_ivl, qu_insert_connection_status, diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs index 897a460..97bc449 100644 --- a/netfetch/src/channelwriter.rs +++ b/netfetch/src/channelwriter.rs @@ -1,5 +1,5 @@ use crate::errconv::ErrConv; -use crate::zmtp::{CommonQueries, ZmtpFrame}; +use crate::zmtp::ZmtpFrame; use err::Error; use futures_util::{Future, FutureExt}; use log::*; @@ -21,11 +21,12 @@ pub struct ScyQueryFut<'a> { } impl<'a> ScyQueryFut<'a> { - pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self + pub fn new(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: V) -> Self where V: ValueList + Send + 'static, { - let fut = scy.execute(query, values); + //let fut = scy.execute(query, values); + let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); Self { fut: Box::pin(fut) } } } @@ -164,7 +165,7 @@ pub struct InsertLoopFut<'a> { } impl<'a> InsertLoopFut<'a> { - pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: Vec, skip_insert: bool) -> Self + pub fn new(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: Vec, skip_insert: bool) -> Self where V: ValueList + Send + Sync + 'static, { @@ -178,7 +179,8 @@ impl<'a> InsertLoopFut<'a> { let futs: Vec<_> = values .into_iter() .map(|vs| { - let fut = scy.execute(query, vs); + //let fut = scy.execute(query, vs); + let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); Box::pin(fut) as _ }) .collect(); @@ -325,7 +327,6 @@ pub trait ChannelWriter { } struct MsgAcceptorOptions { - cq: Arc, skip_insert: bool, array_truncate: usize, } @@ -341,7 +342,7 @@ trait MsgAcceptor { macro_rules! impl_msg_acceptor_scalar { ($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => { struct $sname { - query: PreparedStatement, + //query: PreparedStatement, values: Vec<(i64, i64, i64, i64, $st)>, series: i64, opts: MsgAcceptorOptions, @@ -351,8 +352,8 @@ macro_rules! impl_msg_acceptor_scalar { impl $sname { pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { Self { - query: opts.cq.$qu_id.clone(), - values: vec![], + //query: opts.cq.$qu_id.clone(), + values: Vec::new(), series, opts, batch: Batch::new((BatchType::Unlogged)), @@ -387,20 +388,20 @@ macro_rules! impl_msg_acceptor_scalar { } fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, vec![]); + let vt = mem::replace(&mut self.values, Vec::new()); let nn = vt.len(); self.batch = Batch::new(BatchType::Unlogged); let batch = &mut self.batch; for _ in 0..nn { - batch.append_statement(self.query.clone()); + //batch.append_statement(self.query.clone()); } let ret = ScyBatchFutGen::new(&scy, batch, vt); Ok(ret) } fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert); + let vt = mem::replace(&mut self.values, Vec::new()); + let ret = InsertLoopFut::new(scy, None, vt, self.opts.skip_insert); Ok(ret) } } @@ -410,7 +411,7 @@ macro_rules! impl_msg_acceptor_scalar { macro_rules! impl_msg_acceptor_array { ($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => { struct $sname { - query: PreparedStatement, + //query: PreparedStatement, values: Vec<(i64, i64, i64, i64, Vec<$st>)>, series: i64, array_truncate: usize, @@ -422,8 +423,8 @@ macro_rules! impl_msg_acceptor_array { impl $sname { pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { Self { - query: opts.cq.$qu_id.clone(), - values: vec![], + //query: opts.cq.$qu_id.clone(), + values: Vec::new(), series, array_truncate: opts.array_truncate, truncated: 0, @@ -461,20 +462,20 @@ macro_rules! impl_msg_acceptor_array { } fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, vec![]); + let vt = mem::replace(&mut self.values, Vec::new()); let nn = vt.len(); self.batch = Batch::new(BatchType::Unlogged); let batch = &mut self.batch; for _ in 0..nn { - batch.append_statement(self.query.clone()); + //batch.append_statement(self.query.clone()); } let ret = ScyBatchFutGen::new(&scy, batch, vt); Ok(ret) } fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert); + let vt = mem::replace(&mut self.values, Vec::new()); + let ret = InsertLoopFut::new(scy, None, vt, self.opts.skip_insert); Ok(ret) } } @@ -506,7 +507,7 @@ impl_msg_acceptor_array!(MsgAcceptorArrayF64LE, f64, qu_insert_array_f64, from_l impl_msg_acceptor_array!(MsgAcceptorArrayF64BE, f64, qu_insert_array_f64, from_be_bytes); struct MsgAcceptorArrayBool { - query: PreparedStatement, + //query: PreparedStatement, values: Vec<(i64, i64, i64, i64, Vec)>, series: i64, array_truncate: usize, @@ -518,8 +519,8 @@ struct MsgAcceptorArrayBool { impl MsgAcceptorArrayBool { pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { Self { - query: opts.cq.qu_insert_array_bool.clone(), - values: vec![], + //query: opts.cq.qu_insert_array_bool.clone(), + values: Vec::new(), series, array_truncate: opts.array_truncate, truncated: 0, @@ -566,20 +567,20 @@ impl MsgAcceptor for MsgAcceptorArrayBool { } fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result { - let vt = mem::replace(&mut self.values, vec![]); + let vt = mem::replace(&mut self.values, Vec::new()); let nn = vt.len(); self.batch = Batch::new(BatchType::Unlogged); let batch = &mut self.batch; for _ in 0..nn { - batch.append_statement(self.query.clone()); + //batch.append_statement(self.query.clone()); } let ret = ScyBatchFutGen::new(&scy, batch, vt); Ok(ret) } fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, vec![]); - let ret = InsertLoopFut::new(scy, &self.query, vt, self.opts.skip_insert); + let vt = mem::replace(&mut self.values, Vec::new()); + let ret = InsertLoopFut::new(scy, None, vt, self.opts.skip_insert); Ok(ret) } } @@ -587,7 +588,6 @@ impl MsgAcceptor for MsgAcceptorArrayBool { pub struct ChannelWriterAll { series: u64, scy: Arc, - common_queries: Arc, ts_msp_lsp: fn(u64, u64) -> (u64, u64), ts_msp_last: u64, acceptor: Box, @@ -603,7 +603,6 @@ pub struct ChannelWriterAll { impl ChannelWriterAll { pub fn new( series: u64, - common_queries: Arc, scy: Arc, scalar_type: ScalarType, shape: Shape, @@ -612,7 +611,6 @@ impl ChannelWriterAll { skip_insert: bool, ) -> Result { let opts = MsgAcceptorOptions { - cq: common_queries.clone(), skip_insert, array_truncate, }; @@ -762,7 +760,6 @@ impl ChannelWriterAll { let ret = Self { series, scy, - common_queries, ts_msp_lsp, ts_msp_last: 0, acceptor: acc, @@ -787,11 +784,7 @@ impl ChannelWriterAll { debug!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}"); self.ts_msp_last = ts_msp; if !self.skip_insert { - let fut = ScyQueryFut::new( - &self.scy, - &self.common_queries.qu_insert_ts_msp, - (self.series as i64, ts_msp as i64), - ); + let fut = ScyQueryFut::new(&self.scy, None, (self.series as i64, ts_msp as i64)); Some(Box::pin(fut) as _) } else { None diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index e49b42c..8d2c71d 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -40,6 +40,7 @@ pub struct CaIngestOpts { ttl_d0: Option, #[serde(with = "humantime_serde")] ttl_d1: Option, + pub test_bsread_addr: Option, } impl CaIngestOpts { diff --git a/netfetch/src/dbpg.rs b/netfetch/src/dbpg.rs index c666d64..22ea5e6 100644 --- a/netfetch/src/dbpg.rs +++ b/netfetch/src/dbpg.rs @@ -1,16 +1,78 @@ use crate::errconv::ErrConv; use err::Error; +use netpod::log::*; use netpod::Database; use tokio_postgres::Client as PgClient; pub async fn make_pg_client(d: &Database) -> Result { - let (client, pg_conn) = tokio_postgres::connect( - &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), - tokio_postgres::tls::NoTls, - ) - .await - .err_conv()?; + let url = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); + info!("connect to {url}"); + let (client, pg_conn) = tokio_postgres::connect(&url, tokio_postgres::tls::NoTls) + .await + .err_conv()?; // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: tokio::spawn(pg_conn); Ok(client) } + +async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result { + let rows = pgc + .query( + "select count(*) as c from information_schema.columns where table_name = $1 and column_name = $2 limit 10", + &[&table, &column], + ) + .await + .err_conv()?; + if rows.len() == 1 { + let c: i64 = rows[0].get(0); + if c == 0 { + Ok(false) + } else if c == 1 { + Ok(true) + } else { + Err(Error::with_msg_no_trace(format!("has_columns bad count {}", c))) + } + } else if rows.len() == 0 { + Ok(false) + } else { + Err(Error::with_msg_no_trace(format!( + "has_columns bad row count {}", + rows.len() + ))) + } +} + +async fn migrate_00(pgc: &PgClient) -> Result<(), Error> { + if !has_column("ioc_by_channel_log", "tscreate", pgc).await? { + pgc.execute( + "alter table ioc_by_channel_log add tscreate timestamptz not null default now()", + &[], + ) + .await + .err_conv()?; + } + if !has_column("ioc_by_channel_log", "archived", pgc).await? { + pgc.execute( + "alter table ioc_by_channel_log add archived int not null default 0", + &[], + ) + .await + .err_conv()?; + } + { + match pgc.execute("alter table series_by_channel add constraint series_by_channel_nondup unique (facility, channel, scalar_type, shape_dims, agg_kind)", &[]).await { + Ok(_) => { + info!("constraint added"); + } + Err(_)=>{} + } + } + Ok(()) +} + +pub async fn schema_check(db: &Database) -> Result<(), Error> { + let pgc = make_pg_client(db).await?; + migrate_00(&pgc).await?; + info!("schema_check done"); + Ok(()) +} diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 4e79fb2..92519b3 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -273,7 +273,7 @@ pub async fn spawn_scylla_insert_workers( ingest_commons .insert_workers_running .fetch_sub(1, atomic::Ordering::AcqRel); - info!("insert worker {worker_ix} has no more messages"); + trace!("insert worker {worker_ix} done"); }; let jh = tokio::spawn(fut); jhs.push(jh); diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index 80bd062..fb6c328 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -139,10 +139,11 @@ pub enum ConnectionStatus { Established, Closing, ClosedUnexpected, + ConnectionHandlerDone, } impl ConnectionStatus { - pub fn kind(&self) -> u32 { + pub fn to_kind(&self) -> u32 { use ConnectionStatus::*; match self { ConnectError => 1, @@ -150,8 +151,27 @@ impl ConnectionStatus { Established => 3, Closing => 4, ClosedUnexpected => 5, + ConnectionHandlerDone => 6, } } + + pub fn from_kind(kind: u32) -> Result { + use ConnectionStatus::*; + let ret = match kind { + 1 => ConnectError, + 2 => ConnectTimeout, + 3 => Established, + 4 => Closing, + 5 => ClosedUnexpected, + 6 => ConnectionHandlerDone, + _ => { + return Err(err::Error::with_msg_no_trace(format!( + "unknown ConnectionStatus kind {kind}" + ))); + } + }; + Ok(ret) + } } #[derive(Debug)] @@ -451,7 +471,8 @@ pub async fn insert_item( I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?, - String(_) => (), + String(_) => warn!("TODO string insert"), + Bool(_v) => warn!("TODO bool insert"), } } Array(val) => { @@ -469,6 +490,7 @@ pub async fn insert_item( I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?, F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?, F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?, + Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?, } } } @@ -488,7 +510,7 @@ pub async fn insert_connection_status( let ts = secs + nanos; let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; - let kind = item.status.kind(); + let kind = item.status.to_kind(); let addr = format!("{}", item.addr); let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr, ttl.as_secs() as i32); data_store diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index fcb075c..2efcc2f 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,16 +1,20 @@ +use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::bsread::{BsreadMessage, ChannelDescDecoded, Parser}; use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; +use crate::ca::proto::{CaDataArrayValue, CaDataValue}; +use crate::ca::IngestCommons; use crate::channelwriter::{ChannelWriter, ChannelWriterAll}; use crate::errconv::ErrConv; use crate::netbuf::NetBuf; -use crate::store::CommonInsertItemQueueSender; +use crate::series::SeriesId; +use crate::store::{CommonInsertItemQueueSender, InsertItem, QueryItem}; use async_channel::{Receiver, Sender}; #[allow(unused)] use bytes::BufMut; use err::Error; use futures_util::{pin_mut, Future, FutureExt, Stream, StreamExt}; use log::*; -use netpod::timeunits::*; +use netpod::{timeunits::*, ScalarType, Shape, TS_MSP_GRID_SPACING, TS_MSP_GRID_UNIT}; use scylla::batch::{Batch, BatchType, Consistency}; use scylla::prepared_statement::PreparedStatement; use scylla::{Session as ScySession, SessionBuilder}; @@ -19,6 +23,7 @@ use stats::CheckEvery; use std::collections::BTreeMap; use std::fmt; use std::mem; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -57,51 +62,19 @@ fn test_service() -> Result<(), Error> { taskrun::run(fut) } -pub fn __get_series_id(chn: &ChannelDesc) -> u64 { - // TODO use a more stable format (with ScalarType, Shape) as hash input. - // TODO do not depend at all on the mapping, instead look it up on demand and cache. - use md5::Digest; - let mut h = md5::Md5::new(); - h.update(chn.name.as_bytes()); - h.update(chn.ty.as_bytes()); - h.update(format!("{:?}", chn.shape).as_bytes()); - let f = h.finalize(); - u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()) -} - pub async fn get_series_id(_scy: &ScySession, _chn: &ChannelDescDecoded) -> Result { error!("TODO get_series_id"); err::todoval() } -pub struct CommonQueries { - pub qu1: PreparedStatement, - pub qu2: PreparedStatement, - pub qu_insert_ts_msp: PreparedStatement, - pub qu_insert_scalar_u16: PreparedStatement, - pub qu_insert_scalar_u32: PreparedStatement, - pub qu_insert_scalar_i16: PreparedStatement, - pub qu_insert_scalar_i32: PreparedStatement, - pub qu_insert_scalar_f32: PreparedStatement, - pub qu_insert_scalar_f64: PreparedStatement, - pub qu_insert_array_u16: PreparedStatement, - pub qu_insert_array_i16: PreparedStatement, - pub qu_insert_array_i32: PreparedStatement, - pub qu_insert_array_f32: PreparedStatement, - pub qu_insert_array_f64: PreparedStatement, - pub qu_insert_array_bool: PreparedStatement, -} - #[derive(Clone)] pub struct ZmtpClientOpts { pub backend: String, - pub scylla: Vec, - pub sources: Vec, + pub addr: SocketAddr, pub do_pulse_id: bool, pub rcvbuf: Option, pub array_truncate: Option, pub process_channel_count_limit: Option, - pub skip_insert: bool, } struct ClientRun { @@ -128,44 +101,159 @@ impl Future for ClientRun { } } -struct BsreadClient { +#[derive(Debug)] +pub enum ZmtpEvent { + ZmtpCommand(ZmtpFrame), + ZmtpMessage(ZmtpMessage), +} + +pub struct BsreadClient { opts: ZmtpClientOpts, - source_addr: String, + source_addr: SocketAddr, do_pulse_id: bool, rcvbuf: Option, - tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>, - insert_item_sender: CommonInsertItemQueueSender, - scy: Arc, - channel_writers: BTreeMap>, - common_queries: Arc, print_stats: CheckEvery, parser: Parser, + ingest_commons: Arc, + insqtx: CommonInsertItemQueueSender, + tmp_evtset_series: Option, + channel_info_query_tx: Sender, + inserted_in_ts_msp_count: u32, + ts_msp_last: u64, + ts_msp_grid_last: u32, } impl BsreadClient { pub async fn new( opts: ZmtpClientOpts, - source_addr: String, - insert_item_sender: CommonInsertItemQueueSender, - scy: Arc, - common_queries: Arc, + ingest_commons: Arc, + channel_info_query_tx: Sender, ) -> Result { + let insqtx = ingest_commons + .insert_item_queue + .sender() + .ok_or_else(|| Error::with_msg_no_trace("can not get insqtx"))?; let ret = Self { - source_addr, + source_addr: opts.addr, do_pulse_id: opts.do_pulse_id, rcvbuf: opts.rcvbuf, opts, - tmp_vals_pulse_map: Vec::new(), - insert_item_sender, - scy, - channel_writers: Default::default(), - common_queries, print_stats: CheckEvery::new(Duration::from_millis(2000)), parser: Parser::new(), + ingest_commons, + insqtx, + tmp_evtset_series: None, + channel_info_query_tx, + inserted_in_ts_msp_count: 0, + ts_msp_last: 0, + ts_msp_grid_last: 0, }; Ok(ret) } + async fn test_evtset_extract( + &mut self, + msg: &ZmtpMessage, + bm: &BsreadMessage, + ts: u64, + pulse: u64, + ) -> Result<(), Error> { + let chname = "SAR-CVME-TIFALL4:EvtSet"; + // Test the bool set write + let mut i3 = usize::MAX; + for (i, ch) in bm.head_b.channels.iter().enumerate() { + if ch.name == chname { + i3 = i; + break; + } + } + if i3 != usize::MAX { + if let Some(fr) = msg.frames.get(2 + 2 * i3) { + debug!("try to extract bools {} {}", fr.msglen, fr.data.len()); + let setlen = fr.data.len(); + debug!("flags {:?}", &fr.data[..setlen.min(16)]); + let evtset: Vec<_> = fr.data.iter().map(|&x| x != 0).collect(); + let scalar_type = ScalarType::BOOL; + let shape = Shape::Wave(256); + if self.tmp_evtset_series.is_none() { + debug!("try to fetch series id"); + let (tx, rx) = async_channel::bounded(8); + let item = ChannelInfoQuery { + backend: self.opts.backend.clone(), + channel: chname.into(), + scalar_type: ScalarType::BOOL.to_scylla_i32(), + shape_dims: Shape::Wave(setlen as _).to_scylla_vec(), + tx, + }; + self.channel_info_query_tx.send(item).await?; + match rx.recv().await { + Ok(res) => match res { + Ok(res) => { + debug!("got series id: {res:?}"); + self.tmp_evtset_series = Some(res.into_inner()); + } + Err(e) => { + error!("{e}"); + } + }, + Err(e) => { + error!("{e}"); + } + } + } + if let Some(series) = self.tmp_evtset_series.clone() { + let (ts_msp, ts_msp_changed) = + if self.inserted_in_ts_msp_count >= 6400 || self.ts_msp_last + HOUR <= ts { + let div = SEC * 10; + let ts_msp = ts / div * div; + if ts_msp == self.ts_msp_last { + (ts_msp, false) + } else { + self.ts_msp_last = ts_msp; + self.inserted_in_ts_msp_count = 1; + (ts_msp, true) + } + } else { + self.inserted_in_ts_msp_count += 1; + (self.ts_msp_last, false) + }; + let ts_lsp = ts - ts_msp; + let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; + let ts_msp_grid = if self.ts_msp_grid_last != ts_msp_grid { + self.ts_msp_grid_last = ts_msp_grid; + Some(ts_msp_grid) + } else { + None + }; + let item = InsertItem { + series, + ts_msp, + ts_lsp, + msp_bump: ts_msp_changed, + ts_msp_grid, + pulse, + scalar_type, + shape, + val: CaDataValue::Array(CaDataArrayValue::Bool(evtset)), + }; + let item = QueryItem::Insert(item); + match self.insqtx.send(item).await { + Ok(_) => { + debug!("item send ok pulse {}", pulse); + } + Err(e) => { + error!("can not send item {:?}", e.0); + } + } + } else { + error!("still no series id"); + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + } + Ok(()) + } + pub async fn run(&mut self) -> Result<(), Error> { let mut conn = tokio::net::TcpStream::connect(&self.source_addr).await?; if let Some(v) = self.rcvbuf { @@ -181,7 +269,6 @@ impl BsreadClient { let mut bytes_payload = 0u64; let mut rows_inserted = 0u32; let mut time_spent_inserting = Duration::from_millis(0); - let mut series_ids = Vec::new(); let mut msg_dt_ema = stats::EMA::with_k(0.01); let mut msg_ts_last = Instant::now(); while let Some(item) = zmtp.next().await { @@ -220,12 +307,14 @@ impl BsreadClient { } { if bm.head_b_md5 != dh_md5_last { - series_ids.clear(); + // TODO header changed, don't support this at the moment. head_b = bm.head_b.clone(); if dh_md5_last.is_empty() { - info!("data header hash {}", bm.head_b_md5); + debug!("data header hash {}", bm.head_b_md5); dh_md5_last = bm.head_b_md5.clone(); - let scy = self.scy.clone(); + // TODO must fetch series ids on-demand. + // For the time being, assume that channel list never changes, but WARN! + /*let scy = self.scy.clone(); for chn in &head_b.channels { info!("Setup writer for {}", chn.name); let cd: ChannelDescDecoded = chn.try_into()?; @@ -235,7 +324,7 @@ impl BsreadClient { warn!("can not set up writer for {} {e:?}", chn.name); } } - } + }*/ } else { error!("TODO changed data header hash {}", bm.head_b_md5); dh_md5_last = bm.head_b_md5.clone(); @@ -246,18 +335,20 @@ impl BsreadClient { } } if self.do_pulse_id { + let nframes = msg.frames().len(); + debug!("nframes {nframes}"); let mut i3 = u32::MAX; for (i, ch) in head_b.channels.iter().enumerate() { - if ch.name == "SINEG01-RLLE-STA:MASTER-EVRPULSEID" { + if ch.name == "SINEG01-RLLE-STA:MASTER-EVRPULSEID" + || ch.name == "SAR-CVME-TIFALL4:EvtSet" + { i3 = i as u32; } } // TODO need to know the facility! if i3 < u32::MAX { let i4 = 2 * i3 + 2; - if i4 >= msg.frames.len() as u32 { - } else { - let fr = &msg.frames[i4 as usize]; + if let Some(fr) = msg.frames.get(i4 as usize) { self.insert_pulse_map(fr, &msg, &bm).await?; } } @@ -266,46 +357,36 @@ impl BsreadClient { // TODO count always, throttle log. error!("not enough frames for data header"); } - let gts = bm.head_a.global_timestamp; + let gts = &bm.head_a.global_timestamp; let ts = (gts.sec as u64) * SEC + gts.ns as u64; let pulse = bm.head_a.pulse_id.as_u64().unwrap_or(0); + debug!("ts {ts:20} pulse{pulse:20}"); // TODO limit warn rate - if pulse != 0 && (pulse < 14781000000 || pulse > 16000000000) { + if pulse != 0 && (pulse < 14781000000 || pulse > 49000000000) { // TODO limit log rate - warn!("Bad pulse {} for {}", pulse, self.source_addr); + warn!("pulse out of range {} addr {}", pulse, self.source_addr); } - for i1 in 0..head_b - .channels - .len() - .min(self.opts.process_channel_count_limit.unwrap_or(4000)) - { + if pulse % 1000000 != ts % 1000000 { + warn!( + "pulse-ts mismatch ts {} pulse {} addr {}", + ts, pulse, self.source_addr + ); + } + self.test_evtset_extract(&msg, &bm, ts, pulse).await?; + let nch = head_b.channels.len(); + let nmax = self.opts.process_channel_count_limit.unwrap_or(4000); + let nlim = if nch > nmax { + // TODO count this event + 4000 + } else { + nch + }; + for i1 in 0..nlim { // TODO skip decoding if header unchanged. let chn = &head_b.channels[i1]; let chd: ChannelDescDecoded = chn.try_into()?; let fr = &msg.frames[2 + 2 * i1]; - // TODO refactor to make correctness evident. - if i1 >= series_ids.len() { - series_ids.resize(head_b.channels.len(), (0u8, 0u64)); - } - if series_ids[i1].0 == 0 { - let series = get_series_id(&self.scy, &chd).await?; - series_ids[i1].0 = 1; - series_ids[i1].1 = series; - } - let series = series_ids[i1].1; - if let Some(_cw) = self.channel_writers.get_mut(&series) { - let _ = ts; - let _ = fr; - // TODO hand off item to a writer item queue. - err::todo(); - /*let res = cw.write_msg(ts, pulse, fr)?.await?; - rows_inserted += res.nrows; - time_spent_inserting = time_spent_inserting + res.dt; - bytes_payload += fr.data().len() as u64;*/ - } else { - // TODO check for missing writers. - warn!("no writer for {}", chn.name); - } + // TODO store the channel information together with series in struct. } } Err(e) => { @@ -353,195 +434,53 @@ impl BsreadClient { } async fn setup_channel_writers(&mut self, scy: &ScySession, cd: &ChannelDescDecoded) -> Result<(), Error> { - let series = get_series_id(scy, cd).await?; let has_comp = cd.compression.is_some(); if has_comp { warn!("Compression not yet supported [{}]", cd.name); return Ok(()); } - let trunc = self.opts.array_truncate.unwrap_or(64); - let cw = ChannelWriterAll::new( - series, - self.common_queries.clone(), - self.scy.clone(), - cd.scalar_type.clone(), - cd.shape.clone(), - cd.byte_order.clone(), - trunc, - self.opts.skip_insert, - )?; let shape_dims = cd.shape.to_scylla_vec(); - self.channel_writers.insert(series, Box::new(cw)); - if !self.opts.skip_insert { - error!("TODO use PGSQL and existing function instead."); - err::todo(); - // TODO insert correct facility name - self.scy - .query( - "insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?) if not exists", - (&self.opts.backend, &cd.name, series as i64, cd.scalar_type.to_scylla_i32(), &shape_dims), - ) - .await - .err_conv()?; - } Ok(()) } async fn insert_pulse_map(&mut self, fr: &ZmtpFrame, msg: &ZmtpMessage, bm: &BsreadMessage) -> Result<(), Error> { - trace!("data len {}", fr.data.len()); + debug!("data len {}", fr.data.len()); // TODO take pulse-id also from main header and compare. - let pulse_f64 = f64::from_be_bytes(fr.data[..].try_into().unwrap()); - trace!("pulse_f64 {pulse_f64}"); + let pulse_f64 = f64::from_be_bytes(fr.data[..8].try_into()?); + debug!("pulse_f64 {pulse_f64}"); let pulse = pulse_f64 as u64; if false { let i4 = 3; // TODO this next frame should be described somehow in the json header or? - info!("next val len {}", msg.frames[i4 as usize + 1].data.len()); - let ts_a = u64::from_be_bytes(msg.frames[i4 as usize + 1].data[0..8].try_into().unwrap()); - let ts_b = u64::from_be_bytes(msg.frames[i4 as usize + 1].data[8..16].try_into().unwrap()); - info!("ts_a {ts_a} ts_b {ts_b}"); - } - let _ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns; - if true { - let pulse_a = (pulse >> 14) as i64; - let pulse_b = (pulse & 0x3fff) as i32; - let ts_a = bm.head_a.global_timestamp.sec as i64; - let ts_b = bm.head_a.global_timestamp.ns as i32; - self.tmp_vals_pulse_map.push((pulse_a, pulse_b, ts_a, ts_b)); - } - if self.tmp_vals_pulse_map.len() >= 200 { - let ts1 = Instant::now(); - // TODO use facility, channel_name, ... as partition key. - self.scy - .execute(&self.common_queries.qu1, (1i32, self.tmp_vals_pulse_map[0].0)) - .await - .err_conv()?; - let mut batch = Batch::new(BatchType::Unlogged); - for _ in 0..self.tmp_vals_pulse_map.len() { - batch.append_statement(self.common_queries.qu2.clone()); - } - let _ = self.scy.batch(&batch, &self.tmp_vals_pulse_map).await.err_conv()?; - let nn = self.tmp_vals_pulse_map.len(); - self.tmp_vals_pulse_map.clear(); - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; - info!("insert {} items in {:6.2} ms", nn, dt); + debug!("next val len {}", msg.frames[i4 as usize + 1].data.len()); + let ts_a = u64::from_be_bytes(msg.frames[i4 as usize + 1].data[0..8].try_into()?); + let ts_b = u64::from_be_bytes(msg.frames[i4 as usize + 1].data[8..16].try_into()?); + debug!("ts_a {ts_a} ts_b {ts_b}"); } + let ts = bm.head_a.global_timestamp.sec * SEC + bm.head_a.global_timestamp.ns; + /*let pulse_a = (pulse >> 14) as i64; + let pulse_b = (pulse & 0x3fff) as i32; + let ts_a = bm.head_a.global_timestamp.sec as i64; + let ts_b = bm.head_a.global_timestamp.ns as i32;*/ + debug!("ts {ts:20} pulse {pulse:20}"); Ok(()) } } pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { - let scy = SessionBuilder::new().default_consistency(Consistency::Quorum); - let mut scy = scy; - for a in &opts.scylla { - scy = scy.known_node(a); - } - // TODO use keyspace from configuration. - err::todo(); - let scy = scy - .use_keyspace("ks1", false) - .build() - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let scy = Arc::new(scy); - - error!("TODO redo the pulse mapping"); - err::todo(); - let qu1 = scy - .prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu2 = scy - .prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_ts_msp = scy - .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_scalar_u16 = scy - .prepare("insert into events_scalar_u16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_scalar_u32 = scy - .prepare("insert into events_scalar_u32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_scalar_i16 = scy - .prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_scalar_i32 = scy - .prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_scalar_f32 = scy - .prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_scalar_f64 = scy - .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_array_u16 = scy - .prepare("insert into events_array_u16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_array_i16 = scy - .prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_array_i32 = scy - .prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_array_f32 = scy - .prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_array_f64 = scy - .prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_array_bool = scy - .prepare("insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let common_queries = CommonQueries { - qu1, - qu2, - qu_insert_ts_msp, - qu_insert_scalar_u16, - qu_insert_scalar_u32, - qu_insert_scalar_i16, - qu_insert_scalar_i32, - qu_insert_scalar_f32, - qu_insert_scalar_f64, - qu_insert_array_u16, - qu_insert_array_i16, - qu_insert_array_i32, - qu_insert_array_f32, - qu_insert_array_f64, - qu_insert_array_bool, + let client = BsreadClient::new(opts.clone(), todo!(), todo!()).await?; + let fut = { + async move { + let mut client = client; + client.run().await?; + Ok::<_, Error>(()) + } }; - let common_queries = Arc::new(common_queries); - let mut jhs = vec![]; - for source_addr in &opts.sources { - let client = BsreadClient::new( - opts.clone(), - source_addr.into(), - todo!(), - scy.clone(), - common_queries.clone(), - ) - .await?; - let fut = ClientRun::new(client); - //clients.push(fut); - let jh = tokio::spawn(fut); - jhs.push(jh); - } - futures_util::future::join_all(jhs).await; + let jh = tokio::spawn(fut); + //let mut jhs = Vec::new(); + //jhs.push(jh); + //futures_util::future::join_all(jhs).await; + jh.await.map_err(|e| e.to_string()).map_err(Error::from)??; Ok(()) } @@ -666,6 +605,7 @@ enum ConnState { ReadFrameShort, ReadFrameLong, ReadFrameBody(usize), + LockScan(usize), } impl ConnState { @@ -682,6 +622,7 @@ impl ConnState { ReadFrameShort => 1, ReadFrameLong => 8, ReadFrameBody(msglen) => *msglen, + LockScan(n) => *n, } } } @@ -734,7 +675,7 @@ impl Zmtp { complete: false, socket_type, conn, - conn_state: ConnState::InitSend, + conn_state: ConnState::LockScan(1), buf: NetBuf::new(1024 * 128), outbuf: NetBuf::new(1024 * 128), out_enable: false, @@ -742,7 +683,7 @@ impl Zmtp { has_more: false, is_command: false, peer_ver: (0, 0), - frames: vec![], + frames: Vec::new(), inp_eof: false, data_tx: tx, data_rx: rx, @@ -1106,7 +1047,7 @@ impl Zmtp { } ConnState::ReadFrameLong => { self.msglen = self.buf.read_u64()? as usize; - trace!("parse_item ReadFrameShort msglen {}", self.msglen); + trace!("parse_item ReadFrameLong msglen {}", self.msglen); self.conn_state = ConnState::ReadFrameBody(self.msglen); if self.msglen > self.buf.cap() / 2 { error!("msglen {} too large for this client", self.msglen); @@ -1160,7 +1101,7 @@ impl Zmtp { } } let g = ZmtpFrame { - msglen: self.msglen, + msglen: msglen, has_more: self.has_more, is_command: self.is_command, data, @@ -1168,7 +1109,7 @@ impl Zmtp { Ok(Some(ZmtpEvent::ZmtpCommand(g))) } else { let g = ZmtpFrame { - msglen: self.msglen, + msglen: msglen, has_more: self.has_more, is_command: self.is_command, data, @@ -1178,7 +1119,7 @@ impl Zmtp { Ok(None) } else { let g = ZmtpMessage { - frames: mem::replace(&mut self.frames, vec![]), + frames: mem::replace(&mut self.frames, Vec::new()), }; if false && g.frames.len() != 118 { info!("EMIT {} frames", g.frames.len()); @@ -1199,6 +1140,55 @@ impl Zmtp { } } } + ConnState::LockScan(n) => { + if n > 1024 * 20 { + warn!("could not lock within {n} bytes"); + } + const NBACK: usize = 2; + let data = self.buf.data(); + let mut found_at = None; + debug!("{}", String::from_utf8_lossy(data)); + debug!("try to lock within {} bytes", data.len()); + let needle = br##"{"dh_compression":"##; + for (i1, b) in data.iter().enumerate() { + if i1 >= NBACK && *b == needle[0] { + let dd = &data[i1..]; + { + let nn = dd.len().min(32); + debug!("pre {}", String::from_utf8_lossy(&dd[..nn])); + } + if dd.len() >= needle.len() { + if &dd[..needle.len()] == needle { + debug!("found at {i1}"); + found_at = Some(i1); + break; + } + } + } + } + let mut locked = false; + if let Some(nf) = found_at { + if nf >= NBACK { + if false { + let s1 = data[nf - NBACK..].iter().take(32).fold(String::new(), |mut a, x| { + use std::fmt::Write; + let _ = write!(a, "{:02x} ", *x); + a + }); + debug!("BUF {s1}"); + } + if data[nf - 2] == 0x01 && data[nf - 1] > 0x70 && data[nf - 1] < 0xd0 { + locked = true; + } + } + } + if locked { + self.conn_state = ConnState::ReadFrameFlags; + } else { + self.conn_state = ConnState::LockScan(data.len() + 1); + } + Ok(None) + } } } } @@ -1294,12 +1284,6 @@ impl fmt::Debug for Int { } } -#[derive(Debug)] -pub enum ZmtpEvent { - ZmtpCommand(ZmtpFrame), - ZmtpMessage(ZmtpMessage), -} - impl Stream for Zmtp { type Item = Result; @@ -1365,7 +1349,7 @@ impl DummyData { let ha = serde_json::to_vec(&head_a).unwrap(); let hf = self.value.to_le_bytes().to_vec(); let hp = [(self.ts / SEC).to_be_bytes(), (self.ts % SEC).to_be_bytes()].concat(); - let mut msg = ZmtpMessage { frames: vec![] }; + let mut msg = ZmtpMessage { frames: Vec::new() }; let fr = ZmtpFrame { msglen: 0, has_more: false,