diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 88cc02e..0bb5f16 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -660,7 +660,7 @@ dependencies = [ [[package]] name = "daqingest" -version = "0.1.4" +version = "0.1.5" dependencies = [ "async-channel", "bytes", @@ -676,7 +676,6 @@ dependencies = [ "serde", "stats", "taskrun", - "tokio", "tokio-postgres", "tracing", ] @@ -1658,7 +1657,6 @@ dependencies = [ "stats", "streams", "taskrun", - "tokio", "tokio-postgres", "tokio-stream", "tracing", diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index f54004a..e54ca81 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.1.4" +version = "0.1.5" authors = ["Dominik Werder "] edition = "2021" @@ -12,8 +12,7 @@ name = "daqingest" path = "src/bin/daqingest.rs" [dependencies] -clap = { version = "4.0.22", features = ["derive", "cargo"] } -tokio = { version = "1.32.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } +clap = { version = "4.3.24", features = ["derive", "cargo"] } tracing = "0.1.37" futures-util = "0.3" async-channel = "1.9.0" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 8694f5b..e7b5bd8 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -5,6 +5,7 @@ use log::*; use netfetch::conf::parse_config; pub fn main() -> Result<(), Error> { + println!("daqingest fn main"); let opts = DaqIngestOpts::parse(); // TODO offer again function to get runtime and configure tracing in one call let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32); diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 740d802..a308c3f 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -45,6 +45,7 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; +use taskrun::tokio; use tokio_postgres::Client as PgClient; use tokio_postgres::Row as PgRow; use tracing::info_span; @@ -293,7 +294,9 @@ pub struct Daemon { impl Daemon { pub async fn new(opts: DaemonOpts) -> Result { let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?); - let datastore = DataStore::new(&opts.scyconf).await?; + let datastore = DataStore::new(&opts.scyconf) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let datastore = Arc::new(datastore); let (tx, rx) = async_channel::bounded(32); let pgcs = { diff --git a/daqingest/src/query.rs b/daqingest/src/query.rs index 511b93b..5bf061d 100644 --- a/daqingest/src/query.rs +++ b/daqingest/src/query.rs @@ -1,7 +1,7 @@ use crate::opts::FetchEvents; use log::*; -use scylla::batch::Consistency; use scylla::execution_profile::ExecutionProfileBuilder; +use scylla::statement::Consistency; use scylla::transport::errors::NewSessionError; use scylla::transport::errors::QueryError; use scylla::Session; diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index fe540cc..e29dddd 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -9,7 +9,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11" serde_yaml = "0.9.16" -tokio = { version = "1.32.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } tokio-stream = { version = "0.1", features = ["fs"] } tracing = "0.1.37" async-channel = "1.9.0" diff --git a/netfetch/src/batcher.rs b/netfetch/src/batcher.rs index 21b18c0..9488c49 100644 --- a/netfetch/src/batcher.rs +++ b/netfetch/src/batcher.rs @@ -1,6 +1,7 @@ use async_channel::Receiver; use netpod::log::*; use std::time::Duration; +use taskrun::tokio; pub fn batch( batch_limit: usize, diff --git a/netfetch/src/batchquery/series_by_channel.rs b/netfetch/src/batchquery/series_by_channel.rs index 87cf59e..7fd1b8c 100644 --- a/netfetch/src/batchquery/series_by_channel.rs +++ b/netfetch/src/batchquery/series_by_channel.rs @@ -12,6 +12,7 @@ use netpod::log::*; use netpod::Database; use std::time::Duration; use std::time::Instant; +use taskrun::tokio; use tokio::task::JoinHandle; use tokio_postgres::Client as PgClient; use tokio_postgres::Statement as PgStatement; diff --git a/netfetch/src/bsreadclient.rs b/netfetch/src/bsreadclient.rs index 238d0a7..0edd8cd 100644 --- a/netfetch/src/bsreadclient.rs +++ b/netfetch/src/bsreadclient.rs @@ -35,6 +35,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use taskrun::tokio; #[derive(Debug, ThisError)] pub enum Error { diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 7e7a4ab..1efba37 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -26,6 +26,7 @@ use std::sync::Mutex; use std::task::Poll; use std::time::Duration; use std::time::Instant; +use taskrun::tokio; use tokio_postgres::Client as PgClient; pub static SIGINT: AtomicU32 = AtomicU32::new(0); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 0517781..0926528 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -55,6 +55,7 @@ use std::task::Poll; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; +use taskrun::tokio; use tokio::net::TcpStream; #[derive(Clone, Debug, Serialize)] diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 45f984a..e17f9c8 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -25,6 +25,7 @@ use std::net::SocketAddrV4; use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use taskrun::tokio; use tracing::info_span; use tracing::Instrument; diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 28bffe4..ddfeaa3 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -1,14 +1,24 @@ -use crate::ca::proto::{CaMsg, CaMsgTy, HeadInfo}; +use crate::ca::proto::CaMsg; +use crate::ca::proto::CaMsgTy; +use crate::ca::proto::HeadInfo; use err::Error; -use futures_util::{Future, FutureExt, Stream}; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; use libc::c_int; use log::*; -use std::collections::{BTreeMap, VecDeque}; -use std::net::{Ipv4Addr, SocketAddrV4}; +use std::collections::BTreeMap; +use std::collections::VecDeque; +use std::net::Ipv4Addr; +use std::net::SocketAddrV4; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; +use taskrun::tokio; use tokio::io::unix::AsyncFd; struct SockBox(c_int); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index b8e63ba..efff650 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -15,6 +15,7 @@ use std::num::NonZeroU64; use std::pin::Pin; use std::task::Context; use std::task::Poll; +use taskrun::tokio; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::ReadBuf; diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 817b11e..f1b272e 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -11,6 +11,7 @@ use std::net::IpAddr; use std::net::SocketAddr; use std::time::Duration; use std::time::Instant; +use taskrun::tokio; use tokio::task::JoinHandle; use tokio_postgres::Client as PgClient; diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 444ef2d..5250856 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -59,98 +59,108 @@ impl DataStore { .prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?") .await?; let qu_insert_ts_msp = Arc::new(q); - let q = scy - .prepare( - "insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?) using ttl ?", - ) - .await ?; + + let cql = "insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_series_by_ts_msp = Arc::new(q); // scalar: - let q = scy - .prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + let cql = + "insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_scalar_i8 = Arc::new(q); - let q = scy - .prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_scalar_i16 = Arc::new(q); - let q = scy - .prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_scalar_i32 = Arc::new(q); - let q = scy - .prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_scalar_f32 = Arc::new(q); - let q = scy - .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_scalar_f64 = Arc::new(q); - let q = scy - .prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql="insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_scalar_string = Arc::new(q); // array - let q = scy - .prepare( - "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?", - ) - .await?; + let cql = + "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_array_i8 = Arc::new(q); - let q = scy - .prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_array_i16 = Arc::new(q); - let q = scy - .prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_array_i32 = Arc::new(q); - let q = scy - .prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_array_f32 = Arc::new(q); - let q = scy - .prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_array_f64 = Arc::new(q); - let q = scy - .prepare("insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = + "insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_array_bool = Arc::new(q); + // Others: - let q = scy - .prepare("insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + let cql = "insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_muted = Arc::new(q); - let q = scy - .prepare("insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = "insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_item_recv_ivl = Arc::new(q); + // Connection status: - let q = scy - .prepare("insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?) using ttl ?") - .await?; + let cql = "insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_connection_status = Arc::new(q); - let q = scy - .prepare("insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?) using ttl ?") - .await?; + + let cql = "insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_channel_status = Arc::new(q); - let q = scy - .prepare( - "insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?) using ttl ?", - ) - .await?; + + let cql = "insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?) using ttl ?"; + let q = scy.prepare(cql).await?; let qu_insert_channel_status_by_ts_msp = Arc::new(q); - let q = scy - .prepare("insert into channel_ping (part, ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?, ?) using ttl ?") - .await?; + + let cql = concat!( + "insert into channel_ping (", + "part, ts_msp, series, ivl, interest, evsize", + ") values (?, ?, ?, ?, ?, ?) using ttl ?" + ); + let q = scy.prepare(cql).await?; let qu_insert_channel_ping = Arc::new(q); - let q = scy - .prepare("insert into binned_scalar_f32_v01 (series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs) values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?") - .await?; + let cql = concat!( + "insert into binned_scalar_f32_v01 (", + "series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs)", + " values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?" + ); + let q = scy.prepare(cql).await?; let qu_insert_binned_scalar_f32_v01 = Arc::new(q); let ret = Self { scy, diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index b728a0a..537ba49 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -7,6 +7,7 @@ use serde::Deserialize; use serde::Serialize; use std::path::PathBuf; use std::time::Duration; +use taskrun::tokio; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; diff --git a/netfetch/src/dbpg.rs b/netfetch/src/dbpg.rs index 22ea5e6..c393797 100644 --- a/netfetch/src/dbpg.rs +++ b/netfetch/src/dbpg.rs @@ -2,6 +2,7 @@ use crate::errconv::ErrConv; use err::Error; use netpod::log::*; use netpod::Database; +use taskrun::tokio; use tokio_postgres::Client as PgClient; pub async fn make_pg_client(d: &Database) -> Result { diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index b4df9ca..8ebf1a7 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -1,14 +1,20 @@ use crate::ca::store::DataStore; use crate::ca::IngestCommons; use crate::rt::JoinHandle; -use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem}; +use crate::store::CommonInsertItemQueue; +use crate::store::IntoSimplerError; +use crate::store::QueryItem; use err::Error; use log::*; -use netpod::timeunits::{MS, SEC}; +use netpod::timeunits::MS; +use netpod::timeunits::SEC; use netpod::ScyllaConfig; -use std::sync::atomic::{self, Ordering}; +use std::sync::atomic; +use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; +use taskrun::tokio; use tokio_postgres::Client as PgClient; fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) { diff --git a/netfetch/src/linuxhelper.rs b/netfetch/src/linuxhelper.rs index a2a5d3b..9a5f4a6 100644 --- a/netfetch/src/linuxhelper.rs +++ b/netfetch/src/linuxhelper.rs @@ -2,6 +2,7 @@ use err::Error; use log::*; use std::ffi::CStr; use std::mem::MaybeUninit; +use taskrun::tokio; use tokio::net::TcpStream; pub fn local_hostname() -> String { diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index c164145..35a1097 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -17,6 +17,7 @@ use std::net::SocketAddrV4; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +use taskrun::tokio; pub struct StatsSet { daemon: Arc, diff --git a/netfetch/src/netbuf.rs b/netfetch/src/netbuf.rs index 509b810..0a2e164 100644 --- a/netfetch/src/netbuf.rs +++ b/netfetch/src/netbuf.rs @@ -1,6 +1,7 @@ use err::thiserror; use std::array::TryFromSliceError; use std::mem; +use taskrun::tokio; use tokio::io::ReadBuf; #[derive(Debug, thiserror::Error)] diff --git a/netfetch/src/rt.rs b/netfetch/src/rt.rs index df3ff9e..6fa8a57 100644 --- a/netfetch/src/rt.rs +++ b/netfetch/src/rt.rs @@ -1,3 +1,4 @@ +use taskrun::tokio; pub use tokio::sync::Mutex as TokMx; pub use tokio::task::JoinHandle; pub use tokio::time::sleep; diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 49bc6b7..163a29d 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -4,6 +4,7 @@ use err::Error; use log::*; use serde::Serialize; use std::time::{Duration, Instant}; +use taskrun::tokio; use tokio_postgres::Client as PgClient; #[derive(Clone, Debug)] diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 18c35c9..ee87e08 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -23,6 +23,7 @@ use std::net::SocketAddr; use std::pin::Pin; use std::task::Context; use std::task::Poll; +use taskrun::tokio; #[derive(Debug, ThisError)] pub enum Error { diff --git a/netfetch/src/zmtp/dumper.rs b/netfetch/src/zmtp/dumper.rs index ff1a0bd..50cdf74 100644 --- a/netfetch/src/zmtp/dumper.rs +++ b/netfetch/src/zmtp/dumper.rs @@ -10,6 +10,7 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::timeunits::SEC; use std::io; +use taskrun::tokio; #[derive(Debug, thiserror::Error)] pub enum Error { diff --git a/netfetch/src/zmtp/zmtpproto.rs b/netfetch/src/zmtp/zmtpproto.rs index 3c9e2bd..7fb05a0 100644 --- a/netfetch/src/zmtp/zmtpproto.rs +++ b/netfetch/src/zmtp/zmtpproto.rs @@ -21,6 +21,7 @@ use std::pin::Pin; use std::string::FromUtf8Error; use std::task::Context; use std::task::Poll; +use taskrun::tokio; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::ReadBuf;