Update deps

This commit is contained in:
Dominik Werder
2023-05-10 09:42:10 +02:00
parent c43a558c0e
commit 8818fb865f
8 changed files with 163 additions and 56 deletions

View File

@@ -520,7 +520,7 @@ dependencies = [
[[package]]
name = "daqingest"
version = "0.1.3"
version = "0.1.4"
dependencies = [
"async-channel",
"bytes",
@@ -1641,6 +1641,15 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rand_pcg"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59cad018caf63deb318e5a4586d99a24424a364f40f1e5778c29aca23f4fc73e"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
@@ -1750,9 +1759,9 @@ checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1"
[[package]]
name = "scylla"
version = "0.7.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b9d4ef7fb24d95d30c4a8da782bb2afead3a8b66f32c805bb82a03722d7be33"
checksum = "530af73ae66ea56dcc2dd6a8900b442c31da0f068923ea41e0b0f3d0703b067c"
dependencies = [
"arc-swap",
"async-trait",
@@ -1768,6 +1777,7 @@ dependencies = [
"num-bigint",
"num_enum",
"rand",
"rand_pcg",
"scylla-cql",
"scylla-macros",
"smallvec",
@@ -1782,9 +1792,9 @@ dependencies = [
[[package]]
name = "scylla-cql"
version = "0.0.3"
version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6972061cbcc83754b4243d007ae51c1a1345a950b368cbdaad0186eac8799203"
checksum = "0fc8568c89b1d70547881610ad2423157267d57552edb4861e6bd4394e53f26c"
dependencies = [
"async-trait",
"bigdecimal",
@@ -1803,10 +1813,11 @@ dependencies = [
[[package]]
name = "scylla-macros"
version = "0.1.2"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e03b3a19daa79085439113c746d2946e5e6effd2d9039bf092bb08df915487b2"
checksum = "32d777dadbf7163d1524ea4f5a095146298d263a686febb96d022cf46d06df32"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
@@ -2512,6 +2523,9 @@ name = "uuid"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2"
dependencies = [
"getrandom",
]
[[package]]
name = "valuable"

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.1.3"
version = "0.1.4"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -18,8 +18,8 @@ tracing = "0.1.37"
futures-util = "0.3"
async-channel = "1.6"
chrono = "0.4"
bytes = "1.1"
scylla = "0.7"
bytes = "1.4.0"
scylla = "0.8.1"
tokio-postgres = "0.7.7"
serde = { version = "1.0", features = ["derive"] }
libc = "0.2"

View File

@@ -1,8 +1,10 @@
use crate::opts::FetchEvents;
use log::*;
use scylla::batch::Consistency;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
use scylla::Session;
use scylla::SessionBuilder;
pub struct Error(err::Error);
@@ -25,13 +27,23 @@ impl From<QueryError> for Error {
}
}
pub async fn list_pkey() -> Result<(), Error> {
async fn make_scy_session() -> Result<Session, Error> {
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.default_consistency(Consistency::LocalOne)
.use_keyspace("ks1", true)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
.build()
.into_handle(),
)
.build()
.await?;
Ok(scy)
}
pub async fn list_pkey() -> Result<(), Error> {
let scy = make_scy_session().await?;
let query = scy
.prepare("select distinct token(pulse_a), pulse_a from pulse where token(pulse_a) >= ? and token(pulse_a) <= ?")
.await?;
@@ -67,12 +79,7 @@ pub async fn list_pkey() -> Result<(), Error> {
}
pub async fn list_pulses() -> Result<(), Error> {
let scy = SessionBuilder::new()
.known_node("127.0.0.1:19042")
.default_consistency(Consistency::LocalOne)
.use_keyspace("ks1", true)
.build()
.await?;
let scy = make_scy_session().await?;
let query = scy
.prepare("select token(tsa) as tsatok, tsa, tsb, pulse from pulse where token(tsa) >= ? and token(tsa) <= ?")
.await?;
@@ -109,12 +116,7 @@ pub async fn list_pulses() -> Result<(), Error> {
pub async fn fetch_events(opts: FetchEvents) -> Result<(), Error> {
// TODO use the keyspace from commandline.
err::todo();
let scy = SessionBuilder::new()
.known_nodes(&opts.scylla)
.default_consistency(Consistency::LocalOne)
.use_keyspace("ks1", true)
.build()
.await?;
let scy = make_scy_session().await?;
let qu_series = scy
.prepare(
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?",

View File

@@ -21,7 +21,7 @@ arrayref = "0.3"
byteorder = "1.4"
futures-util = "0.3"
#pin-project-lite = "0.2"
scylla = "0.7"
scylla = "0.8.1"
tokio-postgres = "0.7.7"
md-5 = "0.10"
hex = "0.4"

View File

@@ -1,4 +1,6 @@
use super::proto;
use super::proto::CaDataValue;
use super::proto::CaEventValue;
use super::proto::CaItem;
use super::proto::CaMsg;
use super::proto::CaMsgTy;
@@ -30,9 +32,11 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::scalar_ops::ScalarOps;
use items_0::Appendable;
use items_0::Empty;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use log::*;
use netpod::timeunits::*;
use netpod::ScalarType;
@@ -440,6 +444,7 @@ pub struct CaConn {
Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
>,
events_acc: Box<dyn Any + Send>,
events_acc_func: Box<dyn Fn(&mut CaConn, u64, CaEventValue) -> Result<(), Error> + Send>,
}
impl CaConn {
@@ -493,6 +498,7 @@ impl CaConn {
series_lookup_schedule: BTreeMap::new(),
series_lookup_futs: FuturesUnordered::new(),
events_acc: Box::new(()),
events_acc_func: Box::new(Self::event_acc_push::<i32>),
}
}
@@ -1029,29 +1035,16 @@ impl CaConn {
}
}
// TODO write generic.
// TODO store a pointer to the type-resolved function for faster usage.
fn event_acc_push(&mut self, ts: u64, ev: proto::EventAddRes) -> Result<(), Error> {
match &ev.value.data {
proto::CaDataValue::Scalar(k) => {
use proto::CaDataScalarValue::*;
match k {
F64(v) => {
if let Some(c) = self.events_acc.downcast_mut::<EventsDim0<f64>>() {
c.push(ts, 0, v.clone());
// TODO check for a max length.
// Also check at every check-tick.
} else {
}
}
_ => {
// TODO
}
}
}
proto::CaDataValue::Array(_) => {
// TODO
}
fn event_acc_push<STY>(this: &mut CaConn, ts: u64, ev: CaEventValue) -> Result<(), Error>
where
STY: ScalarOps,
CaDataValue: proto::GetValHelp<STY, ScalTy = STY>,
{
let v = proto::GetValHelp::<STY>::get(&ev.data)?;
if let Some(c) = this.events_acc.downcast_mut::<EventsDim0<STY>>() {
c.push(ts, 0, v.clone());
// TODO check for a max length.
// Also check at every check-tick.
}
Ok(())
}
@@ -1059,16 +1052,63 @@ impl CaConn {
fn setup_event_acc(&mut self, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> {
use ScalarType::*;
match shape {
Shape::Scalar => match scalar_type {
F64 => {
self.events_acc = Box::new(EventsDim0::<f64>::empty());
Shape::Scalar => {
type Cont<T> = EventsDim0<T>;
match scalar_type {
I8 => {
self.events_acc = Box::new(Cont::<i8>::empty());
}
I16 => {
self.events_acc = Box::new(Cont::<i16>::empty());
}
I32 => {
type ST = i32;
self.events_acc = Box::new(Cont::<ST>::empty());
let f: Box<dyn Fn(&mut CaConn, u64, CaEventValue) -> Result<(), Error>> =
Box::new(Self::event_acc_push::<ST>);
}
F32 => {
type ST = f32;
self.events_acc = Box::new(Cont::<ST>::empty());
let f: Box<dyn Fn(&mut CaConn, u64, CaEventValue) -> Result<(), Error>> =
Box::new(Self::event_acc_push::<ST>);
}
F64 => {
type ST = f64;
self.events_acc = Box::new(Cont::<ST>::empty());
let f: Box<dyn Fn(&mut CaConn, u64, CaEventValue) -> Result<(), Error>> =
Box::new(Self::event_acc_push::<ST>);
}
_ => {
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}
}
_ => {
// TODO
}
Shape::Wave(..) => {
type Cont<T> = EventsDim1<T>;
match scalar_type {
I8 => {
self.events_acc = Box::new(Cont::<i8>::empty());
}
I16 => {
self.events_acc = Box::new(Cont::<i16>::empty());
}
I32 => {
self.events_acc = Box::new(Cont::<i32>::empty());
}
F32 => {
self.events_acc = Box::new(Cont::<f32>::empty());
}
F64 => {
self.events_acc = Box::new(Cont::<f64>::empty());
}
_ => {
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}
}
},
}
_ => {
// TODO
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}
}
Ok(())

View File

@@ -166,6 +166,50 @@ pub enum CaDataScalarValue {
Bool(bool),
}
pub trait GetValHelp<T> {
type ScalTy: Clone;
fn get(&self) -> Result<&Self::ScalTy, Error>;
}
impl GetValHelp<i32> for CaDataValue {
type ScalTy = i32;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
CaDataValue::Scalar(v) => match v {
CaDataScalarValue::I32(v) => Ok(v),
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
},
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
}
}
}
impl GetValHelp<f32> for CaDataValue {
type ScalTy = f32;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
CaDataValue::Scalar(v) => match v {
CaDataScalarValue::F32(v) => Ok(v),
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
},
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
}
}
}
impl GetValHelp<f64> for CaDataValue {
type ScalTy = f64;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
CaDataValue::Scalar(v) => match v {
CaDataScalarValue::F64(v) => Ok(v),
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
},
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
}
}
}
#[derive(Clone, Debug)]
pub enum CaDataArrayValue {
I8(Vec<i8>),

View File

@@ -1,6 +1,7 @@
use err::Error;
use futures_util::StreamExt;
use netpod::ScyllaConfig;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::prepared_statement::PreparedStatement;
use scylla::statement::Consistency;
use scylla::Session as ScySession;
@@ -59,8 +60,13 @@ impl DataStore {
pub async fn new(scyconf: &ScyllaConfig) -> Result<Self, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.default_consistency(Consistency::LocalOne)
.use_keyspace(&scyconf.keyspace, true)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
.build()
.into_handle(),
)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;

View File

@@ -62,6 +62,7 @@ impl IntoSimplerError for QueryError {
QueryError::TooManyOrphanedStreamIds(e) => Error::DbError(e.to_string()),
QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()),
QueryError::RequestTimeout(e) => Error::DbError(e.to_string()),
QueryError::TranslationError(e) => Error::DbError(e.to_string()),
}
}
}