From f52d941ca21bdb0101b25b3aa68f89f22a9f843d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 6 Mar 2024 12:28:47 +0100 Subject: [PATCH] Care about larger payloads --- daqingest/Cargo.toml | 6 +- daqingest/src/daemon.rs | 6 + netfetch/src/ca/conn.rs | 14 +- netfetch/src/ca/findioc.rs | 6 +- netfetch/src/ca/proto.rs | 265 ++++++++++++++++++----------------- netfetch/src/conf.rs | 2 +- scywr/src/insertworker.rs | 53 +++---- scywr/src/iteminsertqueue.rs | 2 +- scywr/src/schema.rs | 139 +++++------------- scywr/src/session.rs | 1 + scywr/src/store.rs | 78 ++++++----- 11 files changed, 256 insertions(+), 316 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index a3e0d59..b68e179 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.0-alpha.1" +version = "0.2.0-aa.2" authors = ["Dominik Werder "] edition = "2021" @@ -10,11 +10,11 @@ default = [] bsread = [] [dependencies] -clap = { version = "4.4.4", features = ["derive", "cargo"] } +clap = { version = "4.5.1", features = ["derive", "cargo"] } tracing = "0.1" serde = { version = "1.0", features = ["derive"] } tokio-postgres = "0.7.10" -async-channel = "2.0.0" +async-channel = "2.2.0" futures-util = "0.3" chrono = "0.4" bytes = "1.5.0" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 647be7d..7a31d14 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -165,7 +165,13 @@ impl Daemon { array_truncate: Arc::new(AtomicU64::new(ingest_opts.array_truncate())), }; let insert_worker_opts = Arc::new(insert_worker_opts); + + debug!("TODO RetentionTime"); + + let rett = RetentionTime::Short; + let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers( + rett, opts.scyconf.clone(), ingest_opts.insert_scylla_sessions(), ingest_opts.insert_worker_count(), diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 74ad137..74083d7 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,7 +1,6 @@ use super::proto; use super::proto::CaEventValue; use super::proto::ReadNotify; -use super::ExtraInsertsConf; use crate::ca::proto::EventCancel; use crate::conf::ChannelConfig; use crate::senderpolling::SenderPolling; @@ -42,7 +41,6 @@ use scywriiq::ConnectionStatusItem; use scywriiq::IvlItem; use scywriiq::MuteItem; use scywriiq::QueryItem; -use serde::Deserialize; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; @@ -296,7 +294,7 @@ struct CreatedState { cid: Cid, sid: Sid, ca_dbr_type: u16, - ca_dbr_count: u16, + ca_dbr_count: u32, ts_created: Instant, ts_alive_last: Instant, ts_msp_last: u64, @@ -694,7 +692,7 @@ impl Default for CaConnOpts { fn default() -> Self { Self { insert_queue_max: 20000, - array_truncate: 2000, + array_truncate: 2000000, } } } @@ -1350,6 +1348,10 @@ impl CaConn { // debug!("handle_event_add_res {ev:?}"); match ch_s { ChannelState::Writable(st) => { + // debug!( + // "CaConn sees data_count {} payload_len {}", + // ev.data_count, ev.payload_len + // ); let stnow = self.tmp_ts_poll; let crst = &mut st.channel; let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 4; @@ -1571,6 +1573,7 @@ impl CaConn { stnow: SystemTime, stats: &CaConnStats, ) -> Result<(), Error> { + // debug!("event_add_ingest payload_len {} value {:?}", payload_len, value); crst.ts_alive_last = tsnow; crst.item_recv_ivl_ema.tick(tsnow); crst.recv_count += 1; @@ -1977,8 +1980,7 @@ impl CaConn { cid, sid, ca_dbr_type, - // TODO for extended epics messages, can be u32! - ca_dbr_count: k.data_count as u16, + ca_dbr_count: k.data_count, ts_created: tsnow, ts_alive_last: tsnow, ts_msp_last: 0, diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 0a6a746..4ce8658 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -342,16 +342,16 @@ impl FindIocStream { } else { info!("cmdid {} payload {}", hi.cmdid(), hi.payload_len()); } - if nb.data().len() < hi.payload_len() { + if nb.data().len() < hi.payload_len() as usize { error!("incomplete message, missing payload"); break; } let msg = CaMsg::from_proto_infos(&hi, nb.data(), tsnow, 32).map_err(|e| e.to_string())?; - nb.adv(hi.payload_len()).map_err(|e| e.to_string())?; + nb.adv(hi.payload_len() as usize).map_err(|e| e.to_string())?; msgs.push(msg); accounted += 16 + hi.payload_len(); } - if accounted != ec as usize { + if accounted != ec as u32 { stats.ca_udp_unaccounted_data().inc(); debug!("unaccounted data ec {} accounted {}", ec, accounted); } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index ed14fb9..793e785 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -13,6 +13,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::Duration; use std::time::Instant; use taskrun::tokio; use tokio::io::AsyncRead; @@ -45,10 +46,13 @@ pub enum Error { NoReadBufferSpace, NeitherPendingNorProgress, OutputBufferTooSmall, + LogicError, } -const CA_PROTO_VERSION: u16 = 13; +const CA_PROTO_VERSION: u32 = 13; const EPICS_EPOCH_OFFSET: u64 = 631152000; +const PAYLOAD_LEN_MAX: u32 = 1024 * 1024 * 32; +const PROTO_INPUT_BUF_CAP: u32 = 1024 * 1024 * 40; #[derive(Debug)] pub struct Search { @@ -104,7 +108,7 @@ pub struct AccessRightsRes { #[derive(Debug)] pub struct EventAdd { pub data_type: u16, - pub data_count: u16, + pub data_count: u32, pub sid: u32, pub subid: u32, } @@ -112,7 +116,7 @@ pub struct EventAdd { #[derive(Debug)] pub struct EventCancel { pub data_type: u16, - pub data_count: u16, + pub data_count: u32, pub sid: u32, pub subid: u32, } @@ -145,7 +149,7 @@ pub struct EventAddResEmpty { #[derive(Debug)] pub struct ReadNotify { pub data_type: u16, - pub data_count: u16, + pub data_count: u32, pub sid: u32, pub ioid: u32, } @@ -343,7 +347,11 @@ impl CaMsgTy { } fn len(&self) -> usize { - 16 + self.payload_len() + if self.payload_len() <= 0x3ff0 && self.data_count() <= 0xffff { + 16 + self.payload_len() + } else { + 24 + self.payload_len() + } } fn payload_len(&self) -> usize { @@ -407,7 +415,7 @@ impl CaMsgTy { } } - fn data_count(&self) -> u16 { + fn data_count(&self) -> u32 { use CaMsgTy::*; match self { Version => CA_PROTO_VERSION, @@ -607,21 +615,19 @@ impl CaMsg { } fn place_into(&self, buf: &mut [u8]) { - //info!("place_into given {} bytes buffer", buf.len()); - if self.ty.payload_len() > 0x4000 - 16 { - error!("TODO emit for larger payloads"); - panic!(); - } else { + if self.ty.payload_len() <= 0x3ff0 && self.ty.data_count() <= 0xffff { + let pls = self.ty.payload_len() as u16; + let cnt = self.ty.data_count() as u16; let t = self.ty.cmdid().to_be_bytes(); buf[0] = t[0]; buf[1] = t[1]; - let t = (self.ty.payload_len() as u16).to_be_bytes(); + let t = pls.to_be_bytes(); buf[2] = t[0]; buf[3] = t[1]; let t = self.ty.data_type().to_be_bytes(); buf[4] = t[0]; buf[5] = t[1]; - let t = self.ty.data_count().to_be_bytes(); + let t = cnt.to_be_bytes(); buf[6] = t[0]; buf[7] = t[1]; let t = self.ty.param1().to_be_bytes(); @@ -635,6 +641,40 @@ impl CaMsg { buf[14] = t[2]; buf[15] = t[3]; self.ty.place_payload_into(&mut buf[16..]); + } else { + let pls = self.ty.payload_len(); + let cnt = self.ty.data_count(); + let t = self.ty.cmdid().to_be_bytes(); + buf[0] = t[0]; + buf[1] = t[1]; + buf[2] = 0xff; + buf[3] = 0xff; + let t = self.ty.data_type().to_be_bytes(); + buf[4] = t[0]; + buf[5] = t[1]; + buf[6] = 0x00; + buf[7] = 0x00; + let t = self.ty.param1().to_be_bytes(); + buf[8] = t[0]; + buf[9] = t[1]; + buf[10] = t[2]; + buf[11] = t[3]; + let t = self.ty.param2().to_be_bytes(); + buf[12] = t[0]; + buf[13] = t[1]; + buf[14] = t[2]; + buf[15] = t[3]; + let t = pls.to_be_bytes(); + buf[16] = t[0]; + buf[17] = t[1]; + buf[18] = t[2]; + buf[19] = t[3]; + let t = cnt.to_be_bytes(); + buf[20] = t[0]; + buf[21] = t[1]; + buf[22] = t[2]; + buf[23] = t[3]; + self.ty.place_payload_into(&mut buf[24..]); } } @@ -686,7 +726,7 @@ impl CaMsg { array_truncate: usize, ) -> Result { let msg = match hi.cmdid { - 0x00 => CaMsg::from_ty_ts(CaMsgTy::VersionRes(hi.data_count), tsnow), + 0x00 => CaMsg::from_ty_ts(CaMsgTy::VersionRes(hi.data_count() as u16), tsnow), 0x0b => { let mut s = String::new(); s.extend(format!("{:?}", &payload[..payload.len().min(16)]).chars()); @@ -871,21 +911,20 @@ pub enum CaItem { #[derive(Clone, Debug)] pub struct HeadInfo { cmdid: u16, - payload_size: u16, + payload_size: u32, data_type: u16, - data_count: u16, + data_count: u32, param1: u32, param2: u32, - ext_payload_size: u32, - ext_data_count: u32, + is_ext: bool, } impl HeadInfo { pub fn from_netbuf(buf: &mut SlideBuf) -> Result { let command = buf.read_u16_be()?; - let payload_size = buf.read_u16_be()?; + let payload_size = buf.read_u16_be()? as u32; let data_type = buf.read_u16_be()?; - let data_count = buf.read_u16_be()?; + let data_count = buf.read_u16_be()? as u32; let param1 = buf.read_u32_be()?; let param2 = buf.read_u32_be()?; let hi = HeadInfo { @@ -895,15 +934,15 @@ impl HeadInfo { data_count, param1, param2, - ext_payload_size: 0, - ext_data_count: 0, + is_ext: false, }; Ok(hi) } fn with_ext(mut self, payload: u32, datacount: u32) -> Self { - self.ext_payload_size = payload; - self.ext_data_count = datacount; + self.is_ext = true; + self.payload_size = payload; + self.data_count = datacount; self } @@ -911,20 +950,12 @@ impl HeadInfo { self.cmdid } - pub fn payload_len(&self) -> usize { - if self.payload_size == 0xffff { - self.ext_payload_size as _ - } else { - self.payload_size as _ - } + pub fn payload_len(&self) -> u32 { + self.payload_size } - pub fn data_count(&self) -> usize { - if self.payload_size == 0xffff { - self.ext_data_count as _ - } else { - self.data_count as _ - } + pub fn data_count(&self) -> u32 { + self.data_count } // only for debug purpose @@ -947,7 +978,7 @@ impl CaState { match self { StdHead => 16, ExtHead(_) => 8, - Payload(k) => k.payload_len(), + Payload(k) => k.payload_len() as usize, Done => 123, } } @@ -955,6 +986,7 @@ impl CaState { pub struct CaProto { tcp: TcpStream, + tcp_eof: bool, remote_addr_dbg: SocketAddrV4, state: CaState, buf: SlideBuf, @@ -969,10 +1001,11 @@ impl CaProto { pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize, stats: Arc) -> Self { Self { tcp, + tcp_eof: false, remote_addr_dbg, state: CaState::StdHead, - buf: SlideBuf::new(1024 * 1024 * 8), - outbuf: SlideBuf::new(1024 * 128), + buf: SlideBuf::new(PROTO_INPUT_BUF_CAP as usize), + outbuf: SlideBuf::new(1024 * 256), out: VecDeque::new(), array_truncate, stats, @@ -988,16 +1021,6 @@ impl CaProto { self.out.push_back(item); } - fn inpbuf_conn(&mut self, need_min: usize) -> Result<(&mut TcpStream, ReadBuf), Error> { - let buf = self.buf.available_writable_area(need_min)?; - let buf = ReadBuf::new(buf); - Ok((&mut self.tcp, buf)) - } - - fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) { - (&mut self.tcp, self.outbuf.data()) - } - fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> { if let Some(item) = self.out.front() { match self.outbuf.available_writable_area(item.len()) { @@ -1014,7 +1037,9 @@ impl CaProto { fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let (w, b) = self.outbuf_conn(); + let this = self.as_mut().get_mut(); + let w = &mut this.tcp; + let b = this.outbuf.data(); let w = Pin::new(w); match w.poll_write(cx, b) { Ready(k) => match k { @@ -1046,66 +1071,52 @@ impl CaProto { let g = self.outbuf.len(); self.stats.outbuf_len().ingest(g as u32); } - 'l1: while self.out.len() != 0 { - while let Some((msg, buf)) = self.out_msg_buf() { - let msglen = msg.len(); - if msglen > buf.len() { - error!("got output buffer but too small"); - let e = Error::OutputBufferTooSmall; - return Err(e); - } else { - msg.place_into(&mut buf[..msglen]); - self.outbuf.wadv(msglen)?; - self.out.pop_front(); - self.stats.out_msg_placed().inc(); - } - } - while self.outbuf.len() != 0 { - match Self::attempt_output(self.as_mut(), cx)? { - Ready(n) => { - if n != 0 { - have_progress = true; - } else { - // Should not occur to begin with. TODO restructure. - break 'l1; - } - } - Pending => { - have_pending = true; - break 'l1; - } - } + while let Some((msg, buf)) = self.out_msg_buf() { + let msglen = msg.len(); + if msglen > buf.len() { + break; } + msg.place_into(&mut buf[..msglen]); + self.outbuf.wadv(msglen)?; + self.out.pop_front(); + self.stats.out_msg_placed().inc(); } - 'l1: while self.outbuf.len() != 0 { + while self.outbuf.len() != 0 { match Self::attempt_output(self.as_mut(), cx)? { Ready(n) => { - if n != 0 { - have_progress = true; - } else { - // Should not occur to begin with. TODO restructure. - break 'l1; + if n == 0 { + let e = Error::LogicError; + return Err(e); } + have_progress = true; } Pending => { have_pending = true; - break 'l1; + break; } } } let need_min = self.state.need_min(); - if self.buf.cap() < need_min { - self.state = CaState::Done; - let e = Error::BufferTooSmallForNeedMin(self.buf.cap(), self.state.need_min()); - return Err(e); + { + let cap = self.buf.cap(); + if cap < need_min { + let e = Error::BufferTooSmallForNeedMin(cap, need_min); + warn!("{e}"); + return Err(e); + } } - if self.buf.len() < need_min { - let (w, mut rbuf) = self.inpbuf_conn(need_min)?; + loop { + if self.tcp_eof { + break; + } + let this = self.as_mut().get_mut(); + let tcp = Pin::new(&mut this.tcp); + let buf = this.buf.available_writable_area(need_min)?; + let mut rbuf = ReadBuf::new(buf); if rbuf.remaining() == 0 { return Err(Error::NoReadBufferSpace); } - let w = Pin::new(w); - match w.poll_read(cx, &mut rbuf) { + break match tcp.poll_read(cx, &mut rbuf) { Ready(k) => match k { Ok(()) => { let nf = rbuf.filled().len(); @@ -1116,23 +1127,22 @@ impl CaProto { self.remote_addr_dbg, self.state ); - // TODO may need another state, if not yet done when input is EOF. - self.state = CaState::Done; - have_progress = true; + self.tcp_eof = true; } else { if false { - info!("received {} bytes", rbuf.filled().len()); + debug!("received {} bytes", rbuf.filled().len()); let t = rbuf.filled().len().min(32); - info!("received data {:?}", &rbuf.filled()[0..t]); + debug!("received data {:?}", &rbuf.filled()[0..t]); } match self.buf.wadv(nf) { Ok(()) => { have_progress = true; self.stats.tcp_recv_bytes().add(nf as _); self.stats.tcp_recv_count().inc(); + continue; } Err(e) => { - error!("netbuf wadv fail nf {nf}"); + error!("netbuf wadv fail nf {nf} {e}"); return Err(e.into()); } } @@ -1145,12 +1155,16 @@ impl CaProto { Pending => { have_pending = true; } - } + }; } while self.resqu.len() < self.resqu.capacity() { - if let Some(item) = self.parse_item(tsnow)? { + if self.buf.len() >= self.state.need_min() { + if let Some(item) = self.parse_item(tsnow)? { + self.resqu.push_back(item); + } else { + // Nothing to do + } have_progress = true; - self.resqu.push_back(item); } else { break; } @@ -1160,29 +1174,19 @@ impl CaProto { } else if have_pending { Ok(Pending) } else { - Err(Error::NeitherPendingNorProgress) + if self.tcp_eof { + self.state = CaState::Done; + Ok(Ready(())) + } else { + Err(Error::NeitherPendingNorProgress) + } } } fn parse_item(&mut self, tsnow: Instant) -> Result, Error> { - if self.buf.len() < self.state.need_min() { - return Ok(None); - } match &self.state { CaState::StdHead => { let hi = HeadInfo::from_netbuf(&mut self.buf)?; - if hi.cmdid == 1 || hi.cmdid == 15 { - if hi.payload_size == 0xffff { - if hi.data_count != 0 { - warn!("protocol error: {hi:?}"); - return Err(Error::ExtendedHeaderBadCount); - } - } - if hi.payload_size == 0xffff { - } else if hi.payload_size > 16368 { - self.stats.payload_std_too_large().inc(); - } - } if hi.cmdid > 26 { // TODO count as logic error self.stats.protocol_issue().inc(); @@ -1191,7 +1195,6 @@ impl CaProto { self.state = CaState::ExtHead(hi); Ok(None) } else { - // For extended messages, ingest on receive of extended header self.stats.payload_size().ingest(hi.payload_len() as u32); if hi.payload_size == 0 { self.state = CaState::StdHead; @@ -1207,7 +1210,7 @@ impl CaProto { let payload_size = self.buf.read_u32_be()?; let data_count = self.buf.read_u32_be()?; self.stats.payload_size().ingest(hi.payload_len() as u32); - if payload_size > 1024 * 1024 * 32 { + if payload_size > PAYLOAD_LEN_MAX { self.stats.payload_ext_very_large().inc(); if false { warn!( @@ -1216,19 +1219,22 @@ impl CaProto { ); } } - if payload_size <= 16368 { + if payload_size <= 0x3ff0 { + // NOTE can happen even with zero payload, just because data-count exceeds u16. self.stats.payload_ext_but_small().inc(); - warn!( - "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", - hi.data_type - ); + if false { + warn!( + "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", + hi.data_type + ); + } } let hi = hi.clone().with_ext(payload_size, data_count); self.state = CaState::Payload(hi); Ok(None) } CaState::Payload(hi) => { - let g = self.buf.read_bytes(hi.payload_len())?; + let g = self.buf.read_bytes(hi.payload_len() as usize)?; let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?; // data-count is only reasonable for event messages if let CaMsgTy::EventAddRes(..) = &msg.ty { @@ -1257,7 +1263,10 @@ impl Stream for CaProto { match k { Ok(Ready(())) => continue, Ok(Pending) => Pending, - Err(e) => Ready(Some(Err(e))), + Err(e) => { + self.state = CaState::Done; + Ready(Some(Err(e))) + } } }; } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 4536f0f..271d4f5 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -93,7 +93,7 @@ impl CaIngestOpts { } pub fn array_truncate(&self) -> u64 { - self.array_truncate.unwrap_or(1024 * 64) + self.array_truncate.unwrap_or(1024 * 200) } pub fn insert_item_queue_cap(&self) -> usize { diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 066fed8..44d7937 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -17,6 +17,7 @@ use err::Error; use log::*; use netpod::timeunits::MS; use netpod::timeunits::SEC; +use netpod::ttl::RetentionTime; use smallvec::smallvec; use smallvec::SmallVec; use stats::InsertWorkerStats; @@ -91,6 +92,7 @@ pub struct InsertWorkerOpts { } pub async fn spawn_scylla_insert_workers( + rett: RetentionTime, scyconf: ScyllaIngestConfig, insert_scylla_sessions: usize, insert_worker_count: usize, @@ -108,7 +110,11 @@ pub async fn spawn_scylla_insert_workers( let mut jhs = Vec::new(); let mut data_stores = Vec::new(); for _ in 0..insert_scylla_sessions { - let data_store = Arc::new(DataStore::new(&scyconf).await.map_err(|e| Error::from(e.to_string()))?); + let data_store = Arc::new( + DataStore::new(&scyconf, rett.clone()) + .await + .map_err(|e| Error::from(e.to_string()))?, + ); data_stores.push(data_store); } for worker_ix in 0..insert_worker_count { @@ -387,41 +393,20 @@ fn prepare_query_insert_futs( let ts_msp = item.ts_msp; let do_insert = true; let mut futs = smallvec![]; - - // TODO - if true || item_ts_local & 0x3f00000 < 0x0600000 { - let fut = insert_item_fut(item, &data_store, do_insert, stats); - futs.push(fut); - if msp_bump { - stats.inserts_msp().inc(); - let fut = insert_msp_fut( - series, - ts_msp, - item_ts_local, - data_store.scy.clone(), - data_store.qu_insert_ts_msp.clone(), - stats.clone(), - ); - futs.push(fut); - } - } - - #[cfg(DISABLED)] - if let Some(ts_msp_grid) = item.ts_msp_grid { - let params = ( - (item.series.id() as i32) & 0xff, - ts_msp_grid as i32, - if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, - item.scalar_type.to_scylla_i32(), - item.series.id() as i64, + let fut = insert_item_fut(item, &data_store, do_insert, stats); + futs.push(fut); + if msp_bump { + stats.inserts_msp().inc(); + let fut = insert_msp_fut( + series, + ts_msp, + item_ts_local, + data_store.scy.clone(), + data_store.qu_insert_ts_msp.clone(), + stats.clone(), ); - data_store - .scy - .execute(&data_store.qu_insert_series_by_ts_msp, params) - .await?; - stats.inserts_msp_grid().inc(); + futs.push(fut); } - futs } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index bc01cb8..a90688f 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -632,10 +632,10 @@ pub fn insert_item_fut( match val { I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy), I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), - Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy), F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy), F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy), + Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy), Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy), } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 85b03a8..7b2ed1a 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -99,10 +99,12 @@ pub async fn create_table_ts_msp(table_name: &str, scy: &ScySession) -> Result<( Ok(()) } +#[allow(unused)] fn dhours(x: u64) -> Duration { Duration::from_secs(60 * 60 * x) } +#[allow(unused)] fn ddays(x: u64) -> Duration { Duration::from_secs(60 * 60 * 24 * x) } @@ -320,89 +322,6 @@ impl GenTwcsTab { } } -fn table_param_compaction(compaction_window_size: Duration) -> String { - table_param_compaction_twcs(compaction_window_size) -} - -#[allow(unused)] -fn table_param_compaction_stcs() -> String { - format!(concat!( - "{{ 'class': 'SizeTieredCompactionStrategy'", - // ", 'min_sstable_size': 200", - // ", 'max_threshold': 10", - " }}" - )) -} - -#[allow(unused)] -fn table_param_compaction_twcs(compaction_window_size: Duration) -> String { - format!( - concat!( - "{{ 'class': 'TimeWindowCompactionStrategy'", - ", 'compaction_window_unit': 'HOURS'", - ", 'compaction_window_size': {}", - " }}" - ), - compaction_window_size.as_secs() / 60 / 60 - ) -} - -struct EvTabDim0 { - pre: String, - sty: String, - cqlsty: String, - // SCYLLA_TTL_EVENTS_DIM0 - default_time_to_live: Duration, - // TWCS_WINDOW_0D - compaction_window_size: Duration, -} - -impl EvTabDim0 { - fn name(&self) -> String { - format!("{}events_scalar_{}", self.pre, self.sty) - } - - fn cql_create(&self) -> String { - use std::fmt::Write; - let ttl = self.default_time_to_live.as_secs(); - let compaction = table_param_compaction(self.compaction_window_size); - let mut s = String::new(); - write!(s, "create table {}", self.name()).unwrap(); - write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); - write!(s, " with default_time_to_live = {}", ttl).unwrap(); - write!(s, " and compaction = {}", compaction).unwrap(); - s - } -} - -struct EvTabDim1 { - pre: String, - sty: String, - cqlsty: String, - // SCYLLA_TTL_EVENTS_DIM1 - default_time_to_live: Duration, - // TWCS_WINDOW_1D - compaction_window_size: Duration, -} - -impl EvTabDim1 { - fn name(&self) -> String { - format!("{}events_array_{}", self.pre, self.sty) - } - - fn cql_create(&self) -> String { - use std::fmt::Write; - let mut s = String::new(); - let ttl = self.default_time_to_live.as_secs(); - let compaction = table_param_compaction(self.compaction_window_size); - write!(s, "create table {}", self.name()).unwrap(); - write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); - write!(s, " with default_time_to_live = {}", ttl).unwrap(); - write!(s, " and compaction = {}", compaction).unwrap(); - s - } -} - #[allow(unused)] async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result, Error> { let mut ret = Vec::new(); @@ -430,29 +349,39 @@ async fn check_event_tables(rett: RetentionTime, scy: &ScySession) -> Result<(), "text", ]; for (sty, cqlsty) in stys.into_iter().zip(cqlstys) { - let desc = EvTabDim0 { - pre: rett.table_prefix().into(), - sty: sty.into(), - cqlsty: cqlsty.into(), - // ttl is set in actual data inserts - default_time_to_live: dhours(1), - compaction_window_size: dhours(48), - }; - if !has_table(&desc.name(), scy).await? { - info!("scylla create table {}", desc.name()); - scy.query(desc.cql_create(), ()).await?; + { + let tab = GenTwcsTab::new( + rett.table_prefix(), + format!("events_scalar_{}", sty), + &[ + ("series", "bigint"), + ("ts_msp", "bigint"), + ("ts_lsp", "bigint"), + ("pulse", "bigint"), + ("value", cqlsty), + ], + ["series", "ts_msp"], + ["ts_lsp"], + rett.ttl_events_d0(), + ); + tab.setup(scy).await?; } - let desc = EvTabDim1 { - pre: rett.table_prefix().into(), - sty: sty.into(), - cqlsty: format!("frozen>", cqlsty), - // ttl is set in actual data inserts - default_time_to_live: dhours(1), - compaction_window_size: dhours(12), - }; - if !has_table(&desc.name(), scy).await? { - info!("scylla create table {}", desc.name()); - scy.query(desc.cql_create(), ()).await?; + { + let tab = GenTwcsTab::new( + rett.table_prefix(), + format!("events_array_{}", sty), + &[ + ("series", "bigint"), + ("ts_msp", "bigint"), + ("ts_lsp", "bigint"), + ("pulse", "bigint"), + ("value", &format!("frozen>", cqlsty)), + ], + ["series", "ts_msp"], + ["ts_lsp"], + rett.ttl_events_d1(), + ); + tab.setup(scy).await?; } } Ok(()) diff --git a/scywr/src/session.rs b/scywr/src/session.rs index cefb966..d6d7d34 100644 --- a/scywr/src/session.rs +++ b/scywr/src/session.rs @@ -33,6 +33,7 @@ pub async fn create_session_no_ks(scyconf: &ScyllaIngestConfig) -> Result, pub qu_insert_ts_msp: Arc, pub qu_insert_series_by_ts_msp: Arc, @@ -41,49 +43,54 @@ pub struct DataStore { pub qu_account_00: Arc, } +macro_rules! prep_qu_ins_a { + ($id1:expr, $rett:expr, $scy:expr) => {{ + let cql = format!( + concat!( + "insert into {}{} (series, ts_msp, ts_lsp, pulse, value)", + " values (?, ?, ?, ?, ?)" + ), + $rett.table_prefix(), + $id1 + ); + let q = $scy.prepare(cql).await?; + Arc::new(q) + }}; +} + impl DataStore { - pub async fn new(scyconf: &ScyllaIngestConfig) -> Result { + pub async fn new(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result { let scy = create_session(scyconf).await.map_err(|_| Error::NewSession)?; - let q = scy.prepare("insert into ts_msp (series, ts_msp) values (?, ?)").await?; + let q = scy + .prepare(format!( + concat!("insert into {}{} (series, ts_msp) values (?, ?)"), + rett.table_prefix(), + "ts_msp" + )) + .await?; let qu_insert_ts_msp = Arc::new(q); - let cql = "insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?)"; + let cql = format!( + concat!( + "insert into {}{}", + " (part, ts_msp, shape_kind, scalar_type, series)", + " values (?, ?, ?, ?, ?)" + ), + rett.table_prefix(), + "series_by_ts_msp" + ); let q = scy.prepare(cql).await?; let qu_insert_series_by_ts_msp = Arc::new(q); - // scalar: - let cql = "insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_scalar_i8 = Arc::new(q); - - let cql = "insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_scalar_i16 = Arc::new(q); - - let cql = "insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_scalar_i32 = Arc::new(q); - - let cql = "insert into events_scalar_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_scalar_i64 = Arc::new(q); - - let cql = "insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_scalar_f32 = Arc::new(q); - - let cql = "insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_scalar_f64 = Arc::new(q); - - let cql = "insert into events_scalar_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_scalar_bool = Arc::new(q); - - let cql = "insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; - let q = scy.prepare(cql).await?; - let qu_insert_scalar_string = Arc::new(q); + let qu_insert_scalar_i8 = prep_qu_ins_a!("events_scalar_i8", rett, scy); + let qu_insert_scalar_i16 = prep_qu_ins_a!("events_scalar_i16", rett, scy); + let qu_insert_scalar_i32 = prep_qu_ins_a!("events_scalar_i32", rett, scy); + let qu_insert_scalar_i64 = prep_qu_ins_a!("events_scalar_i64", rett, scy); + let qu_insert_scalar_f32 = prep_qu_ins_a!("events_scalar_f32", rett, scy); + let qu_insert_scalar_f64 = prep_qu_ins_a!("events_scalar_f64", rett, scy); + let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy); + let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy); // array let cql = "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; @@ -144,6 +151,7 @@ impl DataStore { let qu_account_00 = Arc::new(q); let ret = Self { + rett, scy, qu_insert_ts_msp, qu_insert_series_by_ts_msp,