diff --git a/crates/httpret/src/api4/binwriteindex.rs b/crates/httpret/src/api4/binwriteindex.rs index cc7d0b7..7fc2152 100644 --- a/crates/httpret/src/api4/binwriteindex.rs +++ b/crates/httpret/src/api4/binwriteindex.rs @@ -193,7 +193,7 @@ async fn binned_instrumented( if accepts_json_or_all(&head.headers) { Ok(binned_json_single(res2, ctx, ncc).await?) } else { - let ret = error_response(format!("Unsupported Accept: {:?}", &head.headers), ctx.reqid()); + let ret = error_response(format!("unsupported accept: {:?}", &head.headers), ctx.reqid()); Ok(ret) } } @@ -243,8 +243,7 @@ async fn binned_json_single( ) -> Result { // TODO unify with binned_json_framed debug!("binned_json_single"); - let rt1 = res2.query.retention_time_1(); - let rt2 = res2.query.retention_time_2(); + let rt1 = res2.query.retention_time(); let pbp = res2.query.prebinned_partitioning(); // let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; // for rt in rts { @@ -252,7 +251,6 @@ async fn binned_json_single( { let mut stream = scyllaconn::binwriteindex::BinWriteIndexRtStream::new( rt1, - rt2, SeriesId::new(res2.ch_conf.series().unwrap()), pbp.clone(), res2.query.range().to_time().unwrap(), diff --git a/crates/scyllaconn/src/binwriteindex.rs b/crates/scyllaconn/src/binwriteindex.rs index 84b20ec..a1f5357 100644 --- a/crates/scyllaconn/src/binwriteindex.rs +++ b/crates/scyllaconn/src/binwriteindex.rs @@ -39,7 +39,6 @@ type Fut2 = #[derive(Debug)] pub struct BinWriteIndexEntry { - pub rt: u16, pub lsp: u32, pub binlen: u32, } @@ -54,7 +53,6 @@ pub struct BinWriteIndexSet { #[derive(Debug)] pub struct BinWriteIndexRtStream { rt1: RetentionTime, - rt2: RetentionTime, series: SeriesId, scyqueue: ScyllaQueue, pbp: PrebinnedPartitioning, @@ -72,7 +70,6 @@ impl BinWriteIndexRtStream { pub fn new( rt1: RetentionTime, - rt2: RetentionTime, series: SeriesId, pbp: PrebinnedPartitioning, range: NanoRange, @@ -84,7 +81,6 @@ impl BinWriteIndexRtStream { let (msp_end, lsp_end) = pbp.msp_lsp(range.end_ts().add_dt_nano(pbp.bin_len().dt_ns()).to_ts_ms()); BinWriteIndexRtStream { rt1, - rt2, series, scyqueue, pbp, @@ -99,7 +95,6 @@ impl BinWriteIndexRtStream { async fn next_query_fut( scyqueue: &ScyllaQueue, rt1: RetentionTime, - rt2: RetentionTime, series: SeriesId, pbp: PrebinnedPartitioning, msp: u32, @@ -108,7 +103,7 @@ impl BinWriteIndexRtStream { ) -> Result<(u32, u32, u32, VecDeque), crate::worker::Error> { debug!("make_next_query_fut msp {} lsp {} {}", msp, lsp_min, lsp_max); let res = scyqueue - .bin_write_index_read(rt1, rt2, series, pbp, MspU32(msp), lsp_min, lsp_max) + .bin_write_index_read(rt1, series, pbp, MspU32(msp), lsp_min, lsp_max) .await?; Ok((msp, lsp_min, lsp_max, res)) } @@ -128,7 +123,6 @@ impl BinWriteIndexRtStream { let fut = Self::next_query_fut( scyqueue, self.rt1.clone(), - self.rt2.clone(), self.series.clone(), self.pbp.clone(), msp, @@ -160,7 +154,10 @@ impl Stream for BinWriteIndexRtStream { }; Ready(Some(Ok(item))) } - Ready(Err(e)) => Ready(Some(Err(e.into()))), + Ready(Err(e)) => { + self.fut1 = None; + Ready(Some(Err(e.into()))) + } Pending => Pending, } } else if let Some(fut) = self.as_mut().make_next_query_fut(cx) { diff --git a/crates/scyllaconn/src/binwriteindex/bwxcmb.rs b/crates/scyllaconn/src/binwriteindex/bwxcmb.rs index cbd8c03..00267b9 100644 --- a/crates/scyllaconn/src/binwriteindex/bwxcmb.rs +++ b/crates/scyllaconn/src/binwriteindex/bwxcmb.rs @@ -56,7 +56,6 @@ impl BinWriteIndexStream { let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; for rt in rts { let s = BinWriteIndexRtStream::new( - rt.clone(), rt.clone(), series.clone(), PrebinnedPartitioning::Day1, diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index de9b0f7..30eaf58 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -70,7 +70,7 @@ impl EventReadOpts { Self { one_before, with_values, - qucap: qucap.unwrap_or(1), + qucap: qucap.unwrap_or(3), } } diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index c31b6d8..087713b 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -236,9 +236,9 @@ async fn make_prebinned_f32(ks: &str, rt: &RetentionTime, scy: &Session) -> Resu async fn make_bin_write_index_read(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { let cql = format!( concat!( - "select rt, lsp, binlen", - " from {}.{}bin_write_index_v03", - " where series = ? and pbp = ? and msp = ? and rt = ?", + "select lsp, binlen", + " from {}.{}bin_write_index_v04", + " where series = ? and pbp = ? and msp = ?", " and lsp >= ? and lsp < ?", ), ks, diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 19804e3..d9b83b0 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -71,7 +71,6 @@ struct ReadPrebinnedF32 { #[derive(Debug)] struct BinWriteIndexRead { rt1: RetentionTime, - rt2: RetentionTime, series: SeriesId, pbp: PrebinnedPartitioning, msp: MspU32, @@ -99,7 +98,6 @@ impl BinWriteIndexRead { self.series.id() as i64, self.pbp.db_ix() as i16, self.msp.0 as i32, - self.rt2.to_index_db_i32() as i16, self.lsp_min as i32, self.lsp_max as i32, ); @@ -107,11 +105,10 @@ impl BinWriteIndexRead { let res = scy .execute_iter(stmts.rt(&self.rt1).bin_write_index_read().clone(), params) .await?; - let mut it = res.rows_stream::<(i16, i32, i32)>()?; + let mut it = res.rows_stream::<(i32, i32)>()?; let mut all = VecDeque::new(); - while let Some((rt, lsp, binlen)) = it.try_next().await? { + while let Some((lsp, binlen)) = it.try_next().await? { let v = BinWriteIndexEntry { - rt: rt as u16, lsp: lsp as u32, binlen: binlen as u32, }; @@ -274,7 +271,6 @@ impl ScyllaQueue { pub async fn bin_write_index_read( &self, rt1: RetentionTime, - rt2: RetentionTime, series: SeriesId, pbp: PrebinnedPartitioning, msp: MspU32, @@ -284,7 +280,6 @@ impl ScyllaQueue { let (tx, rx) = async_channel::bounded(1); let job = BinWriteIndexRead { rt1, - rt2, series, pbp, msp,