Update dependencies, refactor error type

This commit is contained in:
Dominik Werder
2023-08-24 10:23:39 +02:00
parent 6c401fc9f3
commit 5110337fad
12 changed files with 1432 additions and 372 deletions
+3 -1
View File
@@ -18,6 +18,8 @@ use crate::zmtp::zmtpproto::ZmtpMessage;
use crate::zmtp::ZmtpClientOpts;
use crate::zmtp::ZmtpEvent;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::HOUR;
@@ -34,7 +36,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
#[derive(Debug, thiserror::Error)]
#[derive(Debug, ThisError)]
pub enum Error {
#[error("InsertQueueSenderMissing")]
InsertQueueSenderMissing,
+3 -1
View File
@@ -1,5 +1,7 @@
use crate::netbuf;
use crate::netbuf::NetBuf;
use err::thiserror;
use err::ThisError;
use futures_util::pin_mut;
use futures_util::Stream;
use log::*;
@@ -18,7 +20,7 @@ use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;
use tokio::net::TcpStream;
#[derive(Debug, thiserror::Error)]
#[derive(Debug, ThisError)]
pub enum Error {
#[error("{0}")]
NetBuf(#[from] netbuf::Error),
+32 -46
View File
@@ -1,12 +1,20 @@
use err::Error;
use futures_util::StreamExt;
use err::thiserror;
use err::ThisError;
use netpod::ScyllaConfig;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::prepared_statement::PreparedStatement;
use scylla::statement::Consistency;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
use scylla::Session as ScySession;
use std::sync::Arc;
#[derive(Debug, ThisError)]
pub enum Error {
NewSessionError(#[from] NewSessionError),
QueryError(#[from] QueryError),
}
pub struct DataStore {
pub scy: Arc<ScySession>,
pub qu_insert_ts_msp: Arc<PreparedStatement>,
@@ -44,53 +52,44 @@ impl DataStore {
.into_handle(),
)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let scy = Arc::new(scy);
let q = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_ts_msp = Arc::new(q);
let q = scy
.prepare(
"insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?) using ttl ?",
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await ?;
let qu_insert_series_by_ts_msp = Arc::new(q);
// scalar:
let q = scy
.prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_scalar_i8 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_scalar_i16 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_scalar_i32 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_scalar_f32 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_scalar_f64 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_scalar_string = Arc::new(q);
// array
@@ -98,73 +97,60 @@ impl DataStore {
.prepare(
"insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?",
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_array_i8 = Arc::new(q);
let q = scy
.prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_array_i16 = Arc::new(q);
let q = scy
.prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_array_i32 = Arc::new(q);
let q = scy
.prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_array_f32 = Arc::new(q);
let q = scy
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_array_f64 = Arc::new(q);
let q = scy
.prepare("insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_array_bool = Arc::new(q);
// Others:
let q = scy
.prepare("insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_muted = Arc::new(q);
let q = scy
.prepare("insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_item_recv_ivl = Arc::new(q);
// Connection status:
let q = scy
.prepare("insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_connection_status = Arc::new(q);
let q = scy
.prepare("insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_channel_status = Arc::new(q);
let q = scy
.prepare(
"insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?) using ttl ?",
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_channel_status_by_ts_msp = Arc::new(q);
let q = scy
.prepare("insert into channel_ping (part, ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_channel_ping = Arc::new(q);
let q = scy
.prepare("insert into binned_scalar_f32_v01 (series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs) values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.await?;
let qu_insert_binned_scalar_f32_v01 = Arc::new(q);
let ret = Self {
scy,
-1
View File
@@ -43,7 +43,6 @@ impl DaemonEvent {
match &b.value {
None => format!("CaConnEvent/None"),
EchoTimeout => format!("CaConnEvent/EchoTimeout"),
HealthCheckDone => format!("CaConnEvent/HealthCheckDone"),
ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"),
EndOfStream => format!("CaConnEvent/EndOfStream"),
}
+1 -1
View File
@@ -126,7 +126,7 @@ pub async fn spawn_scylla_insert_workers(
let mut jhs = Vec::new();
let mut data_stores = Vec::new();
for _ in 0..insert_scylla_sessions {
let data_store = Arc::new(DataStore::new(&scyconf).await?);
let data_store = Arc::new(DataStore::new(&scyconf).await.map_err(|e| Error::from(e.to_string()))?);
data_stores.push(data_store);
}
for worker_ix in 0..insert_worker_count {
+3 -1
View File
@@ -10,6 +10,8 @@ use crate::zmtp::zmtpproto::SocketType;
use crate::zmtp::zmtpproto::Zmtp;
#[allow(unused)]
use bytes::BufMut;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::StreamExt;
@@ -22,7 +24,7 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[derive(Debug, thiserror::Error)]
#[derive(Debug, ThisError)]
pub enum Error {
#[error("Msg({0})")]
Msg(String),
+3 -1
View File
@@ -7,6 +7,7 @@ use crate::zmtp::ZmtpEvent;
use async_channel::Receiver;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use futures_util::pin_mut;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -25,7 +26,7 @@ use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;
use tokio::net::TcpStream;
#[derive(Debug, thiserror::Error)]
#[derive(Debug, ThisError)]
pub enum Error {
#[error("bad")]
Bad,
@@ -59,6 +60,7 @@ enum ConnState {
ReadFrameShort,
ReadFrameLong,
ReadFrameBody(usize),
#[allow(unused)]
LockScan(usize),
}