Factor CA event handler

This commit is contained in:
Dominik Werder
2022-07-26 12:23:25 +02:00
parent f50a1513b3
commit 37204cdad8
2 changed files with 215 additions and 162 deletions

View File

@@ -273,7 +273,6 @@ pub struct CaConn {
init_state_count: u64,
cid_by_name: BTreeMap<String, u32>,
cid_by_subid: BTreeMap<u32, u32>,
ts_msp_last_by_series: BTreeMap<SeriesId, u64>,
name_by_cid: BTreeMap<u32, String>,
poll_count: usize,
data_store: Arc<DataStore>,
@@ -313,7 +312,6 @@ impl CaConn {
init_state_count: 0,
cid_by_name: BTreeMap::new(),
cid_by_subid: BTreeMap::new(),
ts_msp_last_by_series: BTreeMap::new(),
name_by_cid: BTreeMap::new(),
poll_count: 0,
data_store,
@@ -336,8 +334,8 @@ impl CaConn {
self.conn_command_tx.clone()
}
fn handle_conn_command(&mut self, cx: &mut Context) {
// TODO if this loops for too long time, interrupt and make sure we get called waken up again.
fn handle_conn_command(&mut self, cx: &mut Context) -> Option<Poll<()>> {
// TODO if this loops for too long time, yield and make sure we get wake up again.
use Poll::*;
loop {
match self.conn_command_rx.poll_next_unpin(cx) {
@@ -422,7 +420,7 @@ impl CaConn {
error!("Command queue closed");
}
Pending => {
break;
break Some(Pending);
}
}
}
@@ -476,8 +474,7 @@ impl CaConn {
Ready(Err(_)) => break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))),
Pending => {
if false {
// Drop the item and continue.
// TODO this causes performance degradation in the channel.
// TODO test this case.
self.stats.inserts_queue_drop_inc();
self.insert_item_send_fut = None;
} else {
@@ -502,125 +499,124 @@ impl CaConn {
}
}
fn handle_get_series_futs(&mut self, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
while self.fut_get_series.len() > 0 {
match self.fut_get_series.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
self.stats.get_series_id_ok_inc();
let cid = k.0;
let sid = k.1;
let data_type = k.2;
let data_count = k.3;
let series = match k.4 {
Existence::Created(k) => k,
Existence::Existing(k) => k,
};
if series.id() == 0 {
warn!("Weird series id: {series:?}");
}
if data_type > 6 {
error!("data type of series unexpected: {}", data_type);
}
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
// TODO convert first to CaDbrType, set to `Time`, then convert to ix:
let data_type_asked = data_type + 14;
let msg = CaMsg {
ty: CaMsgTy::EventAdd(EventAdd {
sid,
data_type: data_type_asked,
data_count,
subid,
}),
};
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid,
sid,
// TODO handle error better! Transition channel to Error state?
scalar_type: ScalarType::from_ca_id(data_type)?,
shape: Shape::from_ca_count(data_count)?,
ts_created: Instant::now(),
state: MonitoringState::AddingEvent(series),
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: Instant::now(),
insert_next_earliest: Instant::now(),
muted_before: 0,
});
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
let _cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::LE,
compression: None,
};
cx.waker().wake_by_ref();
}
Ready(Some(Err(e))) => error!("series error: {e:?}"),
Ready(None) => {}
Pending => break,
}
fn channel_to_evented(
&mut self,
cid: u32,
sid: u32,
data_type: u16,
data_count: u16,
series: Existence<SeriesId>,
cx: &mut Context,
) -> Result<(), Error> {
self.stats.get_series_id_ok_inc();
let series = match series {
Existence::Created(k) => k,
Existence::Existing(k) => k,
};
if series.id() == 0 {
warn!("Weird series id: {series:?}");
}
if data_type > 6 {
error!("data type of series unexpected: {}", data_type);
}
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
// TODO convert first to CaDbrType, set to `Time`, then convert to ix:
let data_type_asked = data_type + 14;
let msg = CaMsg {
ty: CaMsgTy::EventAdd(EventAdd {
sid,
data_type: data_type_asked,
data_count,
subid,
}),
};
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid,
sid,
// TODO handle error better! Transition channel to Error state?
scalar_type: ScalarType::from_ca_id(data_type)?,
shape: Shape::from_ca_count(data_count)?,
ts_created: Instant::now(),
state: MonitoringState::AddingEvent(series),
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: Instant::now(),
insert_next_earliest: Instant::now(),
muted_before: 0,
});
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
let _cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::LE,
compression: None,
};
cx.waker().wake_by_ref();
Ok(())
}
fn handle_get_series_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
while self.fut_get_series.len() > 0 {
match self.fut_get_series.poll_next_unpin(cx) {
Ready(Some(Ok((cid, sid, data_type, data_count, series)))) => {
match self.channel_to_evented(cid, sid, data_type, data_count, series, cx) {
Ok(_) => {}
Err(e) => {
return Ready(Err(e));
}
}
}
Ready(Some(Err(e))) => return Ready(Err(e)),
Ready(None) => return Ready(Err(Error::with_msg_no_trace("series lookup stream should never end"))),
Pending => break,
}
}
return Pending;
}
fn event_add_insert(
&mut self,
st: &mut CreatedState,
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
ts: u64,
ev: proto::EventAddRes,
cid: u32,
item_queue: &mut VecDeque<QueryItem>,
ts_msp_last: u64,
inserted_in_ts_msp: u64,
ts_msp_grid: Option<u32>,
stats: Arc<CaConnStats>,
) -> Result<(), Error> {
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
let ts_msp = if inserted_in_ts_msp > 20000 {
let (ts_msp, ts_msp_changed) = if inserted_in_ts_msp >= 20000 {
let ts_msp = ts / (10 * SEC) * (10 * SEC);
if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() {
if ts_msp == st.ts_msp_last {
(ts_msp, false)
} else {
st.ts_msp_last = ts_msp;
st.inserted_in_ts_msp = 1;
} else {
error!("logic error expect ChannelState::Created");
(ts_msp, true)
}
ts_msp
} else {
if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() {
st.inserted_in_ts_msp += 1;
} else {
error!("logic error expect ChannelState::Created");
}
ts_msp_last
st.inserted_in_ts_msp += 1;
(ts_msp_last, false)
};
let ts_lsp = ts - ts_msp;
let ts_msp_changed = if let Some(ts_msp_cur) = self.ts_msp_last_by_series.get_mut(&series) {
if ts_msp != *ts_msp_cur {
*ts_msp_cur = ts_msp;
true
} else {
false
}
} else {
self.ts_msp_last_by_series.insert(series.clone(), ts_msp);
true
};
let item_queue = &mut self.insert_item_queue;
let item = InsertItem {
series: series.id(),
ts_msp,
@@ -633,7 +629,55 @@ impl CaConn {
ts_msp_grid,
};
item_queue.push_back(QueryItem::Insert(item));
self.stats.insert_item_create_inc();
stats.insert_item_create_inc();
Ok(())
}
fn do_event_insert(
st: &mut CreatedState,
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
ts: u64,
ev: proto::EventAddRes,
tsnow: Instant,
item_queue: &mut VecDeque<QueryItem>,
insert_ivl_min: Arc<AtomicU64>,
stats: Arc<CaConnStats>,
) -> Result<(), Error> {
st.muted_before = 0;
st.insert_item_ivl_ema.tick(tsnow);
let em = st.insert_item_ivl_ema.ema();
let ema = em.ema();
let ivl_min = insert_ivl_min.load(Ordering::Acquire);
let ivl_min = (ivl_min as f32) * 1e-6;
let dt = (ivl_min - ema).max(0.) / em.k();
st.insert_next_earliest = tsnow
.checked_add(Duration::from_micros((dt * 1e6) as u64))
.ok_or_else(|| Error::with_msg_no_trace("time overflow in next insert"))?;
let ts_msp_last = st.ts_msp_last;
let inserted_in_ts_msp = st.inserted_in_ts_msp;
// TODO get event timestamp from channel access field
let ts_msp_grid = (ts / (SEC * 10 * 6 * 2)) as u32 * (6 * 2);
let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid {
st.ts_msp_grid_last = ts_msp_grid;
Some(ts_msp_grid)
} else {
None
};
Self::event_add_insert(
st,
series,
scalar_type,
shape,
ts,
ev,
item_queue,
ts_msp_last,
inserted_in_ts_msp,
ts_msp_grid,
stats,
)?;
Ok(())
}
@@ -691,36 +735,19 @@ impl CaConn {
self.stats.ca_ts_off_1_inc();
}
if tsnow >= st.insert_next_earliest {
st.muted_before = 0;
st.insert_item_ivl_ema.tick(tsnow);
let em = st.insert_item_ivl_ema.ema();
let ema = em.ema();
let ivl_min = self.insert_ivl_min.load(Ordering::Acquire);
let ivl_min = (ivl_min as f32) * 1e-6;
let dt = (ivl_min - ema).max(0.) / em.k();
st.insert_next_earliest = tsnow
.checked_add(Duration::from_micros((dt * 1e6) as u64))
.ok_or_else(|| Error::with_msg_no_trace("time overflow in next insert"))?;
let ts_msp_last = st.ts_msp_last;
let inserted_in_ts_msp = st.inserted_in_ts_msp;
// TODO get event timestamp from channel access field
let ts_msp_grid = (ts / (SEC * 10 * 6 * 2)) as u32 * (6 * 2);
let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid {
st.ts_msp_grid_last = ts_msp_grid;
Some(ts_msp_grid)
} else {
None
};
self.event_add_insert(
//let channel_state = self.channels.get_mut(&cid).unwrap();
let item_queue = &mut self.insert_item_queue;
Self::do_event_insert(
st,
series,
scalar_type,
shape,
ts,
ev,
cid,
ts_msp_last,
inserted_in_ts_msp,
ts_msp_grid,
tsnow,
item_queue,
self.insert_ivl_min.clone(),
self.stats.clone(),
)?;
} else {
self.stats.channel_fast_item_drop_inc();
@@ -1069,7 +1096,7 @@ impl Stream for CaConn {
},
CaConnState::PeerReady => {
{
self.handle_get_series_futs(cx)?;
let _ = self.handle_get_series_futs(cx)?;
let ts2 = Instant::now();
self.stats
.poll_time_get_series_futs

View File

@@ -1,5 +1,7 @@
use crate::ca::conn::ConnCommand;
use crate::ca::{CommandQueueSet, IngestCommons};
use axum::extract::Query;
use http::request::Parts;
use log::*;
use serde::Deserialize;
use std::collections::HashMap;
@@ -7,14 +9,6 @@ use std::net::SocketAddrV4;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Deserialize)]
struct QueryForm {
query: String,
time: Option<f64>,
#[allow(unused)]
timeout: Option<String>,
}
#[allow(unused)]
#[derive(Debug, Deserialize)]
struct PromLabels {
@@ -146,6 +140,60 @@ async fn channel_remove(
}
}
async fn prom_query(
Query(params): Query<HashMap<String, String>>,
parts: Parts,
body: bytes::Bytes,
) -> axum::Json<serde_json::Value> {
use axum::Json;
info!("/api/v1/query params {:?} {:?}", params, parts);
let url = url::Url::parse(&format!("dummy://{}", &parts.uri));
info!("/api/v1/query parsed url: {:?}", url);
let body_str = String::from_utf8_lossy(&body);
info!("/api/v1/query body_str: {:?}", body_str);
let formurl = url::Url::parse(&format!("dummy:///?{}", body_str));
info!("/api/v1/query formurl: {:?}", formurl);
let res = serde_json::json!({
"status": "success",
"data": {
"resultType": "scalar",
"result": [40, "2"]
}
});
Json(res)
}
async fn prom_query_range(
Query(params): Query<HashMap<String, String>>,
parts: Parts,
body: bytes::Bytes,
) -> axum::Json<serde_json::Value> {
use axum::Json;
info!("/api/v1/query_range {:?} Query(params) {:?}", parts, params);
let url = url::Url::parse(&format!("dummy://{}", &parts.uri));
info!("/api/v1/query_range parsed url: {:?}", url);
let body_str = String::from_utf8_lossy(&body);
info!("/api/v1/query_range body_str: {:?}", body_str);
let formurl = url::Url::parse(&format!("dummy:///?{}", body_str));
info!("/api/v1/query_range formurl: {:?}", formurl);
let res = serde_json::json!({
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"__name__": "series1",
},
"values": [
]
}
]
}
});
Json(res)
}
pub async fn start_metrics_service(
bind_to: String,
insert_frac: Arc<AtomicU64>,
@@ -153,11 +201,9 @@ pub async fn start_metrics_service(
command_queue_set: Arc<CommandQueueSet>,
ingest_commons: Arc<IngestCommons>,
) {
use axum::extract::Query;
use axum::routing::{get, post, put};
use axum::Form;
use axum::{extract, Router};
use http::request::Parts;
let app = Router::new()
.route(
"/metrics",
@@ -299,28 +345,8 @@ pub async fn start_metrics_service(
serde_json::to_string(&res).unwrap()
}),
)
.route(
"/api/v1/query",
post(
|Form(form): Form<QueryForm>, Query(params): Query<HashMap<String, String>>, parts: Parts| async move {
info!("/api/v1/query form {form:?} params {params:?} {parts:?}");
let res = if form.query == "1+1" {
serde_json::json!({
"status": "success",
"data": {
"resultType": "scalar",
"result": [form.time.unwrap_or(0.0), "2"]
}
})
} else {
serde_json::json!({
"status": "success"
})
};
serde_json::to_string(&res).unwrap()
},
),
)
.route("/api/v1/query", post(prom_query))
.route("/api/v1/query_range", post(prom_query_range))
.route(
"/api/v1/labels",
post(|Form(_form): Form<PromLabels>| async move {