Improve bin index write

This commit is contained in:
Dominik Werder
2025-03-05 15:25:08 +01:00
parent b3cd133bbd
commit 502f1febb0
8 changed files with 302 additions and 241 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.7-aa.4"
version = "0.2.7-aa.5"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -7,7 +7,6 @@ use async_channel::Receiver;
use async_channel::Sender;
use ca_proto::ca::proto;
use ca_proto_tokio::tcpasyncwriteread::TcpAsyncWriteRead;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
use enumfetch::ConnFuture;
@@ -17,16 +16,16 @@ use futures_util::Stream;
use futures_util::StreamExt;
use hashbrown::HashMap;
use log::*;
use netpod::channelstatus::ChannelStatus;
use netpod::channelstatus::ChannelStatusClosedReason;
use netpod::timeunits::*;
use netpod::ttl::RetentionTime;
use netpod::EMIT_ACCOUNTING_SNAP;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use netpod::EMIT_ACCOUNTING_SNAP;
use netpod::channelstatus::ChannelStatus;
use netpod::channelstatus::ChannelStatusClosedReason;
use netpod::timeunits::*;
use netpod::ttl::RetentionTime;
use proto::CaDataValue;
use proto::CaEventValue;
use proto::CaItem;
@@ -60,19 +59,20 @@ use serieswriter::fixgridwriter::ChannelStatusWriteState;
use serieswriter::msptool::MspSplit;
use serieswriter::rtwriter::RtWriter;
use serieswriter::writer::EmittableType;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IntervalEma;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
@@ -91,60 +91,17 @@ const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis
const SILENCE_READ_NEXT_IVL: Duration = Duration::from_millis(1000 * 200);
const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10);
const DO_RATE_CHECK: bool = false;
const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 60);
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); }
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! trace4 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); }
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! trace_flush_queue { ($($arg:expr),*) => ( if false { trace3!($($arg),*); } ); }
#[allow(unused)]
macro_rules! trace_flush_queue {
($($arg:tt)*) => {
if false {
trace3!($($arg)*);
}
};
}
macro_rules! trace_event_incoming { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); }
#[allow(unused)]
macro_rules! trace_event_incoming {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_monitor_stale {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! trace_monitor_stale { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); }
fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool {
if let Some(name) = conn.name_by_cid(cid) {
@@ -272,7 +229,7 @@ mod ser_instant {
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Cid(pub u32);
pub struct Cid(pub u32);
impl fmt::Display for Cid {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
@@ -636,11 +593,7 @@ impl ChannelState {
let item_recv_ivl_ema = match self {
ChannelState::Writable(s) => {
let ema = s.channel.item_recv_ivl_ema.ema();
if ema.update_count() == 0 {
None
} else {
Some(ema.ema())
}
if ema.update_count() == 0 { None } else { Some(ema.ema()) }
}
_ => None,
};
@@ -1091,6 +1044,7 @@ pub struct CaConn {
trace_channel_poll: bool,
ca_msg_recv_count: u64,
ca_version_recv_count: u64,
ts_channel_status_pong_last: Instant,
}
impl Drop for CaConn {
@@ -1157,6 +1111,7 @@ impl CaConn {
trace_channel_poll: false,
ca_msg_recv_count: 0,
ca_version_recv_count: 0,
ts_channel_status_pong_last: tsnow,
}
}
@@ -1195,6 +1150,14 @@ impl CaConn {
Ioid(self.ioid)
}
fn channel_status_qu(iqdqs: &mut InsertDeques) -> &mut VecDeque<QueryItem> {
&mut iqdqs.lt_rf3_qu
}
fn channel_status_pong_qu(iqdqs: &mut InsertDeques) -> &mut VecDeque<QueryItem> {
&mut iqdqs.st_rf3_qu
}
pub fn conn_command_tx(&self) -> Sender<ConnCommand> {
self.conn_command_tx.as_ref().get_ref().clone()
}
@@ -1422,7 +1385,8 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Opened,
};
conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
conf.wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
}
if let Some((ivl,)) = conf_poll_conf {
let ivl = Duration::from_millis(ivl);
@@ -1449,10 +1413,7 @@ impl CaConn {
self.cid_by_subid.insert(subid, cid);
trace!(
"new {:?} for {:?} chst {:?} {:?}",
subid,
cid,
st2.channel.cid,
st2.channel.sid
subid, cid, st2.channel.cid, st2.channel.sid
);
subid
};
@@ -1532,7 +1493,6 @@ impl CaConn {
debug!("channel_close {}", name);
let tsnow = Instant::now();
let stnow = SystemTime::now();
let cid = if let Some(x) = self.cid_by_name.get(&name) {
x.clone()
} else {
@@ -1540,22 +1500,16 @@ impl CaConn {
return;
};
self.cid_by_name.remove(&name);
if let Some(conf) = self.channels.get_mut(&cid) {
let mut item_deque = VecDeque::new();
let item = ChannelStatusItem {
ts: stnow,
cssid: conf.state.cssid(),
status: ChannelStatus::Closed(ChannelStatusClosedReason::ChannelRemove),
};
let deque = &mut item_deque;
if conf.wrst.emit_channel_status_item(item, deque).is_err() {
let qu = Self::channel_status_qu(&mut self.iqdqs);
if conf.wrst.emit_channel_status_item(item, qu).is_err() {
self.stats.logic_error().inc();
}
for x in item_deque {
self.iqdqs.st_rf3_qu.push_back(x);
}
// TODO shutdown the internal writer structures.
if let Some(cst) = conf.state.created_state() {
if let Some(proto) = self.proto.as_mut() {
@@ -1567,7 +1521,6 @@ impl CaConn {
proto.push_out(item);
}
}
{
let mut it = self.cid_by_subid.extract_if(|_, v| *v == cid);
if let Some((subid, _cid)) = it.next() {
@@ -1589,14 +1542,11 @@ impl CaConn {
} else {
debug!("channel_close {} no channel block", name);
};
{
let it = self.cid_by_sid.extract_if(|_, v| *v == cid);
it.count();
}
self.channels.remove(&cid);
// TODO emit CaConn item to let CaConnSet know that we have closed the channel.
// TODO may be too full
let value = CaConnEventValue::ChannelRemoved(name);
@@ -1656,21 +1606,17 @@ impl CaConn {
// TODO can I reuse emit_channel_info_insert_items ?
trace!("channel_state_on_shutdown channels {}", self.channels.len());
let stnow = self.tmp_ts_poll;
let mut item_deque = VecDeque::new();
let status_qu = Self::channel_status_qu(&mut self.iqdqs);
for (_cid, conf) in &mut self.channels {
let item = ChannelStatusItem {
ts: stnow,
cssid: conf.state.cssid(),
status: ChannelStatus::Closed(channel_reason.clone()),
};
let deque = &mut item_deque;
if conf.wrst.emit_channel_status_item(item, deque).is_err() {
if conf.wrst.emit_channel_status_item(item, status_qu).is_err() {
self.stats.logic_error().inc();
}
}
for x in item_deque {
self.iqdqs.st_rf3_qu.push_back(x);
}
for (_cid, conf) in &mut self.channels {
if series::dbg::dbg_chn(conf.conf.name()) {
info!("channel_state_on_shutdown {:?}", conf);
@@ -2046,7 +1992,7 @@ impl CaConn {
self.read_ioids.remove(&st3.ioid);
let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow);
if self.trace_channel_poll {
trace!("make next poll idle at {next:?} tsnow {tsnow:?}");
trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow);
}
st2.tick = PollTickState::Idle(PollTickStateIdle { next });
let iqdqs = &mut self.iqdqs;
@@ -2085,7 +2031,7 @@ impl CaConn {
} else {
self.stats.recv_read_notify_state_read_pending.inc();
}
let read_expected = if let Some(cid) = self.read_ioids.remove(&ioid) {
let read_expected = if let Some(_cid) = self.read_ioids.remove(&ioid) {
true
} else {
false
@@ -2100,14 +2046,16 @@ impl CaConn {
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadResultExpected,
};
ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
ch_wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
} else {
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadResultUnexpected,
};
ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
ch_wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
}
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
@@ -2224,8 +2172,10 @@ impl CaConn {
cssid: crst.cssid,
status: ChannelStatus::MonitoringSilenceReadUnchanged,
};
let deque = &mut iqdqs.st_rf3_qu;
if wrst.emit_channel_status_item(item, deque).is_err() {
if wrst
.emit_channel_status_item(item, Self::channel_status_qu(iqdqs))
.is_err()
{
stats.logic_error().inc();
}
}
@@ -2463,7 +2413,8 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::MonitoringSilenceReadStart,
};
conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
conf.wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
}
}
}
@@ -2485,7 +2436,8 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::MonitoringSilenceReadTimeout,
};
conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
conf.wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
}
if false {
// Here we try to close the channel at hand.
@@ -3204,8 +3156,16 @@ impl CaConn {
cssid: st1.channel.cssid,
status: ChannelStatus::Pong,
};
let deque = &mut self.iqdqs.st_rf3_qu;
if ch.wrst.emit_channel_status_item(item, deque).is_err() {
let dt = self
.poll_tsnow
.saturating_duration_since(self.ts_channel_status_pong_last);
let qu = if dt >= CHANNEL_STATUS_PONG_QUIET {
self.ts_channel_status_pong_last = self.poll_tsnow;
Self::channel_status_qu(&mut self.iqdqs)
} else {
Self::channel_status_pong_qu(&mut self.iqdqs)
};
if ch.wrst.emit_channel_status_item(item, qu).is_err() {
self.stats.logic_error().inc();
}
}

View File

@@ -46,12 +46,12 @@ use statemap::ConnectionState;
use statemap::ConnectionStateValue;
use statemap::WithStatusSeriesIdState;
use statemap::WithStatusSeriesIdStateInner;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IocFinderStats;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::rand_xoshiro::rand_core::RngCore;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
@@ -418,9 +418,12 @@ pub struct CaConnSet {
find_ioc_res_rx: Pin<Box<Receiver<VecDeque<FindIocRes>>>>,
find_ioc_queue_set: QueueSet<ChannelName>,
iqtx: Pin<Box<InsertQueuesTx>>,
storage_insert_queue_l1: VecDeque<QueryItem>,
storage_insert_queue: VecDeque<VecDeque<QueryItem>>,
storage_insert_sender: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
storage_insert_st_qu: VecDeque<VecDeque<QueryItem>>,
storage_insert_st_qu_l1: VecDeque<QueryItem>,
storage_insert_st_tx: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
storage_insert_lt_qu: VecDeque<VecDeque<QueryItem>>,
storage_insert_lt_qu_l1: VecDeque<QueryItem>,
storage_insert_lt_tx: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
ca_conn_res_tx: Pin<Box<Sender<(SocketAddr, CaConnEvent)>>>,
ca_conn_res_rx: Pin<Box<Receiver<(SocketAddr, CaConnEvent)>>>,
connset_out_queue: VecDeque<CaConnSetItem>,
@@ -486,12 +489,13 @@ impl CaConnSet {
find_ioc_res_rx: Box::pin(find_ioc_res_rx),
find_ioc_queue_set: QueueSet::new(),
iqtx: Box::pin(iqtx.clone()),
storage_insert_queue_l1: VecDeque::new(),
storage_insert_queue: VecDeque::new(),
storage_insert_st_qu: VecDeque::new(),
storage_insert_st_qu_l1: VecDeque::new(),
storage_insert_st_tx: Box::pin(SenderPolling::new(iqtx.st_rf3_tx.clone())),
// TODO simplify for all combinations
storage_insert_sender: Box::pin(SenderPolling::new(iqtx.st_rf3_tx.clone())),
storage_insert_lt_qu: VecDeque::new(),
storage_insert_lt_qu_l1: VecDeque::new(),
storage_insert_lt_tx: Box::pin(SenderPolling::new(iqtx.lt_rf3_tx.clone())),
ca_conn_res_tx: Box::pin(ca_conn_res_tx),
ca_conn_res_rx: Box::pin(ca_conn_res_rx),
shutdown_stopping: false,
@@ -808,7 +812,7 @@ impl CaConnSet {
let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64());
let state = &mut writer_status_state;
let ts_net = Instant::now();
let deque = &mut self.storage_insert_queue_l1;
let deque = &mut self.storage_insert_lt_qu_l1;
writer_status.write(item, state, ts_net, ts, deque)?;
}
*chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState {
@@ -873,7 +877,7 @@ impl CaConnSet {
let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64());
let state = &mut writer_status_state;
let ts_net = Instant::now();
let deque = &mut self.storage_insert_queue_l1;
let deque = &mut self.storage_insert_lt_qu_l1;
writer_status.write(item, state, ts_net, ts, deque)?;
}
*st3 = WithStatusSeriesIdState {
@@ -1013,8 +1017,9 @@ impl CaConnSet {
Ok(())
} else {
if false {
// TODO
self.thr_msg_storage_len
.trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]);
.trigger("connset handle_check_health", &[&self.storage_insert_st_tx.len()]);
}
self.check_channel_states(tsnow, stnow)?;
let item = CaConnSetItem::Healthy;
@@ -1136,8 +1141,7 @@ impl CaConnSet {
} else {
trace!(
"handle_channel_create_fail {:?} {:?} set to MaybeWrongAddress",
ch,
addr
ch, addr
);
}
bump_backoff(&mut st3.addr_find_backoff);
@@ -1482,7 +1486,8 @@ impl CaConnSet {
} else {
self.channel_states.range_mut(..)
};
let mut item_deque = VecDeque::new();
let mut st_qu_2 = VecDeque::new();
let mut lt_qu_2 = VecDeque::new();
for (i, (ch, st)) in it.enumerate() {
match &mut st.value {
ChannelStateValue::Active(st2) => match st2 {
@@ -1556,7 +1561,7 @@ impl CaConnSet {
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(snew);
let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone());
let (tsev, val) = item.to_ts_val();
let deque = &mut item_deque;
let deque = &mut lt_qu_2;
st3.writer_status.as_mut().unwrap().write(
serieswriter::fixgridwriter::ChannelStatusWriteValue::new(tsev, val),
st3.writer_status_state.as_mut().unwrap(),
@@ -1642,7 +1647,8 @@ impl CaConnSet {
}
};
}
self.storage_insert_queue.push_back(item_deque);
self.storage_insert_st_qu.push_back(st_qu_2);
self.storage_insert_lt_qu.push_back(lt_qu_2);
for (addr, ch) in cmd_remove_channel {
if let Some(g) = self.ca_conn_ress.get_mut(&addr) {
let cmd = ConnCommand::channel_close(ch.name().into());
@@ -1793,9 +1799,15 @@ impl CaConnSet {
}
self.handle_check_health()?;
{
if self.storage_insert_queue_l1.len() != 0 {
let a = core::mem::replace(&mut self.storage_insert_queue_l1, VecDeque::new());
self.storage_insert_queue.push_back(a);
if self.storage_insert_st_qu_l1.len() != 0 {
let a = std::mem::replace(&mut self.storage_insert_st_qu_l1, VecDeque::new());
self.storage_insert_st_qu.push_back(a);
}
}
{
if self.storage_insert_lt_qu_l1.len() != 0 {
let a = std::mem::replace(&mut self.storage_insert_lt_qu_l1, VecDeque::new());
self.storage_insert_lt_qu.push_back(a);
}
}
Ok(())
@@ -1907,7 +1919,7 @@ impl Stream for CaConnSet {
self.stats.storage_insert_tx_len.set(self.iqtx.st_rf3_tx.len() as _);
self.stats
.storage_insert_queue_len
.set(self.storage_insert_queue.len() as _);
.set(self.storage_insert_st_qu.len() as _);
self.stats
.channel_info_query_queue_len
.set(self.channel_info_query_qu.len() as _);
@@ -1978,8 +1990,20 @@ impl Stream for CaConnSet {
{
let this = self.as_mut().get_mut();
let qu = &mut this.storage_insert_queue;
let tx = this.storage_insert_sender.as_mut();
let qu = &mut this.storage_insert_st_qu;
let tx = this.storage_insert_st_tx.as_mut();
let counter = this.stats.storage_insert_queue_send();
let x = sender_polling_send(qu, tx, cx, || {
counter.inc();
});
if let Err(e) = merge_pending_progress(x, &mut penpro) {
break Ready(Some(CaConnSetItem::Error(e)));
}
}
{
let this = self.as_mut().get_mut();
let qu = &mut this.storage_insert_lt_qu;
let tx = this.storage_insert_lt_tx.as_mut();
let counter = this.stats.storage_insert_queue_send();
let x = sender_polling_send(qu, tx, cx, || {
counter.inc();

View File

@@ -1,7 +1,7 @@
use crate::config::ScyllaIngestConfig;
use crate::iteminsertqueue::Accounting;
use crate::iteminsertqueue::AccountingRecv;
use crate::iteminsertqueue::BinWriteIndexV01;
use crate::iteminsertqueue::BinWriteIndexV03;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
use crate::iteminsertqueue::MspItem;
@@ -273,11 +273,11 @@ where
prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow)
}
}
QueryItem::BinWriteIndexV01(item) => {
QueryItem::BinWriteIndexV03(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_bin_write_index_v01_insert_futs(item, &data_store, &stats, tsnow)
prepare_bin_write_index_v03_insert_futs(item, &data_store, &stats, tsnow)
}
}
QueryItem::Accounting(item) => {
@@ -321,8 +321,8 @@ fn inspect_items(
QueryItem::TimeBinSimpleF32V02(_) => {
trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02");
}
QueryItem::BinWriteIndexV01(_) => {
trace_item_execute!("execute {worker_name} BinWriteIndexV01");
QueryItem::BinWriteIndexV03(_) => {
trace_item_execute!("execute {worker_name} BinWriteIndexV03");
}
QueryItem::Accounting(_) => {
trace_item_execute!("execute {worker_name} Accounting {item:?}");
@@ -420,26 +420,18 @@ fn prepare_timebin_v02_insert_futs(
futs
}
fn prepare_bin_write_index_v01_insert_futs(
item: BinWriteIndexV01,
fn prepare_bin_write_index_v03_insert_futs(
item: BinWriteIndexV03,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
let params = (
item.series,
item.dv1,
item.dv2,
item.quo,
item.rem,
item.rt,
item.binlen,
);
let params = (item.series, item.pbp, item.msp, item.rt, item.lsp, item.binlen);
// TODO would be better to count inserts only on completed insert
stats.inserted_binned().inc();
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_bin_write_index_v01.clone(),
data_store.qu_insert_bin_write_index_v03.clone(),
params,
tsnow,
stats.clone(),

View File

@@ -551,13 +551,12 @@ pub struct TimeBinSimpleF32V02 {
}
#[derive(Debug, Clone)]
pub struct BinWriteIndexV01 {
pub struct BinWriteIndexV03 {
pub series: i64,
pub dv1: i32,
pub dv2: i32,
pub quo: i64,
pub rem: i32,
pub rt: i32,
pub pbp: i16,
pub msp: i32,
pub rt: i16,
pub lsp: i32,
pub binlen: i32,
}
@@ -567,7 +566,7 @@ pub enum QueryItem {
Insert(InsertItem),
Msp(MspItem),
TimeBinSimpleF32V02(TimeBinSimpleF32V02),
BinWriteIndexV01(BinWriteIndexV01),
BinWriteIndexV03(BinWriteIndexV03),
Accounting(Accounting),
AccountingRecv(AccountingRecv),
}

View File

@@ -636,18 +636,17 @@ async fn migrate_scylla_data_schema(
let tab = GenTwcsTab::new(
ks,
rett.table_prefix(),
"bin_write_index_v01",
"bin_write_index_v03",
&[
("series", "bigint"),
("dv1", "int"),
("dv2", "int"),
("quo", "bigint"),
("rem", "int"),
("rt", "int"),
("pbp", "smallint"),
("msp", "int"),
("rt", "smallint"),
("lsp", "int"),
("binlen", "int"),
],
["series", "dv1", "dv2", "quo"],
["rem", "rt", "binlen"],
["series", "pbp", "msp"],
["rt", "lsp", "binlen"],
rett.ttl_binned(),
);
tab.setup(chs, scy).await?;
@@ -711,6 +710,18 @@ async fn migrate_scylla_data_schema(
);
tab.setup(chs, scy).await?;
}
{
let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v00");
if has_table(&tn, scy).await? {
chs.add_todo(format!("drop table {}.{}", ks, tn));
}
}
{
let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v01");
if has_table(&tn, scy).await? {
chs.add_todo(format!("drop table {}.{}", ks, tn));
}
}
Ok(())
}

View File

@@ -45,7 +45,7 @@ pub struct DataStore {
pub qu_insert_array_f64: Arc<PreparedStatement>,
pub qu_insert_array_bool: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v02: Arc<PreparedStatement>,
pub qu_insert_bin_write_index_v01: Arc<PreparedStatement>,
pub qu_insert_bin_write_index_v03: Arc<PreparedStatement>,
pub qu_account_00: Arc<PreparedStatement>,
pub qu_account_recv_00: Arc<PreparedStatement>,
pub qu_dummy: Arc<PreparedStatement>,
@@ -157,10 +157,10 @@ impl DataStore {
scy
);
let qu_insert_bin_write_index_v01 = prep_qu_ins_c!(
"bin_write_index_v01",
"series, dv1, dv2, quo, rem, rt, binlen",
"?, ?, ?, ?, ?, ?, ?",
let qu_insert_bin_write_index_v03 = prep_qu_ins_c!(
"bin_write_index_v03",
"series, pbp, msp, rt, lsp, binlen",
"?, ?, ?, ?, ?, ?",
rett,
scy
);
@@ -219,7 +219,7 @@ impl DataStore {
qu_insert_array_f64,
qu_insert_array_bool,
qu_insert_binned_scalar_f32_v02,
qu_insert_bin_write_index_v01,
qu_insert_bin_write_index_v03,
qu_account_00,
qu_account_recv_00,
qu_dummy,

View File

@@ -16,7 +16,7 @@ use netpod::Shape;
use netpod::TsNano;
use netpod::ttl::RetentionTime;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::BinWriteIndexV01;
use scywr::iteminsertqueue::BinWriteIndexV03;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use series::ChannelStatusSeriesId;
@@ -84,25 +84,26 @@ impl WriteCntZero {
}
#[derive(Debug)]
struct IndexWritten {
last: Option<(PrebinnedPartitioning, u64, u32)>,
enum IndexWritten {
None,
Last(u32, u32),
}
impl IndexWritten {
fn new() -> Self {
Self { last: None }
IndexWritten::None
}
fn should_write(&self, _div: PrebinnedPartitioning, quo: u64, rem: u32) -> bool {
if let Some((_div0, quo0, rem0)) = &self.last {
*quo0 != quo || *rem0 != rem
fn should_write(&self, msp: u32, lsp: u32) -> bool {
if let IndexWritten::Last(lmsp, llsp) = self {
*lmsp != msp || *llsp != lsp
} else {
true
}
}
fn mark_written(&mut self, div: PrebinnedPartitioning, quo: u64, rem: u32) {
self.last = Some((div, quo, rem));
fn mark_written(&mut self, msp: u32, lsp: u32) {
*self = IndexWritten::Last(msp, lsp);
}
}
@@ -119,14 +120,17 @@ pub struct BinWriter {
BinnedEventsTimeweight<f32>,
WriteCntZero,
PrebinnedPartitioning,
IndexWritten,
Option<IndexWritten>,
)>,
binner_others: Vec<(
RetentionTime,
BinnedBinsTimeweight<f32, f32>,
WriteCntZero,
PrebinnedPartitioning,
IndexWritten,
Option<IndexWritten>,
)>,
index_written: IndexWritten,
trd: bool,
}
@@ -151,13 +155,13 @@ impl BinWriter {
let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()];
let mut binner_1st = None;
let mut binner_others = Vec::new();
let mut has_monitor = false;
let mut has_monitor = None;
let mut combs: Vec<_> = rts
.into_iter()
.zip(quiets.into_iter().map(|x| DtMs::from_ms_u64(x.as_millis() as u64)))
.inspect(|x| {
if x.1 <= DUR_ZERO {
has_monitor = true;
has_monitor = Some(x.0.clone());
}
})
.filter(|x| x.1 > DUR_ZERO && x.1 < DUR_MAX)
@@ -181,9 +185,42 @@ impl BinWriter {
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
}
} else {
match &has_monitor {
Some(RetentionTime::Short) => {
combs.push((RetentionTime::Short, PrebinnedPartitioning::Min1, WriteCntZero::Disable));
combs.push((
RetentionTime::Medium,
PrebinnedPartitioning::Hour1,
WriteCntZero::Disable,
));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
Some(RetentionTime::Medium) => {
combs.push((RetentionTime::Short, PrebinnedPartitioning::Min1, WriteCntZero::Disable));
combs.push((
RetentionTime::Medium,
PrebinnedPartitioning::Hour1,
WriteCntZero::Disable,
));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
Some(RetentionTime::Long) => {
combs.push((
RetentionTime::Medium,
PrebinnedPartitioning::Min1,
WriteCntZero::Disable,
));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Hour1, WriteCntZero::Disable));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
None => {
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
}
}
debug_init!(trd, "combs B {:?}", combs);
if !is_polled && !has_monitor && combs.len() > 1 {
if combs.len() > 1 && has_monitor.is_none() && is_polled {
combs.remove(0);
}
let combs = combs;
@@ -196,15 +233,24 @@ impl BinWriter {
if let WriteCntZero::Enable = write_zero {
binner.cnt_zero_enable();
}
binner_1st = Some((rt, binner, write_zero, pbp));
let iw2 = if pbp.uses_index_min10() {
Some(IndexWritten::new())
} else {
None
};
binner_1st = Some((rt, binner, write_zero, pbp, IndexWritten::new(), iw2));
} else {
let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len());
let binner = BinnedBinsTimeweight::new(range);
let mut binner = BinnedBinsTimeweight::new(range);
if let WriteCntZero::Enable = write_zero {
// TODO
// binner.cnt_zero_enable();
binner.cnt_zero_enable();
}
binner_others.push((rt, binner, write_zero, pbp));
let iw2 = if pbp.uses_index_min10() {
Some(IndexWritten::new())
} else {
None
};
binner_others.push((rt, binner, write_zero, pbp, IndexWritten::new(), iw2));
}
}
let ret = Self {
@@ -216,7 +262,6 @@ impl BinWriter {
evbuf: ContainerEvents::new(),
binner_1st,
binner_others,
index_written: IndexWritten::new(),
trd,
};
let _ = ret.cssid;
@@ -278,6 +323,8 @@ impl BinWriter {
let write_zero = ee.2.clone();
let binner = &mut ee.1;
let pbp = ee.3.clone();
let index_written = &mut ee.4;
let iw2 = &mut ee.5;
// TODO avoid boxing
let bufbox = Box::new(buf);
use items_0::timebin::IngestReport;
@@ -297,18 +344,20 @@ impl BinWriter {
trace_bin!(self.trd, "binner_1st out len {}", bins.len());
Self::handle_output_ready(
self.trd,
true,
self.sid,
rt,
&bins,
write_zero,
&mut self.index_written,
index_written,
iw2,
pbp,
iqdqs,
)?;
// TODO avoid boxing
let mut bins2: BinsBoxed = Box::new(bins);
for i in 0..self.binner_others.len() {
let (rt, binner, write_zero, pbp) = &mut self.binner_others[i];
let (rt, binner, write_zero, pbp, index_written, iw2) = &mut self.binner_others[i];
let write_zero = write_zero.clone();
binner.ingest(&bins2)?;
let bb: Option<BinsBoxed> = binner.output()?;
@@ -319,11 +368,13 @@ impl BinWriter {
if let Some(bb2) = bb.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
Self::handle_output_ready(
self.trd,
false,
self.sid,
rt.clone(),
&bb2,
write_zero,
&mut self.index_written,
index_written,
iw2,
pbp.clone(),
iqdqs,
)?;
@@ -348,17 +399,21 @@ impl BinWriter {
Ok(())
}
} else {
// TODO should rather make the top-level binner non-optional
self.evbuf.clear();
Ok(())
}
}
fn handle_output_ready(
trd: bool,
is_from_events: bool,
series: SeriesId,
rt: RetentionTime,
bins: &ContainerBins<f32, f32>,
write_zero: WriteCntZero,
index_written: &mut IndexWritten,
iw1: &mut IndexWritten,
iw2: &mut Option<IndexWritten>,
pbp: PrebinnedPartitioning,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
@@ -373,71 +428,91 @@ impl BinWriter {
if fnl == false {
info!("non final bin {:?}", series);
} else if cnt == 0 && !write_zero.enabled() {
info!("zero count bin {:?}", series);
if is_from_events {
info!("zero count bin from events {:?}", series);
} else {
info!("zero count bin from bins {:?}", series);
}
} else {
if bin_len != pbp.bin_len() {
let e = Error::UnexpectedBinLen(bin_len, pbp);
return Err(e);
}
let div = pbp.msp_div();
if div.ns() % bin_len.ns() != 0 {
let e = Error::UnsupportedGridDiv(bin_len, div);
return Err(e);
}
let msp = ts1.ms() / div.ms();
let off = (ts1.ms() - div.ms() * msp) / bin_len.ms();
let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 {
series,
binlen: bin_len.ms() as i32,
msp: msp as i64,
off: off as i32,
cnt: cnt as i64,
min,
max,
avg,
dev: f32::NAN,
lst,
});
if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) {
debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item);
}
match rt {
RetentionTime::Short => {
iqdqs.st_rf3_qu.push_back(item);
{
let (msp, lsp) = pbp.msp_lsp(ts1.to_ts_ms());
let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 {
series,
binlen: bin_len.ms() as i32,
msp: msp as i64,
off: lsp as i32,
cnt: cnt as i64,
min,
max,
avg,
dev: f32::NAN,
lst,
});
if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) {
debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item);
}
RetentionTime::Medium => {
iqdqs.mt_rf3_qu.push_back(item);
}
RetentionTime::Long => {
iqdqs.lt_rf3_qu.push_back(item);
let qu = iqdqs.deque(rt.clone());
qu.push_back(item);
}
if pbp.uses_index_min10() {
let pbp_ix = PrebinnedPartitioning::Min10;
let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms());
debug_bin!(
trd,
"handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}",
series,
pbp_ix,
pbp,
rt,
msp,
lsp
);
let iw = iw2.as_mut().unwrap();
if iw.should_write(msp, lsp) {
iw.mark_written(msp, lsp);
let item = BinWriteIndexV03 {
series: series.id() as i64,
pbp: pbp_ix.db_ix() as i16,
msp: msp as i32,
rt: rt.index_db_i32() as i16,
lsp: lsp as i32,
binlen: pbp.bin_len().ms() as i32,
};
let item = QueryItem::BinWriteIndexV03(item);
iqdqs.deque(rt.clone()).push_back(item);
}
}
let div = PrebinnedPartitioning::Day1;
let (quo, rem, dv1, dv2) = div.quo_rem(ts1.to_ts_ms());
if index_written.should_write(div.clone(), quo, rem) {
index_written.mark_written(div.clone(), quo, rem);
let item = BinWriteIndexV01 {
series: series.id() as i64,
dv1: dv1 as i32,
dv2: dv2 as i32,
quo: quo as i64,
rem: rem as i32,
rt: rt.index_db_i32(),
binlen: pbp.bin_len().ms() as i32,
};
let item = QueryItem::BinWriteIndexV01(item);
match rt {
RetentionTime::Short => {
iqdqs.st_rf3_qu.push_back(item);
}
RetentionTime::Medium => {
iqdqs.mt_rf3_qu.push_back(item);
}
RetentionTime::Long => {
iqdqs.lt_rf3_qu.push_back(item);
}
if true {
let pbp_ix = PrebinnedPartitioning::Day1;
let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms());
debug_bin!(
trd,
"handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}",
series,
pbp_ix,
pbp,
rt,
msp,
lsp
);
// let iw = iw1;
if iw1.should_write(msp, lsp) {
iw1.mark_written(msp, lsp);
let item = BinWriteIndexV03 {
series: series.id() as i64,
pbp: pbp_ix.db_ix() as i16,
msp: msp as i32,
rt: rt.index_db_i32() as i16,
lsp: lsp as i32,
binlen: pbp.bin_len().ms() as i32,
};
let item = QueryItem::BinWriteIndexV03(item);
iqdqs.deque(rt.clone()).push_back(item);
}
} else {
}
}
}