This commit is contained in:
Dominik Werder
2023-01-20 06:24:49 +01:00
parent 54992684f5
commit 3a23fa4275
7 changed files with 136 additions and 124 deletions

View File

@@ -28,10 +28,6 @@ pub fn main() -> Result<(), Error> {
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
let (conf, channels) = parse_config(k.config.into()).await?;
netfetch::ca::ca_connect(conf, &channels).await?
}
ChannelAccess::CaIngestNew(k) => {
let (conf, channels) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels).await?
}

View File

@@ -4,7 +4,6 @@ use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
use log::*;
use netfetch::ca::conn::CaConnEvent;
use netfetch::ca::conn::ConnCommand;
use netfetch::ca::connset::CaConnSet;
use netfetch::ca::findioc::FindIocRes;
@@ -13,6 +12,8 @@ use netfetch::ca::store::DataStore;
use netfetch::ca::IngestCommons;
use netfetch::ca::SlowWarnable;
use netfetch::conf::CaIngestOpts;
use netfetch::daemon_common::Channel;
use netfetch::daemon_common::DaemonEvent;
use netfetch::errconv::ErrConv;
use netfetch::insertworker::Ttls;
use netfetch::metrics::ExtraInsertsConf;
@@ -82,21 +83,6 @@ macro_rules! trace_batch {
});
}
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)]
pub struct Channel {
id: String,
}
impl Channel {
pub fn new(id: String) -> Self {
Self { id }
}
pub fn id(&self) -> &str {
&self.id
}
}
#[allow(non_snake_case)]
mod serde_Instant {
use serde::Serializer;
@@ -177,37 +163,6 @@ pub struct CaConnState {
value: CaConnStateValue,
}
#[derive(Debug)]
pub enum DaemonEvent {
TimerTick,
ChannelAdd(Channel),
ChannelRemove(Channel),
SearchDone(Result<VecDeque<FindIocRes>, Error>),
CaConnEvent(SocketAddrV4, CaConnEvent),
}
impl DaemonEvent {
pub fn summary(&self) -> String {
use DaemonEvent::*;
match self {
TimerTick => format!("TimerTick"),
ChannelAdd(x) => format!("ChannelAdd {x:?}"),
ChannelRemove(x) => format!("ChannelRemove {x:?}"),
SearchDone(_x) => format!("SearchDone"),
CaConnEvent(_a, b) => {
use netfetch::ca::conn::CaConnEventValue::*;
match &b.value {
None => format!("CaConnEvent/None"),
EchoTimeout => format!("CaConnEvent/EchoTimeout"),
HealthCheckDone => format!("CaConnEvent/HealthCheckDone"),
ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"),
EndOfStream => format!("CaConnEvent/EndOfStream"),
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct DaemonOpts {
backend: String,
@@ -1186,13 +1141,10 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
netfetch::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigint)?;
netfetch::linuxhelper::set_signal_handler(libc::SIGTERM, handler_sigterm)?;
let dcom = Arc::new(netfetch::metrics::DaemonComm::dummy());
netfetch::metrics::start_metrics_service(opts.api_bind(), dcom);
// TODO use a new stats type:
let store_stats = Arc::new(CaConnStats::new());
let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
let metrics_agg_jh = tokio::spawn(metrics_agg_fut);
//let store_stats = Arc::new(CaConnStats::new());
//let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
//let metrics_agg_jh = tokio::spawn(metrics_agg_fut);
let opts2 = DaemonOpts {
backend: opts.backend().into(),
@@ -1209,6 +1161,10 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
};
let mut daemon = Daemon::new(opts2).await?;
let tx = daemon.tx.clone();
let dcom = Arc::new(netfetch::metrics::DaemonComm::dummy());
netfetch::metrics::start_metrics_service(opts.api_bind(), dcom);
let daemon_jh = taskrun::spawn(async move {
// TODO handle Err
daemon.daemon().await.unwrap();

View File

@@ -79,7 +79,6 @@ pub struct BsreadDump {
#[derive(Debug, Parser)]
pub enum ChannelAccess {
CaIngest(CaConfig),
CaIngestNew(CaConfig),
CaSearch(CaSearch),
}

View File

@@ -59,8 +59,68 @@ async fn prepare_pgcs(sql: &str, pgcn: usize, db: &Database) -> Result<(Sender<P
Ok((pgc_tx, pgc_rx))
}
async fn fetch_data(pgres: PgRes) -> Result<ChannelInfoResult, Error> {
err::todoval()
async fn fetch_data(batch: Vec<ChannelInfoQuery>, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> {
let mut backend = Vec::new();
let mut channel = Vec::new();
let mut scalar_type = Vec::new();
let mut shape_dims: Vec<String> = Vec::new();
let mut rid = Vec::new();
let mut tx = Vec::new();
for (i, e) in batch.into_iter().enumerate() {
backend.push(e.backend);
channel.push(e.channel);
scalar_type.push(e.scalar_type);
let mut dims = String::with_capacity(16);
dims.push('{');
for (i, v) in e.shape_dims.into_iter().enumerate() {
if i > 0 {
dims.push(',');
}
use std::fmt::Write;
write!(dims, "{}", v).unwrap();
}
dims.push('}');
shape_dims.push(dims);
rid.push(i as i32);
tx.push((i as u32, e.tx));
}
match pgres
.pgc
.query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims, &rid])
.await
.map_err(|e| {
error!("{e}");
Error::from(e.to_string())
}) {
Ok(rows) => {
let mut series_ids = Vec::new();
let mut txs = Vec::new();
let mut it1 = rows.into_iter();
let mut e1 = it1.next();
for (qrid, tx) in tx {
if let Some(row) = &e1 {
let rid: i32 = row.get(1);
if rid as u32 == qrid {
let series: i64 = row.get(0);
let series = SeriesId::new(series as _);
series_ids.push(Existence::Existing(series));
txs.push(tx);
}
e1 = it1.next();
}
}
let result = ChannelInfoResult {
series: series_ids,
tx: txs,
};
Ok((result, pgres))
}
Err(e) => {
error!("error in pg query {e}");
tokio::time::sleep(Duration::from_millis(2000)).await;
Err(e)
}
}
}
async fn run_queries(
@@ -75,70 +135,12 @@ async fn run_queries(
let pgc_tx = pgc_tx.clone();
async move {
if let Ok(pgres) = pgc_rx.recv().await {
let mut backend = Vec::new();
let mut channel = Vec::new();
let mut scalar_type = Vec::new();
let mut shape_dims: Vec<String> = Vec::new();
let mut rid = Vec::new();
let mut tx = Vec::new();
for (i, e) in batch.into_iter().enumerate() {
backend.push(e.backend);
channel.push(e.channel);
scalar_type.push(e.scalar_type);
let mut dims = String::with_capacity(16);
dims.push('{');
for (i, v) in e.shape_dims.into_iter().enumerate() {
if i > 0 {
dims.push(',');
}
use std::fmt::Write;
write!(dims, "{}", v).unwrap();
}
dims.push('}');
shape_dims.push(dims);
rid.push(i as i32);
tx.push((i as u32, e.tx));
}
match pgres
.pgc
.query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims, &rid])
.await
.map_err(|e| {
error!("{e}");
Error::from(e.to_string())
}) {
Ok(rows) => {
if pgc_tx.send(pgres).await.is_err() {
Err(Error::with_msg_no_trace("can not hand pgres back"))
} else {
let mut series_ids = Vec::new();
let mut txs = Vec::new();
let mut it1 = rows.into_iter();
let mut e1 = it1.next();
for (qrid, tx) in tx {
if let Some(row) = &e1 {
let rid: i32 = row.get(1);
if rid as u32 == qrid {
let series: i64 = row.get(0);
let series = SeriesId::new(series as _);
series_ids.push(Existence::Existing(series));
txs.push(tx);
}
e1 = it1.next();
}
}
let result = ChannelInfoResult {
series: series_ids,
tx: txs,
};
Ok(result)
}
}
Err(e) => {
error!("error in pg query {e}");
tokio::time::sleep(Duration::from_millis(2000)).await;
Err(e)
}
let (res, pgres) = fetch_data(batch, pgres).await?;
if let Err(_) = pgc_tx.send(pgres).await {
error!("can not hand back pgres");
Err(Error::with_msg_no_trace("can not hand back pgres"))
} else {
Ok(res)
}
} else {
error!("can not get pgc");

View File

@@ -0,0 +1,52 @@
use crate::ca::conn::CaConnEvent;
use crate::ca::findioc::FindIocRes;
use err::Error;
use serde::Serialize;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)]
pub struct Channel {
id: String,
}
impl Channel {
pub fn new(id: String) -> Self {
Self { id }
}
pub fn id(&self) -> &str {
&self.id
}
}
#[derive(Debug)]
pub enum DaemonEvent {
TimerTick,
ChannelAdd(Channel),
ChannelRemove(Channel),
SearchDone(Result<VecDeque<FindIocRes>, Error>),
CaConnEvent(SocketAddrV4, CaConnEvent),
}
impl DaemonEvent {
pub fn summary(&self) -> String {
use DaemonEvent::*;
match self {
TimerTick => format!("TimerTick"),
ChannelAdd(x) => format!("ChannelAdd {x:?}"),
ChannelRemove(x) => format!("ChannelRemove {x:?}"),
SearchDone(_x) => format!("SearchDone"),
CaConnEvent(_a, b) => {
use crate::ca::conn::CaConnEventValue::*;
match &b.value {
None => format!("CaConnEvent/None"),
EchoTimeout => format!("CaConnEvent/EchoTimeout"),
HealthCheckDone => format!("CaConnEvent/HealthCheckDone"),
ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"),
EndOfStream => format!("CaConnEvent/EndOfStream"),
}
}
}
}
}

View File

@@ -1,5 +1,7 @@
use crate::ca::IngestCommons;
use crate::ca::METRICS;
use crate::daemon_common::DaemonEvent;
use async_channel::Sender;
use axum::extract::Query;
use err::Error;
use http::Request;
@@ -113,6 +115,10 @@ struct DummyQuery {
pub struct DaemonComm {}
impl DaemonComm {
pub fn new(tx: Sender<DaemonEvent>) -> Self {
Self {}
}
pub fn dummy() -> Self {
Self {}
}

View File

@@ -4,6 +4,7 @@ pub mod bsread;
pub mod ca;
pub mod channelwriter;
pub mod conf;
pub mod daemon_common;
pub mod dbpg;
pub mod errconv;
pub mod insertworker;