Refactor config from st, mt, lt
This commit is contained in:
@@ -44,7 +44,7 @@ async fn run_batcher<T>(rx: Receiver<T>, batch_tx: Sender<Vec<T>>, batch_limit:
|
||||
do_emit = true;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
Err(_e) => {
|
||||
break;
|
||||
}
|
||||
},
|
||||
|
||||
@@ -19,6 +19,8 @@ use netfetch::throttletrace::ThrottleTrace;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::Database;
|
||||
use scywr::config::ScyllaIngestConfig;
|
||||
use scywr::insertqueues::InsertQueuesRx;
|
||||
use scywr::insertqueues::InsertQueuesTx;
|
||||
use scywr::insertworker::InsertWorkerOpts;
|
||||
use scywr::iteminsertqueue as scywriiq;
|
||||
use scywriiq::QueryItem;
|
||||
@@ -46,7 +48,9 @@ const RUN_WITHOUT_SCYLLA: bool = true;
|
||||
|
||||
pub struct DaemonOpts {
|
||||
pgconf: Database,
|
||||
scyconf: ScyllaIngestConfig,
|
||||
scyconf_st: ScyllaIngestConfig,
|
||||
scyconf_mt: ScyllaIngestConfig,
|
||||
scyconf_lt: ScyllaIngestConfig,
|
||||
#[allow(unused)]
|
||||
test_bsread_addr: Option<String>,
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
@@ -97,51 +101,14 @@ impl Daemon {
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
|
||||
let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
|
||||
let query_item_tx_weak = query_item_tx.downgrade();
|
||||
|
||||
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
// Insert queue hook
|
||||
// let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx);
|
||||
|
||||
let wrest_stats = Arc::new(SeriesWriterEstablishStats::new());
|
||||
let (writer_establis_tx,) =
|
||||
serieswriter::writer::start_writer_establish_worker(channel_info_query_tx.clone(), wrest_stats.clone())
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
|
||||
let local_epics_hostname = ingest_linux::net::local_hostname();
|
||||
let conn_set_ctrl = CaConnSet::start(
|
||||
ingest_opts.backend().into(),
|
||||
local_epics_hostname,
|
||||
query_item_tx,
|
||||
channel_info_query_tx.clone(),
|
||||
ingest_opts.clone(),
|
||||
writer_establis_tx,
|
||||
);
|
||||
|
||||
// TODO remove
|
||||
tokio::spawn({
|
||||
let rx = conn_set_ctrl.receiver().clone();
|
||||
let tx = daemon_ev_tx.clone();
|
||||
async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(item) => {
|
||||
let item = DaemonEvent::CaConnSetItem(item);
|
||||
if let Err(_) = tx.send(item).await {
|
||||
debug!("CaConnSet to Daemon adapter: tx closed, break");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("CaConnSet to Daemon adapter: rx done, break");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
#[cfg(DISABLED)]
|
||||
let query_item_rx = {
|
||||
@@ -172,33 +139,91 @@ impl Daemon {
|
||||
};
|
||||
let insert_worker_opts = Arc::new(insert_worker_opts);
|
||||
|
||||
debug!("TODO RetentionTime");
|
||||
let (iqtx, iqrx) = {
|
||||
let (st_rf3_tx, st_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
|
||||
let (st_rf1_tx, st_rf1_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
|
||||
let (mt_rf3_tx, mt_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
|
||||
let iqtx = InsertQueuesTx {
|
||||
st_rf3_tx,
|
||||
st_rf1_tx,
|
||||
mt_rf3_tx,
|
||||
};
|
||||
let iqrx = InsertQueuesRx {
|
||||
st_rf3_rx,
|
||||
st_rf1_rx,
|
||||
mt_rf3_rx,
|
||||
};
|
||||
(iqtx, iqrx)
|
||||
};
|
||||
|
||||
let rett = RetentionTime::Short;
|
||||
let query_item_tx_weak = iqtx.st_rf3_tx.clone().downgrade();
|
||||
|
||||
#[cfg(DISABLED)]
|
||||
let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers(
|
||||
rett,
|
||||
opts.scyconf.clone(),
|
||||
ingest_opts.insert_scylla_sessions(),
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
query_item_rx,
|
||||
insert_worker_opts,
|
||||
insert_worker_stats.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
)
|
||||
.await?;
|
||||
let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
query_item_rx,
|
||||
insert_worker_opts,
|
||||
insert_worker_stats.clone(),
|
||||
)
|
||||
.await?;
|
||||
let conn_set_ctrl = CaConnSet::start(
|
||||
ingest_opts.backend().into(),
|
||||
local_epics_hostname,
|
||||
iqtx,
|
||||
channel_info_query_tx.clone(),
|
||||
ingest_opts.clone(),
|
||||
writer_establis_tx,
|
||||
);
|
||||
|
||||
// TODO remove
|
||||
tokio::spawn({
|
||||
let rx = conn_set_ctrl.receiver().clone();
|
||||
let tx = daemon_ev_tx.clone();
|
||||
async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(item) => {
|
||||
let item = DaemonEvent::CaConnSetItem(item);
|
||||
if let Err(_) = tx.send(item).await {
|
||||
debug!("CaConnSet to Daemon adapter: tx closed, break");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("CaConnSet to Daemon adapter: rx done, break");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// let query_item_tx_weak = query_item_tx.downgrade();
|
||||
// Insert queue hook
|
||||
// let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx);
|
||||
|
||||
let mut insert_worker_jhs = Vec::new();
|
||||
|
||||
if RUN_WITHOUT_SCYLLA {
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
iqrx.st_rf3_rx,
|
||||
insert_worker_opts,
|
||||
insert_worker_stats.clone(),
|
||||
)
|
||||
.await?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
} else {
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers(
|
||||
// TODO does the worker actually need RETT? Yes, to use the correct table names.
|
||||
RetentionTime::Short,
|
||||
opts.scyconf_st.clone(),
|
||||
ingest_opts.insert_scylla_sessions(),
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
iqrx.st_rf3_rx.clone(),
|
||||
insert_worker_opts,
|
||||
insert_worker_stats.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
)
|
||||
.await?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
};
|
||||
let stats = Arc::new(DaemonStats::new());
|
||||
stats.insert_worker_spawned().add(insert_workers_jh.len() as _);
|
||||
stats.insert_worker_spawned().add(insert_worker_jhs.len() as _);
|
||||
|
||||
#[cfg(feature = "bsread")]
|
||||
if let Some(bsaddr) = &opts.test_bsread_addr {
|
||||
@@ -248,7 +273,7 @@ impl Daemon {
|
||||
count_unassigned: 0,
|
||||
count_assigned: 0,
|
||||
last_status_print: SystemTime::now(),
|
||||
insert_workers_jh,
|
||||
insert_workers_jh: insert_worker_jhs,
|
||||
stats,
|
||||
insert_worker_stats,
|
||||
series_by_channel_stats,
|
||||
@@ -289,11 +314,14 @@ impl Daemon {
|
||||
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
|
||||
if self.shutting_down {
|
||||
let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire);
|
||||
let nitems = self
|
||||
.query_item_tx_weak
|
||||
.upgrade()
|
||||
.map(|x| (x.sender_count(), x.receiver_count(), x.len()));
|
||||
info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems);
|
||||
#[cfg(DISABLED)]
|
||||
{
|
||||
let nitems = self
|
||||
.query_item_tx_weak
|
||||
.upgrade()
|
||||
.map(|x| (x.sender_count(), x.receiver_count(), x.len()));
|
||||
info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems);
|
||||
}
|
||||
if nworkers == 0 {
|
||||
info!("goodbye");
|
||||
std::process::exit(0);
|
||||
@@ -552,15 +580,15 @@ impl Daemon {
|
||||
}
|
||||
|
||||
pub async fn daemon(mut self) -> Result<(), Error> {
|
||||
{
|
||||
let worker_jh = {
|
||||
let backend = String::new();
|
||||
let (item_tx, item_rx) = async_channel::bounded(256);
|
||||
let (_item_tx, item_rx) = async_channel::bounded(256);
|
||||
let info_worker_tx = self.channel_info_query_tx.clone();
|
||||
let iiq_tx = self.query_item_tx_weak.upgrade().unwrap();
|
||||
let worker_fut =
|
||||
netfetch::metrics::postingest::process_api_query_items(backend, item_rx, info_worker_tx, iiq_tx);
|
||||
let worker_jh = taskrun::spawn(worker_fut);
|
||||
}
|
||||
taskrun::spawn(worker_fut)
|
||||
};
|
||||
Self::spawn_ticker(self.tx.clone(), self.stats.clone());
|
||||
loop {
|
||||
if self.shutting_down {
|
||||
@@ -598,12 +626,15 @@ impl Daemon {
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Wait for metrics handler");
|
||||
info!("wait for metrics handler");
|
||||
self.metrics_shutdown_tx.send(1).await?;
|
||||
if let Some(jh) = self.metrics_jh.take() {
|
||||
jh.await??;
|
||||
}
|
||||
info!("Joined metrics handler");
|
||||
info!("joined metrics handler");
|
||||
info!("\n\n\n-----------------------\n\n\nwait for postingest task");
|
||||
worker_jh.await?.map_err(|e| Error::from_string(e))?;
|
||||
info!("joined postingest task");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -630,25 +661,24 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
|
||||
info!("start up {opts:?}");
|
||||
ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?;
|
||||
ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?;
|
||||
|
||||
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config())
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?;
|
||||
drop(pg);
|
||||
jh.await?.map_err(Error::from_string)?;
|
||||
|
||||
if RUN_WITHOUT_SCYLLA {
|
||||
} else {
|
||||
scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short)
|
||||
{
|
||||
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config())
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?;
|
||||
jh.await?.map_err(Error::from_string)?;
|
||||
}
|
||||
if RUN_WITHOUT_SCYLLA {
|
||||
} else {
|
||||
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
|
||||
if let Some(scyconf) = opts.scylla_config_lt() {
|
||||
scywr::schema::migrate_scylla_data_schema(scyconf, RetentionTime::Long)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
}
|
||||
}
|
||||
info!("database check done");
|
||||
|
||||
@@ -668,7 +698,9 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
|
||||
|
||||
let opts2 = DaemonOpts {
|
||||
pgconf: opts.postgresql_config().clone(),
|
||||
scyconf: opts.scylla_config().clone(),
|
||||
scyconf_st: opts.scylla_config_st().clone(),
|
||||
scyconf_mt: opts.scylla_config_mt().clone(),
|
||||
scyconf_lt: opts.scylla_config_lt().clone(),
|
||||
test_bsread_addr: opts.test_bsread_addr.clone(),
|
||||
insert_frac: insert_frac.clone(),
|
||||
store_workers_rate,
|
||||
|
||||
@@ -1,21 +1,42 @@
|
||||
use async_channel::Sender;
|
||||
use bytes::Buf;
|
||||
use dbpg::seriesbychannel::ChannelInfoQuery;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use log::*;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use scywr::iteminsertqueue::DataValue;
|
||||
use scywr::iteminsertqueue::ScalarValue;
|
||||
use serieswriter::writer::SeriesWriter;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::Cursor;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::time::SystemTime;
|
||||
use taskrun::tokio::net::UdpSocket;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Io(#[from] std::io::Error),
|
||||
SeriesWriter(#[from] serieswriter::writer::Error),
|
||||
}
|
||||
|
||||
pub async fn listen_beacons(mut cancel: taskrun::tokio::sync::mpsc::Receiver<u32>) -> Result<(), Error> {
|
||||
pub async fn listen_beacons(
|
||||
mut cancel: taskrun::tokio::sync::mpsc::Receiver<u32>,
|
||||
worker_tx: Sender<ChannelInfoQuery>,
|
||||
backend: String,
|
||||
) -> Result<(), Error> {
|
||||
let stnow = SystemTime::now();
|
||||
let channel = "epics-ca-beacons".to_string();
|
||||
let scalar_type = ScalarType::U64;
|
||||
let shape = Shape::Scalar;
|
||||
let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?;
|
||||
let sock = UdpSocket::bind("0.0.0.0:5065").await?;
|
||||
sock.set_broadcast(true).unwrap();
|
||||
let mut buf = Vec::new();
|
||||
buf.resize(1024 * 4, 0);
|
||||
let mut item_qu = VecDeque::new();
|
||||
loop {
|
||||
let bb = &mut buf;
|
||||
let (n, remote) = taskrun::tokio::select! {
|
||||
@@ -34,12 +55,23 @@ pub async fn listen_beacons(mut cancel: taskrun::tokio::sync::mpsc::Receiver<u32
|
||||
let ver = cur.get_u16();
|
||||
let port = cur.get_u16();
|
||||
let _seqid = cur.get_u32();
|
||||
let addr = cur.get_u32();
|
||||
let addr = Ipv4Addr::from(addr);
|
||||
let addr_u32 = cur.get_u32();
|
||||
let addr = Ipv4Addr::from(addr_u32);
|
||||
if cmd == 0x0d {
|
||||
debug!("beacon {remote} {ver} {addr} {port}")
|
||||
debug!("beacon {remote} {ver} {addr} {port}");
|
||||
let stnow = SystemTime::now();
|
||||
let x = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap();
|
||||
let ts = TsNano::from_ms(1000 * x.as_secs() + x.subsec_millis() as u64);
|
||||
let ts_local = ts;
|
||||
let blob = addr_u32 as i64;
|
||||
let val = DataValue::Scalar(ScalarValue::I64(blob));
|
||||
writer.write(ts, ts_local, val, &mut item_qu)?;
|
||||
}
|
||||
}
|
||||
if item_qu.len() != 0 {
|
||||
// TODO deliver to insert queue
|
||||
item_qu.clear();
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -96,12 +96,21 @@ macro_rules! trace3 {
|
||||
#[allow(unused)]
|
||||
macro_rules! trace4 {
|
||||
($($arg:tt)*) => {
|
||||
if true {
|
||||
if false {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_flush_queue {
|
||||
($($arg:tt)*) => {
|
||||
if false {
|
||||
trace3!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
NoProtocol,
|
||||
@@ -1932,7 +1941,7 @@ impl CaConn {
|
||||
CaMsgTy::SearchRes(k) => {
|
||||
let a = k.addr.to_be_bytes();
|
||||
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
|
||||
trace!("Search result indicates server address: {addr}");
|
||||
trace!("search result indicates server address: {addr}");
|
||||
// TODO count this unexpected case.
|
||||
}
|
||||
CaMsgTy::CreateChanRes(k) => {
|
||||
@@ -1940,12 +1949,12 @@ impl CaConn {
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
CaMsgTy::EventAddRes(ev) => {
|
||||
trace2!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count);
|
||||
trace4!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count);
|
||||
self.stats.event_add_res_recv.inc();
|
||||
Self::handle_event_add_res(self, ev, tsnow)?
|
||||
}
|
||||
CaMsgTy::EventAddResEmpty(ev) => {
|
||||
trace2!("got EventAddResEmpty {:?}", camsg.ts);
|
||||
trace4!("got EventAddResEmpty {:?}", camsg.ts);
|
||||
Self::handle_event_add_res_empty(self, ev, tsnow)?
|
||||
}
|
||||
CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow)?,
|
||||
@@ -1957,7 +1966,7 @@ impl CaConn {
|
||||
self.stats.pong_recv_lat().ingest(dt);
|
||||
} else {
|
||||
let addr = &self.remote_addr_dbg;
|
||||
warn!("Received Echo even though we didn't asked for it {addr:?}");
|
||||
warn!("received Echo even though we didn't asked for it {addr:?}");
|
||||
}
|
||||
self.ioc_ping_last = tsnow;
|
||||
self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng);
|
||||
@@ -2144,7 +2153,7 @@ impl CaConn {
|
||||
self.backoff_reset();
|
||||
let proto = CaProto::new(
|
||||
tcp,
|
||||
self.remote_addr_dbg.clone(),
|
||||
self.remote_addr_dbg.to_string(),
|
||||
self.opts.array_truncate,
|
||||
self.ca_proto_stats.clone(),
|
||||
);
|
||||
@@ -2408,7 +2417,7 @@ impl CaConn {
|
||||
{
|
||||
use Poll::*;
|
||||
if qu.len() != 0 {
|
||||
trace3!("attempt_flush_queue id {:7} len {}", id, qu.len());
|
||||
trace_flush_queue!("attempt_flush_queue id {:7} len {}", id, qu.len());
|
||||
}
|
||||
let mut have_progress = false;
|
||||
let mut i = 0;
|
||||
@@ -2431,7 +2440,7 @@ impl CaConn {
|
||||
if sp.is_sending() {
|
||||
match sp.poll_unpin(cx) {
|
||||
Ready(Ok(())) => {
|
||||
trace3!("attempt_flush_queue id {:7} send done", id);
|
||||
trace_flush_queue!("attempt_flush_queue id {:7} send done", id);
|
||||
have_progress = true;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
|
||||
@@ -63,6 +63,7 @@ use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
|
||||
use netpod::OnDrop;
|
||||
use scywr::insertqueues::InsertQueuesTx;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
@@ -367,7 +368,7 @@ pub struct CaConnSet {
|
||||
find_ioc_query_queue: VecDeque<IocAddrQuery>,
|
||||
find_ioc_query_sender: Pin<Box<SenderPolling<IocAddrQuery>>>,
|
||||
find_ioc_res_rx: Pin<Box<Receiver<VecDeque<FindIocRes>>>>,
|
||||
storage_insert_tx: Pin<Box<Sender<VecDeque<QueryItem>>>>,
|
||||
iqtx: Pin<Box<InsertQueuesTx>>,
|
||||
storage_insert_queue: VecDeque<VecDeque<QueryItem>>,
|
||||
storage_insert_sender: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
|
||||
ca_conn_res_tx: Pin<Box<Sender<(SocketAddr, CaConnEvent)>>>,
|
||||
@@ -398,7 +399,7 @@ impl CaConnSet {
|
||||
pub fn start(
|
||||
backend: String,
|
||||
local_epics_hostname: String,
|
||||
storage_insert_tx: Sender<VecDeque<QueryItem>>,
|
||||
iqtx: InsertQueuesTx,
|
||||
channel_info_query_tx: Sender<ChannelInfoQuery>,
|
||||
ingest_opts: CaIngestOpts,
|
||||
establish_worker_tx: async_channel::Sender<EstablishWorkerJob>,
|
||||
@@ -435,9 +436,12 @@ impl CaConnSet {
|
||||
find_ioc_query_queue: VecDeque::new(),
|
||||
find_ioc_query_sender: Box::pin(SenderPolling::new(find_ioc_query_tx)),
|
||||
find_ioc_res_rx: Box::pin(find_ioc_res_rx),
|
||||
storage_insert_tx: Box::pin(storage_insert_tx.clone()),
|
||||
iqtx: Box::pin(iqtx.clone()),
|
||||
storage_insert_queue: VecDeque::new(),
|
||||
storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)),
|
||||
|
||||
// TODO simplify for all combinations
|
||||
storage_insert_sender: Box::pin(SenderPolling::new(iqtx.st_rf3_tx.clone())),
|
||||
|
||||
ca_conn_res_tx: Box::pin(ca_conn_res_tx),
|
||||
ca_conn_res_rx: Box::pin(ca_conn_res_rx),
|
||||
shutdown_stopping: false,
|
||||
@@ -480,13 +484,17 @@ impl CaConnSet {
|
||||
async fn run(mut this: CaConnSet) -> Result<(), Error> {
|
||||
trace!("CaConnSet run begin");
|
||||
let (beacons_cancel_guard_tx, rx) = taskrun::tokio::sync::mpsc::channel(12);
|
||||
let beacons_jh = tokio::spawn(async move {
|
||||
if false {
|
||||
crate::ca::beacons::listen_beacons(rx).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
let beacons_jh = {
|
||||
let tx2 = this.channel_info_query_tx.clone().unwrap();
|
||||
let backend = this.backend.clone();
|
||||
tokio::spawn(async move {
|
||||
if false {
|
||||
crate::ca::beacons::listen_beacons(rx, tx2, backend).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
};
|
||||
let _g_beacon = OnDrop::new(move || {});
|
||||
loop {
|
||||
let x = this.next().await;
|
||||
@@ -1039,7 +1047,7 @@ impl CaConnSet {
|
||||
add.backend.clone(),
|
||||
addr_v4,
|
||||
self.local_epics_hostname.clone(),
|
||||
self.storage_insert_tx.as_ref().get_ref().clone(),
|
||||
self.iqtx.st_rf3_tx.clone(),
|
||||
self.channel_info_query_tx
|
||||
.clone()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?,
|
||||
@@ -1050,8 +1058,7 @@ impl CaConnSet {
|
||||
let conn_tx = conn.conn_command_tx();
|
||||
let conn_stats = conn.stats();
|
||||
let tx1 = self.ca_conn_res_tx.as_ref().get_ref().clone();
|
||||
let tx2 = self.storage_insert_tx.as_ref().get_ref().clone();
|
||||
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr, self.stats.clone()));
|
||||
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, addr, self.stats.clone()));
|
||||
let ca_conn_res = CaConnRes {
|
||||
state: CaConnState::new(CaConnStateValue::Fresh),
|
||||
sender: Box::pin(conn_tx.into()),
|
||||
@@ -1065,7 +1072,6 @@ impl CaConnSet {
|
||||
async fn ca_conn_item_merge(
|
||||
conn: CaConn,
|
||||
tx1: Sender<(SocketAddr, CaConnEvent)>,
|
||||
_tx2: Sender<VecDeque<QueryItem>>,
|
||||
addr: SocketAddr,
|
||||
stats: Arc<CaConnSetStats>,
|
||||
) -> Result<(), Error> {
|
||||
@@ -1546,7 +1552,8 @@ impl Stream for CaConnSet {
|
||||
trace4!("CaConnSet poll loop");
|
||||
self.stats.poll_loop_begin().inc();
|
||||
|
||||
self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _);
|
||||
// TODO generalize to all combinations
|
||||
self.stats.storage_insert_tx_len.set(self.iqtx.st_rf3_tx.len() as _);
|
||||
self.stats
|
||||
.storage_insert_queue_len
|
||||
.set(self.storage_insert_queue.len() as _);
|
||||
|
||||
@@ -8,7 +8,6 @@ use slidebuf::SlideBuf;
|
||||
use stats::CaProtoStats;
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
@@ -1045,7 +1044,7 @@ impl CaState {
|
||||
pub struct CaProto {
|
||||
tcp: TcpStream,
|
||||
tcp_eof: bool,
|
||||
remote_addr_dbg: SocketAddrV4,
|
||||
remote_name: String,
|
||||
state: CaState,
|
||||
buf: SlideBuf,
|
||||
outbuf: SlideBuf,
|
||||
@@ -1058,11 +1057,11 @@ pub struct CaProto {
|
||||
}
|
||||
|
||||
impl CaProto {
|
||||
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize, stats: Arc<CaProtoStats>) -> Self {
|
||||
pub fn new(tcp: TcpStream, remote_name: String, array_truncate: usize, stats: Arc<CaProtoStats>) -> Self {
|
||||
Self {
|
||||
tcp,
|
||||
tcp_eof: false,
|
||||
remote_addr_dbg,
|
||||
remote_name,
|
||||
state: CaState::StdHead,
|
||||
buf: SlideBuf::new(PROTO_INPUT_BUF_CAP as usize),
|
||||
outbuf: SlideBuf::new(1024 * 256),
|
||||
@@ -1186,7 +1185,7 @@ impl CaProto {
|
||||
debug!(
|
||||
"peer done {:?} {:?} {:?}",
|
||||
self.tcp.peer_addr(),
|
||||
self.remote_addr_dbg,
|
||||
self.remote_name,
|
||||
self.state
|
||||
);
|
||||
self.tcp_eof = true;
|
||||
|
||||
@@ -27,11 +27,9 @@ pub struct CaIngestOpts {
|
||||
#[serde(default, with = "humantime_serde")]
|
||||
timeout: Option<Duration>,
|
||||
postgresql: Database,
|
||||
scylla: ScyllaIngestConfig,
|
||||
#[serde(default)]
|
||||
scylla_mt: Option<ScyllaIngestConfig>,
|
||||
#[serde(default)]
|
||||
scylla_lt: Option<ScyllaIngestConfig>,
|
||||
scylla_st: ScyllaIngestConfig,
|
||||
scylla_mt: ScyllaIngestConfig,
|
||||
scylla_lt: ScyllaIngestConfig,
|
||||
array_truncate: Option<u64>,
|
||||
insert_worker_count: Option<usize>,
|
||||
insert_worker_concurrency: Option<usize>,
|
||||
@@ -56,16 +54,16 @@ impl CaIngestOpts {
|
||||
&self.postgresql
|
||||
}
|
||||
|
||||
pub fn scylla_config(&self) -> &ScyllaIngestConfig {
|
||||
&self.scylla
|
||||
pub fn scylla_config_st(&self) -> &ScyllaIngestConfig {
|
||||
&self.scylla_st
|
||||
}
|
||||
|
||||
pub fn scylla_config_mt(&self) -> Option<&ScyllaIngestConfig> {
|
||||
self.scylla_mt.as_ref()
|
||||
pub fn scylla_config_mt(&self) -> &ScyllaIngestConfig {
|
||||
&self.scylla_mt
|
||||
}
|
||||
|
||||
pub fn scylla_config_lt(&self) -> Option<&ScyllaIngestConfig> {
|
||||
self.scylla_lt.as_ref()
|
||||
pub fn scylla_config_lt(&self) -> &ScyllaIngestConfig {
|
||||
&self.scylla_lt
|
||||
}
|
||||
|
||||
pub fn search(&self) -> &Vec<String> {
|
||||
@@ -140,7 +138,10 @@ scylla:
|
||||
assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt")));
|
||||
assert_eq!(&conf.api_bind, "0.0.0.0:3011");
|
||||
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
|
||||
assert_eq!(conf.scylla.hosts().get(1), Some(&"sf-nube-12:19042".to_string()));
|
||||
assert_eq!(
|
||||
conf.scylla_config_st().hosts().get(1),
|
||||
Some(&"sf-nube-12:19042".to_string())
|
||||
);
|
||||
assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
|
||||
}
|
||||
|
||||
|
||||
18
scywr/src/insertqueues.rs
Normal file
18
scywr/src/insertqueues.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
use crate::iteminsertqueue::QueryItem;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InsertQueuesTx {
|
||||
pub st_rf3_tx: Sender<VecDeque<QueryItem>>,
|
||||
pub st_rf1_tx: Sender<VecDeque<QueryItem>>,
|
||||
pub mt_rf3_tx: Sender<VecDeque<QueryItem>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InsertQueuesRx {
|
||||
pub st_rf3_rx: Receiver<VecDeque<QueryItem>>,
|
||||
pub st_rf1_rx: Receiver<VecDeque<QueryItem>>,
|
||||
pub mt_rf3_rx: Receiver<VecDeque<QueryItem>>,
|
||||
}
|
||||
@@ -365,13 +365,7 @@ fn inspect_items(item_inp: Receiver<VecDeque<QueryItem>>) -> impl Stream<Item =
|
||||
trace2!("execute ChannelStatus {item:?}");
|
||||
}
|
||||
QueryItem::Insert(item) => {
|
||||
trace3!(
|
||||
"execute Insert {:?} {:?} {:?} {:?}",
|
||||
item.series,
|
||||
item.ts_msp,
|
||||
item.val.shape(),
|
||||
item
|
||||
);
|
||||
trace3!("execute Insert {}", item.string_short());
|
||||
}
|
||||
QueryItem::TimeBinSimpleF32(_) => {
|
||||
trace2!("execute TimeBinSimpleF32");
|
||||
|
||||
@@ -48,6 +48,7 @@ pub enum ScalarValue {
|
||||
I8(i8),
|
||||
I16(i16),
|
||||
I32(i32),
|
||||
I64(i64),
|
||||
F32(f32),
|
||||
F64(f64),
|
||||
Enum(i16),
|
||||
@@ -61,6 +62,7 @@ impl ScalarValue {
|
||||
ScalarValue::I8(_) => 1,
|
||||
ScalarValue::I16(_) => 2,
|
||||
ScalarValue::I32(_) => 4,
|
||||
ScalarValue::I64(_) => 8,
|
||||
ScalarValue::F32(_) => 4,
|
||||
ScalarValue::F64(_) => 8,
|
||||
ScalarValue::Enum(_) => 2,
|
||||
@@ -68,6 +70,20 @@ impl ScalarValue {
|
||||
ScalarValue::Bool(_) => 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn string_short(&self) -> String {
|
||||
match self {
|
||||
ScalarValue::I8(x) => x.to_string(),
|
||||
ScalarValue::I16(x) => x.to_string(),
|
||||
ScalarValue::I32(x) => x.to_string(),
|
||||
ScalarValue::I64(x) => x.to_string(),
|
||||
ScalarValue::F32(x) => x.to_string(),
|
||||
ScalarValue::F64(x) => x.to_string(),
|
||||
ScalarValue::Enum(x) => x.to_string(),
|
||||
ScalarValue::String(x) => x.to_string(),
|
||||
ScalarValue::Bool(x) => x.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -177,6 +193,18 @@ impl ArrayValue {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn string_short(&self) -> String {
|
||||
use ArrayValue::*;
|
||||
match self {
|
||||
I8(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
|
||||
I16(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
|
||||
I32(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
|
||||
F32(x) => format!("{}", x.get(0).map_or(0., |x| *x)),
|
||||
F64(x) => format!("{}", x.get(0).map_or(0., |x| *x)),
|
||||
Bool(x) => format!("{}", x.get(0).map_or(false, |x| *x)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -199,6 +227,7 @@ impl DataValue {
|
||||
ScalarValue::I8(_) => ScalarType::I8,
|
||||
ScalarValue::I16(_) => ScalarType::I16,
|
||||
ScalarValue::I32(_) => ScalarType::I32,
|
||||
ScalarValue::I64(_) => ScalarType::I64,
|
||||
ScalarValue::F32(_) => ScalarType::F32,
|
||||
ScalarValue::F64(_) => ScalarType::F64,
|
||||
ScalarValue::Enum(_) => ScalarType::U16,
|
||||
@@ -222,6 +251,13 @@ impl DataValue {
|
||||
DataValue::Array(a) => Shape::Wave(a.len() as u32),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn string_short(&self) -> String {
|
||||
match self {
|
||||
DataValue::Scalar(x) => x.string_short(),
|
||||
DataValue::Array(x) => x.string_short(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait GetValHelp<T> {
|
||||
@@ -471,6 +507,18 @@ pub struct InsertItem {
|
||||
pub ts_local: TsMs,
|
||||
}
|
||||
|
||||
impl InsertItem {
|
||||
pub fn string_short(&self) -> String {
|
||||
format!(
|
||||
"{} {} {} {}",
|
||||
self.series.id(),
|
||||
self.ts_msp.ms(),
|
||||
self.ts_lsp.ms(),
|
||||
self.val.string_short()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TimeBinSimpleF32 {
|
||||
pub series: SeriesId,
|
||||
@@ -682,6 +730,7 @@ pub async fn insert_item(
|
||||
I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?,
|
||||
Enum(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?,
|
||||
I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?,
|
||||
I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?,
|
||||
F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?,
|
||||
F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?,
|
||||
String(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_string, &data_store).await?,
|
||||
@@ -751,6 +800,7 @@ pub fn insert_item_fut(
|
||||
I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy),
|
||||
I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy),
|
||||
I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy),
|
||||
I64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i64.clone(), scy),
|
||||
F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy),
|
||||
F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy),
|
||||
Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy),
|
||||
|
||||
@@ -6,6 +6,7 @@ pub mod futbatch;
|
||||
pub mod futbatchgen;
|
||||
pub mod futinsert;
|
||||
pub mod futinsertloop;
|
||||
pub mod insertqueues;
|
||||
pub mod insertworker;
|
||||
pub mod iteminsertqueue;
|
||||
pub mod ratelimit;
|
||||
|
||||
@@ -75,7 +75,7 @@ impl SeriesWriter {
|
||||
channel: String,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
tsnow: SystemTime,
|
||||
stnow: SystemTime,
|
||||
) -> Result<Self, Error> {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let item = ChannelInfoQuery {
|
||||
@@ -89,7 +89,7 @@ impl SeriesWriter {
|
||||
worker_tx.send(item).await?;
|
||||
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
|
||||
let cssid = ChannelStatusSeriesId::new(res.series.to_series().id());
|
||||
Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, tsnow).await
|
||||
Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, stnow).await
|
||||
}
|
||||
|
||||
pub async fn establish_with_cssid(
|
||||
|
||||
Reference in New Issue
Block a user