diff --git a/daqingest/src/query.rs b/daqingest/src/query.rs index a0a4f3f..156d379 100644 --- a/daqingest/src/query.rs +++ b/daqingest/src/query.rs @@ -27,7 +27,7 @@ impl From for Error { pub async fn list_pkey() -> Result<(), Error> { let scy = SessionBuilder::new() .known_node("127.0.0.1:19042") - .default_consistency(Consistency::One) + .default_consistency(Consistency::LocalOne) .use_keyspace("ks1", true) .build() .await?; @@ -68,7 +68,7 @@ pub async fn list_pkey() -> Result<(), Error> { pub async fn list_pulses() -> Result<(), Error> { let scy = SessionBuilder::new() .known_node("127.0.0.1:19042") - .default_consistency(Consistency::One) + .default_consistency(Consistency::LocalOne) .use_keyspace("ks1", true) .build() .await?; @@ -108,7 +108,7 @@ pub async fn list_pulses() -> Result<(), Error> { pub async fn fetch_events(opts: FetchEvents) -> Result<(), Error> { let scy = SessionBuilder::new() .known_nodes(&opts.scylla) - .default_consistency(Consistency::One) + .default_consistency(Consistency::LocalOne) .use_keyspace("ks1", true) .build() .await?; diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index fb74f70..ffc8418 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -68,13 +68,63 @@ struct ChannelConfig { insert_queue_max: Option, insert_item_queue_cap: Option, api_bind: Option, - local_epics_hostname: String, + local_epics_hostname: Option, +} + +#[test] +fn parse_config_minimal() { + let conf = r###" +backend: scylla +api_bind: 0.0.0.0:3011 +channels: + - CHANNEL-1:A + - CHANNEL-1:B + - CHANNEL-2:A +search: + - 172.26.0.255 + - 172.26.2.255 +postgresql: + host: host.example.com + port: 5432 + user: USER + pass: PASS + name: NAME +scylla: + hosts: + - sf-nube-11:19042 + - sf-nube-12:19042 + keyspace: ks1 +"###; + let res: Result = serde_yaml::from_slice(conf.as_bytes()); + assert_eq!(res.is_ok(), true); + let conf = res.unwrap(); + assert_eq!(conf.api_bind, Some("0.0.0.0:3011".to_string())); + assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); + assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string())); } pub struct ListenFromFileOpts { pub config: PathBuf, } +fn local_hostname() -> String { + let mut buf = vec![0u8; 128]; + let hostname = unsafe { + let ec = libc::gethostname(buf.as_mut_ptr() as _, buf.len() - 2); + if ec != 0 { + panic!(); + } + let hostname = CStr::from_ptr(&buf[0] as *const _ as _); + hostname.to_str().unwrap() + }; + hostname.into() +} + +#[test] +fn test_get_local_hostname() { + assert_ne!(local_hostname().len(), 0); +} + pub async fn parse_config(config: PathBuf) -> Result { let mut file = OpenOptions::new().read(true).open(config).await?; let mut buf = vec![]; @@ -103,7 +153,7 @@ pub async fn parse_config(config: PathBuf) -> Result { search_blacklist: conf.search_blacklist, addr_bind: conf.addr_bind.unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))), addr_conn: conf.addr_conn.unwrap_or(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255))), - timeout: conf.timeout.unwrap_or(2000), + timeout: conf.timeout.unwrap_or(1200), pgconf: conf.postgresql, scyconf: conf.scylla, array_truncate: conf.array_truncate.unwrap_or(512), @@ -112,7 +162,7 @@ pub async fn parse_config(config: PathBuf) -> Result { insert_queue_max: conf.insert_queue_max.unwrap_or(64), insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(200000), api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()), - local_epics_hostname: conf.local_epics_hostname, + local_epics_hostname: conf.local_epics_hostname.unwrap_or_else(local_hostname), }) } @@ -724,8 +774,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { futures_util::select! { a = jh => match a { Ok(k) => match k { - Ok(_) => { - } + Ok(_) => {} Err(e) => { error!("{e:?}"); } @@ -772,9 +821,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { loop { futures_util::select!( x = futs.next() => match x { - Some(Ok(_)) => { - info!("waiting for {} inserts", futs.len()); - } + Some(Ok(_)) => {} Some(Err(e)) => { error!("error on shutdown: {e:?}"); } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index db9b411..4c65295 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -754,8 +754,9 @@ impl CaConn { ) -> Result<(), Error> { // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. - let (ts_msp, ts_msp_changed) = if inserted_in_ts_msp >= 20000 { - let ts_msp = ts / (10 * SEC) * (10 * SEC); + let (ts_msp, ts_msp_changed) = if inserted_in_ts_msp >= 64000 || st.ts_msp_last + HOUR <= ts { + let div = SEC * 10; + let ts_msp = ts / div * div; if ts_msp == st.ts_msp_last { (ts_msp, false) } else { diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 6f471e5..5f8730d 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -62,7 +62,7 @@ impl DataStore { pub async fn new(scyconf: &ScyllaConfig, pg_client: Arc) -> Result { let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) - .default_consistency(Consistency::One) + .default_consistency(Consistency::LocalOne) .use_keyspace(&scyconf.keyspace, true) .build() .await