Switch to enable bin writer

This commit is contained in:
Dominik Werder
2025-05-14 16:29:55 +02:00
parent 4a84127705
commit fbd9d1cf0e
3 changed files with 10 additions and 4 deletions

View File

@@ -723,7 +723,7 @@ dependencies = [
[[package]]
name = "daqingest"
version = "0.3.0-aa.2"
version = "0.3.0-aa.3"
dependencies = [
"async-channel",
"autoerr",

View File

@@ -93,6 +93,7 @@ const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10);
const DO_RATE_CHECK: bool = false;
const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 60);
const METRICS_EMIT_IVL: Duration = Duration::from_millis(1000 * 1);
const USE_BIN_WRITER: bool = false;
macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); }
@@ -2337,7 +2338,9 @@ impl CaConn {
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?;
if USE_BIN_WRITER {
binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?;
}
{
let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;
@@ -3401,7 +3404,9 @@ impl CaConn {
if let ChannelState::Writable(st2) = chst {
let iqdqs = &mut self.iqdqs;
st2.writer.tick(iqdqs)?;
st2.binwriter.tick(iqdqs)?;
if USE_BIN_WRITER {
st2.binwriter.tick(iqdqs)?;
}
}
}
Ok(())

View File

@@ -706,7 +706,8 @@ async fn migrate_scylla_data_schema(
chs.add_todo(format!("drop table {}.{}", ks, tn));
}
}
{
// TODO enable delete only after all old executables are replaced.
if false {
let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v03");
if has_table(&ks, &tn, scy).await? {
chs.add_todo(format!("drop table {}.{}", ks, tn));