Emit accounting for rt
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,3 +2,4 @@
|
||||
/Cargo.lock
|
||||
/tmpdoc
|
||||
/.vscode
|
||||
/tmp
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "daqingest"
|
||||
version = "0.2.1-aa.1"
|
||||
version = "0.2.1"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
@@ -14,7 +14,7 @@ clap = { version = "4.5.1", features = ["derive", "cargo"] }
|
||||
tracing = "0.1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
tokio-postgres = "0.7.10"
|
||||
async-channel = "2.2.0"
|
||||
async-channel = "2.3.1"
|
||||
futures-util = "0.3"
|
||||
chrono = "0.4"
|
||||
bytes = "1.6.0"
|
||||
|
||||
@@ -610,6 +610,9 @@ impl Daemon {
|
||||
self.iqtx
|
||||
.take()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?,
|
||||
self.ingest_opts.scylla_config_st().clone(),
|
||||
self.ingest_opts.scylla_config_mt().clone(),
|
||||
self.ingest_opts.scylla_config_lt().clone(),
|
||||
);
|
||||
let rres = Arc::new(rres);
|
||||
let metrics_jh = {
|
||||
|
||||
@@ -15,10 +15,10 @@ serde_yaml = "0.9.16"
|
||||
ciborium = "0.2.2"
|
||||
tokio-stream = { version = "0.1", features = ["fs"] }
|
||||
tracing = "0.1.37"
|
||||
async-channel = "2.0.0"
|
||||
bytes = "1.5"
|
||||
async-channel = "2.3.1"
|
||||
bytes = "1.6"
|
||||
arrayref = "0.3"
|
||||
byteorder = "1.4"
|
||||
byteorder = "1.5"
|
||||
futures-util = "0.3"
|
||||
md-5 = "0.10.5"
|
||||
hex = "0.4.3"
|
||||
@@ -26,7 +26,7 @@ regex = "1.8.4"
|
||||
axum = "0.7.5"
|
||||
http-body = "1"
|
||||
url = "2.2"
|
||||
hyper = "0.14"
|
||||
#hyper = "1.3.1"
|
||||
chrono = "0.4"
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
|
||||
@@ -4,6 +4,7 @@ use super::proto::ReadNotify;
|
||||
use crate::ca::proto::ChannelClose;
|
||||
use crate::ca::proto::EventCancel;
|
||||
use crate::conf::ChannelConfig;
|
||||
use crate::metrics::status::StorageUsage;
|
||||
use crate::throttletrace::ThrottleTrace;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
@@ -18,6 +19,7 @@ use futures_util::StreamExt;
|
||||
use hashbrown::HashMap;
|
||||
use log::*;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsMs;
|
||||
@@ -193,6 +195,7 @@ pub struct ChannelStateInfo {
|
||||
|
||||
mod ser_instant {
|
||||
use super::*;
|
||||
use netpod::DATETIME_FMT_3MS;
|
||||
use serde::Deserializer;
|
||||
use serde::Serializer;
|
||||
|
||||
@@ -221,9 +224,7 @@ mod ser_instant {
|
||||
.unwrap();
|
||||
now.checked_add_signed(dur2).unwrap()
|
||||
};
|
||||
//info!("formatting {:?}", t1);
|
||||
let s = t1.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
|
||||
//info!("final string {:?}", s);
|
||||
let s = t1.format(DATETIME_FMT_3MS).to_string();
|
||||
ser.serialize_str(&s)
|
||||
}
|
||||
None => ser.serialize_none(),
|
||||
@@ -351,6 +352,34 @@ enum ReadingState {
|
||||
Polling(PollingState),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct AccountingInfo {
|
||||
usage: StorageUsage,
|
||||
beg: TsMs,
|
||||
}
|
||||
|
||||
impl AccountingInfo {
|
||||
fn new(beg: TsMs) -> Self {
|
||||
Self {
|
||||
usage: StorageUsage::new(),
|
||||
beg,
|
||||
}
|
||||
}
|
||||
|
||||
fn push_written(&mut self, payload_len: u32) {
|
||||
self.usage.push_written(payload_len);
|
||||
}
|
||||
|
||||
fn usage(&self) -> &StorageUsage {
|
||||
&self.usage
|
||||
}
|
||||
|
||||
fn reset(&mut self, msp: TsMs) {
|
||||
self.beg = msp;
|
||||
self.usage.reset();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct CreatedState {
|
||||
cssid: ChannelStatusSeriesId,
|
||||
@@ -377,9 +406,9 @@ struct CreatedState {
|
||||
stwin_ts: u64,
|
||||
stwin_count: u32,
|
||||
stwin_bytes: u32,
|
||||
account_emit_last: TsMs,
|
||||
account_count: u64,
|
||||
account_bytes: u64,
|
||||
acc_st: AccountingInfo,
|
||||
acc_mt: AccountingInfo,
|
||||
acc_lt: AccountingInfo,
|
||||
dw_st_last: SystemTime,
|
||||
dw_mt_last: SystemTime,
|
||||
dw_lt_last: SystemTime,
|
||||
@@ -389,6 +418,7 @@ impl CreatedState {
|
||||
fn dummy() -> Self {
|
||||
let tsnow = Instant::now();
|
||||
let stnow = SystemTime::now();
|
||||
let (acc_msp, _) = TsMs::from_system_time(stnow).to_grid_02(EMIT_ACCOUNTING_SNAP);
|
||||
Self {
|
||||
cssid: ChannelStatusSeriesId::new(0),
|
||||
cid: Cid(0),
|
||||
@@ -412,9 +442,9 @@ impl CreatedState {
|
||||
stwin_ts: 0,
|
||||
stwin_count: 0,
|
||||
stwin_bytes: 0,
|
||||
account_emit_last: TsMs(0),
|
||||
account_count: 0,
|
||||
account_bytes: 0,
|
||||
acc_st: AccountingInfo::new(acc_msp),
|
||||
acc_mt: AccountingInfo::new(acc_msp),
|
||||
acc_lt: AccountingInfo::new(acc_msp),
|
||||
dw_st_last: SystemTime::UNIX_EPOCH,
|
||||
dw_mt_last: SystemTime::UNIX_EPOCH,
|
||||
dw_lt_last: SystemTime::UNIX_EPOCH,
|
||||
@@ -829,7 +859,7 @@ pub struct CaConn {
|
||||
cid_by_name: BTreeMap<String, Cid>,
|
||||
cid_by_subid: HashMap<Subid, Cid>,
|
||||
cid_by_sid: HashMap<Sid, Cid>,
|
||||
channel_status_emit_last: Instant,
|
||||
channel_status_emit_next: Instant,
|
||||
tick_last_writer: Instant,
|
||||
init_state_count: u64,
|
||||
iqdqs: InsertDeques,
|
||||
@@ -896,7 +926,7 @@ impl CaConn {
|
||||
cid_by_name: BTreeMap::new(),
|
||||
cid_by_subid: HashMap::new(),
|
||||
cid_by_sid: HashMap::new(),
|
||||
channel_status_emit_last: tsnow,
|
||||
channel_status_emit_next: tsnow + Self::channel_status_emit_ivl(&mut rng),
|
||||
tick_last_writer: tsnow,
|
||||
iqdqs: InsertDeques::new(),
|
||||
remote_addr_dbg,
|
||||
@@ -931,6 +961,10 @@ impl CaConn {
|
||||
IOC_PING_IVL * 100 / (70 + (rng.next_u32() % 60))
|
||||
}
|
||||
|
||||
fn channel_status_emit_ivl(rng: &mut Xoshiro128PlusPlus) -> Duration {
|
||||
Duration::from_millis(6000 + (rng.next_u32() & 0x7ff) as u64)
|
||||
}
|
||||
|
||||
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
|
||||
Box::pin(tokio::time::sleep(Duration::from_millis(500)))
|
||||
}
|
||||
@@ -1664,25 +1698,23 @@ impl CaConn {
|
||||
let ts_diff = ts.abs_diff(ts_local);
|
||||
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
|
||||
{
|
||||
{
|
||||
crst.account_count += 1;
|
||||
// TODO how do we account for bytes? Here, we also add 8 bytes for the timestamp.
|
||||
crst.account_bytes += 8 + payload_len as u64;
|
||||
crst.muted_before = 0;
|
||||
crst.insert_item_ivl_ema.tick(tsnow);
|
||||
}
|
||||
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
|
||||
crst.muted_before = 0;
|
||||
crst.insert_item_ivl_ema.tick(tsnow);
|
||||
{
|
||||
let val: DataValue = value.data.into();
|
||||
let ((dwst, dwmt, dwlt),) = writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?;
|
||||
if dwst {
|
||||
crst.dw_st_last = stnow;
|
||||
crst.acc_st.push_written(payload_len);
|
||||
}
|
||||
if dwmt {
|
||||
crst.dw_mt_last = stnow;
|
||||
crst.acc_mt.push_written(payload_len);
|
||||
}
|
||||
if dwlt {
|
||||
crst.dw_lt_last = stnow;
|
||||
crst.acc_lt.push_written(payload_len);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1751,7 +1783,7 @@ impl CaConn {
|
||||
Ready(Some(k)) => match k {
|
||||
Ok(k) => match k {
|
||||
CaItem::Empty => {
|
||||
info!("CaItem::Empty");
|
||||
debug!("CaItem::Empty");
|
||||
Ready(Some(Ok(())))
|
||||
}
|
||||
CaItem::Msg(msg) => match msg.ty {
|
||||
@@ -2183,6 +2215,7 @@ impl CaConn {
|
||||
let ca_dbr_type = k.data_type + 14;
|
||||
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
|
||||
let shape = Shape::from_ca_count(k.data_count)?;
|
||||
let (acc_msp, _) = TsMs::from_system_time(stnow).to_grid_02(EMIT_ACCOUNTING_SNAP);
|
||||
let channel = CreatedState {
|
||||
cssid,
|
||||
cid,
|
||||
@@ -2206,9 +2239,9 @@ impl CaConn {
|
||||
stwin_ts: 0,
|
||||
stwin_count: 0,
|
||||
stwin_bytes: 0,
|
||||
account_emit_last: TsMs::from_ms_u64(0),
|
||||
account_count: 0,
|
||||
account_bytes: 0,
|
||||
acc_st: AccountingInfo::new(acc_msp),
|
||||
acc_mt: AccountingInfo::new(acc_msp),
|
||||
acc_lt: AccountingInfo::new(acc_msp),
|
||||
dw_st_last: SystemTime::UNIX_EPOCH,
|
||||
dw_mt_last: SystemTime::UNIX_EPOCH,
|
||||
dw_lt_last: SystemTime::UNIX_EPOCH,
|
||||
@@ -2230,7 +2263,7 @@ impl CaConn {
|
||||
}
|
||||
|
||||
fn handle_channel_close_res(&mut self, k: proto::ChannelCloseRes, tsnow: Instant) -> Result<(), Error> {
|
||||
info!("{:?}", k);
|
||||
debug!("{:?}", k);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2283,7 +2316,7 @@ impl CaConn {
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
use std::io::ErrorKind;
|
||||
info!("error connect to {addr} {e}");
|
||||
debug!("error connect to {addr} {e}");
|
||||
let addr = addr.clone();
|
||||
self.iqdqs
|
||||
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
|
||||
@@ -2300,7 +2333,7 @@ impl CaConn {
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO log with exponential backoff
|
||||
info!("timeout connect to {addr} {e}");
|
||||
debug!("timeout connect to {addr} {e}");
|
||||
let addr = addr.clone();
|
||||
self.iqdqs
|
||||
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
|
||||
@@ -2415,8 +2448,8 @@ impl CaConn {
|
||||
self.check_channels_state_poll(tsnow, cx)?;
|
||||
self.check_channels_alive(tsnow, cx)?;
|
||||
// TODO add some random variation
|
||||
if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow {
|
||||
self.channel_status_emit_last = tsnow;
|
||||
if self.channel_status_emit_next <= tsnow {
|
||||
self.channel_status_emit_next = tsnow + Self::channel_status_emit_ivl(&mut self.rng);
|
||||
self.emit_channel_status()?;
|
||||
self.emit_accounting()?;
|
||||
}
|
||||
@@ -2476,22 +2509,25 @@ impl CaConn {
|
||||
match st0 {
|
||||
ChannelState::Writable(st1) => {
|
||||
let ch = &mut st1.channel;
|
||||
if ch.account_emit_last != msp {
|
||||
if ch.account_count != 0 {
|
||||
let series = st1.writer.sid();
|
||||
let count = ch.account_count as i64;
|
||||
let bytes = ch.account_bytes as i64;
|
||||
ch.account_count = 0;
|
||||
ch.account_bytes = 0;
|
||||
let item = QueryItem::Accounting(Accounting {
|
||||
part: (series.id() & 0xff) as i32,
|
||||
ts: msp,
|
||||
series,
|
||||
count,
|
||||
bytes,
|
||||
});
|
||||
self.iqdqs.emit_status_item(item)?;
|
||||
ch.account_emit_last = msp;
|
||||
for (acc, rt) in [&mut ch.acc_st, &mut ch.acc_mt, &mut ch.acc_lt].into_iter().zip([
|
||||
RetentionTime::Short,
|
||||
RetentionTime::Medium,
|
||||
RetentionTime::Long,
|
||||
]) {
|
||||
if acc.beg != msp {
|
||||
if acc.usage().count() != 0 {
|
||||
let series = st1.writer.sid();
|
||||
let item = Accounting {
|
||||
part: (series.id() & 0xff) as i32,
|
||||
ts: acc.beg,
|
||||
series,
|
||||
count: acc.usage().count() as _,
|
||||
bytes: acc.usage().bytes() as _,
|
||||
};
|
||||
//info!("EMIT ITEM {rt:?} {item:?}");
|
||||
self.iqdqs.emit_accounting_item(rt, item)?;
|
||||
acc.reset(msp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#![allow(unused)]
|
||||
pub mod delete;
|
||||
pub mod ingest;
|
||||
pub mod postingest;
|
||||
pub mod status;
|
||||
@@ -26,6 +27,7 @@ use http::Request;
|
||||
use http::StatusCode;
|
||||
use http_body::Body;
|
||||
use log::*;
|
||||
use scywr::config::ScyllaIngestConfig;
|
||||
use scywr::insertqueues::InsertQueuesTx;
|
||||
use scywr::iteminsertqueue::QueryItem;
|
||||
use serde::Deserialize;
|
||||
@@ -312,14 +314,27 @@ pub struct RoutesResources {
|
||||
backend: String,
|
||||
worker_tx: Sender<ChannelInfoQuery>,
|
||||
iqtx: InsertQueuesTx,
|
||||
scyconf_st: ScyllaIngestConfig,
|
||||
scyconf_mt: ScyllaIngestConfig,
|
||||
scyconf_lt: ScyllaIngestConfig,
|
||||
}
|
||||
|
||||
impl RoutesResources {
|
||||
pub fn new(backend: String, worker_tx: Sender<ChannelInfoQuery>, iqtx: InsertQueuesTx) -> Self {
|
||||
pub fn new(
|
||||
backend: String,
|
||||
worker_tx: Sender<ChannelInfoQuery>,
|
||||
iqtx: InsertQueuesTx,
|
||||
scyconf_st: ScyllaIngestConfig,
|
||||
scyconf_mt: ScyllaIngestConfig,
|
||||
scyconf_lt: ScyllaIngestConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
backend,
|
||||
worker_tx,
|
||||
iqtx,
|
||||
scyconf_st,
|
||||
scyconf_mt,
|
||||
scyconf_lt,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -381,6 +396,13 @@ fn make_routes(
|
||||
let dcom = dcom.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_add(params, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/remove",
|
||||
get({
|
||||
let dcom = dcom.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_remove(params, dcom)
|
||||
}),
|
||||
),
|
||||
)
|
||||
.nest(
|
||||
@@ -396,6 +418,33 @@ fn make_routes(
|
||||
)| { ingest::post_v01((headers, params, body), rres) }
|
||||
}),
|
||||
),
|
||||
)
|
||||
.nest(
|
||||
"/private",
|
||||
Router::new()
|
||||
.nest(
|
||||
"/channel",
|
||||
Router::new().route(
|
||||
"/delete",
|
||||
post({
|
||||
let rres = rres.clone();
|
||||
move |(headers, params, body): (
|
||||
HeaderMap,
|
||||
Query<HashMap<String, String>>,
|
||||
axum::body::Body,
|
||||
)| {
|
||||
delete::delete((headers, params, body), rres)
|
||||
}
|
||||
}),
|
||||
),
|
||||
)
|
||||
.route(
|
||||
"/channel/states",
|
||||
get({
|
||||
let tx = connset_cmd_tx.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| private_channel_states(params, tx)
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
.route(
|
||||
@@ -416,20 +465,6 @@ fn make_routes(
|
||||
|Query(params): Query<HashMap<String, String>>| find_channel(params, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/private/channel/states",
|
||||
get({
|
||||
let tx = connset_cmd_tx.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| private_channel_states(params, tx)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/channel/remove",
|
||||
get({
|
||||
let dcom = dcom.clone();
|
||||
|Query(params): Query<HashMap<String, String>>| channel_remove(params, dcom)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/store_workers_rate",
|
||||
get({
|
||||
|
||||
245
netfetch/src/metrics/delete.rs
Normal file
245
netfetch/src/metrics/delete.rs
Normal file
@@ -0,0 +1,245 @@
|
||||
use super::RoutesResources;
|
||||
use axum::extract::FromRequest;
|
||||
use axum::extract::Query;
|
||||
use axum::handler::Handler;
|
||||
use axum::http::HeaderMap;
|
||||
use axum::Json;
|
||||
use bytes::Bytes;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use core::fmt;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::log::*;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ScalarType;
|
||||
use netpod::TsMs;
|
||||
use netpod::TsNano;
|
||||
use scylla::Session as ScySession;
|
||||
use scywr::config::ScyllaIngestConfig;
|
||||
use scywr::insertqueues::InsertDeques;
|
||||
use scywr::iteminsertqueue::ArrayValue;
|
||||
use scywr::iteminsertqueue::DataValue;
|
||||
use scywr::iteminsertqueue::QueryItem;
|
||||
use scywr::iteminsertqueue::ScalarValue;
|
||||
use scywr::scylla;
|
||||
use scywr::scylla::prepared_statement::PreparedStatement;
|
||||
use serde::Deserialize;
|
||||
use series::SeriesId;
|
||||
use serieswriter::writer::SeriesWriter;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use streams::framed_bytes::FramedBytesStream;
|
||||
use taskrun::tokio::time::timeout;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! debug_cql {
|
||||
($($arg:tt)*) => {
|
||||
if true {
|
||||
debug!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Logic,
|
||||
MissingRetentionTime,
|
||||
MissingSeriesId,
|
||||
MissingScalarType,
|
||||
MissingBegDate,
|
||||
MissingEndDate,
|
||||
ScyllaTransport(#[from] scylla::transport::errors::NewSessionError),
|
||||
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
|
||||
ScyllaRowType(#[from] scylla::transport::query_result::RowsExpectedError),
|
||||
ScyllaRowError(#[from] scylla::cql_to_rust::FromRowError),
|
||||
InvalidTimestamp,
|
||||
}
|
||||
|
||||
pub async fn delete(
|
||||
(headers, Query(params), body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body),
|
||||
rres: Arc<RoutesResources>,
|
||||
) -> Json<serde_json::Value> {
|
||||
match delete_try(headers, params, body, rres).await {
|
||||
Ok(k) => k,
|
||||
Err(e) => Json(serde_json::json!({
|
||||
"error": e.to_string(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
fn st_to_ns(v: DateTime<Utc>) -> Result<TsNano, Error> {
|
||||
let sec = v.timestamp();
|
||||
if sec < 0 {
|
||||
Err(Error::InvalidTimestamp)
|
||||
} else if sec > 18446744073 {
|
||||
Err(Error::InvalidTimestamp)
|
||||
} else {
|
||||
let w = 1000000000 * sec as u64 + v.timestamp_subsec_nanos() as u64;
|
||||
Ok(TsNano::from_ns(w))
|
||||
}
|
||||
}
|
||||
|
||||
// select * from sf_lt.lt_account_00 where token(part, ts) > -100000000 and series = 8554946496751499549 allow filtering
|
||||
|
||||
async fn delete_try(
|
||||
headers: HeaderMap,
|
||||
params: HashMap<String, String>,
|
||||
body: axum::body::Body,
|
||||
rres: Arc<RoutesResources>,
|
||||
) -> Result<Json<serde_json::Value>, Error> {
|
||||
let rt: RetentionTime = params
|
||||
.get("retentionTime")
|
||||
.ok_or(Error::MissingRetentionTime)
|
||||
.and_then(|x| x.parse().map_err(|_| Error::MissingRetentionTime))?;
|
||||
let series = params
|
||||
.get("series")
|
||||
.ok_or(Error::MissingSeriesId)
|
||||
.and_then(|x| x.parse().map_err(|_| Error::MissingSeriesId))
|
||||
.map(SeriesId::new)?;
|
||||
let beg: DateTime<Utc> = params
|
||||
.get("begDate")
|
||||
.ok_or(Error::MissingBegDate)
|
||||
.and_then(|x| x.parse().map_err(|_| Error::MissingBegDate))?;
|
||||
let end: DateTime<Utc> = params
|
||||
.get("endDate")
|
||||
.ok_or(Error::MissingEndDate)
|
||||
.and_then(|x| x.parse().map_err(|_| Error::MissingEndDate))?;
|
||||
let scalar_type: ScalarType = params
|
||||
.get("scalarType")
|
||||
.ok_or(Error::MissingScalarType)
|
||||
.and_then(|x| ScalarType::from_variant_str(x).map_err(|_| Error::MissingScalarType))?;
|
||||
debug_cql!("delete params {rt:?} {series:?} {beg:?} {end:?}");
|
||||
let beg = st_to_ns(beg)?;
|
||||
let end = st_to_ns(end)?;
|
||||
let scyconf = &rres.scyconf_st;
|
||||
let scy = scy_connect(scyconf).await?;
|
||||
let qu = {
|
||||
let cql = format!(
|
||||
concat!("select ts_msp from {}.{}ts_msp where series = ?"),
|
||||
scyconf.keyspace(),
|
||||
rt.table_prefix(),
|
||||
);
|
||||
scy.prepare(scylla::query::Query::new(cql).with_page_size(4)).await?
|
||||
};
|
||||
let qu_delete_val = {
|
||||
let _cql = format!(
|
||||
concat!(
|
||||
"select ts_lsp from {}.{}events_scalar_{}",
|
||||
" where series = ? and ts_msp = ?",
|
||||
" and ts_lsp >= ? and ts_lsp < ?",
|
||||
),
|
||||
scyconf.keyspace(),
|
||||
rt.table_prefix(),
|
||||
scalar_type.to_scylla_table_name_id(),
|
||||
);
|
||||
let cql = format!(
|
||||
concat!(
|
||||
"delete from {}.{}events_scalar_{}",
|
||||
" where series = ? and ts_msp = ?",
|
||||
" and ts_lsp >= ? and ts_lsp < ?",
|
||||
),
|
||||
scyconf.keyspace(),
|
||||
rt.table_prefix(),
|
||||
scalar_type.to_scylla_table_name_id(),
|
||||
);
|
||||
scy.prepare(scylla::query::Query::new(cql).with_page_size(100)).await?
|
||||
};
|
||||
let mut pst = None;
|
||||
let mut i = 0;
|
||||
loop {
|
||||
// debug_cql!("query iteration {i}");
|
||||
let z = scy.execute_paged(&qu, (series.to_i64(),), pst).await?;
|
||||
pst = z.paging_state.clone();
|
||||
for x in z.rows_typed::<(i64,)>()? {
|
||||
let (msp,) = x?;
|
||||
let msp = TsMs::from_ms_u64(msp as _);
|
||||
let msp_ns = msp.ns_u64();
|
||||
delete_val(series.clone(), msp, beg, end, &qu_delete_val, &scy).await?;
|
||||
}
|
||||
if pst.is_none() {
|
||||
debug_cql!("last page");
|
||||
break;
|
||||
}
|
||||
i += 1;
|
||||
if false {
|
||||
if i > 20 {
|
||||
debug_cql!("loop limit");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Json(serde_json::Value::Null))
|
||||
}
|
||||
|
||||
async fn delete_val(
|
||||
series: SeriesId,
|
||||
msp: TsMs,
|
||||
beg: TsNano,
|
||||
end: TsNano,
|
||||
qu_delete_val: &PreparedStatement,
|
||||
scy: &ScySession,
|
||||
) -> Result<(), Error> {
|
||||
let msp_ns = msp.ns_u64();
|
||||
if msp_ns >= end.ns() {
|
||||
// debug_cql!(" return early msp {msp} after range");
|
||||
return Ok(());
|
||||
}
|
||||
let r1 = if msp_ns >= beg.ns() { 0 } else { beg.ns() - msp_ns };
|
||||
let r2 = end.ns() - msp_ns;
|
||||
let o0 = DateTime::from_timestamp_millis((msp.ms() + 0 / 1000000) as i64).unwrap();
|
||||
let o1 = DateTime::from_timestamp_millis((msp.ms() + r1 / 1000000) as i64).unwrap();
|
||||
let o2 = DateTime::from_timestamp_millis((msp.ms() + r2 / 1000000) as i64).unwrap();
|
||||
debug_cql!(" sub query {o0:?} {o1:?} {o2:?}");
|
||||
let mut pst = None;
|
||||
let mut i = 0;
|
||||
loop {
|
||||
// debug_cql!(" sub query iteration {i}");
|
||||
let params = (series.to_i64(), msp.ms() as i64, r1 as i64, r2 as i64);
|
||||
let z = scy.execute_paged(&qu_delete_val, params, pst).await?;
|
||||
pst = z.paging_state.clone();
|
||||
if z.rows_num().is_ok() {
|
||||
for (i, x) in z.rows_typed::<(i64,)>()?.enumerate() {
|
||||
let (lsp,) = x?;
|
||||
if false && i < 4 {
|
||||
debug_cql!(" lsp {lsp}");
|
||||
}
|
||||
}
|
||||
}
|
||||
if pst.is_none() {
|
||||
// debug_cql!(" last page");
|
||||
break;
|
||||
}
|
||||
i += 1;
|
||||
if false {
|
||||
if i > 20 {
|
||||
debug_cql!(" loop limit");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn scy_connect(scyconf: &ScyllaIngestConfig) -> Result<Arc<ScySession>, Error> {
|
||||
use scylla::execution_profile::ExecutionProfileBuilder;
|
||||
use scylla::statement::Consistency;
|
||||
use scylla::transport::session::PoolSize;
|
||||
use scylla::transport::session_builder::GenericSessionBuilder;
|
||||
let profile = ExecutionProfileBuilder::default()
|
||||
.consistency(Consistency::Quorum)
|
||||
.build()
|
||||
.into_handle();
|
||||
let scy = GenericSessionBuilder::new()
|
||||
.pool_size(PoolSize::default())
|
||||
.known_nodes(scyconf.hosts())
|
||||
.default_execution_profile_handle(profile)
|
||||
.build()
|
||||
.await?;
|
||||
let scy = Arc::new(scy);
|
||||
Ok(scy)
|
||||
}
|
||||
@@ -14,6 +14,7 @@ use items_2::eventsdim0::EventsDim0NoPulse;
|
||||
use items_2::eventsdim1::EventsDim1;
|
||||
use items_2::eventsdim1::EventsDim1NoPulse;
|
||||
use netpod::log::*;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
@@ -118,13 +119,16 @@ async fn post_v01_try(
|
||||
let s = params.get("scalarType").ok_or(Error::MissingScalarType)?;
|
||||
let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?;
|
||||
let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?;
|
||||
debug_setup!("parsed scalar_type {scalar_type:?}");
|
||||
debug_setup!("parsed shape {shape:?}");
|
||||
let rt: RetentionTime = params
|
||||
.get("retentionTime")
|
||||
.and_then(|x| x.parse().ok())
|
||||
.unwrap_or(RetentionTime::Short);
|
||||
debug_setup!(
|
||||
"establishing series writer for {:?} {:?} {:?}",
|
||||
"establishing series writer for {:?} {:?} {:?} {:?}",
|
||||
channel,
|
||||
scalar_type,
|
||||
shape
|
||||
shape,
|
||||
rt
|
||||
);
|
||||
let mut writer =
|
||||
SeriesWriter::establish(worker_tx, backend, channel, scalar_type.clone(), shape.clone(), stnow).await?;
|
||||
@@ -137,7 +141,7 @@ async fn post_v01_try(
|
||||
let x = match x {
|
||||
Ok(x) => x,
|
||||
Err(_) => {
|
||||
tick_writers(&mut writer, &mut iqdqs)?;
|
||||
tick_writers(&mut writer, &mut iqdqs, rt.clone())?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -149,7 +153,7 @@ async fn post_v01_try(
|
||||
}
|
||||
};
|
||||
trace_input!("got frame len {}", frame.len());
|
||||
let deque = &mut iqdqs.st_rf3_rx;
|
||||
let deque = iqdqs.deque(rt.clone());
|
||||
match &shape {
|
||||
Shape::Scalar => match &scalar_type {
|
||||
ScalarType::U8 => {
|
||||
@@ -240,14 +244,14 @@ async fn post_v01_try(
|
||||
trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary());
|
||||
iqtx.send_all(&mut iqdqs).await?;
|
||||
trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary());
|
||||
tick_writers(&mut writer, &mut iqdqs)?;
|
||||
tick_writers(&mut writer, &mut iqdqs, rt.clone())?;
|
||||
trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary());
|
||||
}
|
||||
|
||||
trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary());
|
||||
iqtx.send_all(&mut iqdqs).await?;
|
||||
trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary());
|
||||
finish_writers(&mut writer, &mut iqdqs)?;
|
||||
finish_writers(&mut writer, &mut iqdqs, rt.clone())?;
|
||||
trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary());
|
||||
|
||||
let ret = Json(serde_json::json!({}));
|
||||
@@ -306,12 +310,12 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn tick_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> {
|
||||
writer.tick(&mut deque.st_rf3_rx)?;
|
||||
fn tick_writers(writer: &mut SeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> {
|
||||
writer.tick(deques.deque(rt))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finish_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> {
|
||||
writer.tick(&mut deque.st_rf3_rx)?;
|
||||
fn finish_writers(writer: &mut SeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> {
|
||||
writer.tick(deques.deque(rt))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ use crate::ca::connset::ChannelStatusesRequest;
|
||||
use crate::ca::connset::ConnSetCmd;
|
||||
use crate::conf::ChannelConfig;
|
||||
use async_channel::Sender;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
@@ -11,9 +13,42 @@ use std::time::SystemTime;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ChannelStates {
|
||||
running_since: DateTime<Utc>,
|
||||
// #[serde(with = "humantime_serde")]
|
||||
// running_since_2: SystemTime,
|
||||
channels: BTreeMap<String, ChannelState>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct StorageUsage {
|
||||
count: u64,
|
||||
bytes: u64,
|
||||
}
|
||||
|
||||
impl StorageUsage {
|
||||
pub fn new() -> Self {
|
||||
Self { count: 0, bytes: 0 }
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.count = 0;
|
||||
self.bytes = 0;
|
||||
}
|
||||
|
||||
pub fn push_written(&mut self, payload_len: u32) {
|
||||
self.count += 1;
|
||||
self.bytes += 16 + payload_len as u64;
|
||||
}
|
||||
|
||||
pub fn count(&self) -> u64 {
|
||||
self.count
|
||||
}
|
||||
|
||||
pub fn bytes(&self) -> u64 {
|
||||
self.bytes
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ChannelState {
|
||||
ioc_address: Option<SocketAddr>,
|
||||
@@ -50,11 +85,7 @@ enum ConnectionState {
|
||||
// BTreeMap<String, ChannelState>
|
||||
pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSetEvent>) -> axum::Json<ChannelStates> {
|
||||
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
|
||||
let limit = params
|
||||
.get("limit")
|
||||
.map(|x| x.parse().ok())
|
||||
.unwrap_or(None)
|
||||
.unwrap_or(40);
|
||||
let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(40);
|
||||
let (tx2, rx2) = async_channel::bounded(1);
|
||||
let req = ChannelStatusesRequest { name, limit, tx: tx2 };
|
||||
let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req));
|
||||
@@ -62,6 +93,7 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
|
||||
tx.send(item).await.unwrap();
|
||||
let res = rx2.recv().await.unwrap();
|
||||
let mut states = ChannelStates {
|
||||
running_since: Utc::now(),
|
||||
channels: BTreeMap::new(),
|
||||
};
|
||||
for (k, st1) in res.channels_ca_conn_set {
|
||||
|
||||
@@ -39,7 +39,7 @@ Each CBOR object must contain the timestamps (integer nanoseconds) and the value
|
||||
|
||||
The `shape` URL parameter indicates whether the data is scalar or 1-dimensional,
|
||||
for example `shape=[]` indicates a scalar and `shape=[4096]` indicates an array
|
||||
with 4096 elements.
|
||||
with 4096 elements. Without a `shape` parameter the default is scalar.
|
||||
|
||||
The shape nowadays only distinguishes between scalar and 1-dimensional, but the actual length of
|
||||
the array dimension may vary from event to event and is therefore not meaningful.
|
||||
|
||||
@@ -6,12 +6,12 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
futures-util = "0.3.28"
|
||||
async-channel = "2.0.0"
|
||||
async-channel = "2.3.1"
|
||||
scylla = "0.11.0"
|
||||
smallvec = "1.11.0"
|
||||
pin-project = "1.1.5"
|
||||
stackfuture = "0.3.0"
|
||||
bytes = "1.5.0"
|
||||
bytes = "1.6.0"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
log = { path = "../log" }
|
||||
stats = { path = "../stats" }
|
||||
|
||||
1
scywr/src/delete.rs
Normal file
1
scywr/src/delete.rs
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::iteminsertqueue::Accounting;
|
||||
use crate::iteminsertqueue::QueryItem;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use async_channel::Receiver;
|
||||
@@ -140,9 +141,24 @@ impl InsertDeques {
|
||||
// Should be used only for connection and channel status items.
|
||||
// It encapsulates the decision to which queue(s) we want to send these kind of items.
|
||||
pub fn emit_status_item(&mut self, item: QueryItem) -> Result<(), Error> {
|
||||
self.lt_rf3_rx.push_back(item);
|
||||
self.deque(RetentionTime::Long).push_back(item);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Should be used only for connection and channel status items.
|
||||
// It encapsulates the decision to which queue(s) we want to send these kind of items.
|
||||
pub fn emit_accounting_item(&mut self, rt: RetentionTime, item: Accounting) -> Result<(), Error> {
|
||||
self.deque(rt).push_back(QueryItem::Accounting(item));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn deque(&mut self, rt: RetentionTime) -> &mut VecDeque<QueryItem> {
|
||||
match rt {
|
||||
RetentionTime::Short => &mut self.st_rf3_rx,
|
||||
RetentionTime::Medium => &mut self.mt_rf3_rx,
|
||||
RetentionTime::Long => &mut self.lt_rf3_rx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InsertDequesSummary<'a> {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod access;
|
||||
pub mod config;
|
||||
pub mod delete;
|
||||
pub mod err;
|
||||
pub mod fut;
|
||||
pub mod futbatch;
|
||||
|
||||
Reference in New Issue
Block a user