diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index dbedcea..68e59ed 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -520,7 +520,7 @@ dependencies = [ [[package]] name = "daqingest" -version = "0.1.3" +version = "0.1.4" dependencies = [ "async-channel", "bytes", @@ -1641,6 +1641,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rand_pcg" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59cad018caf63deb318e5a4586d99a24424a364f40f1e5778c29aca23f4fc73e" +dependencies = [ + "rand_core", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -1750,9 +1759,9 @@ checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1" [[package]] name = "scylla" -version = "0.7.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b9d4ef7fb24d95d30c4a8da782bb2afead3a8b66f32c805bb82a03722d7be33" +checksum = "530af73ae66ea56dcc2dd6a8900b442c31da0f068923ea41e0b0f3d0703b067c" dependencies = [ "arc-swap", "async-trait", @@ -1768,6 +1777,7 @@ dependencies = [ "num-bigint", "num_enum", "rand", + "rand_pcg", "scylla-cql", "scylla-macros", "smallvec", @@ -1782,9 +1792,9 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.0.3" +version = "0.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6972061cbcc83754b4243d007ae51c1a1345a950b368cbdaad0186eac8799203" +checksum = "0fc8568c89b1d70547881610ad2423157267d57552edb4861e6bd4394e53f26c" dependencies = [ "async-trait", "bigdecimal", @@ -1803,10 +1813,11 @@ dependencies = [ [[package]] name = "scylla-macros" -version = "0.1.2" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e03b3a19daa79085439113c746d2946e5e6effd2d9039bf092bb08df915487b2" +checksum = "32d777dadbf7163d1524ea4f5a095146298d263a686febb96d022cf46d06df32" dependencies = [ + "proc-macro2", "quote", "syn 1.0.109", ] @@ -2512,6 +2523,9 @@ name = "uuid" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2" +dependencies = [ + "getrandom", +] [[package]] name = "valuable" diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 405f881..11d9b2e 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.1.3" +version = "0.1.4" authors = ["Dominik Werder "] edition = "2021" @@ -18,8 +18,8 @@ tracing = "0.1.37" futures-util = "0.3" async-channel = "1.6" chrono = "0.4" -bytes = "1.1" -scylla = "0.7" +bytes = "1.4.0" +scylla = "0.8.1" tokio-postgres = "0.7.7" serde = { version = "1.0", features = ["derive"] } libc = "0.2" diff --git a/daqingest/src/query.rs b/daqingest/src/query.rs index ca2dd4d..511b93b 100644 --- a/daqingest/src/query.rs +++ b/daqingest/src/query.rs @@ -1,8 +1,10 @@ use crate::opts::FetchEvents; use log::*; use scylla::batch::Consistency; +use scylla::execution_profile::ExecutionProfileBuilder; use scylla::transport::errors::NewSessionError; use scylla::transport::errors::QueryError; +use scylla::Session; use scylla::SessionBuilder; pub struct Error(err::Error); @@ -25,13 +27,23 @@ impl From for Error { } } -pub async fn list_pkey() -> Result<(), Error> { +async fn make_scy_session() -> Result { let scy = SessionBuilder::new() .known_node("127.0.0.1:19042") - .default_consistency(Consistency::LocalOne) .use_keyspace("ks1", true) + .default_execution_profile_handle( + ExecutionProfileBuilder::default() + .consistency(Consistency::LocalOne) + .build() + .into_handle(), + ) .build() .await?; + Ok(scy) +} + +pub async fn list_pkey() -> Result<(), Error> { + let scy = make_scy_session().await?; let query = scy .prepare("select distinct token(pulse_a), pulse_a from pulse where token(pulse_a) >= ? and token(pulse_a) <= ?") .await?; @@ -67,12 +79,7 @@ pub async fn list_pkey() -> Result<(), Error> { } pub async fn list_pulses() -> Result<(), Error> { - let scy = SessionBuilder::new() - .known_node("127.0.0.1:19042") - .default_consistency(Consistency::LocalOne) - .use_keyspace("ks1", true) - .build() - .await?; + let scy = make_scy_session().await?; let query = scy .prepare("select token(tsa) as tsatok, tsa, tsb, pulse from pulse where token(tsa) >= ? and token(tsa) <= ?") .await?; @@ -109,12 +116,7 @@ pub async fn list_pulses() -> Result<(), Error> { pub async fn fetch_events(opts: FetchEvents) -> Result<(), Error> { // TODO use the keyspace from commandline. err::todo(); - let scy = SessionBuilder::new() - .known_nodes(&opts.scylla) - .default_consistency(Consistency::LocalOne) - .use_keyspace("ks1", true) - .build() - .await?; + let scy = make_scy_session().await?; let qu_series = scy .prepare( "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?", diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 8d8fb11..c02b576 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -21,7 +21,7 @@ arrayref = "0.3" byteorder = "1.4" futures-util = "0.3" #pin-project-lite = "0.2" -scylla = "0.7" +scylla = "0.8.1" tokio-postgres = "0.7.7" md-5 = "0.10" hex = "0.4" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 8058a66..83cc1f6 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,4 +1,6 @@ use super::proto; +use super::proto::CaDataValue; +use super::proto::CaEventValue; use super::proto::CaItem; use super::proto::CaMsg; use super::proto::CaMsgTy; @@ -30,9 +32,11 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items_0::scalar_ops::ScalarOps; use items_0::Appendable; use items_0::Empty; use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim1::EventsDim1; use log::*; use netpod::timeunits::*; use netpod::ScalarType; @@ -440,6 +444,7 @@ pub struct CaConn { Pin), Error>> + Send>>, >, events_acc: Box, + events_acc_func: Box Result<(), Error> + Send>, } impl CaConn { @@ -493,6 +498,7 @@ impl CaConn { series_lookup_schedule: BTreeMap::new(), series_lookup_futs: FuturesUnordered::new(), events_acc: Box::new(()), + events_acc_func: Box::new(Self::event_acc_push::), } } @@ -1029,29 +1035,16 @@ impl CaConn { } } - // TODO write generic. - // TODO store a pointer to the type-resolved function for faster usage. - fn event_acc_push(&mut self, ts: u64, ev: proto::EventAddRes) -> Result<(), Error> { - match &ev.value.data { - proto::CaDataValue::Scalar(k) => { - use proto::CaDataScalarValue::*; - match k { - F64(v) => { - if let Some(c) = self.events_acc.downcast_mut::>() { - c.push(ts, 0, v.clone()); - // TODO check for a max length. - // Also check at every check-tick. - } else { - } - } - _ => { - // TODO - } - } - } - proto::CaDataValue::Array(_) => { - // TODO - } + fn event_acc_push(this: &mut CaConn, ts: u64, ev: CaEventValue) -> Result<(), Error> + where + STY: ScalarOps, + CaDataValue: proto::GetValHelp, + { + let v = proto::GetValHelp::::get(&ev.data)?; + if let Some(c) = this.events_acc.downcast_mut::>() { + c.push(ts, 0, v.clone()); + // TODO check for a max length. + // Also check at every check-tick. } Ok(()) } @@ -1059,16 +1052,63 @@ impl CaConn { fn setup_event_acc(&mut self, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> { use ScalarType::*; match shape { - Shape::Scalar => match scalar_type { - F64 => { - self.events_acc = Box::new(EventsDim0::::empty()); + Shape::Scalar => { + type Cont = EventsDim0; + match scalar_type { + I8 => { + self.events_acc = Box::new(Cont::::empty()); + } + I16 => { + self.events_acc = Box::new(Cont::::empty()); + } + I32 => { + type ST = i32; + self.events_acc = Box::new(Cont::::empty()); + let f: Box Result<(), Error>> = + Box::new(Self::event_acc_push::); + } + F32 => { + type ST = f32; + self.events_acc = Box::new(Cont::::empty()); + let f: Box Result<(), Error>> = + Box::new(Self::event_acc_push::); + } + F64 => { + type ST = f64; + self.events_acc = Box::new(Cont::::empty()); + let f: Box Result<(), Error>> = + Box::new(Self::event_acc_push::); + } + _ => { + warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + } } - _ => { - // TODO + } + Shape::Wave(..) => { + type Cont = EventsDim1; + match scalar_type { + I8 => { + self.events_acc = Box::new(Cont::::empty()); + } + I16 => { + self.events_acc = Box::new(Cont::::empty()); + } + I32 => { + self.events_acc = Box::new(Cont::::empty()); + } + F32 => { + self.events_acc = Box::new(Cont::::empty()); + } + F64 => { + self.events_acc = Box::new(Cont::::empty()); + } + _ => { + warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + } } - }, + } _ => { - // TODO + warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); } } Ok(()) diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 7fc856a..0c75d13 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -166,6 +166,50 @@ pub enum CaDataScalarValue { Bool(bool), } +pub trait GetValHelp { + type ScalTy: Clone; + fn get(&self) -> Result<&Self::ScalTy, Error>; +} + +impl GetValHelp for CaDataValue { + type ScalTy = i32; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + CaDataValue::Scalar(v) => match v { + CaDataScalarValue::I32(v) => Ok(v), + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + }, + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + } + } +} + +impl GetValHelp for CaDataValue { + type ScalTy = f32; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + CaDataValue::Scalar(v) => match v { + CaDataScalarValue::F32(v) => Ok(v), + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + }, + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + } + } +} + +impl GetValHelp for CaDataValue { + type ScalTy = f64; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + CaDataValue::Scalar(v) => match v { + CaDataScalarValue::F64(v) => Ok(v), + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + }, + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + } + } +} + #[derive(Clone, Debug)] pub enum CaDataArrayValue { I8(Vec), diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 688be22..da2739e 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -1,6 +1,7 @@ use err::Error; use futures_util::StreamExt; use netpod::ScyllaConfig; +use scylla::execution_profile::ExecutionProfileBuilder; use scylla::prepared_statement::PreparedStatement; use scylla::statement::Consistency; use scylla::Session as ScySession; @@ -59,8 +60,13 @@ impl DataStore { pub async fn new(scyconf: &ScyllaConfig) -> Result { let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) - .default_consistency(Consistency::LocalOne) .use_keyspace(&scyconf.keyspace, true) + .default_execution_profile_handle( + ExecutionProfileBuilder::default() + .consistency(Consistency::LocalOne) + .build() + .into_handle(), + ) .build() .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index 86d1707..a51c304 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -62,6 +62,7 @@ impl IntoSimplerError for QueryError { QueryError::TooManyOrphanedStreamIds(e) => Error::DbError(e.to_string()), QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()), QueryError::RequestTimeout(e) => Error::DbError(e.to_string()), + QueryError::TranslationError(e) => Error::DbError(e.to_string()), } } }