Better accounting data retrieve

This commit is contained in:
Dominik Werder
2024-06-27 11:03:57 +02:00
parent 8e286b455d
commit 21259e6591
14 changed files with 456 additions and 220 deletions

View File

@@ -19,13 +19,13 @@ pub struct ChannelInfo {
pub kind: u16,
}
pub async fn info_for_series_ids(series_ids: &[u64], pg: &Client) -> Result<Vec<(u64, Option<ChannelInfo>)>, Error> {
pub async fn info_for_series_ids(series_ids: &[u64], pg: &Client) -> Result<Vec<Option<ChannelInfo>>, Error> {
let (ord, seriess) = series_ids
.iter()
.enumerate()
.fold((Vec::new(), Vec::new()), |mut a, x| {
a.0.push(x.0 as i32);
a.1.push(*x.1 as i64);
.fold((Vec::new(), Vec::new()), |mut a, (i, &series)| {
a.0.push(i as i32);
a.1.push(series as i64);
a
});
let sql = concat!(
@@ -62,9 +62,9 @@ pub async fn info_for_series_ids(series_ids: &[u64], pg: &Client) -> Result<Vec<
shape,
kind,
};
ret.push((series, Some(e)));
ret.push(Some(e));
} else {
ret.push((series, None));
ret.push(None);
}
}
Ok(ret)

View File

@@ -29,6 +29,10 @@ impl err::ToErr for Error {
enum Job {
ChConfBestMatchingNameRange(String, String, NanoRange, Sender<Result<ChConf, Error>>),
ChConfForSeries(String, u64, Sender<Result<ChConf, Error>>),
InfoForSeriesIds(
Vec<u64>,
Sender<Result<Vec<Option<crate::channelinfo::ChannelInfo>>, crate::channelinfo::Error>>,
),
}
#[derive(Debug, Clone)]
@@ -59,6 +63,16 @@ impl PgQueue {
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
}
pub async fn info_for_series_ids(
&self,
series_ids: Vec<u64>,
) -> Result<Receiver<Result<Vec<Option<crate::channelinfo::ChannelInfo>>, crate::channelinfo::Error>>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::InfoForSeriesIds(series_ids, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
}
}
#[derive(Debug)]
@@ -106,6 +120,12 @@ impl PgWorker {
// TODO count for stats
}
}
Job::InfoForSeriesIds(ids, tx) => {
let res = crate::channelinfo::info_for_series_ids(&ids, &self.pg).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
}
}
}

View File

@@ -2,6 +2,8 @@ use crate::bodystream::response;
use crate::err::Error;
use crate::requests::accepts_json_or_all;
use crate::ReqCtx;
use crate::ServiceSharedResources;
use dbconn::worker::PgQueue;
use err::ToPublicError;
use futures_util::StreamExt;
use http::Method;
@@ -17,30 +19,122 @@ use items_0::Extendable;
use items_2::accounting::AccountingEvents;
use netpod::log::*;
use netpod::req_uri_to_url;
use netpod::ttl::RetentionTime;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::Shape;
use netpod::TsMs;
use query::api4::AccountingIngestedBytesQuery;
use query::api4::AccountingToplistQuery;
use scyllaconn::accounting::toplist::UsageData;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
pub struct AccountingIngestedBytes {}
#[derive(Debug, Serialize, Deserialize)]
pub struct Toplist {
dim0: AccountedIngested,
dim1: AccountedIngested,
infos_count_total: usize,
infos_missing_count: usize,
top1_usage_len: usize,
scalar_count: usize,
wave_count: usize,
found: usize,
mismatch_count: usize,
}
impl AccountingIngestedBytes {
impl Toplist {
fn new() -> Self {
Self {
dim0: AccountedIngested::new(),
dim1: AccountedIngested::new(),
infos_count_total: 0,
infos_missing_count: 0,
top1_usage_len: 0,
scalar_count: 0,
wave_count: 0,
found: 0,
mismatch_count: 0,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AccountedIngested {
names: Vec<String>,
counts: Vec<u64>,
bytes: Vec<u64>,
}
impl AccountedIngested {
fn new() -> Self {
Self {
names: Vec::new(),
counts: Vec::new(),
bytes: Vec::new(),
}
}
fn push(&mut self, name: String, counts: u64, bytes: u64) {
self.names.push(name);
self.counts.push(counts);
self.bytes.push(bytes);
}
fn sort_by_counts(&mut self) {
let mut tmp: Vec<_> = self
.counts
.iter()
.map(|&x| x)
.enumerate()
.map(|(i, x)| (x, i))
.collect();
tmp.sort_unstable();
let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect();
self.reorder_by_index_list(&tmp);
}
fn sort_by_bytes(&mut self) {
let mut tmp: Vec<_> = self.bytes.iter().map(|&x| x).enumerate().map(|(i, x)| (x, i)).collect();
tmp.sort_unstable();
let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect();
self.reorder_by_index_list(&tmp);
}
fn reorder_by_index_list(&mut self, tmp: &[usize]) {
self.names = tmp.iter().map(|&x| self.names[x].clone()).collect();
self.counts = tmp.iter().map(|&x| self.counts[x]).collect();
self.bytes = tmp.iter().map(|&x| self.bytes[x]).collect();
}
fn truncate(&mut self, len: usize) {
self.names.truncate(len);
self.counts.truncate(len);
self.bytes.truncate(len);
}
}
pub struct AccountingIngested {}
impl AccountingIngested {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with("/api/4/accounting/ingested/bytes") {
if req.uri().path().starts_with("/api/4/accounting/ingested") {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
pub async fn handle(
&self,
req: Requ,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
if accepts_json_or_all(req.headers()) {
match self.handle_get(req, ctx, ncc).await {
match self.handle_get(req, ctx, shared_res, ncc).await {
Ok(x) => Ok(x),
Err(e) => {
error!("{e}");
@@ -57,64 +151,44 @@ impl AccountingIngestedBytes {
}
}
async fn handle_get(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
let q = AccountingIngestedBytesQuery::from_url(&url)?;
let res = self.fetch_data(q, ctx, ncc).await?;
let body = ToJsonBody::from(&res).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
async fn fetch_data(
async fn handle_get(
&self,
q: AccountingIngestedBytesQuery,
_ctx: &ReqCtx,
req: Requ,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<AccountingEvents, Error> {
let scyco = ncc
.node_config
.cluster
.scylla_st()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
let mut stream = scyllaconn::accounting::totals::AccountingStreamScylla::new(q.range().try_into()?, scy);
let mut ret = AccountingEvents::empty();
while let Some(item) = stream.next().await {
let mut item = item?;
ret.extend_from(&mut item);
) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
let qu = AccountingToplistQuery::from_url(&url)?;
let res = fetch_data(qu.rt(), qu.ts().to_ts_ms(), ctx, shared_res, ncc).await?;
let mut ret = AccountedIngested::new();
for e in res.dim0.names {
ret.names.push(e)
}
Ok(ret)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Toplist {
dim0: Vec<(String, u64, u64)>,
dim1: Vec<(String, u64, u64)>,
infos_count_total: usize,
infos_missing_count: usize,
top1_usage_len: usize,
scalar_count: usize,
wave_count: usize,
found: usize,
incomplete_count: usize,
mismatch_count: usize,
}
impl Toplist {
fn new() -> Self {
Self {
dim0: Vec::new(),
dim1: Vec::new(),
infos_count_total: 0,
infos_missing_count: 0,
top1_usage_len: 0,
scalar_count: 0,
wave_count: 0,
found: 0,
incomplete_count: 0,
mismatch_count: 0,
for e in res.dim0.counts {
ret.counts.push(e)
}
for e in res.dim0.bytes {
ret.bytes.push(e)
}
for e in res.dim1.names {
ret.names.push(e)
}
for e in res.dim1.counts {
ret.counts.push(e)
}
for e in res.dim1.bytes {
ret.bytes.push(e)
}
if let Some(sort) = qu.sort() {
if sort == "counts" {
// ret.sort_by_counts();
} else if sort == "bytes" {
// ret.sort_by_bytes();
}
}
let body = ToJsonBody::from(&ret).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
}
@@ -129,10 +203,16 @@ impl AccountingToplistCounts {
}
}
pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
pub async fn handle(
&self,
req: Requ,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
if accepts_json_or_all(req.headers()) {
match self.handle_get(req, ctx, ncc).await {
match self.handle_get(req, ctx, shared_res, ncc).await {
Ok(x) => Ok(x),
Err(e) => {
error!("{e}");
@@ -149,99 +229,102 @@ impl AccountingToplistCounts {
}
}
async fn handle_get(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
async fn handle_get(
&self,
req: Requ,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
let qu = AccountingToplistQuery::from_url(&url)?;
let res = self.fetch_data(qu, ctx, ncc).await?;
let res = fetch_data(qu.rt(), qu.ts().to_ts_ms(), ctx, shared_res, ncc).await?;
let body = ToJsonBody::from(&res).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
}
async fn fetch_data(
&self,
qu: AccountingToplistQuery,
_ctx: &ReqCtx,
ncc: &NodeConfigCached,
) -> Result<Toplist, Error> {
let list_len_max = qu.limit() as usize;
// TODO assumes that accounting data is in the LT keyspace
let scyco = ncc
.node_config
.cluster
.scylla_lt()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no lt scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
let pgconf = &ncc.node_config.cluster.database;
let (pg, pgjh) = dbconn::create_connection(&pgconf).await?;
let mut top1 = scyllaconn::accounting::toplist::read_ts(qu.ts().ns(), scy).await?;
top1.sort_by_counts();
let mut ret = Toplist::new();
let top1_usage = top1.usage();
ret.top1_usage_len = top1_usage.len();
let usage_map_0: BTreeMap<u64, (u64, u64)> = top1_usage.iter().map(|x| (x.0, (x.1, x.2))).collect();
let mut usage_it = usage_map_0.iter();
loop {
let mut series_ids = Vec::new();
let mut usages = Vec::new();
while let Some(u) = usage_it.next() {
series_ids.push(*u.0);
usages.push(u.1.clone());
if series_ids.len() >= 200 {
break;
}
}
if series_ids.len() == 0 {
break;
}
let infos = dbconn::channelinfo::info_for_series_ids(&series_ids, &pg)
.await
.map_err(Error::from_to_string)?;
for (_series, info_res) in &infos {
if let Some(info) = info_res {
match &info.shape {
Shape::Scalar => {
ret.scalar_count += 1;
}
Shape::Wave(_) => {
ret.wave_count += 1;
}
_ => {}
}
}
}
if usages.len() > infos.len() {
ret.incomplete_count += usages.len() - infos.len();
}
if infos.len() > usages.len() {
ret.incomplete_count += infos.len() - usages.len();
}
for ((series2, info_res), usage) in infos.into_iter().zip(usages.into_iter()) {
if let Some(info) = info_res {
if series2 != info.series {
ret.mismatch_count += 1;
}
ret.infos_count_total += 1;
// if info.name == "SINSB04-RMOD:PULSE-I-WF" {
// ret.found += 1;
// }
match &info.shape {
Shape::Scalar => {
ret.dim0.push((info.name, usage.0, usage.1));
}
Shape::Wave(_) => {
ret.dim1.push((info.name, usage.0, usage.1));
}
Shape::Image(_, _) => {}
}
} else {
ret.infos_missing_count += 1;
}
}
}
ret.dim0.sort_by_cached_key(|x| u64::MAX - x.1);
ret.dim1.sort_by_cached_key(|x| u64::MAX - x.1);
ret.dim0.truncate(list_len_max);
ret.dim1.truncate(list_len_max);
async fn fetch_data(
rt: RetentionTime,
ts: TsMs,
_ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
_ncc: &NodeConfigCached,
) -> Result<Toplist, Error> {
let list_len_max = 10000000;
if let Some(scyqu) = &shared_res.scyqueue {
let x = scyqu
.accounting_read_ts(rt, ts)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut ret = resolve_usages(x, &shared_res.pgqueue).await?;
// ret.dim0.sort_by_bytes();
// ret.dim1.sort_by_bytes();
// ret.dim0.truncate(list_len_max);
// ret.dim1.truncate(list_len_max);
Ok(ret)
} else {
Err(Error::with_public_msg_no_trace("not a scylla backend"))
}
}
async fn resolve_usages(usage: UsageData, pgqu: &PgQueue) -> Result<Toplist, Error> {
let mut ret = Toplist::new();
let mut series_id_it = usage.series().iter().map(|&x| x);
let mut usage_skip = 0;
loop {
let mut series_ids = Vec::new();
while let Some(u) = series_id_it.next() {
series_ids.push(u);
if series_ids.len() >= 1000 {
break;
}
}
if series_ids.len() == 0 {
break;
}
let infos = pgqu
.info_for_series_ids(series_ids.clone())
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?
.recv()
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
if infos.len() != series_ids.len() {
return Err(Error::with_msg_no_trace("database result len mismatch"));
}
let nn = series_ids.len();
for ((series, info_res), (counts, bytes)) in series_ids.into_iter().zip(infos.into_iter()).zip(
usage
.counts()
.iter()
.skip(usage_skip)
.map(|&x| x)
.zip(usage.bytes().iter().skip(usage_skip).map(|&x| x)),
) {
if let Some(info) = info_res {
if series != info.series {
return Err(Error::with_msg_no_trace("lookup mismatch"));
}
ret.infos_count_total += 1;
match &info.shape {
Shape::Scalar => {
ret.scalar_count += 1;
ret.dim0.push(info.name, counts, bytes);
}
Shape::Wave(_) => {
ret.wave_count += 1;
ret.dim1.push(info.name, counts, bytes);
}
Shape::Image(_, _) => {}
}
} else {
ret.infos_missing_count += 1;
ret.dim0.push("UNRESOLVEDSERIES".into(), counts, bytes);
}
}
usage_skip += nn;
}
Ok(ret)
}

View File

@@ -376,10 +376,10 @@ async fn http_service_inner(
Ok(h.handle(req, &shared_res, &node_config).await?)
} else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = api4::accounting::AccountingIngested::handler(&req) {
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if let Some(h) = api4::accounting::AccountingToplistCounts::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
} else if let Some(h) = api4::accounting::AccountingIngestedBytes::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if path == "/api/4/prebinned" {
if req.method() == Method::GET {
Ok(prebinned(req, ctx, &node_config).await?)

View File

@@ -197,6 +197,8 @@ async fn proxy_http_service_inner(
h.handle(req, ctx, &proxy_config).await
} else if let Some(h) = api4::events::EventsHandler::handler(&req) {
h.handle(req, ctx, &proxy_config).await
} else if path == "/api/4/accounting/ingested" {
Ok(proxy_backend_query::<MapQuery>(req, ctx, proxy_config).await?)
} else if path == "/api/4/accounting/toplist/counts" {
Ok(proxy_backend_query::<MapQuery>(req, ctx, proxy_config).await?)
} else if path == "/api/4/status/connection/events" {
@@ -209,8 +211,6 @@ async fn proxy_http_service_inner(
Ok(proxy_backend_query::<MapPulseQuery>(req, ctx, proxy_config).await?)
} else if path == "/api/4/binned" {
Ok(proxy_backend_query::<BinnedQuery>(req, ctx, proxy_config).await?)
} else if let Some(_) = crate::api4::accounting::AccountingIngestedBytes::handler(&req) {
Ok(proxy_backend_query::<query::api4::AccountingIngestedBytesQuery>(req, ctx, proxy_config).await?)
} else if path == "/api/4/channel/config" {
Ok(proxy_backend_query::<ChannelConfigQuery>(req, ctx, proxy_config).await?)
} else if path.starts_with("/api/4/test/http/204") {

View File

@@ -9,6 +9,8 @@ use httpclient::Requ;
use httpclient::StreamResponse;
use netpod::ProxyConfig;
use netpod::ReqCtx;
use netpod::ServiceVersion;
use std::collections::BTreeMap;
pub struct BackendListHandler {}
@@ -21,21 +23,19 @@ impl BackendListHandler {
}
}
pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, _node_config: &ProxyConfig) -> Result<StreamResponse, Error> {
pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, cfg: &ProxyConfig) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
if accepts_json_or_all(req.headers()) {
let mut list = Vec::new();
if let Some(g) = &cfg.announce_backends {
for j in g {
let mut map = BTreeMap::new();
map.insert("name", j.clone());
list.push(map);
}
}
let res = serde_json::json!({
"backends_available": [
{
"name": "sf-databuffer",
},
{
"name": "sf-imagebuffer",
},
{
"name": "sf-archiver",
},
]
"backends_available": list,
});
let body = serde_json::to_string(&res)?;
Ok(response(StatusCode::OK).body(body_string(body))?)

View File

@@ -287,7 +287,7 @@ impl ScalarType {
BOOL => "bool",
STRING => "string",
Enum => "enum",
ChannelStatus => "ChannelStatus",
ChannelStatus => "channelstatus",
}
}
@@ -307,7 +307,7 @@ impl ScalarType {
"bool" => BOOL,
"string" => STRING,
"enum" => Enum,
"ChannelStatus" => ChannelStatus,
"channelstatus" => ChannelStatus,
_ => {
return Err(Error::with_msg_no_trace(format!(
"from_bsread_str can not understand bsread {:?}",
@@ -469,6 +469,10 @@ impl ScalarType {
}
}
pub fn to_scylla_table_name_id(&self) -> &'static str {
self.to_variant_str()
}
pub fn to_scylla_i32(&self) -> i32 {
self.index() as i32
}
@@ -2942,6 +2946,7 @@ pub struct ProxyConfig {
pub port: u16,
pub backends: Vec<ProxyBackend>,
pub status_subs: Vec<StatusSub>,
pub announce_backends: Option<Vec<String>>,
}
pub trait HasBackend {

View File

@@ -8,6 +8,7 @@ use err::Error;
use netpod::get_url_query_pairs;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::ttl::RetentionTime;
use netpod::AppendToUrl;
use netpod::FromUrl;
use netpod::HasBackend;
@@ -82,12 +83,18 @@ impl AppendToUrl for AccountingIngestedBytesQuery {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccountingToplistQuery {
rt: RetentionTime,
backend: String,
ts: TsNano,
limit: u32,
sort: Option<String>,
}
impl AccountingToplistQuery {
pub fn rt(&self) -> RetentionTime {
self.rt.clone()
}
pub fn ts(&self) -> TsNano {
self.ts.clone()
}
@@ -95,6 +102,10 @@ impl AccountingToplistQuery {
pub fn limit(&self) -> u32 {
self.limit
}
pub fn sort(&self) -> Option<&str> {
self.sort.as_ref().map(|x| x.as_str())
}
}
impl HasBackend for AccountingToplistQuery {
@@ -135,12 +146,20 @@ impl FromUrl for AccountingToplistQuery {
Ok::<_, Error>(TsNano::from_ns(w.to_nanos()))
};
let ret = Self {
rt: pairs
.get("retentionTime")
.ok_or_else(|| Error::with_public_msg_no_trace("missing retentionTime"))
.and_then(|x| {
x.parse()
.map_err(|_| Error::with_public_msg_no_trace("missing retentionTime"))
})?,
backend: pairs
.get("backend")
.ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))?
.to_string(),
ts: fn1(pairs)?,
limit: pairs.get("limit").map_or(None, |x| x.parse().ok()).unwrap_or(20),
sort: pairs.get("sort").map(ToString::to_string),
};
Ok(ret)
}

View File

@@ -4,6 +4,7 @@ use netpod::get_url_query_pairs;
use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::range::evrange::SeriesRange;
use netpod::ttl::RetentionTime;
use netpod::AppendToUrl;
use netpod::ByteSize;
use netpod::FromUrl;
@@ -40,6 +41,10 @@ pub struct BinnedQuery {
pub merger_out_len_max: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
test_do_wasm: Option<String>,
#[serde(default)]
log_level: String,
#[serde(default)]
use_rt: Option<RetentionTime>,
}
impl BinnedQuery {
@@ -56,6 +61,8 @@ impl BinnedQuery {
timeout: None,
merger_out_len_max: None,
test_do_wasm: None,
log_level: String::new(),
use_rt: None,
}
}
@@ -150,8 +157,11 @@ impl BinnedQuery {
}
pub fn log_level(&self) -> &str {
// TODO take from query
""
&self.log_level
}
pub fn use_rt(&self) -> Option<RetentionTime> {
self.use_rt.clone()
}
}
@@ -211,6 +221,12 @@ impl FromUrl for BinnedQuery {
.get("mergerOutLenMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)),
log_level: pairs.get("log_level").map_or(String::new(), String::from),
use_rt: pairs.get("useRt").map_or(Ok(None), |k| {
k.parse()
.map(Some)
.map_err(|_| Error::with_public_msg_no_trace(format!("can not parse useRt: {}", k)))
})?,
};
debug!("BinnedQuery::from_url {:?}", ret);
Ok(ret)
@@ -248,5 +264,11 @@ impl AppendToUrl for BinnedQuery {
if let Some(x) = &self.test_do_wasm {
g.append_pair("testDoWasm", &x);
}
if self.log_level.len() != 0 {
g.append_pair("log_level", &self.log_level);
}
if let Some(x) = self.use_rt.as_ref() {
g.append_pair("useRt", &x.to_string());
}
}
}

View File

@@ -450,7 +450,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings {
// TODO add to query
queue_len_disk_io: None,
create_errors: Vec::new(),
use_rt: None,
use_rt: value.use_rt(),
}
}
}

View File

@@ -1,76 +1,136 @@
use crate::errconv::ErrConv;
use err::Error;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
use netpod::EMIT_ACCOUNTING_SNAP;
use scylla::prepared_statement::PreparedStatement;
use scylla::Session as ScySession;
use std::sync::Arc;
#[derive(Debug, ThisError)]
pub enum Error {
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
UsageDataMalformed,
}
#[derive(Debug)]
pub struct UsageData {
ts: u64,
// (series, count, bytes)
usage: Vec<(u64, u64, u64)>,
ts: TsMs,
series: Vec<u64>,
counts: Vec<u64>,
bytes: Vec<u64>,
}
impl UsageData {
pub fn new(ts: u64) -> Self {
Self { ts, usage: Vec::new() }
pub fn new(ts: TsMs) -> Self {
Self {
ts,
series: Vec::new(),
counts: Vec::new(),
bytes: Vec::new(),
}
}
pub fn ts(&self) -> u64 {
pub fn len(&self) -> usize {
self.series.len()
}
pub fn ts(&self) -> TsMs {
self.ts
}
pub fn usage(&self) -> &[(u64, u64, u64)] {
&self.usage
pub fn series(&self) -> &[u64] {
&self.series
}
pub fn counts(&self) -> &[u64] {
&self.counts
}
pub fn bytes(&self) -> &[u64] {
&self.bytes
}
pub fn sort_by_counts(&mut self) {
self.usage.sort_unstable_by(|a, b| b.1.cmp(&a.1))
let mut tmp: Vec<_> = self
.counts
.iter()
.map(|&x| x)
.enumerate()
.map(|(i, x)| (x, i))
.collect();
tmp.sort_unstable();
let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect();
self.reorder_by_index_list(&tmp);
}
pub fn sort_by_bytes(&mut self) {
self.usage.sort_unstable_by(|a, b| b.2.cmp(&a.2))
let mut tmp: Vec<_> = self.bytes.iter().map(|&x| x).enumerate().map(|(i, x)| (x, i)).collect();
tmp.sort_unstable();
let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect();
self.reorder_by_index_list(&tmp);
}
fn reorder_by_index_list(&mut self, tmp: &[usize]) {
self.series = tmp.iter().map(|&x| self.series[x]).collect();
self.counts = tmp.iter().map(|&x| self.counts[x]).collect();
self.bytes = tmp.iter().map(|&x| self.bytes[x]).collect();
}
fn verify(&self) -> Result<(), Error> {
if self.counts.len() != self.series.len() {
Err(Error::UsageDataMalformed)
} else if self.bytes.len() != self.series.len() {
Err(Error::UsageDataMalformed)
} else {
Ok(())
}
}
}
pub async fn read_ts(ts: u64, scy: Arc<ScySession>) -> Result<UsageData, Error> {
pub async fn read_ts(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession) -> Result<UsageData, Error> {
// TODO toplist::read_ts refactor
info!("TODO toplist::read_ts refactor");
let snap = EMIT_ACCOUNTING_SNAP.ms() / 1000;
info!("ts {ts} snap {snap:?}");
let ts = ts / timeunits::SEC / snap * snap;
let ret = read_ts_inner(ts, scy).await?;
let snap = EMIT_ACCOUNTING_SNAP.ms();
let ts = TsMs::from_ms_u64(ts.ms() / snap * snap);
let ret = read_ts_inner(ks, rt, ts, scy).await?;
Ok(ret)
}
async fn read_ts_inner(ts: u64, scy: Arc<ScySession>) -> Result<UsageData, Error> {
async fn read_ts_inner(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession) -> Result<UsageData, Error> {
type RowType = (i64, i64, i64);
let cql = concat!("select series, count, bytes from lt_account_00 where part = ? and ts = ?");
let qu = prep(cql, scy.clone()).await?;
let cql = format!(
concat!(
"select series, count, bytes",
" from {}.{}account_00",
" where part = ? and ts = ?"
),
ks,
rt.table_prefix()
);
let qu = prep(&cql, scy).await?;
let ts_sec = ts.ms() as i64 / 1000;
let mut ret = UsageData::new(ts);
for part in 0..255_u32 {
let mut res = scy
.execute_iter(qu.clone(), (part as i32, ts as i64))
.await
.err_conv()?
.execute_iter(qu.clone(), (part as i32, ts_sec))
.await?
.into_typed::<RowType>();
while let Some(row) = res.next().await {
let row = row.map_err(Error::from_string)?;
let row = row?;
let series = row.0 as u64;
let count = row.1 as u64;
let bytes = row.2 as u64;
ret.usage.push((series, count, bytes));
ret.series.push(series);
ret.counts.push(count);
ret.bytes.push(bytes);
}
}
ret.verify()?;
Ok(ret)
}
async fn prep(cql: &str, scy: Arc<ScySession>) -> Result<PreparedStatement, Error> {
scy.prepare(cql)
.await
.map_err(|e| Error::with_msg_no_trace(format!("cql error {e}")))
async fn prep(cql: &str, scy: &ScySession) -> Result<PreparedStatement, Error> {
Ok(scy.prepare(cql).await?)
}

View File

@@ -229,17 +229,15 @@ impl Stream for EventsStreamRt {
);
let mut r = items_2::merger::Mergeable::new_empty(&item);
match items_2::merger::Mergeable::find_highest_index_lt(&item, self.ts_seen_max) {
Some(ix) => {
match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, ix)) {
Ok(()) => {}
Err(e) => {
self.state = State::Done;
break Ready(Some(Err(e.into())));
}
Some(ix) => match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, 1 + ix)) {
Ok(()) => {
// TODO count for metrics
}
// self.state = State::Done;
// break Ready(Some(Err(Error::Unordered)));
}
Err(e) => {
self.state = State::Done;
break Ready(Some(Err(e.into())));
}
},
None => {
self.state = State::Done;
break Ready(Some(Err(Error::TruncateLogic)));

View File

@@ -27,6 +27,7 @@ pub enum Error {
ChannelSend,
ChannelRecv,
Join,
Toplist(#[from] crate::accounting::toplist::Error),
}
#[derive(Debug)]
@@ -40,6 +41,11 @@ enum Job {
Sender<Result<VecDeque<TsMs>, Error>>,
),
ReadNextValues(ReadNextValues),
AccountingReadTs(
RetentionTime,
TsMs,
Sender<Result<crate::accounting::toplist::UsageData, crate::accounting::toplist::Error>>,
),
}
struct ReadNextValues {
@@ -98,6 +104,18 @@ impl ScyllaQueue {
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res)
}
pub async fn accounting_read_ts(
&self,
rt: RetentionTime,
ts: TsMs,
) -> Result<crate::accounting::toplist::UsageData, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::AccountingReadTs(rt, ts, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res)
}
}
#[derive(Debug)]
@@ -160,6 +178,17 @@ impl ScyllaWorker {
// TODO count for stats
}
}
Job::AccountingReadTs(rt, ts, tx) => {
let ks = match &rt {
RetentionTime::Short => &self.scyconf_st.keyspace,
RetentionTime::Medium => &self.scyconf_mt.keyspace,
RetentionTime::Long => &self.scyconf_lt.keyspace,
};
let res = crate::accounting::toplist::read_ts(&ks, rt, ts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
}
}
}

View File

@@ -8,7 +8,7 @@ edition = "2021"
path = "src/taskrun.rs"
[dependencies]
tokio = { version = "1.37.0", features = ["full", "tracing", "time"] }
tokio = { version = "1.38.0", features = ["full", "tracing", "time"] }
futures-util = "0.3.28"
tracing = "0.1.40"
tracing-log = "0.2.0"