From 27e93f253902e06461df60708c1955283ac890af Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 20 Jul 2022 06:51:54 +0200 Subject: [PATCH] Move search into sub mod --- daqingest/src/bin/daqingest.rs | 11 +-- daqingest/src/daqingest.rs | 9 +- netfetch/src/ca.rs | 173 +++++---------------------------- netfetch/src/ca/search.rs | 138 ++++++++++++++++++++++++++ 4 files changed, 174 insertions(+), 157 deletions(-) create mode 100644 netfetch/src/ca/search.rs diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index f58cdb4..52e0588 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -8,10 +8,6 @@ pub fn main() -> Result<(), Error> { log::info!("daqingest version {}", clap::crate_version!()); let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32); let res = runtime.block_on(async move { - if false { - return Err(Error::with_msg_no_trace(format!("unknown command"))); - } else { - } match opts.subcmd { SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()).await?, SubCmd::ListPkey => daqingest::query::list_pkey().await?, @@ -22,8 +18,11 @@ pub fn main() -> Result<(), Error> { f.run().await? } SubCmd::ChannelAccess(k) => match k { - ChannelAccess::CaSearch(k) => netfetch::ca::ca_search(k.into()).await?, - ChannelAccess::CaConfig(k) => netfetch::ca::ca_connect(k.into()).await?, + ChannelAccess::CaSearch(k) => { + let opts = daqingest::CaConfig { config: k.config }.into(); + netfetch::ca::search::ca_search(opts).await? + } + ChannelAccess::CaIngest(k) => netfetch::ca::ca_connect(k.into()).await?, }, } Ok(()) diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 6cc58f4..6646fef 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -76,8 +76,13 @@ pub struct BsreadDump { #[derive(Debug, Parser)] pub enum ChannelAccess { - CaConfig(CaConfig), - CaSearch(CaConfig), + CaIngest(CaConfig), + CaSearch(CaSearch), +} + +#[derive(Debug, Parser)] +pub struct CaSearch { + pub config: String, } #[derive(Debug, Parser)] diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 202ff31..b033d9e 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,5 +1,6 @@ pub mod conn; pub mod proto; +pub mod search; pub mod store; use self::conn::FindIocStream; @@ -9,7 +10,7 @@ use conn::CaConn; use err::Error; use futures_util::StreamExt; use log::*; -use netpod::Database; +use netpod::{Database, ScyllaConfig}; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; @@ -49,7 +50,8 @@ struct ChannelConfig { timeout: Option, #[serde(default)] abort_after_search: u32, - pg_pass: String, + pgconf: Database, + scyconf: ScyllaConfig, array_truncate: Option, insert_worker_count: Option, insert_scylla_sessions: Option, @@ -91,7 +93,8 @@ pub async fn parse_config(config: PathBuf) -> Result { addr_conn: conf.addr_conn, timeout: conf.timeout.unwrap_or(2000), abort_after_search: conf.abort_after_search, - pg_pass: conf.pg_pass, + pgconf: conf.pgconf, + scyconf: conf.scyconf, array_truncate: conf.array_truncate.unwrap_or(512), insert_worker_count: conf.insert_worker_count.unwrap_or(8), insert_scylla_sessions: conf.insert_scylla_sessions.unwrap_or(1), @@ -108,7 +111,8 @@ pub struct CaConnectOpts { pub addr_conn: Ipv4Addr, pub timeout: u64, pub abort_after_search: u32, - pub pg_pass: String, + pub pgconf: Database, + pub scyconf: ScyllaConfig, pub array_truncate: usize, pub insert_worker_count: usize, pub insert_scylla_sessions: usize, @@ -117,136 +121,8 @@ pub struct CaConnectOpts { pub api_bind: String, } -async fn resolve_address(addr_str: &str) -> Result { - const PORT_DEFAULT: u16 = 5064; - let ac = match addr_str.parse::() { - Ok(k) => k, - Err(_) => match addr_str.parse::() { - Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT), - Err(e) => match tokio::net::lookup_host(&addr_str).await { - Ok(k) => { - let vs: Vec<_> = k - .filter_map(|x| match x { - SocketAddr::V4(k) => Some(k), - SocketAddr::V6(_) => None, - }) - .collect(); - if let Some(k) = vs.first() { - *k - } else { - error!("Can not understand name for {:?} {:?}", addr_str, vs); - return Err(e.into()); - } - } - Err(e) => { - error!("{e:?}"); - return Err(e.into()); - } - }, - }, - }; - Ok(ac) -} - -pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { - let facility = "scylla"; - let opts = parse_config(opts.config).await?; - let d = Database { - name: "daqbuffer".into(), - host: "sf-nube-11".into(), - port: 5432, - user: "daqbuffer".into(), - pass: opts.pg_pass.clone(), - }; - let (pg_client, pg_conn) = tokio_postgres::connect( - &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name), - tokio_postgres::tls::NoTls, - ) - .await - .unwrap(); - // TODO join pg_conn in the end: - tokio::spawn(pg_conn); - let pg_client = Arc::new(pg_client); - let qu_insert = { - const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; - pg_client - .prepare_typed( - "insert into ioc_by_channel (facility, channel, searchaddr, addr) values ($1, $2, $3, $4)", - &[TEXT, TEXT, TEXT, TEXT], - ) - .await - .unwrap() - }; - let qu_select = pg_client - .prepare("select addr from ioc_by_channel where facility = $1 and channel = $2 and searchaddr = $3") - .await - .unwrap(); - let qu_update = pg_client - .prepare("update ioc_by_channel set addr = $4 where facility = $1 and channel = $2 and searchaddr = $3") - .await - .unwrap(); - let mut addrs = vec![]; - for s in &opts.search { - let x = resolve_address(s).await?; - addrs.push(x); - } - let mut finder = FindIocStream::new(addrs); - for ch in &opts.channels { - finder.push(ch.into()); - } - let mut ts_last = Instant::now(); - loop { - let ts_now = Instant::now(); - if ts_now.duration_since(ts_last) >= Duration::from_millis(1000) { - ts_last = ts_now; - info!("{}", finder.quick_state()); - } - let k = tokio::time::timeout(Duration::from_millis(200), finder.next()).await; - let item = match k { - Ok(Some(k)) => k, - Ok(None) => { - info!("Search stream exhausted"); - break; - } - Err(_) => { - continue; - } - }; - let item = match item { - Ok(k) => k, - Err(e) => { - error!("ca_search {e:?}"); - continue; - } - }; - for item in item { - let searchaddr = item.src.to_string(); - let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new()); - let rows = pg_client - .query(&qu_select, &[&facility, &item.channel, &searchaddr]) - .await - .unwrap(); - if rows.is_empty() { - pg_client - .execute(&qu_insert, &[&facility, &item.channel, &searchaddr, &addr]) - .await - .unwrap(); - } else { - let addr2: &str = rows[0].get(0); - if addr2 != addr { - pg_client - .execute(&qu_update, &[&facility, &item.channel, &searchaddr, &addr]) - .await - .unwrap(); - } - } - } - tokio::time::sleep(Duration::from_millis(1)).await; - } - Ok(()) -} - async fn spawn_scylla_insert_workers( + scyconf: ScyllaConfig, insert_scylla_sessions: usize, insert_worker_count: usize, insert_item_queue: &CommonInsertItemQueue, @@ -257,9 +133,9 @@ async fn spawn_scylla_insert_workers( let mut data_stores = vec![]; for _ in 0..insert_scylla_sessions { let scy = scylla::SessionBuilder::new() - .known_node("sf-nube-14:19042") + .known_nodes(&scyconf.hosts) .default_consistency(Consistency::One) - .use_keyspace("ks1", true) + .use_keyspace(&scyconf.keyspace, true) .build() .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; @@ -358,20 +234,21 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let insert_frac = Arc::new(AtomicU64::new(1000)); let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let opts = parse_config(opts.config).await?; + let scyconf = opts.scyconf.clone(); tokio::spawn(start_metrics_service( opts.api_bind.clone(), insert_frac.clone(), insert_ivl_min.clone(), )); let d = Database { - name: "daqbuffer".into(), - host: "sf-nube-11".into(), - port: 5432, - user: "daqbuffer".into(), - pass: opts.pg_pass.clone(), + name: opts.pgconf.name.clone(), + host: opts.pgconf.host.clone(), + port: opts.pgconf.port.clone(), + user: opts.pgconf.user.clone(), + pass: opts.pgconf.pass.clone(), }; let (pg_client, pg_conn) = tokio_postgres::connect( - &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name), + &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), tokio_postgres::tls::NoTls, ) .await @@ -381,21 +258,18 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let pg_client = Arc::new(pg_client); let scy = scylla::SessionBuilder::new() - .known_node("sf-nube-14:19042") + .known_nodes(&scyconf.hosts) .default_consistency(Consistency::One) - .use_keyspace("ks1", true) + .use_keyspace(scyconf.keyspace, true) .build() .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); - // TODO use new struct: let local_stats = Arc::new(CaConnStats::new()); - // TODO factor the find loop into a separate Stream. - info!("FIND IOCS"); let qu_find_addr = pg_client - .prepare("select t2.channel, t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9)") + .prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let mut channels_by_host = BTreeMap::new(); @@ -421,8 +295,8 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { error!("can not find any addresses of channels {:?}", chstmp); } else { for row in rows { - let ch: &str = row.get(0); - let addr: &str = row.get(1); + let ch: &str = row.get(1); + let addr: &str = row.get(2); if addr == "" { // TODO the address was searched before but could not be found. } else { @@ -459,6 +333,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let store_stats = Arc::new(CaConnStats::new()); spawn_scylla_insert_workers( + opts.scyconf.clone(), opts.insert_scylla_sessions, opts.insert_worker_count, &insert_item_queue, diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs new file mode 100644 index 0000000..3e4aa95 --- /dev/null +++ b/netfetch/src/ca/search.rs @@ -0,0 +1,138 @@ +use crate::ca::{parse_config, ListenFromFileOpts}; +use err::Error; +use futures_util::StreamExt; +use log::*; +use netpod::Database; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use super::conn::FindIocStream; + +async fn resolve_address(addr_str: &str) -> Result { + const PORT_DEFAULT: u16 = 5064; + let ac = match addr_str.parse::() { + Ok(k) => k, + Err(_) => match addr_str.parse::() { + Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT), + Err(e) => match tokio::net::lookup_host(&addr_str).await { + Ok(k) => { + let vs: Vec<_> = k + .filter_map(|x| match x { + SocketAddr::V4(k) => Some(k), + SocketAddr::V6(_) => None, + }) + .collect(); + if let Some(k) = vs.first() { + *k + } else { + error!("Can not understand name for {:?} {:?}", addr_str, vs); + return Err(e.into()); + } + } + Err(e) => { + error!("{e:?}"); + return Err(e.into()); + } + }, + }, + }; + Ok(ac) +} + +pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { + let facility = "scylla"; + let opts = parse_config(opts.config).await?; + let d = Database { + name: opts.pgconf.name.clone(), + host: opts.pgconf.host.clone(), + port: opts.pgconf.port.clone(), + user: opts.pgconf.user.clone(), + pass: opts.pgconf.pass.clone(), + }; + let (pg_client, pg_conn) = tokio_postgres::connect( + &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), + tokio_postgres::tls::NoTls, + ) + .await + .unwrap(); + // TODO join pg_conn in the end: + tokio::spawn(pg_conn); + let pg_client = Arc::new(pg_client); + let qu_select = pg_client + .prepare("select addr from ioc_by_channel where facility = $1 and channel = $2 and searchaddr = $3") + .await + .unwrap(); + let qu_insert = { + const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; + pg_client + .prepare_typed( + "insert into ioc_by_channel (facility, channel, searchaddr, addr) values ($1, $2, $3, $4) on conflict do nothing", + &[TEXT, TEXT, TEXT, TEXT], + ) + .await + .unwrap() + }; + let qu_update = pg_client + .prepare("update ioc_by_channel set addr = $4, tsmod = now(), modcount = modcount + 1 where facility = $1 and channel = $2 and searchaddr = $3") + .await + .unwrap(); + let mut addrs = vec![]; + for s in &opts.search { + let x = resolve_address(s).await?; + addrs.push(x); + } + let mut finder = FindIocStream::new(addrs); + for ch in &opts.channels { + finder.push(ch.into()); + } + let mut ts_last = Instant::now(); + loop { + let ts_now = Instant::now(); + if ts_now.duration_since(ts_last) >= Duration::from_millis(1000) { + ts_last = ts_now; + info!("{}", finder.quick_state()); + } + let k = tokio::time::timeout(Duration::from_millis(200), finder.next()).await; + let item = match k { + Ok(Some(k)) => k, + Ok(None) => { + info!("Search stream exhausted"); + break; + } + Err(_) => { + continue; + } + }; + let item = match item { + Ok(k) => k, + Err(e) => { + error!("ca_search {e:?}"); + continue; + } + }; + for item in item { + let searchaddr = item.src.to_string(); + let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new()); + let rows = pg_client + .query(&qu_select, &[&facility, &item.channel, &searchaddr]) + .await + .unwrap(); + if rows.is_empty() { + pg_client + .execute(&qu_insert, &[&facility, &item.channel, &searchaddr, &addr]) + .await + .unwrap(); + } else { + let addr2: &str = rows[0].get(0); + if addr2 != addr { + pg_client + .execute(&qu_update, &[&facility, &item.channel, &searchaddr, &addr]) + .await + .unwrap(); + } + } + } + } + Ok(()) +}