Updated deps

This commit is contained in:
Dominik Werder
2024-05-17 07:54:45 +02:00
parent 6224df534a
commit 2f223f23f4
6 changed files with 856 additions and 632 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -10,7 +10,7 @@ path = "src/dbconn.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio-postgres = { version = "0.7.9", features = ["with-chrono-0_4", "with-serde_json-1"] }
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4", "with-serde_json-1"] }
crc32fast = "1.3.2"
byteorder = "1.4"
futures-util = "0.3.25"
@@ -19,7 +19,7 @@ pin-project = "1"
#dashmap = "3"
async-channel = "1.9.0"
chrono = "0.4"
regex = "1.7.0"
regex = "1.10.4"
err = { path = "../err" }
netpod = { path = "../netpod" }
parse = { path = "../parse" }

View File

@@ -9,8 +9,8 @@ path = "src/scyllaconn.rs"
[dependencies]
futures-util = "0.3.24"
async-channel = "1.9.0"
scylla = "0.11.0"
async-channel = "2.3.1"
scylla = "0.13.0"
err = { path = "../err" }
netpod = { path = "../netpod" }
query = { path = "../query" }

View File

@@ -7,9 +7,17 @@ use scylla::Session as ScySession;
use std::sync::Arc;
pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result<Arc<ScySession>, Error> {
let scy = create_scy_session_no_ks(scyconf).await?;
scy.use_keyspace(&scyconf.keyspace, true)
.await
.map_err(Error::from_string)?;
let ret = Arc::new(scy);
Ok(ret)
}
pub async fn create_scy_session_no_ks(scyconf: &ScyllaConfig) -> Result<ScySession, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.use_keyspace(&scyconf.keyspace, true)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::LocalOne)
@@ -19,6 +27,5 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result<Arc<ScySession
.build()
.await
.err_conv()?;
let ret = Arc::new(scy);
Ok(ret)
Ok(scy)
}

View File

@@ -5,5 +5,6 @@ pub mod errconv;
pub mod events;
pub mod range;
pub mod status;
pub mod worker;
pub use scylla;

View File

@@ -0,0 +1,84 @@
use crate::conn::create_scy_session_no_ks;
use async_channel::Receiver;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ScyllaConfig;
use scylla::Session;
#[derive(Debug, ThisError)]
pub enum Error {
Error(#[from] err::Error),
ChannelSend,
ChannelRecv,
Join,
}
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
err::Error::from_string(self)
}
}
#[derive(Debug)]
enum Job {
JobA(String, Sender<Result<String, Error>>),
}
#[derive(Debug, Clone)]
pub struct ScyllaQueue {
tx: Sender<Job>,
}
impl ScyllaQueue {
pub async fn job_a(&self, backend: &str) -> Result<Receiver<Result<String, Error>>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::JobA(backend.into(), tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
}
}
#[derive(Debug)]
pub struct ScyllaWorker {
rx: Receiver<Job>,
scy: Session,
// pgjh: Option<JoinHandle<Result<(), err::Error>>>,
}
impl ScyllaWorker {
pub async fn new(scyconf: &ScyllaConfig) -> Result<(ScyllaQueue, Self), Error> {
let (tx, rx) = async_channel::bounded(64);
let scy = create_scy_session_no_ks(scyconf).await?;
let queue = ScyllaQueue { tx };
let worker = Self {
rx,
scy,
// pgjh: Some(pgjh),
};
Ok((queue, worker))
}
pub async fn work(self) -> Result<(), Error> {
loop {
let x = self.rx.recv().await;
let job = match x {
Ok(x) => x,
Err(_) => {
error!("ScyllaWorker can not receive from channel");
return Err(Error::ChannelRecv);
}
};
match job {
Job::JobA(backend, tx) => {
let res = Ok::<_, Error>(backend);
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
}
}
}
}