Move search into sub mod
This commit is contained in:
@@ -8,10 +8,6 @@ pub fn main() -> Result<(), Error> {
|
||||
log::info!("daqingest version {}", clap::crate_version!());
|
||||
let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32);
|
||||
let res = runtime.block_on(async move {
|
||||
if false {
|
||||
return Err(Error::with_msg_no_trace(format!("unknown command")));
|
||||
} else {
|
||||
}
|
||||
match opts.subcmd {
|
||||
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()).await?,
|
||||
SubCmd::ListPkey => daqingest::query::list_pkey().await?,
|
||||
@@ -22,8 +18,11 @@ pub fn main() -> Result<(), Error> {
|
||||
f.run().await?
|
||||
}
|
||||
SubCmd::ChannelAccess(k) => match k {
|
||||
ChannelAccess::CaSearch(k) => netfetch::ca::ca_search(k.into()).await?,
|
||||
ChannelAccess::CaConfig(k) => netfetch::ca::ca_connect(k.into()).await?,
|
||||
ChannelAccess::CaSearch(k) => {
|
||||
let opts = daqingest::CaConfig { config: k.config }.into();
|
||||
netfetch::ca::search::ca_search(opts).await?
|
||||
}
|
||||
ChannelAccess::CaIngest(k) => netfetch::ca::ca_connect(k.into()).await?,
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -76,8 +76,13 @@ pub struct BsreadDump {
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub enum ChannelAccess {
|
||||
CaConfig(CaConfig),
|
||||
CaSearch(CaConfig),
|
||||
CaIngest(CaConfig),
|
||||
CaSearch(CaSearch),
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct CaSearch {
|
||||
pub config: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod conn;
|
||||
pub mod proto;
|
||||
pub mod search;
|
||||
pub mod store;
|
||||
|
||||
use self::conn::FindIocStream;
|
||||
@@ -9,7 +10,7 @@ use conn::CaConn;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use log::*;
|
||||
use netpod::Database;
|
||||
use netpod::{Database, ScyllaConfig};
|
||||
use scylla::batch::Consistency;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
|
||||
@@ -49,7 +50,8 @@ struct ChannelConfig {
|
||||
timeout: Option<u64>,
|
||||
#[serde(default)]
|
||||
abort_after_search: u32,
|
||||
pg_pass: String,
|
||||
pgconf: Database,
|
||||
scyconf: ScyllaConfig,
|
||||
array_truncate: Option<usize>,
|
||||
insert_worker_count: Option<usize>,
|
||||
insert_scylla_sessions: Option<usize>,
|
||||
@@ -91,7 +93,8 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
|
||||
addr_conn: conf.addr_conn,
|
||||
timeout: conf.timeout.unwrap_or(2000),
|
||||
abort_after_search: conf.abort_after_search,
|
||||
pg_pass: conf.pg_pass,
|
||||
pgconf: conf.pgconf,
|
||||
scyconf: conf.scyconf,
|
||||
array_truncate: conf.array_truncate.unwrap_or(512),
|
||||
insert_worker_count: conf.insert_worker_count.unwrap_or(8),
|
||||
insert_scylla_sessions: conf.insert_scylla_sessions.unwrap_or(1),
|
||||
@@ -108,7 +111,8 @@ pub struct CaConnectOpts {
|
||||
pub addr_conn: Ipv4Addr,
|
||||
pub timeout: u64,
|
||||
pub abort_after_search: u32,
|
||||
pub pg_pass: String,
|
||||
pub pgconf: Database,
|
||||
pub scyconf: ScyllaConfig,
|
||||
pub array_truncate: usize,
|
||||
pub insert_worker_count: usize,
|
||||
pub insert_scylla_sessions: usize,
|
||||
@@ -117,136 +121,8 @@ pub struct CaConnectOpts {
|
||||
pub api_bind: String,
|
||||
}
|
||||
|
||||
async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
|
||||
const PORT_DEFAULT: u16 = 5064;
|
||||
let ac = match addr_str.parse::<SocketAddrV4>() {
|
||||
Ok(k) => k,
|
||||
Err(_) => match addr_str.parse::<Ipv4Addr>() {
|
||||
Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT),
|
||||
Err(e) => match tokio::net::lookup_host(&addr_str).await {
|
||||
Ok(k) => {
|
||||
let vs: Vec<_> = k
|
||||
.filter_map(|x| match x {
|
||||
SocketAddr::V4(k) => Some(k),
|
||||
SocketAddr::V6(_) => None,
|
||||
})
|
||||
.collect();
|
||||
if let Some(k) = vs.first() {
|
||||
*k
|
||||
} else {
|
||||
error!("Can not understand name for {:?} {:?}", addr_str, vs);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
return Err(e.into());
|
||||
}
|
||||
},
|
||||
},
|
||||
};
|
||||
Ok(ac)
|
||||
}
|
||||
|
||||
pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let facility = "scylla";
|
||||
let opts = parse_config(opts.config).await?;
|
||||
let d = Database {
|
||||
name: "daqbuffer".into(),
|
||||
host: "sf-nube-11".into(),
|
||||
port: 5432,
|
||||
user: "daqbuffer".into(),
|
||||
pass: opts.pg_pass.clone(),
|
||||
};
|
||||
let (pg_client, pg_conn) = tokio_postgres::connect(
|
||||
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name),
|
||||
tokio_postgres::tls::NoTls,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// TODO join pg_conn in the end:
|
||||
tokio::spawn(pg_conn);
|
||||
let pg_client = Arc::new(pg_client);
|
||||
let qu_insert = {
|
||||
const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT;
|
||||
pg_client
|
||||
.prepare_typed(
|
||||
"insert into ioc_by_channel (facility, channel, searchaddr, addr) values ($1, $2, $3, $4)",
|
||||
&[TEXT, TEXT, TEXT, TEXT],
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
};
|
||||
let qu_select = pg_client
|
||||
.prepare("select addr from ioc_by_channel where facility = $1 and channel = $2 and searchaddr = $3")
|
||||
.await
|
||||
.unwrap();
|
||||
let qu_update = pg_client
|
||||
.prepare("update ioc_by_channel set addr = $4 where facility = $1 and channel = $2 and searchaddr = $3")
|
||||
.await
|
||||
.unwrap();
|
||||
let mut addrs = vec![];
|
||||
for s in &opts.search {
|
||||
let x = resolve_address(s).await?;
|
||||
addrs.push(x);
|
||||
}
|
||||
let mut finder = FindIocStream::new(addrs);
|
||||
for ch in &opts.channels {
|
||||
finder.push(ch.into());
|
||||
}
|
||||
let mut ts_last = Instant::now();
|
||||
loop {
|
||||
let ts_now = Instant::now();
|
||||
if ts_now.duration_since(ts_last) >= Duration::from_millis(1000) {
|
||||
ts_last = ts_now;
|
||||
info!("{}", finder.quick_state());
|
||||
}
|
||||
let k = tokio::time::timeout(Duration::from_millis(200), finder.next()).await;
|
||||
let item = match k {
|
||||
Ok(Some(k)) => k,
|
||||
Ok(None) => {
|
||||
info!("Search stream exhausted");
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let item = match item {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
error!("ca_search {e:?}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
for item in item {
|
||||
let searchaddr = item.src.to_string();
|
||||
let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new());
|
||||
let rows = pg_client
|
||||
.query(&qu_select, &[&facility, &item.channel, &searchaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
if rows.is_empty() {
|
||||
pg_client
|
||||
.execute(&qu_insert, &[&facility, &item.channel, &searchaddr, &addr])
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
let addr2: &str = rows[0].get(0);
|
||||
if addr2 != addr {
|
||||
pg_client
|
||||
.execute(&qu_update, &[&facility, &item.channel, &searchaddr, &addr])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn spawn_scylla_insert_workers(
|
||||
scyconf: ScyllaConfig,
|
||||
insert_scylla_sessions: usize,
|
||||
insert_worker_count: usize,
|
||||
insert_item_queue: &CommonInsertItemQueue,
|
||||
@@ -257,9 +133,9 @@ async fn spawn_scylla_insert_workers(
|
||||
let mut data_stores = vec![];
|
||||
for _ in 0..insert_scylla_sessions {
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_node("sf-nube-14:19042")
|
||||
.known_nodes(&scyconf.hosts)
|
||||
.default_consistency(Consistency::One)
|
||||
.use_keyspace("ks1", true)
|
||||
.use_keyspace(&scyconf.keyspace, true)
|
||||
.build()
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
@@ -358,20 +234,21 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let insert_frac = Arc::new(AtomicU64::new(1000));
|
||||
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
|
||||
let opts = parse_config(opts.config).await?;
|
||||
let scyconf = opts.scyconf.clone();
|
||||
tokio::spawn(start_metrics_service(
|
||||
opts.api_bind.clone(),
|
||||
insert_frac.clone(),
|
||||
insert_ivl_min.clone(),
|
||||
));
|
||||
let d = Database {
|
||||
name: "daqbuffer".into(),
|
||||
host: "sf-nube-11".into(),
|
||||
port: 5432,
|
||||
user: "daqbuffer".into(),
|
||||
pass: opts.pg_pass.clone(),
|
||||
name: opts.pgconf.name.clone(),
|
||||
host: opts.pgconf.host.clone(),
|
||||
port: opts.pgconf.port.clone(),
|
||||
user: opts.pgconf.user.clone(),
|
||||
pass: opts.pgconf.pass.clone(),
|
||||
};
|
||||
let (pg_client, pg_conn) = tokio_postgres::connect(
|
||||
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name),
|
||||
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
|
||||
tokio_postgres::tls::NoTls,
|
||||
)
|
||||
.await
|
||||
@@ -381,21 +258,18 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let pg_client = Arc::new(pg_client);
|
||||
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_node("sf-nube-14:19042")
|
||||
.known_nodes(&scyconf.hosts)
|
||||
.default_consistency(Consistency::One)
|
||||
.use_keyspace("ks1", true)
|
||||
.use_keyspace(scyconf.keyspace, true)
|
||||
.build()
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let scy = Arc::new(scy);
|
||||
|
||||
// TODO use new struct:
|
||||
let local_stats = Arc::new(CaConnStats::new());
|
||||
|
||||
// TODO factor the find loop into a separate Stream.
|
||||
info!("FIND IOCS");
|
||||
let qu_find_addr = pg_client
|
||||
.prepare("select t2.channel, t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9)")
|
||||
.prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let mut channels_by_host = BTreeMap::new();
|
||||
@@ -421,8 +295,8 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
error!("can not find any addresses of channels {:?}", chstmp);
|
||||
} else {
|
||||
for row in rows {
|
||||
let ch: &str = row.get(0);
|
||||
let addr: &str = row.get(1);
|
||||
let ch: &str = row.get(1);
|
||||
let addr: &str = row.get(2);
|
||||
if addr == "" {
|
||||
// TODO the address was searched before but could not be found.
|
||||
} else {
|
||||
@@ -459,6 +333,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let store_stats = Arc::new(CaConnStats::new());
|
||||
|
||||
spawn_scylla_insert_workers(
|
||||
opts.scyconf.clone(),
|
||||
opts.insert_scylla_sessions,
|
||||
opts.insert_worker_count,
|
||||
&insert_item_queue,
|
||||
|
||||
138
netfetch/src/ca/search.rs
Normal file
138
netfetch/src/ca/search.rs
Normal file
@@ -0,0 +1,138 @@
|
||||
use crate::ca::{parse_config, ListenFromFileOpts};
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use log::*;
|
||||
use netpod::Database;
|
||||
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::conn::FindIocStream;
|
||||
|
||||
async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
|
||||
const PORT_DEFAULT: u16 = 5064;
|
||||
let ac = match addr_str.parse::<SocketAddrV4>() {
|
||||
Ok(k) => k,
|
||||
Err(_) => match addr_str.parse::<Ipv4Addr>() {
|
||||
Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT),
|
||||
Err(e) => match tokio::net::lookup_host(&addr_str).await {
|
||||
Ok(k) => {
|
||||
let vs: Vec<_> = k
|
||||
.filter_map(|x| match x {
|
||||
SocketAddr::V4(k) => Some(k),
|
||||
SocketAddr::V6(_) => None,
|
||||
})
|
||||
.collect();
|
||||
if let Some(k) = vs.first() {
|
||||
*k
|
||||
} else {
|
||||
error!("Can not understand name for {:?} {:?}", addr_str, vs);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
return Err(e.into());
|
||||
}
|
||||
},
|
||||
},
|
||||
};
|
||||
Ok(ac)
|
||||
}
|
||||
|
||||
pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let facility = "scylla";
|
||||
let opts = parse_config(opts.config).await?;
|
||||
let d = Database {
|
||||
name: opts.pgconf.name.clone(),
|
||||
host: opts.pgconf.host.clone(),
|
||||
port: opts.pgconf.port.clone(),
|
||||
user: opts.pgconf.user.clone(),
|
||||
pass: opts.pgconf.pass.clone(),
|
||||
};
|
||||
let (pg_client, pg_conn) = tokio_postgres::connect(
|
||||
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
|
||||
tokio_postgres::tls::NoTls,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// TODO join pg_conn in the end:
|
||||
tokio::spawn(pg_conn);
|
||||
let pg_client = Arc::new(pg_client);
|
||||
let qu_select = pg_client
|
||||
.prepare("select addr from ioc_by_channel where facility = $1 and channel = $2 and searchaddr = $3")
|
||||
.await
|
||||
.unwrap();
|
||||
let qu_insert = {
|
||||
const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT;
|
||||
pg_client
|
||||
.prepare_typed(
|
||||
"insert into ioc_by_channel (facility, channel, searchaddr, addr) values ($1, $2, $3, $4) on conflict do nothing",
|
||||
&[TEXT, TEXT, TEXT, TEXT],
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
};
|
||||
let qu_update = pg_client
|
||||
.prepare("update ioc_by_channel set addr = $4, tsmod = now(), modcount = modcount + 1 where facility = $1 and channel = $2 and searchaddr = $3")
|
||||
.await
|
||||
.unwrap();
|
||||
let mut addrs = vec![];
|
||||
for s in &opts.search {
|
||||
let x = resolve_address(s).await?;
|
||||
addrs.push(x);
|
||||
}
|
||||
let mut finder = FindIocStream::new(addrs);
|
||||
for ch in &opts.channels {
|
||||
finder.push(ch.into());
|
||||
}
|
||||
let mut ts_last = Instant::now();
|
||||
loop {
|
||||
let ts_now = Instant::now();
|
||||
if ts_now.duration_since(ts_last) >= Duration::from_millis(1000) {
|
||||
ts_last = ts_now;
|
||||
info!("{}", finder.quick_state());
|
||||
}
|
||||
let k = tokio::time::timeout(Duration::from_millis(200), finder.next()).await;
|
||||
let item = match k {
|
||||
Ok(Some(k)) => k,
|
||||
Ok(None) => {
|
||||
info!("Search stream exhausted");
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let item = match item {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
error!("ca_search {e:?}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
for item in item {
|
||||
let searchaddr = item.src.to_string();
|
||||
let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new());
|
||||
let rows = pg_client
|
||||
.query(&qu_select, &[&facility, &item.channel, &searchaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
if rows.is_empty() {
|
||||
pg_client
|
||||
.execute(&qu_insert, &[&facility, &item.channel, &searchaddr, &addr])
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
let addr2: &str = rows[0].get(0);
|
||||
if addr2 != addr {
|
||||
pg_client
|
||||
.execute(&qu_update, &[&facility, &item.channel, &searchaddr, &addr])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user