Refactor imports

This commit is contained in:
Dominik Werder
2023-08-24 13:13:44 +02:00
parent 5110337fad
commit 26ee621a84
27 changed files with 130 additions and 86 deletions
+1 -3
View File
@@ -660,7 +660,7 @@ dependencies = [
[[package]]
name = "daqingest"
version = "0.1.4"
version = "0.1.5"
dependencies = [
"async-channel",
"bytes",
@@ -676,7 +676,6 @@ dependencies = [
"serde",
"stats",
"taskrun",
"tokio",
"tokio-postgres",
"tracing",
]
@@ -1658,7 +1657,6 @@ dependencies = [
"stats",
"streams",
"taskrun",
"tokio",
"tokio-postgres",
"tokio-stream",
"tracing",
+2 -3
View File
@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.1.4"
version = "0.1.5"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -12,8 +12,7 @@ name = "daqingest"
path = "src/bin/daqingest.rs"
[dependencies]
clap = { version = "4.0.22", features = ["derive", "cargo"] }
tokio = { version = "1.32.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
clap = { version = "4.3.24", features = ["derive", "cargo"] }
tracing = "0.1.37"
futures-util = "0.3"
async-channel = "1.9.0"
+1
View File
@@ -5,6 +5,7 @@ use log::*;
use netfetch::conf::parse_config;
pub fn main() -> Result<(), Error> {
println!("daqingest fn main");
let opts = DaqIngestOpts::parse();
// TODO offer again function to get runtime and configure tracing in one call
let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32);
+4 -1
View File
@@ -45,6 +45,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
use tokio_postgres::Client as PgClient;
use tokio_postgres::Row as PgRow;
use tracing::info_span;
@@ -293,7 +294,9 @@ pub struct Daemon {
impl Daemon {
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?);
let datastore = DataStore::new(&opts.scyconf).await?;
let datastore = DataStore::new(&opts.scyconf)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let datastore = Arc::new(datastore);
let (tx, rx) = async_channel::bounded(32);
let pgcs = {
+1 -1
View File
@@ -1,7 +1,7 @@
use crate::opts::FetchEvents;
use log::*;
use scylla::batch::Consistency;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::statement::Consistency;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
use scylla::Session;
-1
View File
@@ -9,7 +9,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11"
serde_yaml = "0.9.16"
tokio = { version = "1.32.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs"] }
tracing = "0.1.37"
async-channel = "1.9.0"
+1
View File
@@ -1,6 +1,7 @@
use async_channel::Receiver;
use netpod::log::*;
use std::time::Duration;
use taskrun::tokio;
pub fn batch<T>(
batch_limit: usize,
@@ -12,6 +12,7 @@ use netpod::log::*;
use netpod::Database;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
use tokio_postgres::Client as PgClient;
use tokio_postgres::Statement as PgStatement;
+1
View File
@@ -35,6 +35,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
#[derive(Debug, ThisError)]
pub enum Error {
+1
View File
@@ -26,6 +26,7 @@ use std::sync::Mutex;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio_postgres::Client as PgClient;
pub static SIGINT: AtomicU32 = AtomicU32::new(0);
+1
View File
@@ -55,6 +55,7 @@ use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
use tokio::net::TcpStream;
#[derive(Clone, Debug, Serialize)]
+1
View File
@@ -25,6 +25,7 @@ use std::net::SocketAddrV4;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tracing::info_span;
use tracing::Instrument;
+17 -7
View File
@@ -1,14 +1,24 @@
use crate::ca::proto::{CaMsg, CaMsgTy, HeadInfo};
use crate::ca::proto::CaMsg;
use crate::ca::proto::CaMsgTy;
use crate::ca::proto::HeadInfo;
use err::Error;
use futures_util::{Future, FutureExt, Stream};
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use libc::c_int;
use log::*;
use std::collections::{BTreeMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::Ipv4Addr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::io::unix::AsyncFd;
struct SockBox(c_int);
+1
View File
@@ -15,6 +15,7 @@ use std::num::NonZeroU64;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use taskrun::tokio;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;
+1
View File
@@ -11,6 +11,7 @@ use std::net::IpAddr;
use std::net::SocketAddr;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
use tokio_postgres::Client as PgClient;
+76 -66
View File
@@ -59,98 +59,108 @@ impl DataStore {
.prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?")
.await?;
let qu_insert_ts_msp = Arc::new(q);
let q = scy
.prepare(
"insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?) using ttl ?",
)
.await ?;
let cql = "insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_series_by_ts_msp = Arc::new(q);
// scalar:
let q = scy
.prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_i8 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_i16 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_i32 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_f32 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_f64 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql="insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_scalar_string = Arc::new(q);
// array
let q = scy
.prepare(
"insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?",
)
.await?;
let cql =
"insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_array_i8 = Arc::new(q);
let q = scy
.prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_array_i16 = Arc::new(q);
let q = scy
.prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_array_i32 = Arc::new(q);
let q = scy
.prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_array_f32 = Arc::new(q);
let q = scy
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_array_f64 = Arc::new(q);
let q = scy
.prepare("insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql =
"insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_array_bool = Arc::new(q);
// Others:
let q = scy
.prepare("insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql = "insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_muted = Arc::new(q);
let q = scy
.prepare("insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql = "insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_item_recv_ivl = Arc::new(q);
// Connection status:
let q = scy
.prepare("insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?) using ttl ?")
.await?;
let cql = "insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_connection_status = Arc::new(q);
let q = scy
.prepare("insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?) using ttl ?")
.await?;
let cql = "insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_channel_status = Arc::new(q);
let q = scy
.prepare(
"insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?) using ttl ?",
)
.await?;
let cql = "insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?) using ttl ?";
let q = scy.prepare(cql).await?;
let qu_insert_channel_status_by_ts_msp = Arc::new(q);
let q = scy
.prepare("insert into channel_ping (part, ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql = concat!(
"insert into channel_ping (",
"part, ts_msp, series, ivl, interest, evsize",
") values (?, ?, ?, ?, ?, ?) using ttl ?"
);
let q = scy.prepare(cql).await?;
let qu_insert_channel_ping = Arc::new(q);
let q = scy
.prepare("insert into binned_scalar_f32_v01 (series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs) values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?")
.await?;
let cql = concat!(
"insert into binned_scalar_f32_v01 (",
"series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs)",
" values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?"
);
let q = scy.prepare(cql).await?;
let qu_insert_binned_scalar_f32_v01 = Arc::new(q);
let ret = Self {
scy,
+1
View File
@@ -7,6 +7,7 @@ use serde::Deserialize;
use serde::Serialize;
use std::path::PathBuf;
use std::time::Duration;
use taskrun::tokio;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
+1
View File
@@ -2,6 +2,7 @@ use crate::errconv::ErrConv;
use err::Error;
use netpod::log::*;
use netpod::Database;
use taskrun::tokio;
use tokio_postgres::Client as PgClient;
pub async fn make_pg_client(d: &Database) -> Result<PgClient, Error> {
+10 -4
View File
@@ -1,14 +1,20 @@
use crate::ca::store::DataStore;
use crate::ca::IngestCommons;
use crate::rt::JoinHandle;
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
use crate::store::CommonInsertItemQueue;
use crate::store::IntoSimplerError;
use crate::store::QueryItem;
use err::Error;
use log::*;
use netpod::timeunits::{MS, SEC};
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::ScyllaConfig;
use std::sync::atomic::{self, Ordering};
use std::sync::atomic;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio_postgres::Client as PgClient;
fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) {
+1
View File
@@ -2,6 +2,7 @@ use err::Error;
use log::*;
use std::ffi::CStr;
use std::mem::MaybeUninit;
use taskrun::tokio;
use tokio::net::TcpStream;
pub fn local_hostname() -> String {
+1
View File
@@ -17,6 +17,7 @@ use std::net::SocketAddrV4;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use taskrun::tokio;
pub struct StatsSet {
daemon: Arc<DaemonStats>,
+1
View File
@@ -1,6 +1,7 @@
use err::thiserror;
use std::array::TryFromSliceError;
use std::mem;
use taskrun::tokio;
use tokio::io::ReadBuf;
#[derive(Debug, thiserror::Error)]
+1
View File
@@ -1,3 +1,4 @@
use taskrun::tokio;
pub use tokio::sync::Mutex as TokMx;
pub use tokio::task::JoinHandle;
pub use tokio::time::sleep;
+1
View File
@@ -4,6 +4,7 @@ use err::Error;
use log::*;
use serde::Serialize;
use std::time::{Duration, Instant};
use taskrun::tokio;
use tokio_postgres::Client as PgClient;
#[derive(Clone, Debug)]
+1
View File
@@ -23,6 +23,7 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use taskrun::tokio;
#[derive(Debug, ThisError)]
pub enum Error {
+1
View File
@@ -10,6 +10,7 @@ use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::SEC;
use std::io;
use taskrun::tokio;
#[derive(Debug, thiserror::Error)]
pub enum Error {
+1
View File
@@ -21,6 +21,7 @@ use std::pin::Pin;
use std::string::FromUtf8Error;
use std::task::Context;
use std::task::Poll;
use taskrun::tokio;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;