diff --git a/batchtools/Cargo.toml b/batchtools/Cargo.toml new file mode 100644 index 0000000..3df622e --- /dev/null +++ b/batchtools/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "batchtools" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +log = { path = "../log" } +err = { path = "../../daqbuffer/crates/err" } +taskrun = { path = "../../daqbuffer/crates/taskrun" } +async-channel = "1.9.0" diff --git a/netfetch/src/batcher.rs b/batchtools/src/batcher.rs similarity index 98% rename from netfetch/src/batcher.rs rename to batchtools/src/batcher.rs index 9488c49..b4e35c7 100644 --- a/netfetch/src/batcher.rs +++ b/batchtools/src/batcher.rs @@ -1,5 +1,5 @@ use async_channel::Receiver; -use netpod::log::*; +use log::*; use std::time::Duration; use taskrun::tokio; diff --git a/batchtools/src/lib.rs b/batchtools/src/lib.rs new file mode 100644 index 0000000..de0a2da --- /dev/null +++ b/batchtools/src/lib.rs @@ -0,0 +1 @@ +pub mod batcher; diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 1c53f77..e08808c 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -20,4 +20,7 @@ taskrun = { path = "../../daqbuffer/crates/taskrun" } log = { path = "../log" } stats = { path = "../stats" } scywr = { path = "../scywr" } +dbpg = { path = "../dbpg" } +series = { path = "../series" } netfetch = { path = "../netfetch" } +batchtools = { path = "../batchtools" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 6ef834e..78602fd 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1,11 +1,12 @@ use async_channel::Receiver; use async_channel::Sender; use async_channel::WeakReceiver; +use dbpg::conn::make_pg_client; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; use log::*; -use netfetch::batchquery::series_by_channel::ChannelInfoQuery; use netfetch::ca::conn::CaConnEvent; use netfetch::ca::conn::ConnCommand; use netfetch::ca::connset::CaConnSet; @@ -16,12 +17,8 @@ use netfetch::ca::SlowWarnable; use netfetch::conf::CaIngestOpts; use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; -use netfetch::errconv::ErrConv; use netfetch::metrics::ExtraInsertsConf; use netfetch::metrics::StatsSet; -use netfetch::series::ChannelStatusSeriesId; -use netfetch::series::Existence; -use netfetch::series::SeriesId; use netpod::Database; use netpod::ScyllaConfig; use scywr::insertworker::Ttls; @@ -34,6 +31,9 @@ use scywriiq::ConnectionStatus; use scywriiq::ConnectionStatusItem; use scywriiq::QueryItem; use serde::Serialize; +use series::series::Existence; +use series::ChannelStatusSeriesId; +use series::SeriesId; use stats::DaemonStats; use std::collections::BTreeMap; use std::collections::HashMap; @@ -171,7 +171,7 @@ pub enum ActiveChannelState { }, WaitForStatusSeriesId { since: SystemTime, - rx: Receiver, Error>>, + rx: Receiver, dbpg::seriesbychannel::Error>>, }, WithStatusSeriesId { status_series_id: ChannelStatusSeriesId, @@ -252,18 +252,6 @@ where } } -pub async fn make_pg_client(d: &Database) -> Result { - let (client, pg_conn) = tokio_postgres::connect( - &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), - tokio_postgres::tls::NoTls, - ) - .await - .err_conv()?; - // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: - tokio::spawn(pg_conn); - Ok(client) -} - pub struct Daemon { opts: DaemonOpts, connection_states: BTreeMap, @@ -303,14 +291,21 @@ impl Daemon { let pgcs = { let mut a = Vec::new(); for _ in 0..SEARCH_DB_PIPELINE_LEN { - let pgc = Arc::new(make_pg_client(&opts.pgconf).await?); + let pgc = Arc::new( + make_pg_client(&opts.pgconf) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?, + ); a.push(pgc); } a }; let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), opts.backend().into(), pgcs); - let channel_info_query_tx = netfetch::batchquery::series_by_channel::start_task(&opts.pgconf).await?; + // TODO keep join handles and await later + let (channel_info_query_tx, ..) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); @@ -539,7 +534,7 @@ impl Daemon { } let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); let fut = async move { - let (batch_rx, _jh) = netfetch::batcher::batch( + let (batch_rx, _jh) = batchtools::batcher::batch( SEARCH_BATCH_MAX, Duration::from_millis(200), SEARCH_DB_PIPELINE_LEN, @@ -979,7 +974,7 @@ impl Daemon { if let Some(tx) = self.ingest_commons.insert_item_queue.sender() { let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: tsnow, - series: scywr::iteminsertqueue::SeriesId::new(status_series_id.id()), + series: SeriesId::new(status_series_id.id()), status: ChannelStatus::AssignedToAddress, }); match tx.send(item).await { @@ -1551,7 +1546,13 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> netfetch::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigint)?; netfetch::linuxhelper::set_signal_handler(libc::SIGTERM, handler_sigterm)?; - netfetch::dbpg::schema_check(opts.postgresql_config()).await?; + let pg = dbpg::conn::make_pg_client(opts.postgresql_config()) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + + dbpg::schema::schema_check(&pg) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; scywr::schema::migrate_scylla_data_schema(opts.scylla_config()) .await diff --git a/dbpg/Cargo.toml b/dbpg/Cargo.toml index e15a610..5e4fee2 100644 --- a/dbpg/Cargo.toml +++ b/dbpg/Cargo.toml @@ -9,5 +9,9 @@ log = { path = "../log" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } +batchtools = { path = "../batchtools" } +series = { path = "../series" } tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] } +futures-util = "0.3" async-channel = "1.9.0" +md-5 = "0.10" diff --git a/dbpg/src/err.rs b/dbpg/src/err.rs index f27d63e..23f8f83 100644 --- a/dbpg/src/err.rs +++ b/dbpg/src/err.rs @@ -1,20 +1,7 @@ use err::thiserror; use err::ThisError; -#[derive(Debug)] -pub struct Msg(pub String); - #[derive(Debug, ThisError)] pub enum Error { Postgres(#[from] tokio_postgres::Error), - Msg(Msg), -} - -impl Error { - pub fn from_msg(msg: T) -> Self - where - T: Into, - { - Self::Msg(Msg(msg.into())) - } } diff --git a/dbpg/src/findaddr.rs b/dbpg/src/findaddr.rs new file mode 100644 index 0000000..eb7b189 --- /dev/null +++ b/dbpg/src/findaddr.rs @@ -0,0 +1,85 @@ +use crate::conn::PgClient; +use err::thiserror; +use err::ThisError; +use log::*; +use std::net::SocketAddrV4; + +#[derive(Debug, ThisError)] +pub enum Error { + Postgres(#[from] tokio_postgres::Error), + IocAddrNotFound, +} + +pub async fn find_channel_addr(backend: &str, name: String, pg: &PgClient) -> Result, Error> { + let qu_find_addr = pg + .prepare( + "select t1.facility, t1.channel, t1.addr from ioc_by_channel_log t1 where t1.facility = $1 and t1.channel = $2 and addr is not null order by tsmod desc limit 1", + ) + .await?; + let rows = pg.query(&qu_find_addr, &[&backend, &name]).await?; + if rows.is_empty() { + error!("can not find any addresses of channels {:?}", name); + Err(Error::IocAddrNotFound) + } else { + for row in rows { + match row.try_get::<_, &str>(2) { + Ok(addr) => match addr.parse::() { + Ok(addr) => return Ok(Some(addr)), + Err(e) => { + error!("can not parse {e:?}"); + return Err(Error::IocAddrNotFound); + } + }, + Err(e) => { + error!("can not find addr for {name} {e:?}"); + } + } + } + Ok(None) + } +} + +#[allow(unused)] +async fn query_addr_multiple(backend: &str, pg_client: &PgClient) -> Result<(), Error> { + // TODO factor the find loop into a separate Stream. + let sql = concat!( + "with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel_log t1", + " where t1.facility = $1", + " and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9)", + " and t1.addr is not null order by t1.tsmod desc)", + " select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1" + ); + let qu_find_addr = pg_client.prepare(sql).await?; + let mut chns_todo: &[String] = err::todoval(); + let mut chstmp = ["__NONE__"; 8]; + for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) { + *s2 = s1; + } + chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..]; + let rows = pg_client + .query( + &qu_find_addr, + &[ + &backend, &chstmp[0], &chstmp[1], &chstmp[2], &chstmp[3], &chstmp[4], &chstmp[5], &chstmp[6], + &chstmp[7], + ], + ) + .await?; + for row in rows { + let ch: &str = row.get(1); + let addr: &str = row.get(2); + if addr == "" { + // TODO the address was searched before but could not be found. + } else { + let addr: SocketAddrV4 = match addr.parse() { + Ok(k) => k, + Err(e) => { + error!("can not parse {addr:?} for channel {ch:?} {e:?}"); + continue; + } + }; + let _ = addr; + } + } + Ok(()) +} diff --git a/dbpg/src/lib.rs b/dbpg/src/lib.rs index e21871b..22dd88b 100644 --- a/dbpg/src/lib.rs +++ b/dbpg/src/lib.rs @@ -1,5 +1,8 @@ pub mod conn; pub mod err; +pub mod findaddr; pub mod iocindex; pub mod pool; pub mod schema; +pub mod seriesbychannel; +pub mod seriesid; diff --git a/dbpg/src/pool.rs b/dbpg/src/pool.rs index 605e5f2..b167c89 100644 --- a/dbpg/src/pool.rs +++ b/dbpg/src/pool.rs @@ -14,7 +14,6 @@ pub enum Error { EndOfPool, ChannelRecv(#[from] RecvError), ChannelSend, - Msg(String), } impl From for Error { @@ -22,7 +21,6 @@ impl From for Error { type G = crate::err::Error; match value { G::Postgres(e) => Error::Postgres(e), - G::Msg(e) => Error::Msg(e.0), } } } diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index a805acc..478f9c9 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -1,7 +1,23 @@ use crate::conn::PgClient; -use crate::err::Error; +use err::thiserror; +use err::ThisError; use log::*; +#[derive(Debug, ThisError)] +pub enum Error { + Postgres(#[from] tokio_postgres::Error), + LogicError(String), +} + +impl Error { + pub fn from_logic_msg(msg: T) -> Self + where + T: Into, + { + Self::LogicError(msg.into()) + } +} + async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result { let rows = pgc .query( @@ -16,12 +32,15 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result for Error { + fn from(value: crate::err::Error) -> Self { + use crate::err::Error as A; + match value { + A::Postgres(x) => Self::Postgres(x), + } + } +} + +pub struct ChannelInfoQuery { + pub backend: String, + pub channel: String, + pub scalar_type: i32, + pub shape_dims: Vec, + pub tx: Sender, Error>>, +} + +impl ChannelInfoQuery { + pub fn dummy(&self) -> Self { + Self { + backend: String::new(), + channel: String::new(), + scalar_type: -1, + shape_dims: Vec::new(), + tx: self.tx.clone(), + } + } +} + +struct ChannelInfoResult { + series: Existence, + tx: Sender, Error>>, +} + +struct Worker { + pg: PgClient, + qu_select: PgStatement, + qu_insert: PgStatement, + batch_rx: Receiver>, +} + +impl Worker { + async fn new(db: &Database, batch_rx: Receiver>) -> Result { + let pg = crate::conn::make_pg_client(db).await?; + let sql = concat!( + "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])", + " as inp (backend, channel, scalar_type, shape_dims, rid))", + " select t.series, q1.rid from series_by_channel t", + " join q1 on t.facility = q1.backend and t.channel = q1.channel", + " and t.scalar_type = q1.scalar_type and t.shape_dims = q1.shape_dims::int[]", + " and t.agg_kind = 0", + " order by q1.rid", + ); + let qu_select = pg.prepare(sql).await?; + let sql = concat!( + "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::bigint[])", + " as inp (backend, channel, scalar_type, shape_dims, series))", + " insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", + " select series, backend, channel, scalar_type, shape_dims::int[], 0 from q1", + " on conflict do nothing" + ); + let qu_insert = pg.prepare(sql).await?; + let ret = Self { + pg, + qu_select, + qu_insert, + batch_rx, + }; + Ok(ret) + } + + async fn select( + &self, + batch: Vec, + ) -> Result<(Vec, Vec), Error> { + let mut backend = Vec::new(); + let mut channel = Vec::new(); + let mut scalar_type = Vec::new(); + let mut shape_dims = Vec::new(); + let mut shape_dims_str = Vec::new(); + let mut rid = Vec::new(); + let mut tx = Vec::new(); + for (i, e) in batch.into_iter().enumerate() { + backend.push(e.backend); + channel.push(e.channel); + scalar_type.push(e.scalar_type); + let mut dims = String::with_capacity(32); + dims.push('{'); + for (i, &v) in e.shape_dims.iter().enumerate() { + if i > 0 { + dims.push(','); + } + use std::fmt::Write; + write!(dims, "{}", v).unwrap(); + } + dims.push('}'); + shape_dims_str.push(dims); + shape_dims.push(e.shape_dims); + rid.push(i as i32); + tx.push((i as u32, e.tx)); + } + let rows = self + .pg + .query( + &self.qu_select, + &[&backend, &channel, &scalar_type, &shape_dims_str, &rid], + ) + .await?; + let mut result = Vec::new(); + let mut missing = Vec::new(); + let mut it1 = rows.into_iter(); + let mut e1 = it1.next(); + for (qrid, tx) in tx { + if let Some(row) = &e1 { + let rid: i32 = row.get(1); + if rid as u32 == qrid { + let series: i64 = row.get(0); + let series = SeriesId::new(series as _); + let res = ChannelInfoResult { + series: Existence::Existing(series), + tx, + }; + result.push(res); + } + e1 = it1.next(); + } else { + let i = qrid as usize; + let k = ChannelInfoQuery { + backend: backend[i].clone(), + channel: channel[i].clone(), + scalar_type: scalar_type[i].clone(), + shape_dims: shape_dims[i].clone(), + tx, + }; + missing.push(k); + } + } + Ok((result, missing)) + } + + async fn insert_missing(&self, batch: &Vec) -> Result<(), Error> { + let tsbeg = Instant::now(); + let mut backends = Vec::new(); + let mut channels = Vec::new(); + let mut scalar_types = Vec::new(); + let mut shape_dimss = Vec::new(); + let mut shape_dims_strs = Vec::new(); + let mut hashers = Vec::new(); + for e in batch.into_iter() { + { + let mut h = md5::Md5::new(); + h.update(e.backend.as_bytes()); + h.update(e.channel.as_bytes()); + h.update(format!("{:?}", e.scalar_type).as_bytes()); + h.update(format!("{:?}", e.shape_dims).as_bytes()); + hashers.push(h); + } + backends.push(&e.backend); + channels.push(&e.channel); + scalar_types.push(e.scalar_type); + let mut dims = String::with_capacity(32); + dims.push('{'); + for (i, &v) in e.shape_dims.iter().enumerate() { + if i > 0 { + dims.push(','); + } + use std::fmt::Write; + write!(dims, "{}", v).unwrap(); + } + dims.push('}'); + shape_dims_strs.push(dims); + shape_dimss.push(&e.shape_dims); + } + let mut i1 = 0; + loop { + i1 += 1; + if i1 >= 200 { + return Err(Error::CreateSeriesFail); + } + let mut seriess = Vec::with_capacity(hashers.len()); + let mut all_good = true; + for h in &mut hashers { + let mut good = false; + for _ in 0..50 { + h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); + let f = h.clone().finalize(); + let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + if series >= 100000000000000000 && series <= i64::MAX as u64 { + seriess.push(series as i64); + good = true; + break; + } + } + if !good { + all_good = false; + break; + } + } + if !all_good { + continue; + } + self.pg + .execute( + &self.qu_insert, + &[&backends, &channels, &scalar_types, &shape_dims_strs, &seriess], + ) + .await?; + break; + } + Ok(()) + } + + async fn work(&mut self) -> Result<(), Error> { + 'outer: while let Some(batch) = self.batch_rx.next().await { + let (res1, missing) = self.select(batch).await?; + let res3 = if missing.len() > 0 { + self.insert_missing(&missing).await?; + let (res2, missing2) = self.select(missing).await?; + if missing2.len() > 0 { + Err(Error::SeriesMissing) + } else { + Ok(res2) + } + } else { + Ok(res1) + }; + let res4 = res3?; + for r in res4 { + match r.tx.send(Ok(r.series)).await { + Ok(()) => {} + Err(_e) => { + warn!("can not deliver result"); + break 'outer; + } + } + } + } + info!("Worker done"); + Ok(()) + } +} + +pub async fn start_lookup_workers( + worker_count: usize, + db: &Database, +) -> Result< + ( + Sender, + Vec>>, + JoinHandle<()>, + ), + Error, +> { + let inp_cap = 128; + let batch_out_cap = 4; + let timeout = Duration::from_millis(400); + let (query_tx, query_rx) = async_channel::bounded(inp_cap); + let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx); + let mut jhs = Vec::new(); + for _ in 0..worker_count { + let mut worker = Worker::new(db, batch_rx.clone()).await?; + let jh = tokio::task::spawn(async move { worker.work().await }); + jhs.push(jh); + } + Ok((query_tx, jhs, bjh)) +} diff --git a/dbpg/src/seriesid.rs b/dbpg/src/seriesid.rs new file mode 100644 index 0000000..1151220 --- /dev/null +++ b/dbpg/src/seriesid.rs @@ -0,0 +1,93 @@ +use crate::conn::PgClient; +use err::thiserror; +use err::ThisError; +use log::*; +use netpod::ScalarType; +use netpod::Shape; +use series::series::Existence; +use series::SeriesId; +use std::time::Duration; +use std::time::Instant; +use taskrun::tokio; + +#[derive(Debug, ThisError)] +pub enum Error { + Postgres(#[from] tokio_postgres::Error), + IocAddrNotFound, + BadIdGenerated, + CanNotInsertSeriesId, +} + +// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. +pub async fn get_series_id( + name: &str, + scalar_type: &ScalarType, + shape: &Shape, + pg_client: &PgClient, + backend: String, +) -> Result, Error> { + let channel_name = name; + let scalar_type = scalar_type.to_scylla_i32(); + let shape = shape.to_scylla_vec(); + let res = pg_client + .query( + "select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0", + &[&backend, &channel_name, &scalar_type, &shape], + ) + .await?; + let mut all = Vec::new(); + for row in res { + let series: i64 = row.get(0); + let series = series as u64; + all.push(series); + } + let rn = all.len(); + let tsbeg = Instant::now(); + if rn == 0 { + use md5::Digest; + let mut h = md5::Md5::new(); + h.update(backend.as_bytes()); + h.update(channel_name.as_bytes()); + h.update(format!("{:?}", scalar_type).as_bytes()); + h.update(format!("{:?}", shape).as_bytes()); + for _ in 0..200 { + h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); + let f = h.clone().finalize(); + let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + if series > i64::MAX as u64 { + continue; + } + if series == 0 { + continue; + } + if series <= 0 || series > i64::MAX as u64 { + return Err(Error::BadIdGenerated); + } + let sql = concat!( + "insert into series_by_channel", + " (series, facility, channel, scalar_type, shape_dims, agg_kind)", + " values ($1, $2, $3, $4, $5, 0) on conflict do nothing" + ); + let res = pg_client + .execute(sql, &[&(series as i64), &backend, &channel_name, &scalar_type, &shape]) + .await + .unwrap(); + if res == 1 { + let series = Existence::Created(SeriesId::new(series)); + return Ok(series); + } else { + warn!( + "tried to insert {series:?} for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} trying again..." + ); + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + error!("tried to insert new series id for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} but failed"); + Err(Error::CanNotInsertSeriesId) + } else { + let series = all[0] as u64; + let series = Existence::Existing(SeriesId::new(series)); + debug!("get_series_id {backend:?} {channel_name:?} {scalar_type:?} {shape:?} {series:?}"); + Ok(series) + } +} diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 0d4ed3f..6ef03e2 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -30,6 +30,7 @@ humantime-serde = "1.1" pin-project = "1" lazy_static = "1" log = { path = "../log" } +series = { path = "../series" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } diff --git a/netfetch/src/batchquery.rs b/netfetch/src/batchquery.rs deleted file mode 100644 index b1929bb..0000000 --- a/netfetch/src/batchquery.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod series_by_channel; diff --git a/netfetch/src/batchquery/series_by_channel.rs b/netfetch/src/batchquery/series_by_channel.rs deleted file mode 100644 index 7fd1b8c..0000000 --- a/netfetch/src/batchquery/series_by_channel.rs +++ /dev/null @@ -1,306 +0,0 @@ -use crate::batcher; -use crate::dbpg::make_pg_client; -use crate::errconv::ErrConv; -use crate::series::Existence; -use crate::series::SeriesId; -use async_channel::Receiver; -use async_channel::Sender; -use err::Error; -use futures_util::StreamExt; -use md5::Digest; -use netpod::log::*; -use netpod::Database; -use std::time::Duration; -use std::time::Instant; -use taskrun::tokio; -use tokio::task::JoinHandle; -use tokio_postgres::Client as PgClient; -use tokio_postgres::Statement as PgStatement; - -pub struct ChannelInfoQuery { - pub backend: String, - pub channel: String, - pub scalar_type: i32, - pub shape_dims: Vec, - pub tx: Sender, Error>>, -} - -impl ChannelInfoQuery { - pub fn dummy(&self) -> Self { - Self { - backend: String::new(), - channel: String::new(), - scalar_type: -1, - shape_dims: Vec::new(), - tx: self.tx.clone(), - } - } -} - -struct ChannelInfoResult { - series: Existence, - tx: Sender, Error>>, -} - -struct PgRes { - pgc: PgClient, - st: PgStatement, -} - -async fn prepare_pgcs(sql: &str, pgcn: usize, db: &Database) -> Result<(Sender, Receiver), Error> { - let (pgc_tx, pgc_rx) = async_channel::bounded(pgcn); - for _ in 0..pgcn { - let pgc = make_pg_client(&db).await?; - let st = pgc.prepare(sql).await.map_err(|e| Error::from(e.to_string()))?; - let k = PgRes { pgc, st }; - match pgc_tx.send(k).await { - Ok(_) => {} - Err(e) => { - error!("can not enqueue pgc {e}"); - } - } - } - Ok((pgc_tx, pgc_rx)) -} - -async fn select( - batch: Vec, - pgres: PgRes, -) -> Result<(Vec, Vec, PgRes), Error> { - let mut backend = Vec::new(); - let mut channel = Vec::new(); - let mut scalar_type = Vec::new(); - let mut shape_dims = Vec::new(); - let mut shape_dims_str: Vec = Vec::new(); - let mut rid = Vec::new(); - let mut tx = Vec::new(); - for (i, e) in batch.into_iter().enumerate() { - backend.push(e.backend); - channel.push(e.channel); - scalar_type.push(e.scalar_type); - let mut dims = String::with_capacity(32); - dims.push('{'); - for (i, &v) in e.shape_dims.iter().enumerate() { - if i > 0 { - dims.push(','); - } - use std::fmt::Write; - write!(dims, "{}", v).unwrap(); - } - dims.push('}'); - shape_dims_str.push(dims); - shape_dims.push(e.shape_dims); - rid.push(i as i32); - tx.push((i as u32, e.tx)); - } - match pgres - .pgc - .query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims_str, &rid]) - .await - .map_err(|e| { - error!("{e}"); - Error::from(e.to_string()) - }) { - Ok(rows) => { - let mut result = Vec::new(); - let mut missing = Vec::new(); - let mut it1 = rows.into_iter(); - let mut e1 = it1.next(); - for (qrid, tx) in tx { - if let Some(row) = &e1 { - let rid: i32 = row.get(1); - if rid as u32 == qrid { - let series: i64 = row.get(0); - let series = SeriesId::new(series as _); - let res = ChannelInfoResult { - series: Existence::Existing(series), - tx, - }; - result.push(res); - } - e1 = it1.next(); - } else { - let i = qrid as usize; - let k = ChannelInfoQuery { - backend: backend[i].clone(), - channel: channel[i].clone(), - scalar_type: scalar_type[i].clone(), - shape_dims: shape_dims[i].clone(), - tx, - }; - missing.push(k); - } - } - Ok((result, missing, pgres)) - } - Err(e) => { - error!("error in pg query {e}"); - tokio::time::sleep(Duration::from_millis(2000)).await; - Err(e) - } - } -} - -async fn insert_missing(batch: &Vec, pgres: PgRes) -> Result<((), PgRes), Error> { - let tsbeg = Instant::now(); - let mut backends = Vec::new(); - let mut channels = Vec::new(); - let mut scalar_types = Vec::new(); - let mut shape_dimss = Vec::new(); - let mut shape_dims_strs: Vec = Vec::new(); - let mut hashers = Vec::new(); - for e in batch.into_iter() { - { - let mut h = md5::Md5::new(); - h.update(e.backend.as_bytes()); - h.update(e.channel.as_bytes()); - h.update(format!("{:?}", e.scalar_type).as_bytes()); - h.update(format!("{:?}", e.shape_dims).as_bytes()); - hashers.push(h); - } - backends.push(&e.backend); - channels.push(&e.channel); - scalar_types.push(e.scalar_type); - let mut dims = String::with_capacity(32); - dims.push('{'); - for (i, &v) in e.shape_dims.iter().enumerate() { - if i > 0 { - dims.push(','); - } - use std::fmt::Write; - write!(dims, "{}", v).unwrap(); - } - dims.push('}'); - shape_dims_strs.push(dims); - shape_dimss.push(&e.shape_dims); - } - let mut i1 = 0; - loop { - i1 += 1; - if i1 >= 200 { - return Err(Error::with_msg_no_trace("not able to generate series information")); - } - let mut seriess = Vec::with_capacity(hashers.len()); - let mut all_good = true; - for h in &mut hashers { - let mut good = false; - for _ in 0..50 { - h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); - let f = h.clone().finalize(); - let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - if series >= 100000000000000000 && series <= i64::MAX as u64 { - seriess.push(series as i64); - good = true; - break; - } - } - if !good { - all_good = false; - break; - } - } - if !all_good { - continue; - } - let sql = concat!( - "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::bigint[])", - " as inp (backend, channel, scalar_type, shape_dims, series))", - " insert into series_by_channel (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " select series, backend, channel, scalar_type, shape_dims::int[], 0 from q1", - " on conflict do nothing" - ); - pgres - .pgc - .execute(sql, &[&backends, &channels, &scalar_types, &shape_dims_strs, &seriess]) - .await - .err_conv()?; - break; - } - Ok(((), pgres)) -} - -async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(Vec, PgRes), Error> { - let (res1, missing, pgres) = select(batch, pgres).await?; - if missing.len() > 0 { - let ((), pgres) = insert_missing(&missing, pgres).await?; - let (res2, missing2, pgres) = select(missing, pgres).await?; - if missing2.len() > 0 { - Err(Error::with_msg_no_trace("some series not found even after write")) - } else { - Ok((res2, pgres)) - } - } else { - Ok((res1, pgres)) - } -} - -async fn run_queries( - npg: usize, - batch_rx: Receiver>, - pgc_rx: Receiver, - pgc_tx: Sender, -) -> Result<(), Error> { - let mut stream = batch_rx - .map(|batch| { - debug!("see batch of {}", batch.len()); - let pgc_rx = pgc_rx.clone(); - let pgc_tx = pgc_tx.clone(); - async move { - if let Ok(pgres) = pgc_rx.recv().await { - let (res, pgres) = fetch_data(batch, pgres).await?; - if let Err(_) = pgc_tx.send(pgres).await { - error!("can not hand back pgres"); - Err(Error::with_msg_no_trace("can not hand back pgres")) - } else { - Ok(res) - } - } else { - error!("can not get pgc"); - Err(Error::with_msg_no_trace("no more pgres")) - } - } - }) - .buffer_unordered(npg); - while let Some(item) = stream.next().await { - match item { - Ok(res) => { - for r in res { - match r.tx.send(Ok(r.series)).await { - Ok(_) => {} - Err(e) => { - // TODO count cases, but no log. Client may no longer be interested in this result. - error!("{e}"); - } - } - } - } - Err(e) => { - error!("{e}"); - tokio::time::sleep(Duration::from_millis(1000)).await; - } - } - } - info!("run_queries done"); - Ok(()) -} - -pub async fn start_task(db: &Database) -> Result, Error> { - let sql = concat!( - "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])", - " as inp (backend, channel, scalar_type, shape_dims, rid))", - " select t.series, q1.rid from series_by_channel t", - " join q1 on t.facility = q1.backend and t.channel = q1.channel", - " and t.scalar_type = q1.scalar_type and t.shape_dims = q1.shape_dims::int[]", - " and t.agg_kind = 0", - " order by q1.rid", - ); - let inp_cap = 128; - let batch_out_cap = 4; - let pgcn = 4; - let timeout = Duration::from_millis(200); - let (pgc_tx, pgc_rx) = prepare_pgcs(sql, pgcn, db).await?; - let (query_tx, query_rx) = async_channel::bounded(inp_cap); - let (batch_rx, _batch_jh) = batcher::batch(inp_cap, timeout, batch_out_cap, query_rx); - let _queries_jh: JoinHandle<_> = tokio::task::spawn(run_queries(pgcn, batch_rx, pgc_rx, pgc_tx)); - Ok(query_tx) -} diff --git a/netfetch/src/bsreadclient.rs b/netfetch/src/bsreadclient.rs index 88b806c..b285364 100644 --- a/netfetch/src/bsreadclient.rs +++ b/netfetch/src/bsreadclient.rs @@ -1,12 +1,8 @@ -use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::bsread::BsreadMessage; use crate::bsread::ChannelDescDecoded; use crate::bsread::HeadB; use crate::bsread::Parser; -use crate::ca::proto::CaDataArrayValue; -use crate::ca::proto::CaDataValue; use crate::ca::IngestCommons; -use crate::series::SeriesId; use crate::zmtp::zmtpproto; use crate::zmtp::zmtpproto::SocketType; use crate::zmtp::zmtpproto::Zmtp; @@ -15,6 +11,7 @@ use crate::zmtp::zmtpproto::ZmtpMessage; use crate::zmtp::ZmtpClientOpts; use crate::zmtp::ZmtpEvent; use async_channel::Sender; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use futures_util::StreamExt; @@ -31,6 +28,7 @@ use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::InsertItem; use scywr::iteminsertqueue::QueryItem; use scywr::session::ScySession; +use series::SeriesId; use stats::CheckEvery; use std::io; use std::net::SocketAddr; diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 16cca12..9dde75b 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -5,7 +5,6 @@ pub mod proto; pub mod search; use crate::ca::connset::CaConnSet; -use crate::errconv::ErrConv; use crate::metrics::ExtraInsertsConf; use crate::rt::TokMx; use err::Error; @@ -19,15 +18,15 @@ use scywr::store::DataStore; use stats::CaConnStatsAgg; use std::net::SocketAddrV4; use std::pin::Pin; -use std::sync::atomic; -use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::task::Poll; use std::time::Duration; use std::time::Instant; use taskrun::tokio; -use tokio_postgres::Client as PgClient; pub static SIGINT: AtomicU32 = AtomicU32::new(0); @@ -145,97 +144,6 @@ where } } -pub async fn find_channel_addr( - backend: String, - name: String, - pgconf: &Database, -) -> Result, Error> { - // TODO also here, provide a db pool. - let d = pgconf; - let (pg_client, pg_conn) = tokio_postgres::connect( - &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), - tokio_postgres::tls::NoTls, - ) - .await - .unwrap(); - // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: - tokio::spawn(async { - pg_conn.await.unwrap(); - info!("drop pg conn after find_channel_addr"); - }); - let pg_client = Arc::new(pg_client); - let qu_find_addr = pg_client - .prepare( - "select t1.facility, t1.channel, t1.addr from ioc_by_channel_log t1 where t1.facility = $1 and t1.channel = $2 and addr is not null order by tsmod desc limit 1", - ) - .await - .err_conv()?; - let rows = pg_client.query(&qu_find_addr, &[&backend, &name]).await.err_conv()?; - if rows.is_empty() { - error!("can not find any addresses of channels {:?}", name); - Err(Error::with_msg_no_trace(format!("no address for channel {}", name))) - } else { - for row in rows { - match row.try_get::<_, &str>(2) { - Ok(addr) => match addr.parse::() { - Ok(addr) => return Ok(Some(addr)), - Err(e) => { - error!("can not parse {e:?}"); - return Err(Error::with_msg_no_trace(format!("no address for channel {}", name))); - } - }, - Err(e) => { - error!("can not find addr for {name} {e:?}"); - } - } - } - Ok(None) - } -} - -#[allow(unused)] -async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> { - let backend: &String = err::todoval(); - // TODO factor the find loop into a separate Stream. - let qu_find_addr = pg_client - .prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel_log t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr is not null order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let mut chns_todo: &[String] = err::todoval(); - let mut chstmp = ["__NONE__"; 8]; - for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) { - *s2 = s1; - } - chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..]; - let rows = pg_client - .query( - &qu_find_addr, - &[ - &backend, &chstmp[0], &chstmp[1], &chstmp[2], &chstmp[3], &chstmp[4], &chstmp[5], &chstmp[6], - &chstmp[7], - ], - ) - .await - .map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?; - for row in rows { - let ch: &str = row.get(1); - let addr: &str = row.get(2); - if addr == "" { - // TODO the address was searched before but could not be found. - } else { - let addr: SocketAddrV4 = match addr.parse() { - Ok(k) => k, - Err(e) => { - error!("can not parse {addr:?} for channel {ch:?} {e:?}"); - continue; - } - }; - let _ = addr; - } - } - Ok(()) -} - fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { crate::ca::SIGINT.store(1, Ordering::Release); let _ = crate::linuxhelper::unset_signal_handler(libc::SIGINT); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index abf9c0f..eb4db27 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -4,15 +4,12 @@ use super::proto::CaMsg; use super::proto::CaMsgTy; use super::proto::CaProto; use super::ExtraInsertsConf; -use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::CreateChan; use crate::ca::proto::EventAdd; -use crate::series::ChannelStatusSeriesId; -use crate::series::Existence; -use crate::series::SeriesId; use crate::timebin::ConnTimeBin; use async_channel::Sender; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use futures_util::stream::FuturesUnordered; use futures_util::Future; @@ -39,6 +36,9 @@ use scywriiq::IvlItem; use scywriiq::MuteItem; use scywriiq::QueryItem; use serde::Serialize; +use series::series::Existence; +use series::ChannelStatusSeriesId; +use series::SeriesId; use stats::CaConnStats; use stats::IntervalEma; use std::collections::BTreeMap; @@ -764,7 +764,7 @@ impl CaConn { ChannelState::Created(series, ..) => { let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: SystemTime::now(), - series: series.into(), + series: series.clone(), status: ChannelStatus::Closed(channel_reason.clone()), }); self.insert_item_queue.push_back(item); @@ -897,7 +897,7 @@ impl CaConn { st.info_store_msp_last = msp; let item = QueryItem::ChannelInfo(ChannelInfoItem { ts_msp: msp, - series: series.into(), + series: series.clone(), ivl: st.item_recv_ivl_ema.ema().ema(), interest: 0., evsize: 0, @@ -1277,7 +1277,7 @@ impl CaConn { st.insert_recv_ivl_last = tsnow; let ema = st.insert_item_ivl_ema.ema(); let item = IvlItem { - series: (&series).into(), + series: series.clone(), ts, ema: ema.ema(), emd: ema.emv().sqrt(), @@ -1287,7 +1287,7 @@ impl CaConn { if false && st.muted_before == 0 { let ema = st.insert_item_ivl_ema.ema(); let item = MuteItem { - series: series.into(), + series: series.clone(), ts, ema: ema.ema(), emd: ema.emv().sqrt(), @@ -1496,7 +1496,7 @@ impl CaConn { match rx.recv().await { Ok(item) => match item { Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)), - Err(e) => Err(e), + Err(e) => Err(Error::with_msg_no_trace(e.to_string())), }, Err(e) => { // TODO count only diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index a5c619c..b8e5965 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -2,15 +2,14 @@ use super::conn::CaConnEvent; use super::conn::ChannelSetOps; use super::conn::ConnCommand; use super::SlowWarnable; -use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::ca::conn::CaConn; use crate::ca::conn::CaConnEventValue; use crate::errconv::ErrConv; use crate::rt::JoinHandle; use crate::rt::TokMx; -use crate::series::ChannelStatusSeriesId; use async_channel::Receiver; use async_channel::Sender; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; @@ -18,6 +17,7 @@ use netpod::log::*; use scywr::iteminsertqueue::CommonInsertItemQueue; use scywr::iteminsertqueue::CommonInsertItemQueueSender; use scywr::store::DataStore; +use series::ChannelStatusSeriesId; use stats::CaConnStats; use std::collections::BTreeMap; use std::collections::VecDeque; diff --git a/netfetch/src/errconv.rs b/netfetch/src/errconv.rs index cf83876..32eaa74 100644 --- a/netfetch/src/errconv.rs +++ b/netfetch/src/errconv.rs @@ -23,12 +23,3 @@ impl ErrConv for Result { } } } - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 1264cfe..0e6a424 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -1,5 +1,3 @@ -pub mod batcher; -pub mod batchquery; pub mod bsread; pub mod bsreadclient; pub mod ca; @@ -12,7 +10,6 @@ pub mod metrics; pub mod netbuf; pub mod patchcollect; pub mod rt; -pub mod series; #[cfg(test)] pub mod test; pub mod timebin; diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs deleted file mode 100644 index b42bdf8..0000000 --- a/netfetch/src/series.rs +++ /dev/null @@ -1,145 +0,0 @@ -use crate::bsread::ChannelDescDecoded; -use crate::errconv::ErrConv; -use err::Error; -use log::*; -use serde::Serialize; -use std::time::{Duration, Instant}; -use taskrun::tokio; -use tokio_postgres::Client as PgClient; - -#[derive(Clone, Debug)] -pub enum Existence { - Created(T), - Existing(T), -} - -impl Existence { - pub fn into_inner(self) -> T { - use Existence::*; - match self { - Created(x) => x, - Existing(x) => x, - } - } -} - -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)] -pub struct SeriesId(u64); - -impl SeriesId { - pub fn new(id: u64) -> Self { - Self(id) - } - - pub fn id(&self) -> u64 { - self.0 - } -} - -impl From<&SeriesId> for scywr::iteminsertqueue::SeriesId { - fn from(value: &SeriesId) -> Self { - Self::new(value.id()) - } -} - -impl From<&mut SeriesId> for scywr::iteminsertqueue::SeriesId { - fn from(value: &mut SeriesId) -> Self { - Self::new(value.id()) - } -} - -impl From for scywr::iteminsertqueue::SeriesId { - fn from(value: SeriesId) -> Self { - Self::new(value.id()) - } -} - -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)] -pub struct ChannelStatusSeriesId(u64); - -impl ChannelStatusSeriesId { - pub fn new(id: u64) -> Self { - Self(id) - } - - pub fn id(&self) -> u64 { - self.0 - } -} - -// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. -pub async fn get_series_id( - pg_client: &PgClient, - cd: &ChannelDescDecoded, - backend: String, -) -> Result, Error> { - let channel_name = &cd.name; - let scalar_type = cd.scalar_type.to_scylla_i32(); - let shape = cd.shape.to_scylla_vec(); - let res = pg_client - .query( - "select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0", - &[&backend, channel_name, &scalar_type, &shape], - ) - .await - .err_conv()?; - let mut all = vec![]; - for row in res { - let series: i64 = row.get(0); - let series = series as u64; - all.push(series); - } - let rn = all.len(); - let tsbeg = Instant::now(); - if rn == 0 { - use md5::Digest; - let mut h = md5::Md5::new(); - h.update(backend.as_bytes()); - h.update(channel_name.as_bytes()); - h.update(format!("{:?}", scalar_type).as_bytes()); - h.update(format!("{:?}", shape).as_bytes()); - for _ in 0..200 { - h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); - let f = h.clone().finalize(); - let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - if series > i64::MAX as u64 { - continue; - } - if series == 0 { - continue; - } - if series <= 0 || series > i64::MAX as u64 { - return Err(Error::with_msg_no_trace(format!( - "attempt to insert bad series id {series}" - ))); - } - let res = pg_client - .execute( - concat!( - "insert into series_by_channel", - " (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " values ($1, $2, $3, $4, $5, 0) on conflict do nothing" - ), - &[&(series as i64), &backend, channel_name, &scalar_type, &shape], - ) - .await - .unwrap(); - if res == 1 { - let series = Existence::Created(SeriesId(series)); - return Ok(series); - } else { - warn!( - "tried to insert {series:?} for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} trying again..." - ); - } - tokio::time::sleep(Duration::from_millis(20)).await; - } - error!("tried to insert new series id for {backend:?} {channel_name:?} {scalar_type:?} {shape:?} but failed"); - Err(Error::with_msg_no_trace(format!("get_series_id can not create and insert series id {backend:?} {channel_name:?} {scalar_type:?} {shape:?}"))) - } else { - let series = all[0] as u64; - let series = Existence::Existing(SeriesId(series)); - debug!("get_series_id {backend:?} {channel_name:?} {scalar_type:?} {shape:?} {series:?}"); - Ok(series) - } -} diff --git a/netfetch/src/timebin.rs b/netfetch/src/timebin.rs index d0c9d34..ae3580f 100644 --- a/netfetch/src/timebin.rs +++ b/netfetch/src/timebin.rs @@ -2,7 +2,6 @@ use crate::ca::proto; use crate::ca::proto::CaDataValue; use crate::ca::proto::CaEventValue; use crate::patchcollect::PatchCollect; -use crate::series::SeriesId; use err::Error; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinner; @@ -21,6 +20,7 @@ use netpod::Shape; use netpod::TsNano; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinPatchSimpleF32; +use series::SeriesId; use std::any; use std::any::Any; use std::collections::VecDeque; @@ -187,8 +187,7 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque Self { - Self(id) - } - - pub fn id(&self) -> u64 { - self.0 - } -} - #[derive(Clone, Debug)] pub enum ScalarValue { I8(i8), diff --git a/series/Cargo.toml b/series/Cargo.toml new file mode 100644 index 0000000..55cbdb2 --- /dev/null +++ b/series/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "series" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +log = { path = "../log" } +serde = { version = "1.0", features = ["derive"] } diff --git a/series/src/lib.rs b/series/src/lib.rs new file mode 100644 index 0000000..d82fc15 --- /dev/null +++ b/series/src/lib.rs @@ -0,0 +1,4 @@ +pub mod series; + +pub use series::ChannelStatusSeriesId; +pub use series::SeriesId; diff --git a/series/src/series.rs b/series/src/series.rs new file mode 100644 index 0000000..eae62bb --- /dev/null +++ b/series/src/series.rs @@ -0,0 +1,44 @@ +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Debug)] +pub enum Existence { + Created(T), + Existing(T), +} + +impl Existence { + pub fn into_inner(self) -> T { + use Existence::*; + match self { + Created(x) => x, + Existing(x) => x, + } + } +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub struct SeriesId(u64); + +impl SeriesId { + pub fn new(id: u64) -> Self { + Self(id) + } + + pub fn id(&self) -> u64 { + self.0 + } +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)] +pub struct ChannelStatusSeriesId(u64); + +impl ChannelStatusSeriesId { + pub fn new(id: u64) -> Self { + Self(id) + } + + pub fn id(&self) -> u64 { + self.0 + } +}