Version 0.2.2

This commit is contained in:
Dominik Werder
2024-08-13 15:13:35 +02:00
parent a5caec0591
commit 95acfd6061
12 changed files with 265 additions and 220 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.2-aa.3"
version = "0.2.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -3,6 +3,7 @@ use daqingest::opts::DaqIngestOpts;
use err::Error;
use log::*;
use netfetch::conf::parse_config;
use netfetch::conf::CaIngestOpts;
use netpod::Database;
use scywr::config::ScyllaIngestConfig;
use taskrun::TracingMode;
@@ -30,7 +31,7 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> {
}
async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
let buildmark = "+0007";
let buildmark = "+0008";
use daqingest::opts::ChannelAccess;
use daqingest::opts::SubCmd;
match opts.subcmd {
@@ -88,18 +89,27 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
}
}
}
SubCmd::ScyllaSchemaCheck(k) => {
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
let (opts, _) = parse_config(k.config.into()).await?;
scylla_schema_check(opts, false).await?;
}
SubCmd::ScyllaSchemaChange(k) => {
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
let (opts, _) = parse_config(k.config.into()).await?;
scylla_schema_check(opts, true).await?;
}
SubCmd::ChannelAccess(k) => match k {
#[cfg(DISABLED)]
ChannelAccess::CaSearch(k) => {
info!("daqingest version {}", clap::crate_version!());
let (conf, channels) = parse_config(k.config.into()).await?;
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
let (conf, channels_config) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels_config).await?
}
ChannelAccess::CaSearch(_k) => {
// info!("daqingest version {}", clap::crate_version!());
// let (conf, channels) = parse_config(k.config.into()).await?;
// netfetch::ca::search::ca_search(conf, &channels).await?
}
},
#[cfg(feature = "bsread")]
SubCmd::Bsread(k) => ingest_bsread::zmtp::zmtp_client(k.into())
@@ -128,3 +138,20 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
}
Ok(())
}
async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> {
let opstr = if do_change { "change" } else { "check" };
info!("start scylla schema {}", opstr);
use netpod::ttl::RetentionTime;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short, do_change)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium, do_change)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long, do_change)
.await
.map_err(Error::from_string)?;
info!("stop scylla schema {}", opstr);
Ok(())
}

View File

@@ -2,7 +2,6 @@ pub mod inserthook;
use async_channel::Receiver;
use async_channel::Sender;
use async_channel::WeakSender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use log::*;
@@ -23,15 +22,10 @@ use scywr::config::ScyllaIngestConfig;
use scywr::insertqueues::InsertQueuesRx;
use scywr::insertqueues::InsertQueuesTx;
use scywr::insertworker::InsertWorkerOpts;
use scywr::iteminsertqueue as scywriiq;
use scywriiq::QueryItem;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
use stats::SeriesWriterEstablishStats;
use std::collections::VecDeque;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
@@ -107,7 +101,7 @@ impl Daemon {
let local_epics_hostname = ingest_linux::net::local_hostname();
#[cfg(DISABLED)]
#[cfg(target_abi = "x32")]
let query_item_rx = {
// TODO only testing, remove
tokio::spawn({
@@ -345,6 +339,7 @@ impl Daemon {
}
fn check_health_connset(&mut self, ts1: Instant) -> Result<(), Error> {
let _ = ts1;
let dt = self.connset_status_last.elapsed();
if dt > CHECK_HEALTH_TIMEOUT {
error!(
@@ -358,7 +353,7 @@ impl Daemon {
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
if self.shutting_down {
let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire);
#[cfg(DISABLED)]
#[cfg(target_abi = "x32")]
{
let nitems = self
.query_item_tx_weak
@@ -424,7 +419,7 @@ impl Daemon {
Ok(())
}
#[cfg(DISABLED)]
#[cfg(target_abi = "x32")]
async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> {
info!("handle_ca_conn_done {conn_addr:?}");
self.connection_states.remove(&conn_addr);
@@ -509,7 +504,7 @@ impl Daemon {
Ok(())
}
#[cfg(DISABLED)]
#[cfg(target_abi = "x32")]
async fn handle_shutdown(&mut self) -> Result<(), Error> {
warn!("received shutdown event");
if self.shutting_down {
@@ -720,13 +715,13 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
warn!("scylla_disable config flag enabled");
} else {
info!("start scylla schema check");
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short)
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short, false)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium)
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium, false)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long)
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long, false)
.await
.map_err(Error::from_string)?;
info!("stop scylla schema check");

View File

@@ -23,6 +23,8 @@ pub enum SubCmd {
ListPulses,
FetchEvents(FetchEvents),
Db(Db),
ScyllaSchemaCheck(CaConfig),
ScyllaSchemaChange(CaConfig),
#[command(subcommand)]
ChannelAccess(ChannelAccess),
#[cfg(feature = "bsread")]
@@ -81,7 +83,6 @@ pub struct BsreadDump {
#[derive(Debug, clap::Parser)]
pub enum ChannelAccess {
CaIngest(CaConfig),
#[cfg(DISABLED)]
CaSearch(CaSearch),
}
@@ -115,6 +116,14 @@ pub struct Db {
pub sub: DbSub,
}
#[derive(Debug, clap::Parser)]
pub struct ScyllaDb {
#[arg(long)]
pub scylla_host: String,
#[arg(long)]
pub scylla_keyspace: String,
}
#[derive(Debug, clap::Parser)]
pub enum DbSub {
Data(DbData),

View File

@@ -51,8 +51,11 @@ pub enum Error {
ChannelError,
#[error("DbConsistencySeries({0})")]
DbConsistencySeries(String),
ScalarType,
#[error("ScalarType({0})")]
ScalarType(i32),
Shape,
#[error("SeriesKind({0})")]
SeriesKind(i16),
}
impl From<crate::err::Error> for Error {
@@ -355,12 +358,14 @@ impl Worker {
let series: i64 = row.get(1);
let series = SeriesId::new(series as _);
let shape_dims: Vec<i32> = row.get(3);
let scalar_type = ScalarType::from_scylla_i32(row.get(2)).map_err(|_| Error::ScalarType)?;
let scalar_type = row.get(2);
let scalar_type =
ScalarType::from_scylla_i32(scalar_type).map_err(|_| Error::ScalarType(scalar_type))?;
let shape_dims =
Shape::from_scylla_shape_dims(shape_dims.as_slice()).map_err(|_| Error::Shape)?;
let tscs: Vec<DateTime<Utc>> = row.get(4);
let kind: i16 = row.get(5);
let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?;
let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::SeriesKind(kind))?;
if true && job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000"
|| series == SeriesId::new(1605348259462543621)
{

View File

@@ -24,6 +24,7 @@ use futures_util::StreamExt;
use hashbrown::HashMap;
use log::*;
use netpod::timeunits::*;
use netpod::trigger;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
use netpod::SeriesKind;
@@ -58,7 +59,6 @@ use series::SeriesId;
use serieswriter::binwriter::BinWriter;
use serieswriter::fixgridwriter::ChannelStatusSeriesWriter;
use serieswriter::fixgridwriter::ChannelStatusWriteState;
use serieswriter::fixgridwriter::CHANNEL_STATUS_GRID;
use serieswriter::msptool::MspSplit;
use serieswriter::rtwriter::RtWriter;
use serieswriter::writer::EmittableType;
@@ -136,6 +136,15 @@ macro_rules! trace_event_incoming {
};
}
#[allow(unused)]
macro_rules! trace_monitor_stale {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
fn dbg_chn_name(name: impl AsRef<str>) -> bool {
name.as_ref() == "SINSB02-KCOL-ACT:V-EY21700-MAN-ON-SP"
}
@@ -442,14 +451,10 @@ struct CreatedState {
// Updated on monitoring, polling or when the channel config changes to reset the timeout
ts_activity_last: Instant,
st_activity_last: SystemTime,
ts_msp_last: u64,
ts_msp_grid_last: u32,
inserted_in_ts_msp: u64,
insert_item_ivl_ema: IntervalEma,
item_recv_ivl_ema: IntervalEma,
insert_recv_ivl_last: Instant,
muted_before: u32,
info_store_msp_last: u32,
recv_count: u64,
recv_bytes: u64,
stwin_ts: u64,
@@ -464,7 +469,6 @@ struct CreatedState {
dw_lt_last: SystemTime,
scalar_type: ScalarType,
shape: Shape,
log_more: bool,
name: String,
enum_str_table: Option<Vec<String>>,
status_emit_count: u64,
@@ -485,14 +489,10 @@ impl CreatedState {
ts_alive_last: tsnow,
ts_activity_last: tsnow,
st_activity_last: stnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: 0,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: tsnow,
muted_before: 0,
info_store_msp_last: 0,
recv_count: 0,
recv_bytes: 0,
stwin_ts: 0,
@@ -507,7 +507,6 @@ impl CreatedState {
dw_lt_last: SystemTime::UNIX_EPOCH,
scalar_type: ScalarType::I8,
shape: Shape::Scalar,
log_more: false,
name: String::new(),
enum_str_table: None,
status_emit_count: 0,
@@ -1211,7 +1210,7 @@ impl CaConn {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(conf, cssid) => {
self.channel_add(conf, cssid);
self.channel_add(conf, cssid)?;
Ok(Ready(Some(())))
}
ConnCommandKind::ChannelClose(name) => {
@@ -1237,7 +1236,6 @@ impl CaConn {
use Poll::*;
let mut have_progress = false;
let mut have_pending = false;
let stnow = self.tmp_ts_poll;
if self.is_shutdown() {
Ok(Ready(None))
} else {
@@ -1341,7 +1339,6 @@ impl CaConn {
)?;
self.stats.get_series_id_ok.inc();
{
info!("queued Opened {:?}", st2.channel.cssid);
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
@@ -1420,13 +1417,17 @@ impl CaConn {
pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> {
if self.cid_by_name(conf.name()).is_some() {
self.stats.channel_add_exists.inc();
error!("logic error channel already exists {conf:?}");
if trigger.contains(&conf.name()) {
error!("logic error channel already exists {conf:?}");
}
Ok(())
} else {
let cid = self.cid_by_name_or_insert(conf.name())?;
if self.channels.contains_key(&cid) {
self.stats.channel_add_exists.inc();
error!("logic error channel already exists {conf:?}");
if trigger.contains(&conf.name()) {
error!("logic error channel already exists {conf:?}");
}
Ok(())
} else {
let conf = ChannelConf::new(conf, cssid);
@@ -1493,6 +1494,22 @@ impl CaConn {
fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
// TODO can I reuse emit_channel_info_insert_items ?
trace!("channel_state_on_shutdown channels {}", self.channels.len());
let stnow = self.tmp_ts_poll;
let mut item_deque = VecDeque::new();
for (_cid, conf) in &mut self.channels {
let item = ChannelStatusItem {
ts: stnow,
cssid: conf.state.cssid(),
status: ChannelStatus::Closed(channel_reason.clone()),
};
let deque = &mut item_deque;
if conf.wrst.emit_channel_status_item(item, deque).is_err() {
self.stats.logic_error().inc();
}
}
for x in item_deque {
self.iqdqs.st_rf3_qu.push_back(x);
}
for (_cid, conf) in &mut self.channels {
if dbg_chn_name(conf.conf.name()) {
info!("channel_state_on_shutdown {:?}", conf);
@@ -1515,23 +1532,13 @@ impl CaConn {
*chst = ChannelState::Ended(st.channel.cssid.clone());
}
ChannelState::Writable(st2) => {
let cssid = st2.channel.cssid.clone();
// TODO should call the proper channel-close handler which in turn emits the status item.
// Make sure I record the reason for the "Close": user command, IOC error, etc..
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: cssid.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
};
let deque = &mut self.iqdqs.st_rf3_qu;
if conf.wrst.emit_channel_status_item(item, deque).is_err() {
self.stats.logic_error().inc();
}
let cssid = st2.channel.cssid.clone();
*chst = ChannelState::Ended(cssid);
}
ChannelState::Error(..) => {
warn!("TODO emit error status");
// *chst = ChannelState::Ended;
// Leave state unchanged.
}
ChannelState::Ended(_) => {}
ChannelState::Closing(_) => {}
@@ -1539,44 +1546,6 @@ impl CaConn {
}
}
fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> {
let timenow = self.tmp_ts_poll;
for (_, conf) in &mut self.channels {
let st = &mut conf.state;
match st {
ChannelState::Init(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::FetchEnumDetails(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::Creating(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::FetchCaStatusSeries(..) => {
// TODO ?
}
ChannelState::MakingSeriesWriter(..) => {
// TODO ?
}
ChannelState::Writable(st) => {
let crst = &mut st.channel;
// TODO if we don't wave a series id yet, dont' save? write-ampl.
let msp = info_store_msp_from_time(timenow.clone());
if msp != crst.info_store_msp_last {
crst.info_store_msp_last = msp;
}
}
ChannelState::Error(_) => {
// TODO need last-save-ts for this state.
}
ChannelState::Ended(_) => {}
ChannelState::Closing(_) => {}
}
}
Ok(())
}
fn transition_to_polling(&mut self, subid: Subid, tsnow: Instant) -> Result<(), Error> {
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
@@ -2198,7 +2167,8 @@ impl CaConn {
ReadingState::Monitoring(st3) => match &st3.mon2state {
Monitoring2State::Passive(st4) => {
if st4.tsbeg + conf.conf.manual_poll_on_quiet_after() < tsnow {
debug!("check_channels_state_poll Monitoring2State::Passive timeout");
trace_monitor_stale!("check_channels_state_poll Monitoring2State::Passive timeout");
self.stats.monitor_stale_read_begin().inc();
// TODO encapsulate and unify with Polling handler
let ioid = Ioid(self.ioid);
self.ioid = self.ioid.wrapping_add(1);
@@ -2223,8 +2193,13 @@ impl CaConn {
// Something is wrong with this channel.
// Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only
// this or a subset of the subscribed channels no longer give updates.
self.stats.monitor_stale_read_timeout().inc();
let name = conf.conf.name();
warn!("channel monitor explicit read timeout {} ioid {:?}", name, ioid);
trace_monitor_stale!(
"channel monitor explicit read timeout {} ioid {:?}",
name,
ioid
);
if false {
// Here we try to close the channel at hand.
@@ -2277,8 +2252,6 @@ impl CaConn {
if *x + Duration::from_millis(10000) <= tsnow {
self.read_ioids.remove(ioid);
self.stats.caget_timeout().inc();
// warn!("channel caget timeout");
// std::process::exit(1);
st3.tick = PollTickState::Idle(tsnow);
}
}
@@ -2309,7 +2282,7 @@ impl CaConn {
if let Some(started) = self.ioc_ping_start {
if started + TIMEOUT_PONG_WAIT < tsnow {
self.stats.pong_timeout().inc();
warn!("pong timeout {}", self.remote_addr_dbg);
info!("pong timeout {}", self.remote_addr_dbg);
self.ioc_ping_start = None;
let item = CaConnEvent {
ts: tsnow,
@@ -2328,7 +2301,7 @@ impl CaConn {
proto.push_out(msg);
} else {
self.stats.ping_no_proto().inc();
warn!("can not ping {} no proto", self.remote_addr_dbg);
info!("can not ping {} no proto", self.remote_addr_dbg);
self.trigger_shutdown(ShutdownReason::ProtocolMissing);
}
}
@@ -2537,19 +2510,6 @@ impl CaConn {
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
let log_more = match &scalar_type {
ScalarType::Enum => {
if cssid.id() % 60 == 14 {
let name = conf.conf.name();
// info!("ENUM {}", name);
true
} else {
false
}
}
_ => false,
};
let (acc_msp, _) = TsMs::from_system_time(stnow).to_grid_02(EMIT_ACCOUNTING_SNAP);
let created_state = CreatedState {
cssid,
@@ -2561,14 +2521,10 @@ impl CaConn {
ts_alive_last: tsnow,
ts_activity_last: tsnow,
st_activity_last: stnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: tsnow,
muted_before: 0,
info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll),
recv_count: 0,
recv_bytes: 0,
stwin_ts: 0,
@@ -2583,7 +2539,6 @@ impl CaConn {
dw_lt_last: SystemTime::UNIX_EPOCH,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
log_more,
name: conf.conf.name().into(),
enum_str_table: None,
status_emit_count: 0,

View File

@@ -61,6 +61,7 @@ use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use netpod::trigger;
use netpod::OnDrop;
use scywr::insertqueues::InsertQueuesTx;
use series::SeriesId;
@@ -72,15 +73,6 @@ use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
#[allow(non_upper_case_globals)]
pub const trigger: [&'static str; 5] = [
"S10-CMON-DIA1431:CURRENT-3-3",
"S10-CMON-DIA1431:CURRENT-5",
"S10-CMON-DIA1431:FAN-SPEED",
"S10-CMON-DIA1431:POWER-TOT",
"S10-CMON-MAG1721:TIN",
];
const CHECK_CHANS_PER_TICK: usize = 10000000;
pub const SEARCH_BATCH_MAX: usize = 64;
pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 2;
@@ -622,7 +614,7 @@ impl CaConnSet {
}
self.stats.channel_status_series_found().inc();
if trigger.contains(&name) {
debug!("handle_add_channel_with_status_id {cmd:?}");
info!("handle_add_channel_with_status_id {cmd:?}");
}
let ch = Channel::new(name.into());
if let Some(chst) = self.channel_states.get_mut(&ch) {
@@ -678,7 +670,7 @@ impl CaConnSet {
return Err(Error::with_msg_no_trace("ipv4 for epics"));
};
if trigger.contains(&name) {
debug!("handle_add_channel_with_addr {cmd:?}");
info!("handle_add_channel_with_addr {cmd:?}");
}
let ch = Channel::new(name.into());
if let Some(chst) = self.channel_states.get_mut(&ch) {
@@ -772,7 +764,7 @@ impl CaConnSet {
for res in results {
let ch = Channel::new(res.channel.clone());
if trigger.contains(&ch.name()) {
trace!("handle_ioc_query_result {res:?}");
info!("handle_ioc_query_result {res:?}");
}
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ast) = &mut chst.value {
@@ -938,7 +930,7 @@ impl CaConnSet {
}
fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> {
debug!("handle_ca_conn_eos {addr} {reason:?}");
info!("handle_ca_conn_eos {addr} {reason:?}");
if let Some(e) = self.ca_conn_ress.remove(&addr) {
self.stats.ca_conn_eos_ok().inc();
self.await_ca_conn_jhs.push_back((addr, e.jh));
@@ -946,23 +938,26 @@ impl CaConnSet {
self.stats.ca_conn_eos_unexpected().inc();
warn!("end-of-stream received for non-existent CaConn {addr}");
}
match reason {
EndOfStreamReason::UnspecifiedReason => {
warn!("EndOfStreamReason::UnspecifiedReason");
self.handle_connect_fail(addr)?
{
use EndOfStreamReason::*;
match reason {
UnspecifiedReason => {
warn!("EndOfStreamReason::UnspecifiedReason");
self.handle_connect_fail(addr)?
}
Error(e) => {
warn!("received error {addr} {e}");
self.handle_connect_fail(addr)?
}
ConnectRefused => self.handle_connect_fail(addr)?,
ConnectTimeout => self.handle_connect_fail(addr)?,
OnCommand => {
// warn!("TODO make sure no channel is in state which could trigger health timeout")
}
RemoteClosed => self.handle_connect_fail(addr)?,
IocTimeout => self.handle_connect_fail(addr)?,
IoError => self.handle_connect_fail(addr)?,
}
EndOfStreamReason::Error(e) => {
warn!("received error {addr} {e}");
self.handle_connect_fail(addr)?
}
EndOfStreamReason::ConnectRefused => self.handle_connect_fail(addr)?,
EndOfStreamReason::ConnectTimeout => self.handle_connect_fail(addr)?,
EndOfStreamReason::OnCommand => {
// warn!("TODO make sure no channel is in state which could trigger health timeout")
}
EndOfStreamReason::RemoteClosed => self.handle_connect_fail(addr)?,
EndOfStreamReason::IocTimeout => self.handle_connect_fail(addr)?,
EndOfStreamReason::IoError => self.handle_connect_fail(addr)?,
}
// self.remove_channel_status_for_addr(addr)?;
trace2!("still CaConn left {}", self.ca_conn_ress.len());
@@ -982,24 +977,40 @@ impl CaConnSet {
ActiveChannelState::Init { since: _ } => {}
ActiveChannelState::WaitForStatusSeriesId { since: _ } => {}
ActiveChannelState::WithStatusSeriesId(st3) => {
if let WithStatusSeriesIdStateInner::WithAddress {
addr: addr_ch,
state: _st4,
} = &mut st3.inner
{
if SocketAddr::V4(*addr_ch) == addr {
use WithStatusSeriesIdStateInner::*;
match &mut st3.inner {
AddrSearchPending { since: _ } => {}
WithAddress { addr: addr2, state: _ } => {
if trigger.contains(&ch.name()) {
self.connect_fail_count += 1;
debug!(" connect fail, maybe wrong address for {} {}", addr, ch.name());
info!(" connect fail, maybe wrong address for {} {}", addr, ch.name());
}
if self.connect_fail_count > 400 {
std::process::exit(1);
if SocketAddr::V4(*addr2) == addr {
if trigger.contains(&ch.name()) {
info!("transition_channels_to_maybe_wrong_address AA {addr}");
}
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(
MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff),
);
if trigger.contains(&ch.name()) {
info!("transition_channels_to_maybe_wrong_address BB {:?}", st1);
}
} else {
if trigger.contains(&ch.name()) {
info!("transition_channels_to_maybe_wrong_address BB {addr}");
}
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(
MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff),
);
if trigger.contains(&ch.name()) {
info!("transition_channels_to_maybe_wrong_address BB {:?}", st1);
}
}
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(
MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff),
);
}
UnknownAddress { since: _ } => {}
NoAddress { since: _ } => {}
MaybeWrongAddress(_) => {}
}
}
},
@@ -1399,7 +1410,7 @@ impl CaConnSet {
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
trace!("try again channel after MaybeWrongAddress");
if trigger.contains(&ch.name()) {
debug!("issue ioc search for {}", ch.name());
info!("issue ioc search for {}", ch.name());
}
search_pending_count += 1;
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };

View File

@@ -207,11 +207,7 @@ async fn finder_worker_single(
items.extend(to_add.into_iter());
let items = items;
for e in &items {
if crate::ca::connset::trigger.contains(&e.channel.as_str()) {
debug!("found in database: {e:?}");
} else {
trace!("found in database: {e:?}");
}
trace!("found in database: {e:?}");
}
let items_len = items.len();
if items_len != nbatch {

View File

@@ -460,9 +460,6 @@ impl FindIocStream {
addr: Some(addr),
dt,
};
if super::connset::trigger.contains(&res.channel.as_str()) {
debug!("Found via UDP {res:?}");
}
// trace!("udp search response {res:?}");
self.stats.ca_udp_recv_result().inc();
self.out_queue.push_back(res);

View File

@@ -10,6 +10,13 @@ The resulting binary is found at `target/release/daqingest` and dynamically link
to the most basic linux system libraries.
## Create the Scylladb Schema
```
./daqingest scylla-schema-change <CONFIG.YML>
```
## Run
```

View File

@@ -23,6 +23,8 @@ pub enum Error {
ScyllaNextRow(#[from] NextRowError),
MissingData,
AddColumnImpossible,
Msg(String),
BadSchema,
}
impl From<crate::session::Error> for Error {
@@ -184,9 +186,9 @@ impl GenTwcsTab {
&self.name
}
async fn setup(&self, scy: &ScySession) -> Result<(), Error> {
async fn setup(&self, do_change: bool, scy: &ScySession) -> Result<(), Error> {
self.create_if_missing(scy).await?;
self.check_table_options(scy).await?;
self.check_table_options(do_change, scy).await?;
self.check_columns(scy).await?;
Ok(())
}
@@ -255,7 +257,8 @@ impl GenTwcsTab {
map
}
async fn check_table_options(&self, scy: &ScySession) -> Result<(), Error> {
async fn check_table_options(&self, do_change: bool, scy: &ScySession) -> Result<(), Error> {
let mut differ = false;
let cql = concat!(
"select default_time_to_live, gc_grace_seconds, compaction",
" from system_schema.tables where keyspace_name = ? and table_name = ?"
@@ -270,27 +273,34 @@ impl GenTwcsTab {
if let Some(row) = rows.get(0) {
let mut set_opts = Vec::new();
if row.0 != self.default_time_to_live.as_secs() {
if false {
if do_change {
set_opts.push(format!(
"default_time_to_live = {}",
self.default_time_to_live.as_secs()
));
} else {
info!("mismatch default_time_to_live");
info!(
"{:20} vs {:20} {:20} {:20}",
error!(
"mismatch default_time_to_live {:10} exp {:10} {} {}",
row.0,
self.default_time_to_live.as_secs(),
self.keyspace,
self.name,
);
differ = true;
}
}
if row.1 != self.gc_grace.as_secs() {
if false {
if do_change {
set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs()));
} else {
info!("mismatch gc_grace_seconds");
error!(
"mismatch gc_grace_seconds {:10} exp {:10} {} {}",
row.1,
self.gc_grace.as_secs(),
self.keyspace,
self.name,
);
differ = true;
}
}
if row.2 != self.compaction_options() {
@@ -300,22 +310,34 @@ impl GenTwcsTab {
.map(|(k, v)| format!("'{k}': '{v}'"))
.collect();
let params = params.join(", ");
if false {
if do_change {
set_opts.push(format!("compaction = {{ {} }}", params));
} else {
info!("mismatch compaction");
error!(
"mismatch compaction {:?} exp {:?} {} {}",
row.2,
self.compaction_options(),
self.keyspace,
self.name,
);
differ = true;
}
}
if set_opts.len() != 0 {
let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and "));
info!("{cql}");
scy.query(cql, ()).await?;
if do_change {
if set_opts.len() != 0 {
let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and "));
info!("{cql}");
scy.query(cql, ()).await?;
}
}
} else {
let e = Error::MissingData;
return Err(e);
return Err(Error::MissingData);
}
if differ {
Err(Error::BadSchema)
} else {
Ok(())
}
Ok(())
}
async fn check_columns(&self, scy: &ScySession) -> Result<(), Error> {
@@ -397,7 +419,12 @@ async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result<Ve
Ok(ret)
}
async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySession) -> Result<(), Error> {
async fn check_event_tables(
keyspace: &str,
rett: RetentionTime,
do_change: bool,
scy: &ScySession,
) -> Result<(), Error> {
let stys = [
"u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string",
];
@@ -423,7 +450,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
["ts_lsp"],
rett.ttl_events_d0(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -443,7 +470,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
}
{
@@ -462,7 +489,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -479,7 +506,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -496,37 +523,24 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
Ok(())
}
pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result<(), Error> {
pub async fn migrate_scylla_data_schema(
scyconf: &ScyllaIngestConfig,
rett: RetentionTime,
do_change: bool,
) -> Result<(), Error> {
let scy2 = create_session_no_ks(scyconf).await?;
let scy = &scy2;
let durable = true;
if !has_keyspace(scyconf.keyspace(), scy).await? {
// TODO
let replication = 3;
let cql = format!(
concat!(
"create keyspace {}",
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
" and durable_writes = {};"
),
scyconf.keyspace(),
replication,
durable
);
info!("scylla create keyspace {cql}");
scy.query_iter(cql, ()).await?;
info!("keyspace created");
}
if let Some(ks) = scyconf.keyspace_rf1() {
if !has_keyspace(ks, scy).await? {
let replication = 1;
if do_change {
// TODO
let replication = 3;
let cql = format!(
concat!(
"create keyspace {}",
@@ -540,6 +554,33 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
info!("scylla create keyspace {cql}");
scy.query_iter(cql, ()).await?;
info!("keyspace created");
} else {
error!("missing keyspace {:?}", scyconf.keyspace());
return Err(Error::BadSchema);
}
}
if let Some(ks) = scyconf.keyspace_rf1() {
if !has_keyspace(ks, scy).await? {
if do_change {
let replication = 1;
let cql = format!(
concat!(
"create keyspace {}",
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
" and durable_writes = {};"
),
scyconf.keyspace(),
replication,
durable
);
info!("scylla create keyspace {cql}");
scy.query_iter(cql, ()).await?;
info!("keyspace created");
} else {
error!("missing keyspace {:?}", scyconf.keyspace_rf1());
return Err(Error::BadSchema);
}
}
}
@@ -547,7 +588,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
scy.use_keyspace(ks, true).await?;
check_event_tables(ks, rett.clone(), scy).await?;
check_event_tables(ks, rett.clone(), do_change, scy).await?;
{
let tab = GenTwcsTab::new(
@@ -559,7 +600,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
["ts_msp"],
rett.ttl_ts_msp(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -576,7 +617,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
["ts_lsp"],
rett.ttl_channel_status(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -593,7 +634,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
["ts_lsp"],
rett.ttl_channel_status(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -610,7 +651,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
["ts_lsp"],
rett.ttl_channel_status(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -631,7 +672,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
["off"],
rett.ttl_binned(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -649,7 +690,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
["series"],
rett.ttl_channel_status(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -667,7 +708,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
["series"],
rett.ttl_channel_status(),
);
tab.setup(scy).await?;
tab.setup(do_change, scy).await?;
}
Ok(())
}

View File

@@ -357,6 +357,8 @@ stats_proc::stats_struct!((
recv_read_notify_while_enabling_monitoring,
recv_read_notify_while_polling_idle,
channel_not_alive_no_activity,
monitor_stale_read_begin,
monitor_stale_read_timeout,
),
values(inter_ivl_ema, read_ioids_len, proto_out_len,),
histolog2s(