Support multiple sources

This commit is contained in:
Dominik Werder
2022-04-08 18:00:22 +02:00
parent 0d73fe972a
commit a14c05e925
2 changed files with 97 additions and 50 deletions

View File

@@ -25,7 +25,7 @@ pub struct Bsread {
#[clap(long)]
pub scylla: Vec<String>,
#[clap(long)]
pub source: String,
pub source: Vec<String>,
#[clap(long)]
pub rcvbuf: Option<usize>,
#[clap(long)]
@@ -38,7 +38,7 @@ impl From<Bsread> for ZmtpClientOpts {
fn from(k: Bsread) -> Self {
Self {
scylla: k.scylla,
addr: k.source,
sources: k.source,
rcvbuf: k.rcvbuf,
array_truncate: k.array_truncate,
do_pulse_id: k.do_pulse_id,

View File

@@ -6,8 +6,8 @@ use async_channel::{Receiver, Sender};
#[allow(unused)]
use bytes::BufMut;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use futures_core::{Future, Stream};
use futures_util::{pin_mut, FutureExt, StreamExt};
use log::*;
use netpod::timeunits::*;
use scylla::batch::{Batch, BatchType, Consistency};
@@ -86,17 +86,42 @@ pub struct CommonQueries {
pub qu_insert_array_f64: PreparedStatement,
}
#[derive(Clone)]
pub struct ZmtpClientOpts {
pub scylla: Vec<String>,
pub addr: String,
pub sources: Vec<String>,
pub do_pulse_id: bool,
pub rcvbuf: Option<usize>,
pub array_truncate: Option<usize>,
}
struct ClientRun {
#[allow(unused)]
client: Pin<Box<BsreadClient>>,
fut: Pin<Box<dyn Future<Output = Result<(), Error>>>>,
}
impl ClientRun {
fn new(client: BsreadClient) -> Self {
let mut client = Box::pin(client);
let client2 = unsafe { &mut *(&mut client as &mut _ as *mut _) } as &mut BsreadClient;
let fut = client2.run();
let fut = Box::pin(fut) as _;
Self { client, fut }
}
}
impl Future for ClientRun {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.fut.poll_unpin(cx)
}
}
struct BsreadClient {
opts: ZmtpClientOpts,
addr: String,
source_addr: String,
do_pulse_id: bool,
rcvbuf: Option<usize>,
tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>,
@@ -106,59 +131,27 @@ struct BsreadClient {
}
impl BsreadClient {
pub async fn new(opts: ZmtpClientOpts) -> Result<Self, Error> {
let scy = SessionBuilder::new().default_consistency(Consistency::Quorum);
let mut scy = scy;
for a in &opts.scylla {
scy = scy.known_node(a);
}
let scy = scy
.use_keyspace("ks1", false)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu1 = scy
.prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu2 = scy
.prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f64 = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f64 = scy
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let common_queries = CommonQueries {
qu1,
qu2,
qu_insert_ts_msp,
qu_insert_scalar_f64,
qu_insert_array_f64,
};
pub async fn new(
opts: ZmtpClientOpts,
source_addr: String,
scy: Arc<ScySession>,
common_queries: Arc<CommonQueries>,
) -> Result<Self, Error> {
let ret = Self {
addr: opts.addr.clone(),
source_addr,
do_pulse_id: opts.do_pulse_id,
rcvbuf: opts.rcvbuf,
opts,
tmp_vals_pulse_map: vec![],
scy: Arc::new(scy),
scy,
channel_writers: Default::default(),
common_queries: Arc::new(common_queries),
common_queries,
};
Ok(ret)
}
pub async fn run(&mut self) -> Result<(), Error> {
let mut conn = tokio::net::TcpStream::connect(&self.addr).await?;
let mut conn = tokio::net::TcpStream::connect(&self.source_addr).await?;
if let Some(v) = self.rcvbuf {
set_rcv_sock_opts(&mut conn, v as u32)?;
}
@@ -293,6 +286,14 @@ impl BsreadClient {
fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> {
let series = get_series_id(chn);
let has_comp = match &chn.compression {
Some(s) => s != "none",
None => false,
};
if has_comp {
warn!("Compression not yet supported [{}]", chn.name);
return Ok(());
}
match chn.ty.as_str() {
"float64" => match &chn.shape {
JsVal::Array(a) => {
@@ -383,8 +384,54 @@ impl BsreadClient {
}
pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
let mut client = BsreadClient::new(opts).await?;
client.run().await
let scy = SessionBuilder::new().default_consistency(Consistency::Quorum);
let mut scy = scy;
for a in &opts.scylla {
scy = scy.known_node(a);
}
let scy = scy
.use_keyspace("ks1", false)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
let qu1 = scy
.prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu2 = scy
.prepare("insert into pulse (pulse_a, pulse_b, ts_a, ts_b) values (?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f64 = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f64 = scy
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let common_queries = CommonQueries {
qu1,
qu2,
qu_insert_ts_msp,
qu_insert_scalar_f64,
qu_insert_array_f64,
};
let common_queries = Arc::new(common_queries);
let mut clients = vec![];
for source_addr in &opts.sources {
let client = BsreadClient::new(opts.clone(), source_addr.into(), scy.clone(), common_queries.clone()).await?;
let fut = ClientRun::new(client);
//let client = Box::pin(client) as Pin<Box<dyn Future<Output = Result<(), Error>>>>;
clients.push(fut);
}
futures_util::future::join_all(clients).await;
Ok(())
}
fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {