WIP before remove of channelwriter

This commit is contained in:
Dominik Werder
2023-08-24 20:42:50 +02:00
parent 26ee621a84
commit 9b03cfe666
20 changed files with 385 additions and 300 deletions

View File

@@ -2,7 +2,7 @@
rustflags = [
#"-C", "target-cpu=native",
"-C", "target-cpu=sandybridge",
#"-C", "force-frame-pointers=yes",
"-C", "force-frame-pointers=yes",
#"-C", "force-unwind-tables=yes",
#"-C", "relocation-model=static",
#"-C", "embed-bitcode=no",

View File

@@ -2,8 +2,8 @@
members = ["log", "netfetch", "daqingest"]
[profile.release]
opt-level = 2
debug = 1
opt-level = 3
debug = 0
overflow-checks = false
debug-assertions = false
lto = "thin"

View File

@@ -1,16 +1,9 @@
[package]
name = "daqingest"
version = "0.1.5"
version = "0.2.0-alpha.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/daqingest.rs"
[[bin]]
name = "daqingest"
path = "src/bin/daqingest.rs"
[dependencies]
clap = { version = "4.3.24", features = ["derive", "cargo"] }
tracing = "0.1.37"
@@ -19,7 +12,7 @@ async-channel = "1.9.0"
chrono = "0.4"
bytes = "1.4.0"
scylla = "0.9.0"
tokio-postgres = "0.7.7"
tokio-postgres = "0.7.9"
serde = { version = "1.0", features = ["derive"] }
libc = "0.2"
err = { path = "../../daqbuffer/crates/err" }

View File

@@ -5,11 +5,13 @@ use log::*;
use netfetch::conf::parse_config;
pub fn main() -> Result<(), Error> {
println!("daqingest fn main");
let opts = DaqIngestOpts::parse();
// TODO offer again function to get runtime and configure tracing in one call
let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32);
taskrun::tracing_init().unwrap();
match taskrun::tracing_init() {
Ok(()) => {}
Err(()) => return Err(Error::with_msg_no_trace("tracing init failed")),
}
let res = runtime.block_on(async move {
use daqingest::opts::ChannelAccess;
use daqingest::opts::SubCmd;

View File

@@ -16,7 +16,6 @@ bytes = "1.4"
arrayref = "0.3"
byteorder = "1.4"
futures-util = "0.3"
scylla = "0.9.0"
tokio-postgres = "0.7.8"
md-5 = "0.10"
hex = "0.4"
@@ -33,6 +32,7 @@ pin-project = "1"
lazy_static = "1"
log = { path = "../log" }
stats = { path = "../stats" }
scywr = { path = "../scywr" }
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }
items_0 = { path = "../../daqbuffer/crates/items_0" }

View File

@@ -163,7 +163,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(),
gw_addrs.push(addr);
}
Err(e) => {
error!("can not resolve {s} {e}");
warn!("can not resolve {s} {e}");
}
}
}

View File

@@ -8,14 +8,6 @@ use netpod::timeunits::SEC;
use netpod::ByteOrder;
use netpod::ScalarType;
use netpod::Shape;
use scylla::batch::Batch;
use scylla::batch::BatchType;
use scylla::frame::value::BatchValues;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use scylla::Session as ScySession;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
@@ -24,236 +16,6 @@ use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
pub struct ScyQueryFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>,
}
impl<'a> ScyQueryFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: V) -> Self
where
V: ValueList + Send + 'static,
{
//let fut = scy.execute(query, values);
let fut = futures_util::future::ready(Err(QueryError::TimeoutError));
Self { fut: Box::pin(fut) }
}
}
impl<'a> Future for ScyQueryFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => Ready(Ok(())),
Err(e) => Ready(Err(e).err_conv()),
},
Pending => Pending,
}
}
}
pub struct ScyBatchFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + 'a>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<'a> ScyBatchFut<'a> {
pub fn new<V>(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self
where
V: BatchValues + Send + Sync + 'static,
{
let fut = scy.batch(batch, values);
let tsnow = Instant::now();
Self {
fut: Box::pin(fut),
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl<'a> Future for ScyBatchFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
trace!("ScyBatchFut done Ok");
Ready(Ok(()))
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"ScyBatchFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("ScyBatchFut done Err {e:?}");
Ready(Err(e).err_conv())
}
},
Pending => Pending,
}
}
}
pub struct ScyBatchFutGen<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<'a> ScyBatchFutGen<'a> {
pub fn new<V>(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self
where
V: BatchValues + Send + Sync + 'static,
{
let fut = scy.batch(batch, values);
let tsnow = Instant::now();
Self {
fut: Box::pin(fut),
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl<'a> Future for ScyBatchFutGen<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
trace!("ScyBatchFutGen done Ok");
Ready(Ok(()))
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"ScyBatchFutGen polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("ScyBatchFutGen done Err {e:?}");
Ready(Err(e).err_conv())
}
},
Pending => Pending,
}
}
}
pub struct InsertLoopFut<'a> {
futs: Vec<Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>>,
fut_ix: usize,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<'a> InsertLoopFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: Vec<V>, skip_insert: bool) -> Self
where
V: ValueList + Send + Sync + 'static,
{
let mut values = values;
if skip_insert {
values.clear();
}
// TODO
// Can I store the values in some better generic form?
// Or is it acceptable to generate all insert futures right here and poll them later?
let futs: Vec<_> = values
.into_iter()
.map(|vs| {
//let fut = scy.execute(query, vs);
let fut = futures_util::future::ready(Err(QueryError::TimeoutError));
Box::pin(fut) as _
})
.collect();
let tsnow = Instant::now();
Self {
futs,
fut_ix: 0,
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl<'a> Future for InsertLoopFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
if self.futs.is_empty() {
return Ready(Ok(()));
}
loop {
let fut_ix = self.fut_ix;
break match self.futs[fut_ix].poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
self.fut_ix += 1;
if self.fut_ix >= self.futs.len() {
if false {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
info!(
"InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
}
continue;
} else {
Ready(Ok(()))
}
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("InsertLoopFut done Err {e:?}");
Ready(Err(e).err_conv())
}
},
Pending => Pending,
};
}
}
}
pub struct ChannelWriteRes {
pub nrows: u32,
pub dt: Duration,

View File

@@ -3,53 +3,8 @@ use futures_util::StreamExt;
#[allow(unused)]
use netpod::log::*;
use netpod::ScyllaConfig;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::statement::Consistency;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use scylla::Session;
use std::sync::Arc;
pub async fn create_session(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.use_keyspace(&scyconf.keyspace, true)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
.build()
.into_handle(),
)
.build()
.await
.map_err(|e| Error::from(format!("{e}")))?;
let scy = Arc::new(scy);
Ok(scy)
}
async fn has_table(name: &str, scy: &Session, scyconf: &ScyllaConfig) -> Result<bool, Error> {
let ks = scy
.get_keyspace()
.ok_or_else(|| Error::with_msg_no_trace("session is not using a keyspace yet"))?;
let mut res = scy
.query_iter(
"select table_name from system_schema.tables where keyspace_name = ?",
(ks.as_ref(),),
)
.await
.map_err(|e| e.to_string())
.map_err(Error::from)?;
while let Some(k) = res.next().await {
let row = k.map_err(|e| e.to_string()).map_err(Error::from)?;
if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() {
if table_name == name {
return Ok(true);
}
}
}
Ok(false)
}
async fn check_table_exist(name: &str, scy: &Session) -> Result<bool, Error> {
match scy.query(format!("select * from {} limit 1", name), ()).await {
Ok(_) => Ok(true),

View File

@@ -17,7 +17,7 @@ use futures_util::FutureExt;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
use log::*;
use scylla::Session as ScySession;
use scywr::session::ScySession;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;

11
scywr/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "scywr"
version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
futures-util = "0.3"
scylla = "0.9.0"
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }

11
scywr/src/access.rs Normal file
View File

@@ -0,0 +1,11 @@
use err::thiserror;
use err::ThisError;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
#[derive(Debug, ThisError)]
pub enum Error {
DbError(#[from] DbError),
QueryError(#[from] QueryError),
NoKeyspaceChosen,
}

1
scywr/src/config.rs Normal file
View File

@@ -0,0 +1 @@

42
scywr/src/fut.rs Normal file
View File

@@ -0,0 +1,42 @@
use crate::access::Error;
use crate::session::ScySession;
use futures_util::Future;
use futures_util::FutureExt;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct ScyQueryFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>,
}
impl<'a> ScyQueryFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: V) -> Self
where
V: ValueList + Send + 'static,
{
todo!("ScyQueryFut");
//let fut = scy.execute(query, values);
let fut = futures_util::future::ready(Err(QueryError::TimeoutError));
Self { fut: Box::pin(fut) }
}
}
impl<'a> Future for ScyQueryFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => Ready(Ok(())),
Err(e) => Ready(Err(e.into())),
},
Pending => Pending,
}
}
}

68
scywr/src/futbatch.rs Normal file
View File

@@ -0,0 +1,68 @@
use crate::access::Error;
use crate::session::ScySession;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::log::*;
use scylla::batch::Batch;
use scylla::frame::value::BatchValues;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
pub struct ScyBatchFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + 'a>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<'a> ScyBatchFut<'a> {
pub fn new<V>(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self
where
V: BatchValues + Send + Sync + 'static,
{
let fut = scy.batch(batch, values);
let tsnow = Instant::now();
Self {
fut: Box::pin(fut),
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl<'a> Future for ScyBatchFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
trace!("ScyBatchFut done Ok");
Ready(Ok(()))
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"ScyBatchFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("ScyBatchFut done Err {e:?}");
Ready(Err(e.into()))
}
},
Pending => Pending,
}
}
}

68
scywr/src/futbatchgen.rs Normal file
View File

@@ -0,0 +1,68 @@
use crate::access::Error;
use crate::session::ScySession;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::log::*;
use scylla::batch::Batch;
use scylla::frame::value::BatchValues;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
pub struct ScyBatchFutGen<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<'a> ScyBatchFutGen<'a> {
pub fn new<V>(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self
where
V: BatchValues + Send + Sync + 'static,
{
let fut = scy.batch(batch, values);
let tsnow = Instant::now();
Self {
fut: Box::pin(fut),
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl<'a> Future for ScyBatchFutGen<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
trace!("ScyBatchFutGen done Ok");
Ready(Ok(()))
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"ScyBatchFutGen polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("ScyBatchFutGen done Err {e:?}");
Ready(Err(e.into()))
}
},
Pending => Pending,
}
}
}

104
scywr/src/futinsertloop.rs Normal file
View File

@@ -0,0 +1,104 @@
use crate::access::Error;
use crate::session::ScySession;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::log::*;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
pub struct InsertLoopFut<'a> {
futs: Vec<Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>>,
fut_ix: usize,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<'a> InsertLoopFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: Vec<V>, skip_insert: bool) -> Self
where
V: ValueList + Send + Sync + 'static,
{
let mut values = values;
if skip_insert {
values.clear();
}
// TODO
// Can I store the values in some better generic form?
// Or is it acceptable to generate all insert futures right here and poll them later?
let futs: Vec<_> = values
.into_iter()
.map(|vs| {
todo!("InsertLoopFut");
//let fut = scy.execute(query, vs);
let fut = futures_util::future::ready(Err(QueryError::TimeoutError));
Box::pin(fut) as _
})
.collect();
let tsnow = Instant::now();
Self {
futs,
fut_ix: 0,
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl<'a> Future for InsertLoopFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
if self.futs.is_empty() {
return Ready(Ok(()));
}
loop {
let fut_ix = self.fut_ix;
break match self.futs[fut_ix].poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
self.fut_ix += 1;
if self.fut_ix >= self.futs.len() {
if false {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
info!(
"InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
}
continue;
} else {
Ready(Ok(()))
}
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("InsertLoopFut done Err {e:?}");
Ready(Err(e.into()))
}
},
Pending => Pending,
};
}
}
}

8
scywr/src/lib.rs Normal file
View File

@@ -0,0 +1,8 @@
pub mod access;
pub mod config;
pub mod fut;
pub mod futbatch;
pub mod futbatchgen;
pub mod futinsertloop;
pub mod schema;
pub mod session;

23
scywr/src/schema.rs Normal file
View File

@@ -0,0 +1,23 @@
use crate::access::Error;
use crate::session::ScySession;
use futures_util::StreamExt;
// use netpod::ScyllaConfig;
pub async fn has_table(name: &str, scy: &ScySession) -> Result<bool, Error> {
let ks = scy.get_keyspace().ok_or_else(|| Error::NoKeyspaceChosen)?;
let mut res = scy
.query_iter(
"select table_name from system_schema.tables where keyspace_name = ?",
(ks.as_ref(),),
)
.await?;
while let Some(k) = res.next().await {
let row = k?;
if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() {
if table_name == name {
return Ok(true);
}
}
}
Ok(false)
}

37
scywr/src/session.rs Normal file
View File

@@ -0,0 +1,37 @@
pub use netpod::ScyllaConfig;
pub use scylla::Session;
pub use Session as ScySession;
use err::thiserror;
use err::ThisError;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::statement::Consistency;
use scylla::transport::errors::NewSessionError;
use std::sync::Arc;
#[derive(Debug, ThisError)]
pub enum Error {
NewSession(String),
}
impl From<NewSessionError> for Error {
fn from(value: NewSessionError) -> Self {
Self::NewSession(value.to_string())
}
}
pub async fn create_session(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.use_keyspace(&scyconf.keyspace, true)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
.build()
.into_handle(),
)
.build()
.await?;
let scy = Arc::new(scy);
Ok(scy)
}