Support omitting the initial channels list

This commit is contained in:
Dominik Werder
2023-11-26 17:22:28 +01:00
parent e2d8f389b4
commit 3ae555565a
12 changed files with 367 additions and 287 deletions

View File

@@ -43,17 +43,15 @@ use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
use stats::rand_xoshiro::Xoshiro128StarStar;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IntervalEma;
use stats::XorShift32;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::ops::ControlFlow;
use std::pin::pin;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
@@ -322,13 +320,13 @@ fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
}
struct CidStore {
rng: XorShift32,
rng: Xoshiro128PlusPlus,
}
impl CidStore {
fn new(seed: u32) -> Self {
Self {
rng: XorShift32::new(seed),
rng: Xoshiro128PlusPlus::seed_from_u64(seed as _),
}
}
@@ -342,18 +340,18 @@ impl CidStore {
}
fn next(&mut self) -> Cid {
Cid(self.rng.next())
Cid(self.rng.next_u32())
}
}
struct SubidStore {
rng: XorShift32,
rng: Xoshiro128PlusPlus,
}
impl SubidStore {
fn new(seed: u32) -> Self {
Self {
rng: XorShift32::new(seed),
rng: Xoshiro128PlusPlus::seed_from_u64(seed as _),
}
}
@@ -367,7 +365,7 @@ impl SubidStore {
}
fn next(&mut self) -> Subid {
Subid(self.rng.next())
Subid(self.rng.next_u32())
}
}
@@ -376,6 +374,8 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 {
(dt.as_secs() / 60 * 60) as u32
}
pub type CmdResTx = Sender<Result<(), Error>>;
#[derive(Debug)]
pub enum ConnCommandKind {
SeriesLookupResult(Result<ChannelInfoResult, dbpg::seriesbychannel::Error>),
@@ -552,7 +552,7 @@ pub struct CaConn {
thr_msg_poll: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
weird_count: usize,
rng: Xoshiro128StarStar,
rng: Xoshiro128PlusPlus,
}
#[cfg(DISABLED)]
@@ -614,7 +614,7 @@ impl CaConn {
}
}
fn ioc_ping_ivl_rng(rng: &mut Xoshiro128StarStar) -> Duration {
fn ioc_ping_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
IOC_PING_IVL * 100 / (70 + (rng.next_u32() % 60))
}
@@ -668,7 +668,7 @@ impl CaConn {
}
fn cmd_check_health(&mut self) {
debug!("cmd_check_health");
trace!("cmd_check_health");
match self.check_channels_alive() {
Ok(_) => {}
Err(e) => {
@@ -743,8 +743,6 @@ impl CaConn {
fn cmd_channel_add(&mut self, name: String, cssid: ChannelStatusSeriesId) {
self.channel_add(name, cssid);
// TODO return the result
//self.stats.caconn_command_can_not_reply.inc();
}
fn cmd_channel_remove(&mut self, name: String) {
@@ -2168,9 +2166,9 @@ impl Stream for CaConn {
Pending
} else {
// TODO error
error!("logic error");
error!("shutting down, queues not flushed, no progress, no pending");
self.stats.logic_error().inc();
let e = Error::with_msg_no_trace("shutdown, not done, no progress, no pending");
let e = Error::with_msg_no_trace("shutting down, queues not flushed, no progress, no pending");
Ready(Some(Err(e)))
}
}

View File

@@ -46,6 +46,8 @@ use statemap::ChannelStateValue;
use statemap::WithStatusSeriesIdState;
use statemap::WithStatusSeriesIdStateInner;
use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaProtoStats;
@@ -134,7 +136,6 @@ impl CaConnRes {
pub struct ChannelAddWithAddr {
backend: String,
name: String,
local_epics_hostname: String,
cssid: ChannelStatusSeriesId,
addr: SocketAddr,
}
@@ -143,7 +144,6 @@ pub struct ChannelAddWithAddr {
pub struct ChannelAddWithStatusId {
backend: String,
name: String,
local_epics_hostname: String,
cssid: ChannelStatusSeriesId,
}
@@ -151,7 +151,7 @@ pub struct ChannelAddWithStatusId {
pub struct ChannelAdd {
backend: String,
name: String,
local_epics_hostname: String,
restx: crate::ca::conn::CmdResTx,
}
#[derive(Debug, Clone)]
@@ -196,7 +196,7 @@ impl fmt::Debug for ChannelStatusesRequest {
pub enum ConnSetCmd {
ChannelAdd(ChannelAdd),
ChannelRemove(ChannelRemove),
CheckHealth(Instant),
CheckHealth(u32, Instant),
Shutdown,
ChannelStatuses(ChannelStatusesRequest),
}
@@ -213,7 +213,7 @@ impl CaConnSetEvent {
#[derive(Debug, Clone)]
pub enum CaConnSetItem {
Error(Error),
Healthy(Instant, Instant),
Healthy(u32, Instant, Instant),
}
pub struct CaConnSetCtrl {
@@ -224,9 +224,15 @@ pub struct CaConnSetCtrl {
ca_proto_stats: Arc<CaProtoStats>,
ioc_finder_stats: Arc<IocFinderStats>,
jh: JoinHandle<Result<(), Error>>,
rng: Xoshiro128PlusPlus,
idcnt: u32,
}
impl CaConnSetCtrl {
pub fn new() -> Self {
todo!()
}
pub fn sender(&self) -> Sender<CaConnSetEvent> {
self.tx.clone()
}
@@ -235,12 +241,13 @@ impl CaConnSetCtrl {
self.rx.clone()
}
pub async fn add_channel(&self, backend: String, name: String, local_epics_hostname: String) -> Result<(), Error> {
let cmd = ChannelAdd {
backend,
name,
local_epics_hostname,
};
pub async fn add_channel(
&self,
backend: String,
name: String,
restx: crate::ca::conn::CmdResTx,
) -> Result<(), Error> {
let cmd = ChannelAdd { backend, name, restx };
let cmd = ConnSetCmd::ChannelAdd(cmd);
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
Ok(())
@@ -259,16 +266,17 @@ impl CaConnSetCtrl {
Ok(())
}
pub async fn check_health(&self) -> Result<(), Error> {
let cmd = ConnSetCmd::CheckHealth(Instant::now());
pub async fn check_health(&mut self) -> Result<u32, Error> {
let id = self.make_id();
let cmd = ConnSetCmd::CheckHealth(id, Instant::now());
let n = self.tx.len();
if n > 0 {
debug!("check_health self.tx.len() {:?}", n);
}
let s = format!("{:?}", cmd);
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
debug!("check_health enqueued {s}");
Ok(())
trace!("check_health enqueued {s}");
Ok(id)
}
pub async fn join(self) -> Result<(), Error> {
@@ -291,6 +299,12 @@ impl CaConnSetCtrl {
pub fn ioc_finder_stats(&self) -> &Arc<IocFinderStats> {
&self.ioc_finder_stats
}
fn make_id(&mut self) -> u32 {
let id = self.idcnt;
self.idcnt += 1;
self.rng.next_u32() & 0xffff | (id << 16)
}
}
#[derive(Debug)]
@@ -441,6 +455,8 @@ impl CaConnSet {
ca_proto_stats,
ioc_finder_stats,
jh,
idcnt: 0,
rng: stats::xoshiro_from_time(),
}
}
@@ -482,51 +498,13 @@ impl CaConnSet {
ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x),
// ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await,
// ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1),
ConnSetCmd::CheckHealth(id, ts1) => self.handle_check_health(id, ts1),
ConnSetCmd::Shutdown => self.handle_shutdown(),
ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x),
},
}
}
fn handle_ca_conn_event(&mut self, addr: SocketAddr, ev: CaConnEvent) -> Result<(), Error> {
match ev.value {
CaConnEventValue::None => Ok(()),
CaConnEventValue::EchoTimeout => Ok(()),
CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x),
CaConnEventValue::QueryItem(item) => {
todo!("remove this insert case");
// self.storage_insert_queue.push_back(item);
Ok(())
}
CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x),
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr),
CaConnEventValue::ConnectFail => self.handle_connect_fail(addr),
}
}
fn handle_series_lookup_result(&mut self, res: Result<ChannelInfoResult, Error>) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
trace3!("handle_series_lookup_result {res:?}");
match res {
Ok(res) => {
let add = ChannelAddWithStatusId {
backend: res.backend,
name: res.channel,
local_epics_hostname: self.local_epics_hostname.clone(),
cssid: ChannelStatusSeriesId::new(res.series.into_inner().id()),
};
self.handle_add_channel_with_status_id(add)?;
}
Err(e) => {
warn!("TODO handle error {e}");
}
}
Ok(())
}
fn handle_add_channel(&mut self, cmd: ChannelAdd) -> Result<(), Error> {
if self.shutdown_stopping {
trace3!("handle_add_channel but shutdown_stopping");
@@ -555,6 +533,46 @@ impl CaConnSet {
tx: Box::pin(SeriesLookupSender { tx }),
};
self.channel_info_query_queue.push_back(item);
if let Err(_) = cmd.restx.try_send(Ok(())) {
self.stats.command_reply_fail().inc();
}
Ok(())
}
fn handle_ca_conn_event(&mut self, addr: SocketAddr, ev: CaConnEvent) -> Result<(), Error> {
match ev.value {
CaConnEventValue::None => Ok(()),
CaConnEventValue::EchoTimeout => Ok(()),
CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x),
CaConnEventValue::QueryItem(item) => {
todo!("remove this insert case");
// self.storage_insert_queue.push_back(item);
Ok(())
}
CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x),
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr),
CaConnEventValue::ConnectFail => self.handle_connect_fail(addr),
}
}
fn handle_series_lookup_result(&mut self, res: Result<ChannelInfoResult, Error>) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
trace3!("handle_series_lookup_result {res:?}");
match res {
Ok(res) => {
let add = ChannelAddWithStatusId {
backend: res.backend,
name: res.channel,
cssid: ChannelStatusSeriesId::new(res.series.into_inner().id()),
};
self.handle_add_channel_with_status_id(add)?;
}
Err(e) => {
warn!("TODO handle error {e}");
}
}
Ok(())
}
@@ -721,7 +739,6 @@ impl CaConnSet {
name: res.channel,
addr: SocketAddr::V4(addr),
cssid: status_series_id.clone(),
local_epics_hostname: self.local_epics_hostname.clone(),
};
self.handle_add_channel_with_addr(cmd)?;
}
@@ -747,8 +764,8 @@ impl CaConnSet {
Ok(())
}
fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> {
trace2!("handle_check_health");
fn handle_check_health(&mut self, id: u32, ts1: Instant) -> Result<(), Error> {
trace2!("handle_check_health {id:08x}");
if self.shutdown_stopping {
return Ok(());
}
@@ -771,7 +788,7 @@ impl CaConnSet {
}
let ts2 = Instant::now();
let item = CaConnSetItem::Healthy(ts1, ts2);
let item = CaConnSetItem::Healthy(id, ts1, ts2);
self.connset_out_queue.push_back(item);
Ok(())
}
@@ -968,7 +985,7 @@ impl CaConnSet {
opts,
add.backend.clone(),
addr_v4,
add.local_epics_hostname,
self.local_epics_hostname.clone(),
self.storage_insert_tx.as_ref().get_ref().clone(),
self.channel_info_query_tx
.clone()
@@ -1281,7 +1298,6 @@ impl CaConnSet {
let cmd = ChannelAddWithAddr {
backend: self.backend.clone(),
name: ch.id().into(),
local_epics_hostname: self.local_epics_hostname.clone(),
cssid: status_series_id.clone(),
addr: SocketAddr::V4(*addr_v4),
};

View File

@@ -14,7 +14,7 @@ use tokio::io::AsyncReadExt;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CaIngestOpts {
backend: String,
channels: PathBuf,
channels: Option<PathBuf>,
api_bind: String,
search: Vec<String>,
#[serde(default)]
@@ -30,7 +30,6 @@ pub struct CaIngestOpts {
insert_worker_count: Option<usize>,
insert_worker_concurrency: Option<usize>,
insert_scylla_sessions: Option<usize>,
insert_queue_max: Option<usize>,
insert_item_queue_cap: Option<usize>,
local_epics_hostname: Option<String>,
store_workers_rate: Option<u64>,
@@ -88,10 +87,6 @@ impl CaIngestOpts {
self.insert_scylla_sessions.unwrap_or(1)
}
pub fn insert_queue_max(&self) -> usize {
self.insert_queue_max.unwrap_or(64)
}
pub fn array_truncate(&self) -> u64 {
self.array_truncate.unwrap_or(512)
}
@@ -163,7 +158,7 @@ scylla:
"###;
let res: Result<CaIngestOpts, _> = serde_yaml::from_slice(conf.as_bytes());
let conf = res.unwrap();
assert_eq!(conf.channels, PathBuf::from("/some/path/file.txt"));
assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt")));
assert_eq!(&conf.api_bind, "0.0.0.0:3011");
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string()));
@@ -192,7 +187,7 @@ fn test_duration_parse() {
assert_eq!(a.dur, Duration::from_millis(3170));
}
pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Vec<String>), Error> {
pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option<Vec<String>>), Error> {
let mut file = OpenOptions::new().read(true).open(config).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
@@ -200,27 +195,32 @@ pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Vec<String>)
drop(file);
let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-whitelisted--".into()))?;
let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-blacklisted--".into()))?;
let mut file = OpenOptions::new().read(true).open(&conf.channels).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let lines = buf.split(|&x| x == 0x0a);
let mut channels = Vec::new();
for line in lines {
let line = String::from_utf8_lossy(line);
let line = line.trim();
let use_line = if line.is_empty() {
false
} else if let Some(_cs) = re_p.captures(&line) {
true
} else if re_n.is_match(&line) {
false
} else {
true
};
if use_line {
channels.push(line.into());
let channels = if let Some(fname) = conf.channels.as_ref() {
let mut file = OpenOptions::new().read(true).open(fname).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let lines = buf.split(|&x| x == 0x0a);
let mut channels = Vec::new();
for line in lines {
let line = String::from_utf8_lossy(line);
let line = line.trim();
let use_line = if line.is_empty() {
false
} else if let Some(_cs) = re_p.captures(&line) {
true
} else if re_n.is_match(&line) {
false
} else {
true
};
if use_line {
channels.push(line.into());
}
}
}
info!("Parsed {} channels", channels.len());
info!("Parsed {} channels", channels.len());
Some(channels)
} else {
None
};
Ok((conf, channels))
}

View File

@@ -20,7 +20,7 @@ impl Channel {
#[derive(Debug, Clone)]
pub enum DaemonEvent {
TimerTick(u32, Sender<u32>),
ChannelAdd(Channel),
ChannelAdd(Channel, crate::ca::conn::CmdResTx),
ChannelRemove(Channel),
CaConnSetItem(CaConnSetItem),
Shutdown,
@@ -31,7 +31,7 @@ impl DaemonEvent {
use DaemonEvent::*;
match self {
TimerTick(_, _) => format!("TimerTick"),
ChannelAdd(x) => format!("ChannelAdd {x:?}"),
ChannelAdd(x, _) => format!("ChannelAdd {x:?}"),
ChannelRemove(x) => format!("ChannelRemove {x:?}"),
CaConnSetItem(_) => format!("CaConnSetItem"),
Shutdown => format!("Shutdown"),

View File

@@ -9,8 +9,12 @@ use async_channel::Receiver;
use async_channel::Sender;
use async_channel::WeakSender;
use axum::extract::Query;
use axum::http;
use axum::response::IntoResponse;
use axum::response::Response;
use err::Error;
use http::Request;
use http::StatusCode;
use log::*;
use scywr::iteminsertqueue::QueryItem;
use serde::Deserialize;
@@ -33,6 +37,41 @@ use std::sync::Arc;
use std::time::Duration;
use taskrun::tokio;
struct PublicErrorMsg(String);
trait ToPublicErrorMsg {
fn to_public_err_msg(&self) -> PublicErrorMsg;
}
impl ToPublicErrorMsg for err::Error {
fn to_public_err_msg(&self) -> PublicErrorMsg {
todo!()
}
}
impl IntoResponse for PublicErrorMsg {
fn into_response(self) -> axum::response::Response {
todo!()
}
}
struct CustomErrorResponse(Response);
impl<T> From<T> for CustomErrorResponse
where
T: ToPublicErrorMsg,
{
fn from(value: T) -> Self {
todo!()
}
}
impl IntoResponse for CustomErrorResponse {
fn into_response(self) -> Response {
todo!()
}
}
pub struct StatsSet {
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
@@ -79,6 +118,12 @@ impl ExtraInsertsConf {
}
}
async fn always_error(params: HashMap<String, String>) -> Result<axum::Json<bool>, Response> {
Err(Error::with_public_msg_no_trace("The-public-message")
.to_public_err_msg()
.into_response())
}
async fn find_channel(
params: HashMap<String, String>,
dcom: Arc<DaemonComm>,
@@ -91,20 +136,26 @@ async fn find_channel(
}
async fn channel_add_inner(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> Result<(), Error> {
if let (Some(backend), Some(name)) = (params.get("backend"), params.get("name")) {
error!("TODO channel_add_inner");
Err(Error::with_msg_no_trace(format!("TODO channel_add_inner")))
if let Some(name) = params.get("name") {
let ch = crate::daemon_common::Channel::new(name.into());
let (tx, rx) = async_channel::bounded(1);
let ev = DaemonEvent::ChannelAdd(ch, tx);
dcom.tx.send(ev).await?;
match rx.recv().await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(Error::with_msg_no_trace(format!("{e}"))),
Err(e) => Err(Error::with_msg_no_trace(format!("{e}"))),
}
} else {
Err(Error::with_msg_no_trace(format!("wrong parameters given")))
}
}
async fn channel_add(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> axum::Json<bool> {
let ret = match channel_add_inner(params, dcom).await {
Ok(_) => true,
Err(_) => false,
};
axum::Json(ret)
async fn channel_add(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> Result<axum::Json<bool>, Response> {
match channel_add_inner(params, dcom).await {
Ok(_) => Ok(axum::Json::from(true)),
Err(e) => Err(e.to_public_err_msg().into_response()),
}
}
async fn channel_remove(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> axum::Json<serde_json::Value> {
@@ -198,7 +249,8 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
.route(
"/path2",
get(|qu: Query<DummyQuery>| async move { (StatusCode::OK, format!("{qu:?}")) }),
),
)
.route("/path3/", get(|| async { (StatusCode::OK, format!("Hello there!")) })),
)
.route(
"/metrics",
@@ -236,6 +288,10 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
}
}),
)
.route(
"/daqingest/always-error/",
get(|Query(params): Query<HashMap<String, String>>| always_error(params)),
)
.route(
"/daqingest/find/channel",
get({
@@ -326,11 +382,7 @@ pub async fn metrics_service(
.unwrap()
}
pub async fn metrics_agg_task(
query_item_chn: WeakSender<QueryItem>,
local_stats: Arc<CaConnStats>,
store_stats: Arc<CaConnStats>,
) -> Result<(), Error> {
pub async fn metrics_agg_task(local_stats: Arc<CaConnStats>, store_stats: Arc<CaConnStats>) -> Result<(), Error> {
let mut agg_last = CaConnStatsAgg::new();
loop {
tokio::time::sleep(Duration::from_millis(671)).await;
@@ -353,11 +405,6 @@ pub async fn metrics_agg_task(
agg.push(g.stats());
}
}
{
warn!("TODO provide metrics with a weak ref to the query_item_channel");
let nitems = query_item_chn.upgrade().map_or(0, |x| x.len());
agg.store_worker_recv_queue_len.__set(nitems as u64);
}
#[cfg(DISABLED)]
{
let mut m = METRICS.lock().unwrap();