diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index a5ab372..58d33a3 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.2-aa.3" +version = "0.2.2" authors = ["Dominik Werder "] edition = "2021" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 441fb60..51c8e6d 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -3,6 +3,7 @@ use daqingest::opts::DaqIngestOpts; use err::Error; use log::*; use netfetch::conf::parse_config; +use netfetch::conf::CaIngestOpts; use netpod::Database; use scywr::config::ScyllaIngestConfig; use taskrun::TracingMode; @@ -30,7 +31,7 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> { } async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { - let buildmark = "+0007"; + let buildmark = "+0008"; use daqingest::opts::ChannelAccess; use daqingest::opts::SubCmd; match opts.subcmd { @@ -88,18 +89,27 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { } } } + SubCmd::ScyllaSchemaCheck(k) => { + info!("daqingest version {} {}", clap::crate_version!(), buildmark); + let (opts, _) = parse_config(k.config.into()).await?; + scylla_schema_check(opts, false).await?; + } + SubCmd::ScyllaSchemaChange(k) => { + info!("daqingest version {} {}", clap::crate_version!(), buildmark); + let (opts, _) = parse_config(k.config.into()).await?; + scylla_schema_check(opts, true).await?; + } SubCmd::ChannelAccess(k) => match k { - #[cfg(DISABLED)] - ChannelAccess::CaSearch(k) => { - info!("daqingest version {}", clap::crate_version!()); - let (conf, channels) = parse_config(k.config.into()).await?; - netfetch::ca::search::ca_search(conf, &channels).await? - } ChannelAccess::CaIngest(k) => { info!("daqingest version {} {}", clap::crate_version!(), buildmark); let (conf, channels_config) = parse_config(k.config.into()).await?; daqingest::daemon::run(conf, channels_config).await? } + ChannelAccess::CaSearch(_k) => { + // info!("daqingest version {}", clap::crate_version!()); + // let (conf, channels) = parse_config(k.config.into()).await?; + // netfetch::ca::search::ca_search(conf, &channels).await? + } }, #[cfg(feature = "bsread")] SubCmd::Bsread(k) => ingest_bsread::zmtp::zmtp_client(k.into()) @@ -128,3 +138,20 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { } Ok(()) } + +async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> { + let opstr = if do_change { "change" } else { "check" }; + info!("start scylla schema {}", opstr); + use netpod::ttl::RetentionTime; + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short, do_change) + .await + .map_err(Error::from_string)?; + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium, do_change) + .await + .map_err(Error::from_string)?; + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long, do_change) + .await + .map_err(Error::from_string)?; + info!("stop scylla schema {}", opstr); + Ok(()) +} diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 700985c..e16f7c4 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -2,7 +2,6 @@ pub mod inserthook; use async_channel::Receiver; use async_channel::Sender; -use async_channel::WeakSender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use log::*; @@ -23,15 +22,10 @@ use scywr::config::ScyllaIngestConfig; use scywr::insertqueues::InsertQueuesRx; use scywr::insertqueues::InsertQueuesTx; use scywr::insertworker::InsertWorkerOpts; -use scywr::iteminsertqueue as scywriiq; -use scywriiq::QueryItem; use stats::rand_xoshiro::rand_core::RngCore; -use stats::rand_xoshiro::Xoshiro128PlusPlus; use stats::DaemonStats; use stats::InsertWorkerStats; use stats::SeriesByChannelStats; -use stats::SeriesWriterEstablishStats; -use std::collections::VecDeque; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; @@ -107,7 +101,7 @@ impl Daemon { let local_epics_hostname = ingest_linux::net::local_hostname(); - #[cfg(DISABLED)] + #[cfg(target_abi = "x32")] let query_item_rx = { // TODO only testing, remove tokio::spawn({ @@ -345,6 +339,7 @@ impl Daemon { } fn check_health_connset(&mut self, ts1: Instant) -> Result<(), Error> { + let _ = ts1; let dt = self.connset_status_last.elapsed(); if dt > CHECK_HEALTH_TIMEOUT { error!( @@ -358,7 +353,7 @@ impl Daemon { async fn handle_timer_tick(&mut self) -> Result<(), Error> { if self.shutting_down { let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire); - #[cfg(DISABLED)] + #[cfg(target_abi = "x32")] { let nitems = self .query_item_tx_weak @@ -424,7 +419,7 @@ impl Daemon { Ok(()) } - #[cfg(DISABLED)] + #[cfg(target_abi = "x32")] async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> { info!("handle_ca_conn_done {conn_addr:?}"); self.connection_states.remove(&conn_addr); @@ -509,7 +504,7 @@ impl Daemon { Ok(()) } - #[cfg(DISABLED)] + #[cfg(target_abi = "x32")] async fn handle_shutdown(&mut self) -> Result<(), Error> { warn!("received shutdown event"); if self.shutting_down { @@ -720,13 +715,13 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> warn!("scylla_disable config flag enabled"); } else { info!("start scylla schema check"); - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short) + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short, false) .await .map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium) + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium, false) .await .map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long) + scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long, false) .await .map_err(Error::from_string)?; info!("stop scylla schema check"); diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index d349a7f..f518ae9 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -23,6 +23,8 @@ pub enum SubCmd { ListPulses, FetchEvents(FetchEvents), Db(Db), + ScyllaSchemaCheck(CaConfig), + ScyllaSchemaChange(CaConfig), #[command(subcommand)] ChannelAccess(ChannelAccess), #[cfg(feature = "bsread")] @@ -81,7 +83,6 @@ pub struct BsreadDump { #[derive(Debug, clap::Parser)] pub enum ChannelAccess { CaIngest(CaConfig), - #[cfg(DISABLED)] CaSearch(CaSearch), } @@ -115,6 +116,14 @@ pub struct Db { pub sub: DbSub, } +#[derive(Debug, clap::Parser)] +pub struct ScyllaDb { + #[arg(long)] + pub scylla_host: String, + #[arg(long)] + pub scylla_keyspace: String, +} + #[derive(Debug, clap::Parser)] pub enum DbSub { Data(DbData), diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index f1b8b9c..f8cb9d6 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -51,8 +51,11 @@ pub enum Error { ChannelError, #[error("DbConsistencySeries({0})")] DbConsistencySeries(String), - ScalarType, + #[error("ScalarType({0})")] + ScalarType(i32), Shape, + #[error("SeriesKind({0})")] + SeriesKind(i16), } impl From for Error { @@ -355,12 +358,14 @@ impl Worker { let series: i64 = row.get(1); let series = SeriesId::new(series as _); let shape_dims: Vec = row.get(3); - let scalar_type = ScalarType::from_scylla_i32(row.get(2)).map_err(|_| Error::ScalarType)?; + let scalar_type = row.get(2); + let scalar_type = + ScalarType::from_scylla_i32(scalar_type).map_err(|_| Error::ScalarType(scalar_type))?; let shape_dims = Shape::from_scylla_shape_dims(shape_dims.as_slice()).map_err(|_| Error::Shape)?; let tscs: Vec> = row.get(4); let kind: i16 = row.get(5); - let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?; + let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::SeriesKind(kind))?; if true && job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000" || series == SeriesId::new(1605348259462543621) { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 33cbdc9..3e852c5 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -24,6 +24,7 @@ use futures_util::StreamExt; use hashbrown::HashMap; use log::*; use netpod::timeunits::*; +use netpod::trigger; use netpod::ttl::RetentionTime; use netpod::ScalarType; use netpod::SeriesKind; @@ -58,7 +59,6 @@ use series::SeriesId; use serieswriter::binwriter::BinWriter; use serieswriter::fixgridwriter::ChannelStatusSeriesWriter; use serieswriter::fixgridwriter::ChannelStatusWriteState; -use serieswriter::fixgridwriter::CHANNEL_STATUS_GRID; use serieswriter::msptool::MspSplit; use serieswriter::rtwriter::RtWriter; use serieswriter::writer::EmittableType; @@ -136,6 +136,15 @@ macro_rules! trace_event_incoming { }; } +#[allow(unused)] +macro_rules! trace_monitor_stale { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + fn dbg_chn_name(name: impl AsRef) -> bool { name.as_ref() == "SINSB02-KCOL-ACT:V-EY21700-MAN-ON-SP" } @@ -442,14 +451,10 @@ struct CreatedState { // Updated on monitoring, polling or when the channel config changes to reset the timeout ts_activity_last: Instant, st_activity_last: SystemTime, - ts_msp_last: u64, - ts_msp_grid_last: u32, - inserted_in_ts_msp: u64, insert_item_ivl_ema: IntervalEma, item_recv_ivl_ema: IntervalEma, insert_recv_ivl_last: Instant, muted_before: u32, - info_store_msp_last: u32, recv_count: u64, recv_bytes: u64, stwin_ts: u64, @@ -464,7 +469,6 @@ struct CreatedState { dw_lt_last: SystemTime, scalar_type: ScalarType, shape: Shape, - log_more: bool, name: String, enum_str_table: Option>, status_emit_count: u64, @@ -485,14 +489,10 @@ impl CreatedState { ts_alive_last: tsnow, ts_activity_last: tsnow, st_activity_last: stnow, - ts_msp_last: 0, - ts_msp_grid_last: 0, - inserted_in_ts_msp: 0, insert_item_ivl_ema: IntervalEma::new(), item_recv_ivl_ema: IntervalEma::new(), insert_recv_ivl_last: tsnow, muted_before: 0, - info_store_msp_last: 0, recv_count: 0, recv_bytes: 0, stwin_ts: 0, @@ -507,7 +507,6 @@ impl CreatedState { dw_lt_last: SystemTime::UNIX_EPOCH, scalar_type: ScalarType::I8, shape: Shape::Scalar, - log_more: false, name: String::new(), enum_str_table: None, status_emit_count: 0, @@ -1211,7 +1210,7 @@ impl CaConn { trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); match a.kind { ConnCommandKind::ChannelAdd(conf, cssid) => { - self.channel_add(conf, cssid); + self.channel_add(conf, cssid)?; Ok(Ready(Some(()))) } ConnCommandKind::ChannelClose(name) => { @@ -1237,7 +1236,6 @@ impl CaConn { use Poll::*; let mut have_progress = false; let mut have_pending = false; - let stnow = self.tmp_ts_poll; if self.is_shutdown() { Ok(Ready(None)) } else { @@ -1341,7 +1339,6 @@ impl CaConn { )?; self.stats.get_series_id_ok.inc(); { - info!("queued Opened {:?}", st2.channel.cssid); let item = ChannelStatusItem { ts: self.tmp_ts_poll, cssid: st2.channel.cssid.clone(), @@ -1420,13 +1417,17 @@ impl CaConn { pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> { if self.cid_by_name(conf.name()).is_some() { self.stats.channel_add_exists.inc(); - error!("logic error channel already exists {conf:?}"); + if trigger.contains(&conf.name()) { + error!("logic error channel already exists {conf:?}"); + } Ok(()) } else { let cid = self.cid_by_name_or_insert(conf.name())?; if self.channels.contains_key(&cid) { self.stats.channel_add_exists.inc(); - error!("logic error channel already exists {conf:?}"); + if trigger.contains(&conf.name()) { + error!("logic error channel already exists {conf:?}"); + } Ok(()) } else { let conf = ChannelConf::new(conf, cssid); @@ -1493,6 +1494,22 @@ impl CaConn { fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { // TODO can I reuse emit_channel_info_insert_items ? trace!("channel_state_on_shutdown channels {}", self.channels.len()); + let stnow = self.tmp_ts_poll; + let mut item_deque = VecDeque::new(); + for (_cid, conf) in &mut self.channels { + let item = ChannelStatusItem { + ts: stnow, + cssid: conf.state.cssid(), + status: ChannelStatus::Closed(channel_reason.clone()), + }; + let deque = &mut item_deque; + if conf.wrst.emit_channel_status_item(item, deque).is_err() { + self.stats.logic_error().inc(); + } + } + for x in item_deque { + self.iqdqs.st_rf3_qu.push_back(x); + } for (_cid, conf) in &mut self.channels { if dbg_chn_name(conf.conf.name()) { info!("channel_state_on_shutdown {:?}", conf); @@ -1515,23 +1532,13 @@ impl CaConn { *chst = ChannelState::Ended(st.channel.cssid.clone()); } ChannelState::Writable(st2) => { - let cssid = st2.channel.cssid.clone(); // TODO should call the proper channel-close handler which in turn emits the status item. // Make sure I record the reason for the "Close": user command, IOC error, etc.. - let item = ChannelStatusItem { - ts: self.tmp_ts_poll, - cssid: cssid.clone(), - status: ChannelStatus::Closed(channel_reason.clone()), - }; - let deque = &mut self.iqdqs.st_rf3_qu; - if conf.wrst.emit_channel_status_item(item, deque).is_err() { - self.stats.logic_error().inc(); - } + let cssid = st2.channel.cssid.clone(); *chst = ChannelState::Ended(cssid); } ChannelState::Error(..) => { - warn!("TODO emit error status"); - // *chst = ChannelState::Ended; + // Leave state unchanged. } ChannelState::Ended(_) => {} ChannelState::Closing(_) => {} @@ -1539,44 +1546,6 @@ impl CaConn { } } - fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> { - let timenow = self.tmp_ts_poll; - for (_, conf) in &mut self.channels { - let st = &mut conf.state; - match st { - ChannelState::Init(..) => { - // TODO need last-save-ts for this state. - } - ChannelState::FetchEnumDetails(..) => { - // TODO need last-save-ts for this state. - } - ChannelState::Creating(..) => { - // TODO need last-save-ts for this state. - } - ChannelState::FetchCaStatusSeries(..) => { - // TODO ? - } - ChannelState::MakingSeriesWriter(..) => { - // TODO ? - } - ChannelState::Writable(st) => { - let crst = &mut st.channel; - // TODO if we don't wave a series id yet, dont' save? write-ampl. - let msp = info_store_msp_from_time(timenow.clone()); - if msp != crst.info_store_msp_last { - crst.info_store_msp_last = msp; - } - } - ChannelState::Error(_) => { - // TODO need last-save-ts for this state. - } - ChannelState::Ended(_) => {} - ChannelState::Closing(_) => {} - } - } - Ok(()) - } - fn transition_to_polling(&mut self, subid: Subid, tsnow: Instant) -> Result<(), Error> { let cid = if let Some(x) = self.cid_by_subid.get(&subid) { *x @@ -2198,7 +2167,8 @@ impl CaConn { ReadingState::Monitoring(st3) => match &st3.mon2state { Monitoring2State::Passive(st4) => { if st4.tsbeg + conf.conf.manual_poll_on_quiet_after() < tsnow { - debug!("check_channels_state_poll Monitoring2State::Passive timeout"); + trace_monitor_stale!("check_channels_state_poll Monitoring2State::Passive timeout"); + self.stats.monitor_stale_read_begin().inc(); // TODO encapsulate and unify with Polling handler let ioid = Ioid(self.ioid); self.ioid = self.ioid.wrapping_add(1); @@ -2223,8 +2193,13 @@ impl CaConn { // Something is wrong with this channel. // Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only // this or a subset of the subscribed channels no longer give updates. + self.stats.monitor_stale_read_timeout().inc(); let name = conf.conf.name(); - warn!("channel monitor explicit read timeout {} ioid {:?}", name, ioid); + trace_monitor_stale!( + "channel monitor explicit read timeout {} ioid {:?}", + name, + ioid + ); if false { // Here we try to close the channel at hand. @@ -2277,8 +2252,6 @@ impl CaConn { if *x + Duration::from_millis(10000) <= tsnow { self.read_ioids.remove(ioid); self.stats.caget_timeout().inc(); - // warn!("channel caget timeout"); - // std::process::exit(1); st3.tick = PollTickState::Idle(tsnow); } } @@ -2309,7 +2282,7 @@ impl CaConn { if let Some(started) = self.ioc_ping_start { if started + TIMEOUT_PONG_WAIT < tsnow { self.stats.pong_timeout().inc(); - warn!("pong timeout {}", self.remote_addr_dbg); + info!("pong timeout {}", self.remote_addr_dbg); self.ioc_ping_start = None; let item = CaConnEvent { ts: tsnow, @@ -2328,7 +2301,7 @@ impl CaConn { proto.push_out(msg); } else { self.stats.ping_no_proto().inc(); - warn!("can not ping {} no proto", self.remote_addr_dbg); + info!("can not ping {} no proto", self.remote_addr_dbg); self.trigger_shutdown(ShutdownReason::ProtocolMissing); } } @@ -2537,19 +2510,6 @@ impl CaConn { let scalar_type = ScalarType::from_ca_id(k.data_type)?; let shape = Shape::from_ca_count(k.data_count)?; - let log_more = match &scalar_type { - ScalarType::Enum => { - if cssid.id() % 60 == 14 { - let name = conf.conf.name(); - // info!("ENUM {}", name); - true - } else { - false - } - } - _ => false, - }; - let (acc_msp, _) = TsMs::from_system_time(stnow).to_grid_02(EMIT_ACCOUNTING_SNAP); let created_state = CreatedState { cssid, @@ -2561,14 +2521,10 @@ impl CaConn { ts_alive_last: tsnow, ts_activity_last: tsnow, st_activity_last: stnow, - ts_msp_last: 0, - ts_msp_grid_last: 0, - inserted_in_ts_msp: u64::MAX, insert_item_ivl_ema: IntervalEma::new(), item_recv_ivl_ema: IntervalEma::new(), insert_recv_ivl_last: tsnow, muted_before: 0, - info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll), recv_count: 0, recv_bytes: 0, stwin_ts: 0, @@ -2583,7 +2539,6 @@ impl CaConn { dw_lt_last: SystemTime::UNIX_EPOCH, scalar_type: scalar_type.clone(), shape: shape.clone(), - log_more, name: conf.conf.name().into(), enum_str_table: None, status_emit_count: 0, diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 5ebae65..7b466a3 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -61,6 +61,7 @@ use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; +use netpod::trigger; use netpod::OnDrop; use scywr::insertqueues::InsertQueuesTx; use series::SeriesId; @@ -72,15 +73,6 @@ use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; -#[allow(non_upper_case_globals)] -pub const trigger: [&'static str; 5] = [ - "S10-CMON-DIA1431:CURRENT-3-3", - "S10-CMON-DIA1431:CURRENT-5", - "S10-CMON-DIA1431:FAN-SPEED", - "S10-CMON-DIA1431:POWER-TOT", - "S10-CMON-MAG1721:TIN", -]; - const CHECK_CHANS_PER_TICK: usize = 10000000; pub const SEARCH_BATCH_MAX: usize = 64; pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 2; @@ -622,7 +614,7 @@ impl CaConnSet { } self.stats.channel_status_series_found().inc(); if trigger.contains(&name) { - debug!("handle_add_channel_with_status_id {cmd:?}"); + info!("handle_add_channel_with_status_id {cmd:?}"); } let ch = Channel::new(name.into()); if let Some(chst) = self.channel_states.get_mut(&ch) { @@ -678,7 +670,7 @@ impl CaConnSet { return Err(Error::with_msg_no_trace("ipv4 for epics")); }; if trigger.contains(&name) { - debug!("handle_add_channel_with_addr {cmd:?}"); + info!("handle_add_channel_with_addr {cmd:?}"); } let ch = Channel::new(name.into()); if let Some(chst) = self.channel_states.get_mut(&ch) { @@ -772,7 +764,7 @@ impl CaConnSet { for res in results { let ch = Channel::new(res.channel.clone()); if trigger.contains(&ch.name()) { - trace!("handle_ioc_query_result {res:?}"); + info!("handle_ioc_query_result {res:?}"); } if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ast) = &mut chst.value { @@ -938,7 +930,7 @@ impl CaConnSet { } fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> { - debug!("handle_ca_conn_eos {addr} {reason:?}"); + info!("handle_ca_conn_eos {addr} {reason:?}"); if let Some(e) = self.ca_conn_ress.remove(&addr) { self.stats.ca_conn_eos_ok().inc(); self.await_ca_conn_jhs.push_back((addr, e.jh)); @@ -946,23 +938,26 @@ impl CaConnSet { self.stats.ca_conn_eos_unexpected().inc(); warn!("end-of-stream received for non-existent CaConn {addr}"); } - match reason { - EndOfStreamReason::UnspecifiedReason => { - warn!("EndOfStreamReason::UnspecifiedReason"); - self.handle_connect_fail(addr)? + { + use EndOfStreamReason::*; + match reason { + UnspecifiedReason => { + warn!("EndOfStreamReason::UnspecifiedReason"); + self.handle_connect_fail(addr)? + } + Error(e) => { + warn!("received error {addr} {e}"); + self.handle_connect_fail(addr)? + } + ConnectRefused => self.handle_connect_fail(addr)?, + ConnectTimeout => self.handle_connect_fail(addr)?, + OnCommand => { + // warn!("TODO make sure no channel is in state which could trigger health timeout") + } + RemoteClosed => self.handle_connect_fail(addr)?, + IocTimeout => self.handle_connect_fail(addr)?, + IoError => self.handle_connect_fail(addr)?, } - EndOfStreamReason::Error(e) => { - warn!("received error {addr} {e}"); - self.handle_connect_fail(addr)? - } - EndOfStreamReason::ConnectRefused => self.handle_connect_fail(addr)?, - EndOfStreamReason::ConnectTimeout => self.handle_connect_fail(addr)?, - EndOfStreamReason::OnCommand => { - // warn!("TODO make sure no channel is in state which could trigger health timeout") - } - EndOfStreamReason::RemoteClosed => self.handle_connect_fail(addr)?, - EndOfStreamReason::IocTimeout => self.handle_connect_fail(addr)?, - EndOfStreamReason::IoError => self.handle_connect_fail(addr)?, } // self.remove_channel_status_for_addr(addr)?; trace2!("still CaConn left {}", self.ca_conn_ress.len()); @@ -982,24 +977,40 @@ impl CaConnSet { ActiveChannelState::Init { since: _ } => {} ActiveChannelState::WaitForStatusSeriesId { since: _ } => {} ActiveChannelState::WithStatusSeriesId(st3) => { - if let WithStatusSeriesIdStateInner::WithAddress { - addr: addr_ch, - state: _st4, - } = &mut st3.inner - { - if SocketAddr::V4(*addr_ch) == addr { + use WithStatusSeriesIdStateInner::*; + match &mut st3.inner { + AddrSearchPending { since: _ } => {} + WithAddress { addr: addr2, state: _ } => { if trigger.contains(&ch.name()) { - self.connect_fail_count += 1; - debug!(" connect fail, maybe wrong address for {} {}", addr, ch.name()); + info!(" connect fail, maybe wrong address for {} {}", addr, ch.name()); } - if self.connect_fail_count > 400 { - std::process::exit(1); + if SocketAddr::V4(*addr2) == addr { + if trigger.contains(&ch.name()) { + info!("transition_channels_to_maybe_wrong_address AA {addr}"); + } + bump_backoff(&mut st3.addr_find_backoff); + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress( + MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff), + ); + if trigger.contains(&ch.name()) { + info!("transition_channels_to_maybe_wrong_address BB {:?}", st1); + } + } else { + if trigger.contains(&ch.name()) { + info!("transition_channels_to_maybe_wrong_address BB {addr}"); + } + bump_backoff(&mut st3.addr_find_backoff); + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress( + MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff), + ); + if trigger.contains(&ch.name()) { + info!("transition_channels_to_maybe_wrong_address BB {:?}", st1); + } } - bump_backoff(&mut st3.addr_find_backoff); - st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress( - MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff), - ); } + UnknownAddress { since: _ } => {} + NoAddress { since: _ } => {} + MaybeWrongAddress(_) => {} } } }, @@ -1399,7 +1410,7 @@ impl CaConnSet { if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { trace!("try again channel after MaybeWrongAddress"); if trigger.contains(&ch.name()) { - debug!("issue ioc search for {}", ch.name()); + info!("issue ioc search for {}", ch.name()); } search_pending_count += 1; st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 800392d..125482e 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -207,11 +207,7 @@ async fn finder_worker_single( items.extend(to_add.into_iter()); let items = items; for e in &items { - if crate::ca::connset::trigger.contains(&e.channel.as_str()) { - debug!("found in database: {e:?}"); - } else { - trace!("found in database: {e:?}"); - } + trace!("found in database: {e:?}"); } let items_len = items.len(); if items_len != nbatch { diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 68168ce..ee6b6bc 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -460,9 +460,6 @@ impl FindIocStream { addr: Some(addr), dt, }; - if super::connset::trigger.contains(&res.channel.as_str()) { - debug!("Found via UDP {res:?}"); - } // trace!("udp search response {res:?}"); self.stats.ca_udp_recv_result().inc(); self.out_queue.push_back(res); diff --git a/readme.md b/readme.md index d64a9c9..84a2a90 100644 --- a/readme.md +++ b/readme.md @@ -10,6 +10,13 @@ The resulting binary is found at `target/release/daqingest` and dynamically link to the most basic linux system libraries. +## Create the Scylladb Schema + +``` +./daqingest scylla-schema-change +``` + + ## Run ``` diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 34b5134..f3978a1 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -23,6 +23,8 @@ pub enum Error { ScyllaNextRow(#[from] NextRowError), MissingData, AddColumnImpossible, + Msg(String), + BadSchema, } impl From for Error { @@ -184,9 +186,9 @@ impl GenTwcsTab { &self.name } - async fn setup(&self, scy: &ScySession) -> Result<(), Error> { + async fn setup(&self, do_change: bool, scy: &ScySession) -> Result<(), Error> { self.create_if_missing(scy).await?; - self.check_table_options(scy).await?; + self.check_table_options(do_change, scy).await?; self.check_columns(scy).await?; Ok(()) } @@ -255,7 +257,8 @@ impl GenTwcsTab { map } - async fn check_table_options(&self, scy: &ScySession) -> Result<(), Error> { + async fn check_table_options(&self, do_change: bool, scy: &ScySession) -> Result<(), Error> { + let mut differ = false; let cql = concat!( "select default_time_to_live, gc_grace_seconds, compaction", " from system_schema.tables where keyspace_name = ? and table_name = ?" @@ -270,27 +273,34 @@ impl GenTwcsTab { if let Some(row) = rows.get(0) { let mut set_opts = Vec::new(); if row.0 != self.default_time_to_live.as_secs() { - if false { + if do_change { set_opts.push(format!( "default_time_to_live = {}", self.default_time_to_live.as_secs() )); } else { - info!("mismatch default_time_to_live"); - info!( - "{:20} vs {:20} {:20} {:20}", + error!( + "mismatch default_time_to_live {:10} exp {:10} {} {}", row.0, self.default_time_to_live.as_secs(), self.keyspace, self.name, ); + differ = true; } } if row.1 != self.gc_grace.as_secs() { - if false { + if do_change { set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs())); } else { - info!("mismatch gc_grace_seconds"); + error!( + "mismatch gc_grace_seconds {:10} exp {:10} {} {}", + row.1, + self.gc_grace.as_secs(), + self.keyspace, + self.name, + ); + differ = true; } } if row.2 != self.compaction_options() { @@ -300,22 +310,34 @@ impl GenTwcsTab { .map(|(k, v)| format!("'{k}': '{v}'")) .collect(); let params = params.join(", "); - if false { + if do_change { set_opts.push(format!("compaction = {{ {} }}", params)); } else { - info!("mismatch compaction"); + error!( + "mismatch compaction {:?} exp {:?} {} {}", + row.2, + self.compaction_options(), + self.keyspace, + self.name, + ); + differ = true; } } - if set_opts.len() != 0 { - let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and ")); - info!("{cql}"); - scy.query(cql, ()).await?; + if do_change { + if set_opts.len() != 0 { + let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and ")); + info!("{cql}"); + scy.query(cql, ()).await?; + } } } else { - let e = Error::MissingData; - return Err(e); + return Err(Error::MissingData); + } + if differ { + Err(Error::BadSchema) + } else { + Ok(()) } - Ok(()) } async fn check_columns(&self, scy: &ScySession) -> Result<(), Error> { @@ -397,7 +419,12 @@ async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result Result<(), Error> { +async fn check_event_tables( + keyspace: &str, + rett: RetentionTime, + do_change: bool, + scy: &ScySession, +) -> Result<(), Error> { let stys = [ "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", ]; @@ -423,7 +450,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio ["ts_lsp"], rett.ttl_events_d0(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -443,7 +470,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio ["ts_lsp"], rett.ttl_events_d1(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } } { @@ -462,7 +489,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio ["ts_lsp"], rett.ttl_events_d1(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -479,7 +506,7 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio ["ts_lsp"], rett.ttl_events_d1(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -496,37 +523,24 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio ["ts_lsp"], rett.ttl_events_d1(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } Ok(()) } -pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result<(), Error> { +pub async fn migrate_scylla_data_schema( + scyconf: &ScyllaIngestConfig, + rett: RetentionTime, + do_change: bool, +) -> Result<(), Error> { let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; let durable = true; if !has_keyspace(scyconf.keyspace(), scy).await? { - // TODO - let replication = 3; - let cql = format!( - concat!( - "create keyspace {}", - " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", - " and durable_writes = {};" - ), - scyconf.keyspace(), - replication, - durable - ); - info!("scylla create keyspace {cql}"); - scy.query_iter(cql, ()).await?; - info!("keyspace created"); - } - - if let Some(ks) = scyconf.keyspace_rf1() { - if !has_keyspace(ks, scy).await? { - let replication = 1; + if do_change { + // TODO + let replication = 3; let cql = format!( concat!( "create keyspace {}", @@ -540,6 +554,33 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete info!("scylla create keyspace {cql}"); scy.query_iter(cql, ()).await?; info!("keyspace created"); + } else { + error!("missing keyspace {:?}", scyconf.keyspace()); + return Err(Error::BadSchema); + } + } + + if let Some(ks) = scyconf.keyspace_rf1() { + if !has_keyspace(ks, scy).await? { + if do_change { + let replication = 1; + let cql = format!( + concat!( + "create keyspace {}", + " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", + " and durable_writes = {};" + ), + scyconf.keyspace(), + replication, + durable + ); + info!("scylla create keyspace {cql}"); + scy.query_iter(cql, ()).await?; + info!("keyspace created"); + } else { + error!("missing keyspace {:?}", scyconf.keyspace_rf1()); + return Err(Error::BadSchema); + } } } @@ -547,7 +588,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete scy.use_keyspace(ks, true).await?; - check_event_tables(ks, rett.clone(), scy).await?; + check_event_tables(ks, rett.clone(), do_change, scy).await?; { let tab = GenTwcsTab::new( @@ -559,7 +600,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ["ts_msp"], rett.ttl_ts_msp(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -576,7 +617,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ["ts_lsp"], rett.ttl_channel_status(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -593,7 +634,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ["ts_lsp"], rett.ttl_channel_status(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -610,7 +651,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ["ts_lsp"], rett.ttl_channel_status(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -631,7 +672,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ["off"], rett.ttl_binned(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -649,7 +690,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ["series"], rett.ttl_channel_status(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } { let tab = GenTwcsTab::new( @@ -667,7 +708,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ["series"], rett.ttl_channel_status(), ); - tab.setup(scy).await?; + tab.setup(do_change, scy).await?; } Ok(()) } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 7253b5b..25e37c3 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -357,6 +357,8 @@ stats_proc::stats_struct!(( recv_read_notify_while_enabling_monitoring, recv_read_notify_while_polling_idle, channel_not_alive_no_activity, + monitor_stale_read_begin, + monitor_stale_read_timeout, ), values(inter_ivl_ema, read_ioids_len, proto_out_len,), histolog2s(