This commit is contained in:
Dominik Werder
2023-11-01 09:57:39 +01:00
parent f7db475b30
commit 35f0bcb41f
10 changed files with 91 additions and 529 deletions

View File

@@ -47,13 +47,16 @@ impl Stream for InputMerge {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let selfp = self.project();
let ret = {
if let Some(inp) = selfp.inp3.as_pin_mut() {
let mut selfp = self.as_mut().project();
if let Some(inp) = selfp.inp3.as_mut().as_pin_mut() {
match inp.poll_next(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())),
Ready(None) => {
// self.inp3 = None;
unsafe {
// TODO what guarantees that I can drop the content here like this?
self.as_mut().get_unchecked_mut().inp3 = None;
}
None
}
Pending => None,
@@ -65,11 +68,15 @@ impl Stream for InputMerge {
let ret = if let Some(x) = ret {
Some(x)
} else {
if let Some(inp) = selfp.inp2.as_pin_mut() {
let mut selfp = self.as_mut().project();
if let Some(inp) = selfp.inp2.as_mut().as_pin_mut() {
match inp.poll_next(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())),
Ready(None) => {
// self.inp2 = None;
unsafe {
// TODO what guarantees that I can drop the content here like this?
self.as_mut().get_unchecked_mut().inp2 = None;
}
None
}
Pending => None,
@@ -81,11 +88,15 @@ impl Stream for InputMerge {
if let Some(x) = ret {
Ready(Some(x))
} else {
if let Some(inp) = selfp.inp1.as_pin_mut() {
let mut selfp = self.as_mut().project();
if let Some(inp) = selfp.inp1.as_mut().as_pin_mut() {
match inp.poll_next(cx) {
Ready(Some(x)) => Ready(Some(x)),
Ready(None) => {
// self.inp1 = None;
unsafe {
// TODO what guarantees that I can drop the content here like this?
self.as_mut().get_unchecked_mut().inp1 = None;
}
Ready(None)
}
Pending => Pending,

View File

@@ -217,7 +217,7 @@ async fn finder_worker_single(
items.extend(to_add.into_iter());
let items = items;
for e in &items {
if crate::ca::connset::trigger.contains(&e.channel.as_str()) {
if true || crate::ca::connset::trigger.contains(&e.channel.as_str()) {
debug!("found in database: {e:?}");
}
}
@@ -262,6 +262,7 @@ async fn finder_network_if_not_found(
let mut res = VecDeque::new();
let mut net = VecDeque::new();
for e in item {
trace!("finder_network_if_not_found sees {e:?}");
if e.addr.is_none() {
net.push_back(e.channel);
} else {

View File

@@ -402,13 +402,14 @@ impl CaMsgTy {
fn payload_len(&self) -> usize {
use CaMsgTy::*;
trace!("payload_len for {self:?}");
match self {
Version => 0,
VersionRes(_) => 0,
Error(x) => (16 + x.msg.len() + 1 + 7) / 8 * 8,
ClientName => 0x10,
ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8,
HostName(_) => 0x18,
HostName(x) => (x.len() + 1 + 7) / 8 * 8,
Search(x) => (x.channel.len() + 1 + 7) / 8 * 8,
SearchRes(_) => 8,
CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8,

View File

@@ -22,7 +22,7 @@ pub struct CaIngestOpts {
whitelist: Option<String>,
blacklist: Option<String>,
max_simul: Option<usize>,
#[serde(with = "humantime_serde")]
#[serde(default, with = "humantime_serde")]
timeout: Option<Duration>,
postgresql: Database,
scylla: ScyllaConfig,
@@ -35,13 +35,13 @@ pub struct CaIngestOpts {
store_workers_rate: Option<u64>,
insert_frac: Option<u64>,
use_rate_limit_queue: Option<bool>,
#[serde(with = "humantime_serde")]
#[serde(default, with = "humantime_serde")]
ttl_index: Option<Duration>,
#[serde(with = "humantime_serde")]
#[serde(default, with = "humantime_serde")]
ttl_d0: Option<Duration>,
#[serde(with = "humantime_serde")]
#[serde(default, with = "humantime_serde")]
ttl_d1: Option<Duration>,
#[serde(with = "humantime_serde")]
#[serde(default, with = "humantime_serde")]
ttl_binned: Option<Duration>,
pub test_bsread_addr: Option<String>,
}
@@ -138,8 +138,7 @@ impl CaIngestOpts {
fn parse_config_minimal() {
let conf = r###"
backend: scylla
ttl_d1: 10m 3s
ttl_binned: 70d
ttl_d1: 10m 3s 45ms
api_bind: "0.0.0.0:3011"
channels: /some/path/file.txt
search:
@@ -158,14 +157,13 @@ scylla:
keyspace: ks1
"###;
let res: Result<CaIngestOpts, _> = serde_yaml::from_slice(conf.as_bytes());
assert_eq!(res.is_ok(), true);
let conf = res.unwrap();
assert_eq!(conf.channels, PathBuf::from("/some/path/file.txt"));
assert_eq!(conf.api_bind, Some("0.0.0.0:3011".to_string()));
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string()));
assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
assert_eq!(conf.ttl_binned, Some(Duration::from_secs(60 * 60 * 70)));
assert_eq!(conf.ttl_binned, None);
}
#[test]