This commit is contained in:
Dominik Werder
2023-12-22 21:31:00 +01:00
parent 71fa333f75
commit 4e758dc4b8
20 changed files with 529 additions and 320 deletions

View File

@@ -9,7 +9,7 @@ rustflags = [
#"-C", "inline-threshold=1000",
#"-Z", "time-passes=yes",
#"-Z", "time-llvm-passes=yes",
#"--cfg", "tokio_unstable",
"--cfg", "tokio_unstable",
]
rustdocflags = [

View File

@@ -3,67 +3,74 @@ use daqingest::opts::DaqIngestOpts;
use err::Error;
use log::*;
use netfetch::conf::parse_config;
use taskrun::TracingMode;
pub fn main() -> Result<(), Error> {
let opts = DaqIngestOpts::parse();
// TODO offer again function to get runtime and configure tracing in one call
let runtime = taskrun::get_runtime_opts(opts.worker_threads.unwrap_or(8), opts.blocking_threads.unwrap_or(256));
match taskrun::tracing_init() {
match taskrun::tracing_init(TracingMode::Production) {
Ok(()) => {}
Err(()) => return Err(Error::with_msg_no_trace("tracing init failed")),
}
let res = runtime.block_on(async move {
use daqingest::opts::ChannelAccess;
use daqingest::opts::SubCmd;
match opts.subcmd {
SubCmd::ListPkey => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::list_pkey(&scylla_conf).await?
}
SubCmd::ListPulses => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::list_pulses(&scylla_conf).await?
}
SubCmd::FetchEvents(k) => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await?
}
SubCmd::ChannelAccess(k) => match k {
#[cfg(DISABLED)]
ChannelAccess::CaSearch(k) => {
info!("daqingest version {}", clap::crate_version!());
let (conf, channels) = parse_config(k.config.into()).await?;
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
info!("daqingest version {}", clap::crate_version!());
let (conf, channels) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels).await?
}
},
#[cfg(feature = "bsread")]
SubCmd::Bsread(k) => ingest_bsread::zmtp::zmtp_client(k.into())
.await
.map_err(|e| Error::from(e.to_string()))?,
#[cfg(feature = "bsread")]
SubCmd::BsreadDump(k) => {
let mut f = ingest_bsread::zmtp::dumper::BsreadDumper::new(k.source);
f.run().await.map_err(|e| Error::from(e.to_string()))?
}
SubCmd::Version => {
println!("{}", clap::crate_version!());
}
}
Ok(())
});
let res = runtime.block_on(main_run(opts));
match res {
Ok(k) => Ok(k),
Err(e) => {
error!("Catched: {:?}", e);
error!("catched: {:?}", e);
Err(e)
}
}
}
async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> {
taskrun::tokio::spawn(main_run_inner(opts)).await?
}
async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
use daqingest::opts::ChannelAccess;
use daqingest::opts::SubCmd;
match opts.subcmd {
SubCmd::ListPkey => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::list_pkey(&scylla_conf).await?
}
SubCmd::ListPulses => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::list_pulses(&scylla_conf).await?
}
SubCmd::FetchEvents(k) => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await?
}
SubCmd::ChannelAccess(k) => match k {
#[cfg(DISABLED)]
ChannelAccess::CaSearch(k) => {
info!("daqingest version {}", clap::crate_version!());
let (conf, channels) = parse_config(k.config.into()).await?;
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
info!("daqingest version {}", clap::crate_version!());
let (conf, channels) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels).await?
}
},
#[cfg(feature = "bsread")]
SubCmd::Bsread(k) => ingest_bsread::zmtp::zmtp_client(k.into())
.await
.map_err(|e| Error::from(e.to_string()))?,
#[cfg(feature = "bsread")]
SubCmd::BsreadDump(k) => {
let mut f = ingest_bsread::zmtp::dumper::BsreadDumper::new(k.source);
f.run().await.map_err(|e| Error::from(e.to_string()))?
}
SubCmd::Version => {
println!("{}", clap::crate_version!());
}
}
Ok(())
}

View File

@@ -35,17 +35,11 @@ use taskrun::tokio;
use tokio::task::JoinHandle;
const CHECK_HEALTH_IVL: Duration = Duration::from_millis(2000);
const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(1500);
const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000);
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000);
const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000);
const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500);
#[derive(Debug)]
enum CheckPeriodic {
Waiting(Instant),
Ongoing(u32, Instant),
}
pub struct DaemonOpts {
pgconf: Database,
scyconf: ScyllaConfig,
@@ -75,7 +69,7 @@ pub struct Daemon {
series_by_channel_stats: Arc<SeriesByChannelStats>,
shutting_down: bool,
connset_ctrl: CaConnSetCtrl,
connset_status_last: CheckPeriodic,
connset_status_last: Instant,
// TODO should be a stats object?
insert_workers_running: AtomicU64,
query_item_tx_weak: WeakSender<VecDeque<QueryItem>>,
@@ -231,7 +225,7 @@ impl Daemon {
series_by_channel_stats,
shutting_down: false,
connset_ctrl: conn_set_ctrl,
connset_status_last: CheckPeriodic::Waiting(Instant::now()),
connset_status_last: Instant::now(),
insert_workers_running: AtomicU64::new(0),
query_item_tx_weak,
connset_health_lat_ema: 0.,
@@ -243,23 +237,18 @@ impl Daemon {
&self.stats
}
async fn check_caconn_chans(&mut self, ts1: Instant) -> Result<(), Error> {
match &self.connset_status_last {
CheckPeriodic::Waiting(since) => {
if *since + CHECK_HEALTH_IVL <= ts1 {
let id = self.connset_ctrl.check_health().await?;
self.connset_status_last = CheckPeriodic::Ongoing(id, ts1);
}
}
CheckPeriodic::Ongoing(idexp, since) => {
let dt = ts1.saturating_duration_since(*since);
if dt > CHECK_HEALTH_TIMEOUT {
error!(
"CaConnSet has not reported health status since {:.0} idexp {idexp:08x}",
dt.as_secs_f32() * 1e3
);
}
}
async fn check_health(&mut self, ts1: Instant) -> Result<(), Error> {
self.check_health_connset(ts1)?;
Ok(())
}
fn check_health_connset(&mut self, ts1: Instant) -> Result<(), Error> {
let dt = self.connset_status_last.elapsed();
if dt > CHECK_HEALTH_TIMEOUT {
error!(
"CaConnSet has not reported health status since {:.0}",
dt.as_secs_f32() * 1e3
);
}
Ok(())
}
@@ -288,7 +277,7 @@ impl Daemon {
SIGTERM.store(2, atomic::Ordering::Release);
}
let ts1 = Instant::now();
self.check_caconn_chans(ts1).await?;
self.check_health(ts1).await?;
let dt = ts1.elapsed();
if dt > CHECK_CHANNEL_SLOW_WARN {
info!("slow check_chans {:.0} ms", dt.as_secs_f32() * 1e3);
@@ -376,30 +365,10 @@ impl Daemon {
async fn handle_ca_conn_set_item(&mut self, item: CaConnSetItem) -> Result<(), Error> {
use CaConnSetItem::*;
match item {
Healthy(id, ts1, ts2) => {
Healthy => {
let tsnow = Instant::now();
let dt1 = tsnow.duration_since(ts1).as_secs_f32() * 1e3;
let dt2 = tsnow.duration_since(ts2).as_secs_f32() * 1e3;
match &self.connset_status_last {
CheckPeriodic::Waiting(_since) => {
error!("received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms");
}
CheckPeriodic::Ongoing(idexp, since) => {
if id != *idexp {
warn!("unexpected check health answer id {id:08x} idexp {idexp:08x}");
}
// TODO insert response time as series to scylla.
let dtsince = tsnow.duration_since(*since).as_secs_f32() * 1e3;
{
let v = &mut self.connset_health_lat_ema;
*v += (dtsince - *v) * 0.2;
self.stats.connset_health_lat_ema().set(*v as _);
}
trace!("received CaConnSet Healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms");
self.connset_status_last = CheckPeriodic::Waiting(tsnow);
self.stats.caconnset_health_response().inc();
}
}
self.connset_status_last = tsnow;
self.stats.caconnset_health_response().inc();
}
Error(e) => {
error!("error from CaConnSet: {e}");

View File

@@ -18,6 +18,30 @@ impl Error {
}
}
async fn has_table(table: &str, pgc: &PgClient) -> Result<bool, Error> {
let rows = pgc
.query(
"select count(*) as c from information_schema.tables where table_name = $1 and table_type = 'BASE TABLE' limit 10",
&[&table],
)
.await?;
if rows.len() == 1 {
let c: i64 = rows[0].get(0);
if c == 0 {
Ok(false)
} else if c == 1 {
Ok(true)
} else {
Err(Error::from_logic_msg(format!("has_table bad count {}", c)))
}
} else {
Err(Error::from_logic_msg(format!(
"has_columns bad row count {}",
rows.len()
)))
}
}
async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result<bool, Error> {
let rows = pgc
.query(
@@ -32,19 +56,51 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result<bool, E
} else if c == 1 {
Ok(true)
} else {
Err(Error::from_logic_msg(format!("has_columns bad count {}", c)))
Err(Error::from_logic_msg(format!("has_column bad count {}", c)))
}
} else if rows.len() == 0 {
Ok(false)
} else {
Err(Error::from_logic_msg(format!(
"has_columns bad row count {}",
"has_column bad row count {}",
rows.len()
)))
}
}
async fn migrate_00(pgc: &PgClient) -> Result<(), Error> {
if !has_table("ioc_by_channel_log", pgc).await? {
let _ = pgc
.execute(
"
create table if not exists ioc_by_channel_log (
facility text not null,
channel text not null,
tscreate timestamptz not null default now(),
tsmod timestamptz not null default now(),
archived int not null default 0,
queryaddr text,
responseaddr text,
addr text
)
",
&[],
)
.await;
let _ = pgc
.execute(
"
create index if not exists ioc_by_channel_log_channel on ioc_by_channel_log (
facility,
channel
)
",
&[],
)
.await;
}
Ok(())
}
async fn migrate_01(pgc: &PgClient) -> Result<(), Error> {
if !has_column("ioc_by_channel_log", "tscreate", pgc).await? {
pgc.execute(
"alter table ioc_by_channel_log add tscreate timestamptz not null default now()",
@@ -79,6 +135,7 @@ async fn migrate_00(pgc: &PgClient) -> Result<(), Error> {
pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> {
migrate_00(&pgc).await?;
migrate_01(&pgc).await?;
info!("schema_check done");
Ok(())
}

View File

@@ -1,10 +1,8 @@
use log::*;
use std::ffi::CStr;
use std::mem::MaybeUninit;
use thiserror::Error;
#[derive(Debug, Error)]
#[error("{self}")]
pub enum Error {
SignalHandlerSet,
SignalHandlerUnset,

View File

@@ -32,6 +32,7 @@ pin-project = "1"
lazy_static = "1"
libc = "0.2"
slidebuf = "0.0.1"
dashmap = "5.5.3"
log = { path = "../log" }
series = { path = "../series" }
stats = { path = "../stats" }

View File

@@ -1,5 +1,4 @@
use super::proto;
use super::proto::CreateChanRes;
use super::ExtraInsertsConf;
use crate::senderpolling::SenderPolling;
use crate::throttletrace::ThrottleTrace;
@@ -105,7 +104,7 @@ pub enum ChannelConnectedInfo {
#[derive(Clone, Debug, Serialize)]
pub struct ChannelStateInfo {
pub name: String,
pub cssid: ChannelStatusSeriesId,
pub addr: SocketAddrV4,
pub series: Option<SeriesId>,
pub channel_connected_info: ChannelConnectedInfo,
@@ -159,7 +158,7 @@ struct Subid(pub u32);
#[derive(Clone, Debug)]
enum ChannelError {
CreateChanFail,
CreateChanFail(ChannelStatusSeriesId),
}
#[derive(Clone, Debug)]
@@ -213,18 +212,18 @@ enum ChannelState {
FetchingSeriesId(CreatedState),
Created(SeriesId, CreatedState),
Error(ChannelError),
Ended,
Ended(ChannelStatusSeriesId),
}
impl ChannelState {
fn to_info(&self, name: String, addr: SocketAddrV4) -> ChannelStateInfo {
fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4) -> ChannelStateInfo {
let channel_connected_info = match self {
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
ChannelState::FetchingSeriesId(..) => ChannelConnectedInfo::Connecting,
ChannelState::FetchingSeriesId(_) => ChannelConnectedInfo::Connecting,
ChannelState::Created(..) => ChannelConnectedInfo::Connected,
ChannelState::Error(..) => ChannelConnectedInfo::Error,
ChannelState::Ended => ChannelConnectedInfo::Ended,
ChannelState::Error(_) => ChannelConnectedInfo::Error,
ChannelState::Ended(_) => ChannelConnectedInfo::Ended,
};
let scalar_type = match self {
ChannelState::Created(_series, s) => Some(s.scalar_type.clone()),
@@ -269,7 +268,7 @@ impl ChannelState {
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
ChannelStateInfo {
name,
cssid,
addr,
series,
channel_connected_info,
@@ -282,6 +281,19 @@ impl ChannelState {
interest_score,
}
}
fn cssid(&self) -> ChannelStatusSeriesId {
match self {
ChannelState::Init(cssid) => cssid.clone(),
ChannelState::Creating { cssid, .. } => cssid.clone(),
ChannelState::FetchingSeriesId(st) => st.cssid.clone(),
ChannelState::Created(_, st) => st.cssid.clone(),
ChannelState::Error(e) => match e {
ChannelError::CreateChanFail(cssid) => cssid.clone(),
},
ChannelState::Ended(cssid) => cssid.clone(),
}
}
}
enum CaConnState {
@@ -320,12 +332,14 @@ fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
}
struct CidStore {
cnt: u32,
rng: Xoshiro128PlusPlus,
}
impl CidStore {
fn new(seed: u32) -> Self {
Self {
cnt: 0,
rng: Xoshiro128PlusPlus::seed_from_u64(seed as _),
}
}
@@ -340,17 +354,25 @@ impl CidStore {
}
fn next(&mut self) -> Cid {
Cid(self.rng.next_u32())
let c = self.cnt << 8;
self.cnt += 1;
let r = self.rng.next_u32();
let r = r ^ (r >> 8);
let r = r ^ (r >> 8);
let r = r ^ (r >> 8);
Cid(c | r)
}
}
struct SubidStore {
cnt: u32,
rng: Xoshiro128PlusPlus,
}
impl SubidStore {
fn new(seed: u32) -> Self {
Self {
cnt: 0,
rng: Xoshiro128PlusPlus::seed_from_u64(seed as _),
}
}
@@ -365,7 +387,13 @@ impl SubidStore {
}
fn next(&mut self) -> Subid {
Subid(self.rng.next_u32())
let c = self.cnt << 8;
self.cnt += 1;
let r = self.rng.next_u32();
let r = r ^ (r >> 8);
let r = r ^ (r >> 8);
let r = r ^ (r >> 8);
Subid(c | r)
}
}
@@ -381,7 +409,6 @@ pub enum ConnCommandKind {
SeriesLookupResult(Result<ChannelInfoResult, dbpg::seriesbychannel::Error>),
ChannelAdd(String, ChannelStatusSeriesId),
ChannelRemove(String),
CheckHealth,
Shutdown,
}
@@ -413,13 +440,6 @@ impl ConnCommand {
}
}
pub fn check_health() -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::CheckHealth,
}
}
pub fn shutdown() -> Self {
Self {
id: Self::make_id(),
@@ -438,13 +458,13 @@ impl ConnCommand {
}
#[derive(Debug)]
pub struct CheckHealthResult {
pub channel_statuses: BTreeMap<String, ChannelStateInfo>,
pub struct ChannelStatusPartial {
pub channel_statuses: BTreeMap<ChannelStatusSeriesId, ChannelStateInfo>,
}
#[derive(Debug)]
pub enum ConnCommandResultKind {
CheckHealth(CheckHealthResult),
Unused,
}
#[derive(Debug)]
@@ -469,6 +489,7 @@ pub enum CaConnEventValue {
None,
EchoTimeout,
ConnCommandResult(ConnCommandResult),
ChannelStatus(ChannelStatusPartial),
QueryItem(QueryItem),
ChannelCreateFail(String),
EndOfStream,
@@ -526,10 +547,12 @@ pub struct CaConn {
cid_store: CidStore,
subid_store: SubidStore,
channels: HashMap<Cid, ChannelState>,
cid_by_name: HashMap<String, Cid>,
// btree because require order:
cid_by_name: BTreeMap<String, Cid>,
cid_by_subid: HashMap<Subid, Cid>,
name_by_cid: HashMap<Cid, String>,
time_binners: HashMap<Cid, ConnTimeBin>,
channel_status_last_done: Option<Cid>,
init_state_count: u64,
insert_item_queue: VecDeque<QueryItem>,
remote_addr_dbg: SocketAddrV4,
@@ -585,10 +608,11 @@ impl CaConn {
subid_store: SubidStore::new_from_time(),
init_state_count: 0,
channels: HashMap::new(),
cid_by_name: HashMap::new(),
cid_by_name: BTreeMap::new(),
cid_by_subid: HashMap::new(),
name_by_cid: HashMap::new(),
time_binners: HashMap::new(),
channel_status_last_done: None,
insert_item_queue: VecDeque::new(),
remote_addr_dbg,
local_epics_hostname,
@@ -668,7 +692,16 @@ impl CaConn {
}
fn cmd_check_health(&mut self) {
// TODO
// no longer in use.
// CaConn emits health updates by iteself.
// Make sure that we do also the checks here on regular intervals.
trace!("cmd_check_health");
// TODO
// what actions are taken here?
// what status is modified here?
match self.check_channels_alive() {
Ok(_) => {}
Err(e) => {
@@ -676,25 +709,42 @@ impl CaConn {
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
}
}
// TODO return the result
// TODO
// Time this, is it fast enough?
let mut kit = self.cid_by_name.values();
if let Some(mut kk) = kit.next().map(Clone::clone) {
let mut start = Some(kk.clone());
if let Some(last) = self.channel_status_last_done.take() {
while kk <= last {
kk = if let Some(x) = kit.next().map(Clone::clone) {
start = Some(x.clone());
x
} else {
start = None;
break;
};
}
}
if let Some(mut kk) = start {
loop {
kk = if let Some(x) = kit.next().map(Clone::clone) {
x
} else {
break;
};
}
} else {
// Nothing to do, will continue on next call from front.
}
}
while let Some(kk) = kit.next() {}
let mut channel_statuses = BTreeMap::new();
for (k, v) in self.channels.iter() {
let name = self
.name_by_cid(*k)
.map_or_else(|| format!("{k:?}"), ToString::to_string);
let info = v.to_info(name.clone(), self.remote_addr_dbg);
channel_statuses.insert(name, info);
let info = v.to_info(v.cssid(), self.remote_addr_dbg);
channel_statuses.insert(v.cssid(), info);
}
let health = CheckHealthResult { channel_statuses };
let res = ConnCommandResult {
id: ConnCommandResult::make_id(),
kind: ConnCommandResultKind::CheckHealth(health),
};
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnCommandResult(res),
};
self.ca_conn_event_out_queue.push_back(item);
}
fn cmd_find_channel(&self, pattern: &str) {
@@ -713,7 +763,7 @@ impl CaConn {
fn cmd_channel_state(&self, name: String) {
let res = match self.cid_by_name.get(&name) {
Some(cid) => match self.channels.get(cid) {
Some(state) => Some(state.to_info(name, self.remote_addr_dbg.clone())),
Some(state) => Some(state.to_info(state.cssid(), self.remote_addr_dbg.clone())),
None => None,
},
None => None,
@@ -730,11 +780,11 @@ impl CaConn {
.channels
.iter()
.map(|(cid, state)| {
let name = self
.name_by_cid
.get(cid)
.map_or("--unknown--".into(), |x| x.to_string());
state.to_info(name, self.remote_addr_dbg.clone())
// let name = self
// .name_by_cid
// .get(cid)
// .map_or("--unknown--".into(), |x| x.to_string());
state.to_info(state.cssid(), self.remote_addr_dbg.clone())
})
.collect();
let msg = (self.remote_addr_dbg.clone(), res);
@@ -832,10 +882,6 @@ impl CaConn {
self.cmd_channel_remove(name);
Ok(Ready(Some(())))
}
ConnCommandKind::CheckHealth => {
self.cmd_check_health();
Ok(Ready(Some(())))
}
ConnCommandKind::Shutdown => {
self.cmd_shutdown();
Ok(Ready(Some(())))
@@ -899,7 +945,7 @@ impl CaConn {
fn channel_remove_expl(
name: String,
channels: &mut HashMap<Cid, ChannelState>,
cid_by_name: &mut HashMap<String, Cid>,
cid_by_name: &mut BTreeMap<String, Cid>,
name_by_cid: &mut HashMap<Cid, String>,
cid_store: &mut CidStore,
time_binners: &mut HashMap<Cid, ConnTimeBin>,
@@ -926,7 +972,7 @@ impl CaConn {
fn cid_by_name_expl(
name: &str,
cid_by_name: &mut HashMap<String, Cid>,
cid_by_name: &mut BTreeMap<String, Cid>,
name_by_cid: &mut HashMap<Cid, String>,
cid_store: &mut CidStore,
) -> Cid {
@@ -955,17 +1001,18 @@ impl CaConn {
}
fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
// TODO can I reuse emit_channel_info_insert_items ?
trace!("channel_state_on_shutdown channels {}", self.channels.len());
for (_cid, chst) in &mut self.channels {
match chst {
ChannelState::Init(..) => {
*chst = ChannelState::Ended;
ChannelState::Init(cssid) => {
*chst = ChannelState::Ended(cssid.clone());
}
ChannelState::Creating { .. } => {
*chst = ChannelState::Ended;
ChannelState::Creating { cssid, .. } => {
*chst = ChannelState::Ended(cssid.clone());
}
ChannelState::FetchingSeriesId(..) => {
*chst = ChannelState::Ended;
ChannelState::FetchingSeriesId(st) => {
*chst = ChannelState::Ended(st.cssid.clone());
}
ChannelState::Created(series, st2) => {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
@@ -974,12 +1021,13 @@ impl CaConn {
status: ChannelStatus::Closed(channel_reason.clone()),
});
self.insert_item_queue.push_back(item);
*chst = ChannelState::Ended;
*chst = ChannelState::Ended(st2.cssid.clone());
}
ChannelState::Error(..) => {
*chst = ChannelState::Ended;
warn!("TODO emit error status");
// *chst = ChannelState::Ended;
}
ChannelState::Ended => {}
ChannelState::Ended(cssid) => {}
}
}
}
@@ -1070,7 +1118,7 @@ impl CaConn {
ChannelState::Error(_) => {
// TODO need last-save-ts for this state.
}
ChannelState::Ended => {}
ChannelState::Ended(_) => {}
}
}
Ok(())
@@ -1261,6 +1309,17 @@ impl CaConn {
}
fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
trace!("got EventAddRes: {ev:?}");
self.stats.event_add_res_recv.inc();
let res = Self::handle_event_add_res_inner(self, ev, tsnow);
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
.add((ts2.duration_since(tsnow) * MS as u32).as_secs());
res
}
fn handle_event_add_res_inner(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
let subid = Subid(ev.subid);
// TODO handle subid-not-found which can also be peer error:
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
@@ -1446,6 +1505,7 @@ impl CaConn {
}
CaItem::Msg(msg) => match msg.ty {
CaMsgTy::VersionRes(n) => {
debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg);
if n < 12 || n > 13 {
error!("See some unexpected version {n} channel search may not work.");
Ready(Some(Ok(())))
@@ -1553,18 +1613,7 @@ impl CaConn {
self.handle_create_chan_res(k, tsnow)?;
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => {
trace4!("got EventAddRes: {k:?}");
self.stats.event_add_res_recv.inc();
let res = Self::handle_event_add_res(self, k, tsnow);
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
.add((ts2.duration_since(ts1) * MS as u32).as_secs());
ts1 = ts2;
let _ = ts1;
res?
}
CaMsgTy::EventAddRes(k) => self.handle_event_add_res(k, tsnow)?,
CaMsgTy::Echo => {
// let addr = &self.remote_addr_dbg;
if let Some(started) = self.ioc_ping_start {
@@ -1649,7 +1698,7 @@ impl CaConn {
res.map_err(|e| Error::from(e.to_string()))
}
fn handle_create_chan_res(&mut self, k: CreateChanRes, tsnow: Instant) -> Result<(), Error> {
fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> {
// TODO handle cid-not-found which can also indicate peer error.
let cid = Cid(k.cid);
let sid = k.sid;
@@ -1947,6 +1996,24 @@ impl CaConn {
Ok(())
}
fn emit_channel_status(&mut self) {
// TODO limit the queue length.
// Maybe factor the actual push item into new function.
// What to do if limit reached?
// Increase some error counter.
// if self.ca_conn_event_out_queue.len()>
let val = ChannelStatusPartial {
channel_statuses: Default::default(),
};
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ChannelStatus(val),
};
self.ca_conn_event_out_queue.push_back(item);
}
fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> {
Ok(())
}
@@ -2071,10 +2138,13 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.stats.poll_count().inc();
let poll_ts1 = Instant::now();
self.stats.poll_count().inc();
self.stats.poll_fn_begin().inc();
let mut reloops: u32 = 0;
let ret = loop {
let lts1 = Instant::now();
self.stats.poll_loop_begin().inc();
let qlen = self.insert_item_queue.len();
if qlen >= self.opts.insert_queue_max * 2 / 3 {
@@ -2093,6 +2163,8 @@ impl Stream for CaConn {
break Ready(Some(Ok(item)));
}
let lts2 = Instant::now();
match self.as_mut().handle_own_ticker(cx) {
Ok(Ready(())) => {
have_progress = true;
@@ -2114,6 +2186,8 @@ impl Stream for CaConn {
Err(e) => break Ready(Some(Err(e))),
}
let lts3 = Instant::now();
match self.as_mut().attempt_flush_channel_info_query(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
@@ -2125,6 +2199,8 @@ impl Stream for CaConn {
Err(e) => break Ready(Some(Err(e))),
}
let lts2 = Instant::now();
match self.as_mut().handle_conn_command(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
@@ -2136,6 +2212,8 @@ impl Stream for CaConn {
Err(e) => break Ready(Some(Err(e))),
}
let lts4 = Instant::now();
match self.loop_inner(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
@@ -2151,6 +2229,26 @@ impl Stream for CaConn {
}
}
let lts5 = Instant::now();
let max = Duration::from_millis(14);
let dt = lts2.saturating_duration_since(lts1);
if dt > max {
debug!("LONG OPERATION 2 {dt:?}");
}
let dt = lts3.saturating_duration_since(lts2);
if dt > max {
debug!("LONG OPERATION 3 {dt:?}");
}
let dt = lts4.saturating_duration_since(lts3);
if dt > max {
debug!("LONG OPERATION 4 {dt:?}");
}
let dt = lts5.saturating_duration_since(lts4);
if dt > max {
debug!("LONG OPERATION 5 {dt:?}");
}
break if self.is_shutdown() {
if self.queues_out_flushed() {
// debug!("end of stream {}", self.remote_addr_dbg);
@@ -2160,6 +2258,7 @@ impl Stream for CaConn {
// debug!("queues_out_flushed false");
if have_progress {
self.stats.poll_reloop().inc();
reloops += 1;
continue;
} else if have_pending {
self.stats.poll_pending().inc();
@@ -2174,8 +2273,18 @@ impl Stream for CaConn {
}
} else {
if have_progress {
self.stats.poll_reloop().inc();
continue;
if poll_ts1.elapsed() > Duration::from_millis(5) {
self.stats.poll_wake_break().inc();
cx.waker().wake_by_ref();
break Ready(Some(Ok(CaConnEvent {
ts: poll_ts1,
value: CaConnEventValue::None,
})));
} else {
self.stats.poll_reloop().inc();
reloops += 1;
continue;
}
} else if have_pending {
self.stats.poll_pending().inc();
Pending
@@ -2186,13 +2295,20 @@ impl Stream for CaConn {
}
};
};
if reloops >= 512 {
self.stats.poll_reloops_512().inc();
} else if reloops >= 64 {
self.stats.poll_reloops_64().inc();
} else if reloops >= 8 {
self.stats.poll_reloops_8().inc();
}
let poll_ts2 = Instant::now();
let dt = poll_ts2.saturating_duration_since(poll_ts1);
if dt > Duration::from_millis(80) {
warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
} else if dt > Duration::from_millis(40) {
info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
} else if false && dt > Duration::from_millis(5) {
} else if dt > Duration::from_millis(14) {
debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
}
ret

View File

@@ -1,16 +1,6 @@
use super::conn::ChannelStateInfo;
use super::conn::CheckHealthResult;
use super::conn::ConnCommandResult;
use super::findioc::FindIocRes;
use super::statemap;
use super::statemap::ChannelState;
use super::statemap::ConnectionState;
use super::statemap::ConnectionStateValue;
use crate::ca::conn::CaConn;
use crate::ca::conn::CaConnEvent;
use crate::ca::conn::CaConnEventValue;
use crate::ca::conn::CaConnOpts;
use crate::ca::conn::ConnCommand;
use crate::ca::conn;
use crate::ca::statemap;
use crate::ca::statemap::CaConnState;
use crate::ca::statemap::WithAddressState;
use crate::conf::CaIngestOpts;
@@ -22,6 +12,14 @@ use crate::throttletrace::ThrottleTrace;
use async_channel::Receiver;
use async_channel::Sender;
use atomic::AtomicUsize;
use conn::CaConn;
use conn::CaConnEvent;
use conn::CaConnEventValue;
use conn::CaConnOpts;
use conn::ChannelStateInfo;
use conn::ChannelStatusPartial;
use conn::ConnCommand;
use conn::ConnCommandResult;
use core::fmt;
use dbpg::seriesbychannel::BoxedSend;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
@@ -41,8 +39,11 @@ use serde::Serialize;
use series::ChannelStatusSeriesId;
use statemap::ActiveChannelState;
use statemap::CaConnStateValue;
use statemap::ChannelState;
use statemap::ChannelStateMap;
use statemap::ChannelStateValue;
use statemap::ConnectionState;
use statemap::ConnectionStateValue;
use statemap::WithStatusSeriesIdState;
use statemap::WithStatusSeriesIdStateInner;
use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
@@ -53,6 +54,7 @@ use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IocFinderStats;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
@@ -196,7 +198,6 @@ impl fmt::Debug for ChannelStatusesRequest {
pub enum ConnSetCmd {
ChannelAdd(ChannelAdd),
ChannelRemove(ChannelRemove),
CheckHealth(u32, Instant),
Shutdown,
ChannelStatuses(ChannelStatusesRequest),
}
@@ -213,7 +214,7 @@ impl CaConnSetEvent {
#[derive(Debug, Clone)]
pub enum CaConnSetItem {
Error(Error),
Healthy(u32, Instant, Instant),
Healthy,
}
pub struct CaConnSetCtrl {
@@ -266,19 +267,6 @@ impl CaConnSetCtrl {
Ok(())
}
pub async fn check_health(&mut self) -> Result<u32, Error> {
let id = self.make_id();
let cmd = ConnSetCmd::CheckHealth(id, Instant::now());
let n = self.tx.len();
if n > 0 {
debug!("check_health self.tx.len() {:?}", n);
}
let s = format!("{:?}", cmd);
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
trace!("check_health enqueued {s}");
Ok(id)
}
pub async fn join(self) -> Result<(), Error> {
self.jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
Ok(())
@@ -352,6 +340,7 @@ impl CanSendChannelInfoResult for SeriesLookupSender {
}
pub struct CaConnSet {
ticker: Pin<Box<tokio::time::Sleep>>,
backend: String,
local_epics_hostname: String,
ca_conn_ress: BTreeMap<SocketAddr, CaConnRes>,
@@ -384,9 +373,14 @@ pub struct CaConnSet {
ca_proto_stats: Arc<CaProtoStats>,
rogue_channel_count: u64,
connect_fail_count: usize,
name_by_cssid: HashMap<ChannelStatusSeriesId, String>,
}
impl CaConnSet {
pub fn self_name() -> &'static str {
std::any::type_name::<Self>()
}
pub fn start(
backend: String,
local_epics_hostname: String,
@@ -411,6 +405,7 @@ impl CaConnSet {
let ca_proto_stats = Arc::new(CaProtoStats::new());
let ca_conn_stats = Arc::new(CaConnStats::new());
let connset = Self {
ticker: Self::new_self_ticker(),
backend,
local_epics_hostname,
ca_conn_ress: BTreeMap::new(),
@@ -444,6 +439,7 @@ impl CaConnSet {
ca_proto_stats: ca_proto_stats.clone(),
rogue_channel_count: 0,
connect_fail_count: 0,
name_by_cssid: HashMap::new(),
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
@@ -460,6 +456,10 @@ impl CaConnSet {
}
}
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
Box::pin(tokio::time::sleep(Duration::from_millis(500)))
}
async fn run(mut this: CaConnSet) -> Result<(), Error> {
debug!("CaConnSet run begin");
loop {
@@ -498,7 +498,6 @@ impl CaConnSet {
ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x),
// ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await,
// ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
ConnSetCmd::CheckHealth(id, ts1) => self.handle_check_health(id, ts1),
ConnSetCmd::Shutdown => self.handle_shutdown(),
ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x),
},
@@ -517,13 +516,19 @@ impl CaConnSet {
self.stats.channel_add().inc();
// TODO should I add the transition through ActiveChannelState::Init as well?
let ch = Channel::new(cmd.name.clone());
let _st = self.channel_states.inner().entry(ch).or_insert_with(|| ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId {
since: SystemTime::now(),
}),
running_cmd_id: None,
health_timeout_count: 0,
});
let _st = if let Some(e) = self.channel_states.get_mut(&ch) {
e
} else {
let item = ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId {
since: SystemTime::now(),
}),
running_cmd_id: None,
health_timeout_count: 0,
};
self.channel_states.insert(ch.clone(), item);
self.channel_states.get_mut(&ch).unwrap()
};
let tx = self.channel_info_res_tx.as_ref().get_ref().clone();
let item = ChannelInfoQuery {
backend: cmd.backend,
@@ -545,13 +550,17 @@ impl CaConnSet {
CaConnEventValue::EchoTimeout => Ok(()),
CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x),
CaConnEventValue::QueryItem(item) => {
todo!("remove this insert case");
error!("TODO remove this insert case");
// self.storage_insert_queue.push_back(item);
Ok(())
}
CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x),
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr),
CaConnEventValue::ConnectFail => self.handle_connect_fail(addr),
CaConnEventValue::ChannelStatus(st) => {
error!("TODO handle_ca_conn_event update channel status view");
Ok(())
}
}
}
@@ -562,10 +571,12 @@ impl CaConnSet {
trace3!("handle_series_lookup_result {res:?}");
match res {
Ok(res) => {
let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id());
self.name_by_cssid.insert(cssid.clone(), res.channel.clone());
let add = ChannelAddWithStatusId {
backend: res.backend,
name: res.channel,
cssid: ChannelStatusSeriesId::new(res.series.into_inner().id()),
cssid,
};
self.handle_add_channel_with_status_id(add)?;
}
@@ -587,7 +598,7 @@ impl CaConnSet {
debug!("handle_add_channel_with_status_id {cmd:?}");
}
let ch = Channel::new(cmd.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(chst2) = &mut chst.value {
if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 {
*chst2 = ActiveChannelState::WithStatusSeriesId {
@@ -631,7 +642,7 @@ impl CaConnSet {
debug!("handle_add_channel_with_addr {cmd:?}");
}
let ch = Channel::new(cmd.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ast) = &mut chst.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
@@ -669,7 +680,7 @@ impl CaConnSet {
return Ok(());
}
let ch = Channel::new(cmd.name);
if let Some(k) = self.channel_states.inner().get_mut(&ch) {
if let Some(k) = self.channel_states.get_mut(&ch) {
match &k.value {
ChannelStateValue::Active(j) => match j {
ActiveChannelState::Init { .. } => {
@@ -717,7 +728,7 @@ impl CaConnSet {
if trigger.contains(&ch.id()) {
debug!("handle_ioc_query_result {res:?}");
}
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ast) = &mut chst.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id,
@@ -764,33 +775,20 @@ impl CaConnSet {
Ok(())
}
fn handle_check_health(&mut self, id: u32, ts1: Instant) -> Result<(), Error> {
trace2!("handle_check_health {id:08x}");
fn handle_check_health(&mut self) -> Result<(), Error> {
trace2!("handle_check_health");
if self.shutdown_stopping {
return Ok(());
Ok(())
} else {
if false {
self.thr_msg_storage_len
.trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]);
}
self.check_channel_states()?;
let item = CaConnSetItem::Healthy;
self.connset_out_queue.push_back(item);
Ok(())
}
if false {
self.thr_msg_storage_len
.trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]);
}
self.check_channel_states()?;
// Trigger already the next health check, but use the current data that we have.
// TODO do the full check before sending the reply to daemon.
for (_, res) in self.ca_conn_ress.iter_mut() {
let item = ConnCommand::check_health();
res.cmd_queue.push_back(item);
trace2!(
"handle_check_health pushed check command {:?} {:?}",
res.cmd_queue.len(),
res.sender.len()
);
}
let ts2 = Instant::now();
let item = CaConnSetItem::Healthy(id, ts1, ts2);
self.connset_out_queue.push_back(item);
Ok(())
}
fn handle_channel_statuses_req(&mut self, req: ChannelStatusesRequest) -> Result<(), Error> {
@@ -801,7 +799,6 @@ impl CaConnSet {
let reg1 = regex::Regex::new(&req.name)?;
let channels_ca_conn_set = self
.channel_states
.inner()
.iter()
.filter(|(k, _)| reg1.is_match(k.id()))
.map(|(k, v)| (k.id().to_string(), v.clone()))
@@ -835,17 +832,23 @@ impl CaConnSet {
fn handle_conn_command_result(&mut self, addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> {
use crate::ca::conn::ConnCommandResultKind::*;
match res.kind {
CheckHealth(res) => self.apply_ca_conn_health_update(addr, res),
Unused => Ok(()),
//CheckHealth(res) => self.apply_ca_conn_health_update(addr, res),
}
}
fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: CheckHealthResult) -> Result<(), Error> {
fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: ChannelStatusPartial) -> Result<(), Error> {
trace2!("apply_ca_conn_health_update {addr}");
let tsnow = SystemTime::now();
self.rogue_channel_count = 0;
for (k, v) in res.channel_statuses {
let ch = Channel::new(k);
if let Some(st1) = self.channel_states.inner().get_mut(&ch) {
let name = if let Some(x) = self.name_by_cssid.get(&v.cssid) {
x
} else {
return Err(Error::with_msg_no_trace(format!("unknown cssid {:?}", v.cssid)));
};
let ch = Channel::new(name.clone());
if let Some(st1) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
@@ -888,7 +891,7 @@ impl CaConnSet {
trace!("handle_channel_create_fail {addr} {name}");
let tsnow = SystemTime::now();
let ch = Channel::new(name);
if let Some(st1) = self.channel_states.inner().get_mut(&ch) {
if let Some(st1) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
@@ -921,7 +924,7 @@ impl CaConnSet {
fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> {
trace2!("handle_connect_fail {addr}");
let tsnow = SystemTime::now();
for (ch, st1) in self.channel_states.inner().iter_mut() {
for (ch, st1) in self.channel_states.iter_mut() {
match &mut st1.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { since: _ } => {}
@@ -1033,32 +1036,13 @@ impl CaConnSet {
break;
}
}
CaConnEventValue::None => {
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::EchoTimeout => {
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::ConnCommandResult(_) => {
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::ChannelCreateFail(_) => {
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::EndOfStream => {
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::ConnectFail => {
CaConnEventValue::None
| CaConnEventValue::EchoTimeout
| CaConnEventValue::ConnCommandResult(..)
| CaConnEventValue::ChannelCreateFail(..)
| CaConnEventValue::EndOfStream
| CaConnEventValue::ConnectFail
| CaConnEventValue::ChannelStatus(..) => {
if let Err(_) = tx1.send((addr, item)).await {
break;
}
@@ -1240,9 +1224,9 @@ impl CaConnSet {
let k = self.chan_check_next.take();
let it = if let Some(last) = k {
trace!("check_chans start at {:?}", last);
self.channel_states.inner().range_mut(last..)
self.channel_states.range_mut(last..)
} else {
self.channel_states.inner().range_mut(..)
self.channel_states.range_mut(..)
};
let tsnow = SystemTime::now();
@@ -1306,11 +1290,14 @@ impl CaConnSet {
}
}
Assigned(st4) => {
if st4.updated + CHANNEL_HEALTH_TIMEOUT / 3 < tsnow {
warn!("soon health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
}
if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow {
self.stats.channel_health_timeout().inc();
trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
// TODO
error!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
error!("TODO health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
std::process::exit(1);
let addr = SocketAddr::V4(*addr_v4);
cmd_remove_channel.push((addr, ch.clone()));
@@ -1385,7 +1372,7 @@ impl CaConnSet {
let mut connected = 0;
let mut maybe_wrong_address = 0;
let mut assigned_without_health_update = 0;
for (_ch, st) in self.channel_states.inner().iter() {
for (_ch, st) in self.channel_states.iter() {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {
@@ -1477,16 +1464,28 @@ impl CaConnSet {
}
Ok(())
}
fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
debug!("handle_own_ticker_tick {}", Self::self_name());
if !self.ready_for_end_of_stream() {
self.ticker = Self::new_self_ticker();
let _ = self.ticker.poll_unpin(cx);
// cx.waker().wake_by_ref();
}
self.handle_check_health()?;
Ok(())
}
}
impl Stream for CaConnSet {
type Item = CaConnSetItem;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
trace4!("CaConnSet poll begin");
use Poll::*;
trace4!("CaConnSet poll begin");
let poll_ts1 = Instant::now();
self.stats.poll_fn_begin().inc();
let res = loop {
let ret = loop {
trace4!("CaConnSet poll loop");
self.stats.poll_loop_begin().inc();
@@ -1519,6 +1518,22 @@ impl Stream for CaConnSet {
break Ready(Some(item));
}
match self.ticker.poll_unpin(cx) {
Ready(()) => match self.as_mut().handle_own_ticker_tick(cx) {
Ok(()) => {
have_progress = true;
}
Err(e) => {
have_progress = true;
error!("ticker {e}");
break Ready(Some(CaConnSetItem::Error(e)));
}
},
Pending => {
have_pending = true;
}
}
if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() {
match jh.poll_unpin(cx) {
Ready(x) => {
@@ -1591,6 +1606,7 @@ impl Stream for CaConnSet {
}
if self.channel_info_query_sender.is_idle() {
// if self.channel_info_query_sender.len().unwrap_or(0) <= 10 {}
if let Some(item) = self.channel_info_query_queue.pop_front() {
self.channel_info_query_sender.as_mut().send_pin(item);
}
@@ -1690,6 +1706,15 @@ impl Stream for CaConnSet {
};
};
trace4!("CaConnSet poll done");
res
let poll_ts2 = Instant::now();
let dt = poll_ts2.saturating_duration_since(poll_ts1);
if dt > Duration::from_millis(80) {
warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
} else if dt > Duration::from_millis(40) {
info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
} else if dt > Duration::from_millis(5) {
debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
}
ret
}
}

View File

@@ -1,12 +1,9 @@
use super::connset::CaConnSetEvent;
use super::connset::IocAddrQuery;
use super::connset::CURRENT_SEARCH_PENDING_MAX;
use super::connset::SEARCH_BATCH_MAX;
use super::search::ca_search_workers_start;
use crate::ca::findioc::FindIocRes;
use crate::ca::findioc::FindIocStream;
use crate::conf::CaIngestOpts;
use crate::daemon_common::DaemonEvent;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::conn::make_pg_client;
@@ -14,16 +11,11 @@ use dbpg::iocindex::IocItem;
use dbpg::iocindex::IocSearchIndexWorker;
use dbpg::postgres::Row as PgRow;
use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use stats::IocFinderStats;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
@@ -31,10 +23,6 @@ use taskrun::tokio;
use tokio::task::JoinHandle;
const SEARCH_DB_PIPELINE_LEN: usize = 4;
const FINDER_JOB_QUEUE_LEN_MAX: usize = 10;
const FINDER_BATCH_SIZE: usize = 8;
const FINDER_IN_FLIGHT_MAX: usize = 800;
const FINDER_TIMEOUT: Duration = Duration::from_millis(100);
#[allow(unused)]
macro_rules! debug_batch {

View File

@@ -616,10 +616,10 @@ impl Stream for FindIocStream {
match batch.tgts.pop_front() {
Some(tgtix) => {
Self::serialize_batch(buf1, batch);
debug!("serialized for search {:?}", batch.channels);
match self.tgts.get(tgtix) {
Some(tgt) => {
let tgt = tgt.clone();
//info!("Serialize and queue {bid:?}");
self.send_addr = tgt.clone();
self.batch_send_queue.push_back(bid);
have_progress = true;

View File

@@ -1,9 +1,13 @@
use crate::ca::conn::ChannelStateInfo;
use crate::daemon_common::Channel;
use dashmap::DashMap;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use std::collections::btree_map::RangeMut;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddrV4;
use std::ops::RangeBounds;
use std::time::Instant;
use std::time::SystemTime;
@@ -117,14 +121,51 @@ pub struct ChannelState {
#[derive(Debug, Clone, Serialize)]
pub struct ChannelStateMap {
map: BTreeMap<Channel, ChannelState>,
#[serde(skip)]
map2: HashMap<Channel, ChannelState>,
// TODO implement same interface via dashmap and compare
#[serde(skip)]
map3: DashMap<Channel, ChannelState>,
}
impl ChannelStateMap {
pub fn new() -> Self {
Self { map: BTreeMap::new() }
Self {
map: BTreeMap::new(),
map2: HashMap::new(),
map3: DashMap::new(),
}
}
pub fn inner(&mut self) -> &mut BTreeMap<Channel, ChannelState> {
&mut self.map
pub fn insert(&mut self, k: Channel, v: ChannelState) -> Option<ChannelState> {
self.map.insert(k, v)
}
pub fn get_mut(&mut self, k: &Channel) -> Option<&mut ChannelState> {
self.map.iter_mut();
self.map.get_mut(k)
}
pub fn iter(&self) -> impl Iterator<Item = (&Channel, &ChannelState)> {
self.map.iter()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&Channel, &mut ChannelState)> {
self.map.iter_mut()
}
pub fn iter_mut_dash(&mut self) -> ChannelStateIter {
todo!()
}
pub fn range_mut<R>(&mut self, range: R) -> RangeMut<Channel, ChannelState>
where
R: RangeBounds<Channel>,
{
self.map.range_mut(range)
}
}
pub struct ChannelStateIter<'a> {
_m1: &'a u32,
}

View File

@@ -1,5 +1,4 @@
use err::Error;
use ingest_linux::net::local_hostname;
use netpod::log::*;
use netpod::Database;
use netpod::ScyllaConfig;

View File

@@ -2,7 +2,7 @@ use crate::ca::connset::CaConnSetItem;
use async_channel::Sender;
use serde::Serialize;
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)]
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub struct Channel {
id: String,
}

View File

@@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
futures-util = "0.3.28"
async-channel = "2.0.0"
scylla = "0.10.1"
scylla = "0.11.0"
smallvec = "1.11.0"
pin-project = "1.1.3"
stackfuture = "0.3.0"

View File

@@ -4,7 +4,7 @@ use futures_util::Future;
use futures_util::FutureExt;
use netpod::log::*;
use scylla::batch::Batch;
use scylla::frame::value::BatchValues;
use scylla::serialize::batch::BatchValues;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;

View File

@@ -4,7 +4,7 @@ use futures_util::Future;
use futures_util::FutureExt;
use netpod::log::*;
use scylla::batch::Batch;
use scylla::frame::value::BatchValues;
use scylla::serialize::batch::BatchValues;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;

View File

@@ -5,6 +5,7 @@ use futures_util::FutureExt;
use netpod::log::*;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::serialize::row::SerializeRow;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;
@@ -24,7 +25,7 @@ impl<'a> ScyInsertFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self
where
V: ValueList + Send + 'static,
V: ValueList + SerializeRow + Send + 'static,
{
let fut = scy.execute(query, values);
let fut = Box::pin(fut) as _;

View File

@@ -10,7 +10,11 @@ use futures_util::FutureExt;
use netpod::timeunits::SEC;
use netpod::ScalarType;
use netpod::Shape;
use scylla::frame::value::Value;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::serialize::row::SerializeRow;
use scylla::serialize::value::SerializeCql;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
@@ -18,7 +22,6 @@ use series::ChannelStatusSeriesId;
use series::SeriesId;
use smallvec::smallvec;
use smallvec::SmallVec;
use stackfuture::StackFuture;
use stats::InsertWorkerStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
@@ -278,7 +281,7 @@ struct InsParCom {
fn insert_scalar_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
where
ST: scylla::frame::value::Value + Send + 'static,
ST: Value + SerializeCql + Send + 'static,
{
let params = (
par.series as i64,
@@ -293,7 +296,7 @@ where
fn insert_array_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
where
ST: scylla::frame::value::Value + Send + 'static,
ST: Value + SerializeCql + Send + 'static,
{
let params = (
par.series as i64,
@@ -318,7 +321,7 @@ pub struct InsertFut {
}
impl InsertFut {
pub fn new<V: scylla::frame::value::ValueList + Send + 'static>(
pub fn new<V: ValueList + SerializeRow + Send + 'static>(
scy: Arc<ScySession>,
qu: Arc<PreparedStatement>,
params: V,
@@ -363,7 +366,7 @@ async fn insert_scalar_gen<ST>(
data_store: &DataStore,
) -> Result<(), Error>
where
ST: scylla::frame::value::Value,
ST: Value + SerializeCql,
{
let params = (
par.series as i64,
@@ -399,7 +402,7 @@ async fn insert_array_gen<ST>(
data_store: &DataStore,
) -> Result<(), Error>
where
ST: scylla::frame::value::Value,
ST: Value + SerializeCql,
{
if par.do_insert {
let params = (

View File

@@ -30,7 +30,7 @@ impl SeriesId {
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize)]
pub struct ChannelStatusSeriesId(u64);
impl ChannelStatusSeriesId {

View File

@@ -426,6 +426,10 @@ stats_proc::stats_struct!((
poll_reloop,
poll_pending,
poll_no_progress_no_pending,
poll_reloops_8,
poll_reloops_64,
poll_reloops_512,
poll_wake_break,
storage_queue_send,
storage_queue_pending,
storage_queue_above_8,