diff --git a/.cargo/config.toml b/.cargo/config.toml index ec464e1..c633aa7 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -2,7 +2,7 @@ rustflags = [ #"-C", "target-cpu=native", "-C", "target-cpu=sandybridge", - #"-C", "force-frame-pointers=yes", + "-C", "force-frame-pointers=yes", #"-C", "force-unwind-tables=yes", #"-C", "relocation-model=static", #"-C", "embed-bitcode=no", diff --git a/Cargo.toml b/Cargo.toml index e821c7a..7d0ecba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,8 +2,8 @@ members = ["log", "netfetch", "daqingest"] [profile.release] -opt-level = 2 -debug = 1 +opt-level = 3 +debug = 0 overflow-checks = false debug-assertions = false lto = "thin" diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index e54ca81..9da8de9 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,16 +1,9 @@ [package] name = "daqingest" -version = "0.1.5" +version = "0.2.0-alpha.0" authors = ["Dominik Werder "] edition = "2021" -[lib] -path = "src/daqingest.rs" - -[[bin]] -name = "daqingest" -path = "src/bin/daqingest.rs" - [dependencies] clap = { version = "4.3.24", features = ["derive", "cargo"] } tracing = "0.1.37" @@ -19,7 +12,7 @@ async-channel = "1.9.0" chrono = "0.4" bytes = "1.4.0" scylla = "0.9.0" -tokio-postgres = "0.7.7" +tokio-postgres = "0.7.9" serde = { version = "1.0", features = ["derive"] } libc = "0.2" err = { path = "../../daqbuffer/crates/err" } diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index e7b5bd8..d3e4d06 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -5,11 +5,13 @@ use log::*; use netfetch::conf::parse_config; pub fn main() -> Result<(), Error> { - println!("daqingest fn main"); let opts = DaqIngestOpts::parse(); // TODO offer again function to get runtime and configure tracing in one call let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32); - taskrun::tracing_init().unwrap(); + match taskrun::tracing_init() { + Ok(()) => {} + Err(()) => return Err(Error::with_msg_no_trace("tracing init failed")), + } let res = runtime.block_on(async move { use daqingest::opts::ChannelAccess; use daqingest::opts::SubCmd; diff --git a/daqingest/src/daqingest.rs b/daqingest/src/lib.rs similarity index 100% rename from daqingest/src/daqingest.rs rename to daqingest/src/lib.rs diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index e29dddd..8e66fe3 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -16,7 +16,6 @@ bytes = "1.4" arrayref = "0.3" byteorder = "1.4" futures-util = "0.3" -scylla = "0.9.0" tokio-postgres = "0.7.8" md-5 = "0.10" hex = "0.4" @@ -33,6 +32,7 @@ pin-project = "1" lazy_static = "1" log = { path = "../log" } stats = { path = "../stats" } +scywr = { path = "../scywr" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } items_0 = { path = "../../daqbuffer/crates/items_0" } diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index f1b272e..e3a93ae 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -163,7 +163,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), gw_addrs.push(addr); } Err(e) => { - error!("can not resolve {s} {e}"); + warn!("can not resolve {s} {e}"); } } } diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs index 294e0a0..6f16294 100644 --- a/netfetch/src/channelwriter.rs +++ b/netfetch/src/channelwriter.rs @@ -8,14 +8,6 @@ use netpod::timeunits::SEC; use netpod::ByteOrder; use netpod::ScalarType; use netpod::Shape; -use scylla::batch::Batch; -use scylla::batch::BatchType; -use scylla::frame::value::BatchValues; -use scylla::frame::value::ValueList; -use scylla::prepared_statement::PreparedStatement; -use scylla::transport::errors::QueryError; -use scylla::QueryResult; -use scylla::Session as ScySession; use std::mem; use std::pin::Pin; use std::sync::Arc; @@ -24,236 +16,6 @@ use std::task::Poll; use std::time::Duration; use std::time::Instant; -pub struct ScyQueryFut<'a> { - fut: Pin> + Send + 'a>>, -} - -impl<'a> ScyQueryFut<'a> { - pub fn new(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: V) -> Self - where - V: ValueList + Send + 'static, - { - //let fut = scy.execute(query, values); - let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); - Self { fut: Box::pin(fut) } - } -} - -impl<'a> Future for ScyQueryFut<'a> { - type Output = Result<(), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - match self.fut.poll_unpin(cx) { - Ready(k) => match k { - Ok(_) => Ready(Ok(())), - Err(e) => Ready(Err(e).err_conv()), - }, - Pending => Pending, - } - } -} - -pub struct ScyBatchFut<'a> { - fut: Pin> + 'a>>, - polled: usize, - ts_create: Instant, - ts_poll_start: Instant, -} - -impl<'a> ScyBatchFut<'a> { - pub fn new(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self - where - V: BatchValues + Send + Sync + 'static, - { - let fut = scy.batch(batch, values); - let tsnow = Instant::now(); - Self { - fut: Box::pin(fut), - polled: 0, - ts_create: tsnow, - ts_poll_start: tsnow, - } - } -} - -impl<'a> Future for ScyBatchFut<'a> { - type Output = Result<(), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - if self.polled == 0 { - self.ts_poll_start = Instant::now(); - } - self.polled += 1; - match self.fut.poll_unpin(cx) { - Ready(k) => match k { - Ok(_) => { - trace!("ScyBatchFut done Ok"); - Ready(Ok(())) - } - Err(e) => { - let tsnow = Instant::now(); - let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; - let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; - warn!( - "ScyBatchFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", - self.polled, dt_created, dt_polled - ); - warn!("ScyBatchFut done Err {e:?}"); - Ready(Err(e).err_conv()) - } - }, - Pending => Pending, - } - } -} - -pub struct ScyBatchFutGen<'a> { - fut: Pin> + Send + 'a>>, - polled: usize, - ts_create: Instant, - ts_poll_start: Instant, -} - -impl<'a> ScyBatchFutGen<'a> { - pub fn new(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self - where - V: BatchValues + Send + Sync + 'static, - { - let fut = scy.batch(batch, values); - let tsnow = Instant::now(); - Self { - fut: Box::pin(fut), - polled: 0, - ts_create: tsnow, - ts_poll_start: tsnow, - } - } -} - -impl<'a> Future for ScyBatchFutGen<'a> { - type Output = Result<(), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - if self.polled == 0 { - self.ts_poll_start = Instant::now(); - } - self.polled += 1; - match self.fut.poll_unpin(cx) { - Ready(k) => match k { - Ok(_) => { - trace!("ScyBatchFutGen done Ok"); - Ready(Ok(())) - } - Err(e) => { - let tsnow = Instant::now(); - let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; - let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; - warn!( - "ScyBatchFutGen polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", - self.polled, dt_created, dt_polled - ); - warn!("ScyBatchFutGen done Err {e:?}"); - Ready(Err(e).err_conv()) - } - }, - Pending => Pending, - } - } -} - -pub struct InsertLoopFut<'a> { - futs: Vec> + Send + 'a>>>, - fut_ix: usize, - polled: usize, - ts_create: Instant, - ts_poll_start: Instant, -} - -impl<'a> InsertLoopFut<'a> { - pub fn new(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: Vec, skip_insert: bool) -> Self - where - V: ValueList + Send + Sync + 'static, - { - let mut values = values; - if skip_insert { - values.clear(); - } - // TODO - // Can I store the values in some better generic form? - // Or is it acceptable to generate all insert futures right here and poll them later? - let futs: Vec<_> = values - .into_iter() - .map(|vs| { - //let fut = scy.execute(query, vs); - let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); - Box::pin(fut) as _ - }) - .collect(); - let tsnow = Instant::now(); - Self { - futs, - fut_ix: 0, - polled: 0, - ts_create: tsnow, - ts_poll_start: tsnow, - } - } -} - -impl<'a> Future for InsertLoopFut<'a> { - type Output = Result<(), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - if self.polled == 0 { - self.ts_poll_start = Instant::now(); - } - self.polled += 1; - if self.futs.is_empty() { - return Ready(Ok(())); - } - loop { - let fut_ix = self.fut_ix; - break match self.futs[fut_ix].poll_unpin(cx) { - Ready(k) => match k { - Ok(_) => { - self.fut_ix += 1; - if self.fut_ix >= self.futs.len() { - if false { - let tsnow = Instant::now(); - let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; - let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; - info!( - "InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", - self.polled, dt_created, dt_polled - ); - } - continue; - } else { - Ready(Ok(())) - } - } - Err(e) => { - let tsnow = Instant::now(); - let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; - let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; - warn!( - "InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", - self.polled, dt_created, dt_polled - ); - warn!("InsertLoopFut done Err {e:?}"); - Ready(Err(e).err_conv()) - } - }, - Pending => Pending, - }; - } - } -} - pub struct ChannelWriteRes { pub nrows: u32, pub dt: Duration, diff --git a/netfetch/src/scylla.rs b/netfetch/src/scylla.rs index fc1aeff..2b9f204 100644 --- a/netfetch/src/scylla.rs +++ b/netfetch/src/scylla.rs @@ -3,53 +3,8 @@ use futures_util::StreamExt; #[allow(unused)] use netpod::log::*; use netpod::ScyllaConfig; -use scylla::execution_profile::ExecutionProfileBuilder; -use scylla::statement::Consistency; -use scylla::transport::errors::DbError; -use scylla::transport::errors::QueryError; -use scylla::Session; use std::sync::Arc; -pub async fn create_session(scyconf: &ScyllaConfig) -> Result, Error> { - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyconf.hosts) - .use_keyspace(&scyconf.keyspace, true) - .default_execution_profile_handle( - ExecutionProfileBuilder::default() - .consistency(Consistency::LocalOne) - .build() - .into_handle(), - ) - .build() - .await - .map_err(|e| Error::from(format!("{e}")))?; - let scy = Arc::new(scy); - Ok(scy) -} - -async fn has_table(name: &str, scy: &Session, scyconf: &ScyllaConfig) -> Result { - let ks = scy - .get_keyspace() - .ok_or_else(|| Error::with_msg_no_trace("session is not using a keyspace yet"))?; - let mut res = scy - .query_iter( - "select table_name from system_schema.tables where keyspace_name = ?", - (ks.as_ref(),), - ) - .await - .map_err(|e| e.to_string()) - .map_err(Error::from)?; - while let Some(k) = res.next().await { - let row = k.map_err(|e| e.to_string()).map_err(Error::from)?; - if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() { - if table_name == name { - return Ok(true); - } - } - } - Ok(false) -} - async fn check_table_exist(name: &str, scy: &Session) -> Result { match scy.query(format!("select * from {} limit 1", name), ()).await { Ok(_) => Ok(true), diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index ee87e08..89ffd94 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -17,7 +17,7 @@ use futures_util::FutureExt; use futures_util::StreamExt; use futures_util::TryFutureExt; use log::*; -use scylla::Session as ScySession; +use scywr::session::ScySession; use std::io; use std::net::SocketAddr; use std::pin::Pin; diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml new file mode 100644 index 0000000..fd18105 --- /dev/null +++ b/scywr/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "scywr" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +futures-util = "0.3" +scylla = "0.9.0" +err = { path = "../../daqbuffer/crates/err" } +netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/scywr/src/access.rs b/scywr/src/access.rs new file mode 100644 index 0000000..4eda2cc --- /dev/null +++ b/scywr/src/access.rs @@ -0,0 +1,11 @@ +use err::thiserror; +use err::ThisError; +use scylla::transport::errors::DbError; +use scylla::transport::errors::QueryError; + +#[derive(Debug, ThisError)] +pub enum Error { + DbError(#[from] DbError), + QueryError(#[from] QueryError), + NoKeyspaceChosen, +} diff --git a/scywr/src/config.rs b/scywr/src/config.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/scywr/src/config.rs @@ -0,0 +1 @@ + diff --git a/scywr/src/fut.rs b/scywr/src/fut.rs new file mode 100644 index 0000000..963f5bd --- /dev/null +++ b/scywr/src/fut.rs @@ -0,0 +1,42 @@ +use crate::access::Error; +use crate::session::ScySession; +use futures_util::Future; +use futures_util::FutureExt; +use scylla::frame::value::ValueList; +use scylla::prepared_statement::PreparedStatement; +use scylla::transport::errors::QueryError; +use scylla::QueryResult; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct ScyQueryFut<'a> { + fut: Pin> + Send + 'a>>, +} + +impl<'a> ScyQueryFut<'a> { + pub fn new(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: V) -> Self + where + V: ValueList + Send + 'static, + { + todo!("ScyQueryFut"); + //let fut = scy.execute(query, values); + let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); + Self { fut: Box::pin(fut) } + } +} + +impl<'a> Future for ScyQueryFut<'a> { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + match self.fut.poll_unpin(cx) { + Ready(k) => match k { + Ok(_) => Ready(Ok(())), + Err(e) => Ready(Err(e.into())), + }, + Pending => Pending, + } + } +} diff --git a/scywr/src/futbatch.rs b/scywr/src/futbatch.rs new file mode 100644 index 0000000..e8a8d53 --- /dev/null +++ b/scywr/src/futbatch.rs @@ -0,0 +1,68 @@ +use crate::access::Error; +use crate::session::ScySession; +use futures_util::Future; +use futures_util::FutureExt; +use netpod::log::*; +use scylla::batch::Batch; +use scylla::frame::value::BatchValues; +use scylla::transport::errors::QueryError; +use scylla::QueryResult; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; + +pub struct ScyBatchFut<'a> { + fut: Pin> + 'a>>, + polled: usize, + ts_create: Instant, + ts_poll_start: Instant, +} + +impl<'a> ScyBatchFut<'a> { + pub fn new(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self + where + V: BatchValues + Send + Sync + 'static, + { + let fut = scy.batch(batch, values); + let tsnow = Instant::now(); + Self { + fut: Box::pin(fut), + polled: 0, + ts_create: tsnow, + ts_poll_start: tsnow, + } + } +} + +impl<'a> Future for ScyBatchFut<'a> { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + if self.polled == 0 { + self.ts_poll_start = Instant::now(); + } + self.polled += 1; + match self.fut.poll_unpin(cx) { + Ready(k) => match k { + Ok(_) => { + trace!("ScyBatchFut done Ok"); + Ready(Ok(())) + } + Err(e) => { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; + warn!( + "ScyBatchFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", + self.polled, dt_created, dt_polled + ); + warn!("ScyBatchFut done Err {e:?}"); + Ready(Err(e.into())) + } + }, + Pending => Pending, + } + } +} diff --git a/scywr/src/futbatchgen.rs b/scywr/src/futbatchgen.rs new file mode 100644 index 0000000..fd276d3 --- /dev/null +++ b/scywr/src/futbatchgen.rs @@ -0,0 +1,68 @@ +use crate::access::Error; +use crate::session::ScySession; +use futures_util::Future; +use futures_util::FutureExt; +use netpod::log::*; +use scylla::batch::Batch; +use scylla::frame::value::BatchValues; +use scylla::transport::errors::QueryError; +use scylla::QueryResult; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; + +pub struct ScyBatchFutGen<'a> { + fut: Pin> + Send + 'a>>, + polled: usize, + ts_create: Instant, + ts_poll_start: Instant, +} + +impl<'a> ScyBatchFutGen<'a> { + pub fn new(scy: &'a ScySession, batch: &'a Batch, values: V) -> Self + where + V: BatchValues + Send + Sync + 'static, + { + let fut = scy.batch(batch, values); + let tsnow = Instant::now(); + Self { + fut: Box::pin(fut), + polled: 0, + ts_create: tsnow, + ts_poll_start: tsnow, + } + } +} + +impl<'a> Future for ScyBatchFutGen<'a> { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + if self.polled == 0 { + self.ts_poll_start = Instant::now(); + } + self.polled += 1; + match self.fut.poll_unpin(cx) { + Ready(k) => match k { + Ok(_) => { + trace!("ScyBatchFutGen done Ok"); + Ready(Ok(())) + } + Err(e) => { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; + warn!( + "ScyBatchFutGen polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", + self.polled, dt_created, dt_polled + ); + warn!("ScyBatchFutGen done Err {e:?}"); + Ready(Err(e.into())) + } + }, + Pending => Pending, + } + } +} diff --git a/scywr/src/futinsertloop.rs b/scywr/src/futinsertloop.rs new file mode 100644 index 0000000..108a513 --- /dev/null +++ b/scywr/src/futinsertloop.rs @@ -0,0 +1,104 @@ +use crate::access::Error; +use crate::session::ScySession; +use futures_util::Future; +use futures_util::FutureExt; +use netpod::log::*; +use scylla::frame::value::ValueList; +use scylla::prepared_statement::PreparedStatement; +use scylla::transport::errors::QueryError; +use scylla::QueryResult; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; + +pub struct InsertLoopFut<'a> { + futs: Vec> + Send + 'a>>>, + fut_ix: usize, + polled: usize, + ts_create: Instant, + ts_poll_start: Instant, +} + +impl<'a> InsertLoopFut<'a> { + pub fn new(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: Vec, skip_insert: bool) -> Self + where + V: ValueList + Send + Sync + 'static, + { + let mut values = values; + if skip_insert { + values.clear(); + } + // TODO + // Can I store the values in some better generic form? + // Or is it acceptable to generate all insert futures right here and poll them later? + let futs: Vec<_> = values + .into_iter() + .map(|vs| { + todo!("InsertLoopFut"); + //let fut = scy.execute(query, vs); + let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); + Box::pin(fut) as _ + }) + .collect(); + let tsnow = Instant::now(); + Self { + futs, + fut_ix: 0, + polled: 0, + ts_create: tsnow, + ts_poll_start: tsnow, + } + } +} + +impl<'a> Future for InsertLoopFut<'a> { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + if self.polled == 0 { + self.ts_poll_start = Instant::now(); + } + self.polled += 1; + if self.futs.is_empty() { + return Ready(Ok(())); + } + loop { + let fut_ix = self.fut_ix; + break match self.futs[fut_ix].poll_unpin(cx) { + Ready(k) => match k { + Ok(_) => { + self.fut_ix += 1; + if self.fut_ix >= self.futs.len() { + if false { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; + info!( + "InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", + self.polled, dt_created, dt_polled + ); + } + continue; + } else { + Ready(Ok(())) + } + } + Err(e) => { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; + warn!( + "InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", + self.polled, dt_created, dt_polled + ); + warn!("InsertLoopFut done Err {e:?}"); + Ready(Err(e.into())) + } + }, + Pending => Pending, + }; + } + } +} diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs new file mode 100644 index 0000000..e6c7ce9 --- /dev/null +++ b/scywr/src/lib.rs @@ -0,0 +1,8 @@ +pub mod access; +pub mod config; +pub mod fut; +pub mod futbatch; +pub mod futbatchgen; +pub mod futinsertloop; +pub mod schema; +pub mod session; diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs new file mode 100644 index 0000000..65c4eb8 --- /dev/null +++ b/scywr/src/schema.rs @@ -0,0 +1,23 @@ +use crate::access::Error; +use crate::session::ScySession; +use futures_util::StreamExt; +// use netpod::ScyllaConfig; + +pub async fn has_table(name: &str, scy: &ScySession) -> Result { + let ks = scy.get_keyspace().ok_or_else(|| Error::NoKeyspaceChosen)?; + let mut res = scy + .query_iter( + "select table_name from system_schema.tables where keyspace_name = ?", + (ks.as_ref(),), + ) + .await?; + while let Some(k) = res.next().await { + let row = k?; + if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() { + if table_name == name { + return Ok(true); + } + } + } + Ok(false) +} diff --git a/scywr/src/session.rs b/scywr/src/session.rs new file mode 100644 index 0000000..d669ea7 --- /dev/null +++ b/scywr/src/session.rs @@ -0,0 +1,37 @@ +pub use netpod::ScyllaConfig; +pub use scylla::Session; +pub use Session as ScySession; + +use err::thiserror; +use err::ThisError; +use scylla::execution_profile::ExecutionProfileBuilder; +use scylla::statement::Consistency; +use scylla::transport::errors::NewSessionError; +use std::sync::Arc; + +#[derive(Debug, ThisError)] +pub enum Error { + NewSession(String), +} + +impl From for Error { + fn from(value: NewSessionError) -> Self { + Self::NewSession(value.to_string()) + } +} + +pub async fn create_session(scyconf: &ScyllaConfig) -> Result, Error> { + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyconf.hosts) + .use_keyspace(&scyconf.keyspace, true) + .default_execution_profile_handle( + ExecutionProfileBuilder::default() + .consistency(Consistency::LocalOne) + .build() + .into_handle(), + ) + .build() + .await?; + let scy = Arc::new(scy); + Ok(scy) +}