Improve write to different retention times

This commit is contained in:
Dominik Werder
2024-05-09 13:33:09 +02:00
parent 4b2d648559
commit 0477504628
21 changed files with 968 additions and 393 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.0-aa.3"
version = "0.2.1-aa.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -89,7 +89,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
info!("daqingest version {}", clap::crate_version!());
info!("daqingest version {} +0001", clap::crate_version!());
let (conf, channels_config) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels_config).await?
}

View File

@@ -44,7 +44,6 @@ const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000);
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000);
const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000);
const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500);
const RUN_WITHOUT_SCYLLA: bool = true;
pub struct DaemonOpts {
pgconf: Database,
@@ -104,9 +103,11 @@ impl Daemon {
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
let wrest_stats = Arc::new(SeriesWriterEstablishStats::new());
let (writer_establis_tx,) =
serieswriter::writer::start_writer_establish_worker(channel_info_query_tx.clone(), wrest_stats.clone())
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (writer_establis_tx,) = serieswriter::establish_worker::start_writer_establish_worker(
channel_info_query_tx.clone(),
wrest_stats.clone(),
)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let local_epics_hostname = ingest_linux::net::local_hostname();
@@ -199,26 +200,71 @@ impl Daemon {
let mut insert_worker_jhs = Vec::new();
if RUN_WITHOUT_SCYLLA {
if ingest_opts.scylla_disable() {
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
iqrx.st_rf3_rx,
insert_worker_opts,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
)
.await?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
iqrx.mt_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
)
.await?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
iqrx.lt_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
)
.await?;
insert_worker_jhs.extend(jh);
} else {
let jh = scywr::insertworker::spawn_scylla_insert_workers(
// TODO does the worker actually need RETT? Yes, to use the correct table names.
RetentionTime::Short,
opts.scyconf_st.clone(),
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count(),
ingest_opts.insert_worker_concurrency(),
iqrx.st_rf3_rx.clone(),
insert_worker_opts,
iqrx.st_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
)
.await?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers(
RetentionTime::Medium,
opts.scyconf_mt.clone(),
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count().min(2),
ingest_opts.insert_worker_concurrency().min(8),
iqrx.mt_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
)
.await?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers(
RetentionTime::Long,
opts.scyconf_lt.clone(),
ingest_opts.insert_scylla_sessions(),
ingest_opts.insert_worker_count().min(2),
ingest_opts.insert_worker_concurrency().min(8),
iqrx.lt_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
)
@@ -629,15 +675,15 @@ impl Daemon {
}
}
}
info!("wait for metrics handler");
debug!("wait for metrics handler");
self.metrics_shutdown_tx.send(1).await?;
if let Some(jh) = self.metrics_jh.take() {
jh.await??;
}
info!("joined metrics handler");
info!("\n\n\n-----------------------\n\n\nwait for postingest task");
debug!("joined metrics handler");
debug!("wait for postingest task");
worker_jh.await?.map_err(|e| Error::from_string(e))?;
info!("joined postingest task");
debug!("joined postingest task");
Ok(())
}
}
@@ -669,10 +715,13 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
.await
.map_err(Error::from_string)?;
dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?;
drop(pg);
jh.await?.map_err(Error::from_string)?;
}
if RUN_WITHOUT_SCYLLA {
if opts.scylla_disable() {
warn!("scylla_disable config flag enabled");
} else {
info!("start scylla schema check");
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short)
.await
.map_err(Error::from_string)?;
@@ -682,6 +731,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long)
.await
.map_err(Error::from_string)?;
info!("stop scylla schema check");
}
info!("database check done");

View File

@@ -7,10 +7,10 @@ use log::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::ScalarValue;
use serieswriter::writer::SeriesWriter;
use std::collections::VecDeque;
use std::io::Cursor;
use std::net::Ipv4Addr;
use std::time::SystemTime;
@@ -36,7 +36,7 @@ pub async fn listen_beacons(
sock.set_broadcast(true).unwrap();
let mut buf = Vec::new();
buf.resize(1024 * 4, 0);
let mut iqdqs = InsertDeques::new();
let mut deque = VecDeque::new();
loop {
let bb = &mut buf;
let (n, remote) = taskrun::tokio::select! {
@@ -65,12 +65,12 @@ pub async fn listen_beacons(
let ts_local = ts;
let blob = addr_u32 as i64;
let val = DataValue::Scalar(ScalarValue::I64(blob));
writer.write(ts, ts_local, val, &mut iqdqs)?;
writer.write(ts, ts_local, val, &mut deque)?;
}
}
if iqdqs.len() != 0 {
if deque.len() != 0 {
// TODO deliver to insert queue
iqdqs.clear();
deque.clear();
}
}
Ok(())

View File

@@ -46,9 +46,9 @@ use scywriiq::ConnectionStatusItem;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::writer::EstablishWorkerJob;
use serieswriter::writer::JobId;
use serieswriter::writer::SeriesWriter;
use serieswriter::establish_worker::EstablishWorkerJob;
use serieswriter::establish_worker::JobId;
use serieswriter::rtwriter::RtWriter;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
@@ -72,11 +72,12 @@ use taskrun::tokio;
use tokio::net::TcpStream;
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000);
const IOC_PING_IVL: Duration = Duration::from_millis(80000);
const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 80);
const DO_RATE_CHECK: bool = false;
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(3000);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(3000);
const TIMEOUT_MONITOR_PASSIVE: Duration = Duration::from_millis(3000);
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(6000);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(8000);
const TIMEOUT_MONITOR_PASSIVE: Duration = Duration::from_millis(1000 * 68);
const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(10000);
#[allow(unused)]
macro_rules! trace2 {
@@ -90,7 +91,7 @@ macro_rules! trace2 {
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -120,7 +121,7 @@ pub enum Error {
ProtocolError,
IocIssue,
Protocol(#[from] crate::ca::proto::Error),
Writer(#[from] serieswriter::writer::Error),
RtWriter(#[from] serieswriter::rtwriter::Error),
// TODO remove false positive from ThisError derive
#[allow(private_interfaces)]
UnknownCid(Cid),
@@ -137,6 +138,7 @@ pub enum Error {
Error,
DurationOutOfBounds,
NoFreeCid,
InsertQueues(#[from] scywr::insertqueues::Error),
}
impl err::ToErr for Error {
@@ -324,7 +326,7 @@ enum PollTickState {
struct WritableState {
tsbeg: Instant,
channel: CreatedState,
writer: SeriesWriter,
writer: RtWriter,
reading: ReadingState,
}
@@ -344,7 +346,10 @@ struct CreatedState {
ca_dbr_type: u16,
ca_dbr_count: u32,
ts_created: Instant,
// Updated when we receive something via monitoring or polling
ts_alive_last: Instant,
// Updated on monitoring, polling or when the channel config changes to reset the timeout
ts_activity_last: Instant,
ts_msp_last: u64,
ts_msp_grid_last: u32,
inserted_in_ts_msp: u64,
@@ -374,6 +379,7 @@ impl CreatedState {
ca_dbr_count: 0,
ts_created: tsnow,
ts_alive_last: tsnow,
ts_activity_last: tsnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: 0,
@@ -417,6 +423,12 @@ struct ChannelConf {
state: ChannelState,
}
impl ChannelConf {
pub fn poll_conf(&self) -> Option<(u64,)> {
self.conf.poll_conf()
}
}
impl ChannelState {
fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4, conf: ChannelConfig) -> ChannelStateInfo {
let channel_connected_info = match self {
@@ -701,6 +713,18 @@ impl CaConnEvent {
value,
}
}
pub fn desc_short(&self) -> CaConnEventDescShort {
CaConnEventDescShort {}
}
}
pub struct CaConnEventDescShort {}
impl fmt::Display for CaConnEventDescShort {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "CaConnEventDescShort {{ TODO-impl }}")
}
}
#[derive(Debug)]
@@ -781,8 +805,8 @@ pub struct CaConn {
rng: Xoshiro128PlusPlus,
writer_establish_qu: VecDeque<EstablishWorkerJob>,
writer_establish_tx: Pin<Box<SenderPolling<EstablishWorkerJob>>>,
writer_tx: Sender<(JobId, Result<SeriesWriter, serieswriter::writer::Error>)>,
writer_rx: Pin<Box<Receiver<(JobId, Result<SeriesWriter, serieswriter::writer::Error>)>>>,
writer_tx: Sender<(JobId, Result<RtWriter, serieswriter::rtwriter::Error>)>,
writer_rx: Pin<Box<Receiver<(JobId, Result<RtWriter, serieswriter::rtwriter::Error>)>>>,
tmp_ts_poll: SystemTime,
poll_tsnow: Instant,
ioid: u32,
@@ -909,9 +933,10 @@ impl CaConn {
};
self.channel_state_on_shutdown(channel_reason);
let addr = self.remote_addr_dbg.clone();
self.iqdqs
.lt_rf3_rx
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
// TODO handle Err:
let _ = self
.iqdqs
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
// TODO map to appropriate status
@@ -990,7 +1015,7 @@ impl CaConn {
}
}
fn handle_writer_establish_inner(&mut self, cid: Cid, writer: SeriesWriter) -> Result<(), Error> {
fn handle_writer_establish_inner(&mut self, cid: Cid, writer: RtWriter) -> Result<(), Error> {
trace!("handle_writer_establish_inner {cid:?}");
// At this point we have created the channel and created a writer for that type and sid.
// We do not yet monitor.
@@ -999,6 +1024,8 @@ impl CaConn {
// Create a monitor for the channel.
// NOTE: must store the Writer even if not yet in Evented, we could also transition to Polled!
if let Some(conf) = self.channels.get_mut(&cid) {
// TODO refactor, should only execute this when required:
let conf_poll_conf = conf.poll_conf();
let chst = &mut conf.state;
if let ChannelState::MakingSeriesWriter(st2) = chst {
self.stats.get_series_id_ok.inc();
@@ -1008,21 +1035,21 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Opened,
});
self.iqdqs.lt_rf3_rx.push_back(item);
self.iqdqs.emit_status_item(item);
}
let name = conf.conf.name();
if name.starts_with("TEST:PEAKING:") {
if let Some((ivl,)) = conf_poll_conf {
let created_state = WritableState {
tsbeg: self.poll_tsnow,
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
// channel: st2.channel.clone(),
writer,
reading: ReadingState::Polling(PollingState {
tsbeg: self.poll_tsnow,
poll_ivl: Duration::from_millis(1000),
poll_ivl: Duration::from_millis(ivl),
tick: PollTickState::Idle(self.poll_tsnow),
}),
};
*chst = ChannelState::Writable(created_state);
conf.state = ChannelState::Writable(created_state);
Ok(())
} else {
let subid = {
@@ -1058,7 +1085,7 @@ impl CaConn {
subid,
}),
};
*chst = ChannelState::Writable(created_state);
conf.state = ChannelState::Writable(created_state);
Ok(())
}
} else {
@@ -1175,7 +1202,7 @@ impl CaConn {
cssid: cssid.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
});
self.iqdqs.lt_rf3_rx.push_back(item);
self.iqdqs.emit_status_item(item);
*chst = ChannelState::Ended(cssid);
}
ChannelState::Error(..) => {
@@ -1307,8 +1334,8 @@ impl CaConn {
ReadingState::Monitoring(x) => {
match x.mon2state {
// actually, no differing behavior needed so far.
Monitoring2State::Passive(_) => (),
Monitoring2State::ReadPending(ioid, since) => (),
Monitoring2State::Passive(_) => {}
Monitoring2State::ReadPending(ioid, since) => {}
}
Some(x.subid.clone())
}
@@ -1350,9 +1377,10 @@ impl CaConn {
Monitoring2State::Passive(st3) => {
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(ioid, since) => {
warn!("TODO we are waiting for a explicit caget, but received a monitor event");
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
Monitoring2State::ReadPending(_ioid, _since) => {
// Received EventAdd while still waiting for answer to explicit ReadNotify.
// This is fine.
self.stats.recv_event_add_while_wait_on_read_notify.inc();
}
}
let crst = &mut st.channel;
@@ -1377,9 +1405,11 @@ impl CaConn {
}
}
}
_ => {
// TODO count instead of print
error!("unexpected state: EventAddRes while having {ch_s:?}");
ChannelState::Creating(_) | ChannelState::Init(_) | ChannelState::MakingSeriesWriter(_) => {
self.stats.recv_read_notify_but_not_init_yet.inc();
}
ChannelState::Closing(_) | ChannelState::Ended(_) | ChannelState::Error(_) => {
self.stats.recv_read_notify_but_no_longer_ready.inc();
}
}
Ok(())
@@ -1479,7 +1509,7 @@ impl CaConn {
match &mut st.reading {
ReadingState::Polling(st2) => match &mut st2.tick {
PollTickState::Idle(_st3) => {
warn!("received ReadNotifyRes while in Wait state");
self.stats.recv_read_notify_while_polling_idle.inc();
}
PollTickState::Wait(st3, ioid) => {
let dt = tsnow.saturating_duration_since(*st3);
@@ -1492,21 +1522,26 @@ impl CaConn {
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::EnableMonitoring(..) => {
error!("TODO handle_read_notify_res handle EnableMonitoring");
ReadingState::EnableMonitoring(_) => {
self.stats.recv_read_notify_while_enabling_monitoring.inc();
}
ReadingState::Monitoring(st2) => match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
self.read_ioids.remove(&ioid);
if self.read_ioids.remove(&ioid).is_some() {
self.stats.recv_read_notify_state_passive_found_ioid.inc();
} else {
self.stats.recv_read_notify_state_passive.inc();
}
st3.tsbeg = tsnow;
error!("ReadNotifyRes even though we do not expect one");
}
Monitoring2State::ReadPending(ioid2, _since) => {
trace!("\nhandle_read_notify_res received ReadNotify in Monitoring2State::ReadPending\n\n");
// We don't check again for `since` here. That's done in timeout checking.
// So we could be here a little beyond timeout but we don't care about that.
if ioid != *ioid2 {
warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
// warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
self.stats.recv_read_notify_state_read_pending_bad_ioid.inc();
} else {
self.stats.recv_read_notify_state_read_pending.inc();
}
self.read_ioids.remove(&ioid);
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
@@ -1550,7 +1585,7 @@ impl CaConn {
payload_len: u32,
value: CaEventValue,
crst: &mut CreatedState,
writer: &mut SeriesWriter,
writer: &mut RtWriter,
iqdqs: &mut InsertDeques,
tsnow: Instant,
stnow: SystemTime,
@@ -1558,6 +1593,7 @@ impl CaConn {
) -> Result<(), Error> {
// debug!("event_add_ingest payload_len {} value {:?}", payload_len, value);
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
crst.recv_bytes += payload_len as u64;
@@ -1578,7 +1614,7 @@ impl CaConn {
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
}
Self::check_ev_value_data(&value.data, writer.scalar_type())?;
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
{
let val: DataValue = value.data.into();
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?;
@@ -1849,12 +1885,12 @@ impl CaConn {
Ok(())
}
fn check_channels_alive(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> {
trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg);
fn check_channels_alive(&mut self, tsnow: Instant, _cx: &mut Context) -> Result<(), Error> {
trace3!("check_channels_alive {}", self.remote_addr_dbg);
if let Some(started) = self.ioc_ping_start {
if started + Duration::from_millis(4000) < tsnow {
if started + TIMEOUT_PONG_WAIT < tsnow {
self.stats.pong_timeout().inc();
warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg);
warn!("pong timeout {}", self.remote_addr_dbg);
self.ioc_ping_start = None;
let item = CaConnEvent {
ts: tsnow,
@@ -1867,7 +1903,6 @@ impl CaConn {
if self.ioc_ping_next < tsnow {
if let Some(proto) = &mut self.proto {
self.stats.ping_start().inc();
info!("start ping");
self.ioc_ping_start = Some(tsnow);
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
proto.push_out(msg);
@@ -1889,8 +1924,8 @@ impl CaConn {
// TODO handle timeout check
}
ReadingState::Monitoring(st3) => match &st3.mon2state {
Monitoring2State::Passive(st4) => {}
Monitoring2State::ReadPending(_, tsbeg) => {
Monitoring2State::Passive(_st4) => {}
Monitoring2State::ReadPending(_, _) => {
// This is handled in check_channels_state_poll
// TODO should unify.
}
@@ -1898,14 +1933,14 @@ impl CaConn {
ReadingState::StopMonitoringForPolling(_) => {
// TODO handle timeout check
}
ReadingState::Polling(st3) => {
ReadingState::Polling(_st3) => {
// This is handled in check_channels_state_poll
// TODO should unify.
}
}
if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) {
warn!("TODO assume channel not alive because nothing received, but should do CAGET");
if st2.channel.ts_activity_last + conf.conf.expect_activity_within() < tsnow {
not_alive_count += 1;
self.stats.channel_not_alive_no_activity.inc();
} else {
alive_count += 1;
}
@@ -2082,6 +2117,7 @@ impl CaConn {
ca_dbr_count: k.data_count,
ts_created: tsnow,
ts_alive_last: tsnow,
ts_activity_last: tsnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
@@ -2104,8 +2140,10 @@ impl CaConn {
JobId(cid.0 as _),
self.backend.clone(),
conf.conf.name().into(),
cssid,
scalar_type,
shape,
conf.conf.min_quiets(),
self.writer_tx.clone(),
self.tmp_ts_poll,
);
@@ -2149,12 +2187,11 @@ impl CaConn {
self.stats.tcp_connected.inc();
let addr = addr.clone();
self.iqdqs
.lt_rf3_rx
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::Established,
}));
}))?;
self.backoff_reset();
let proto = CaProto::new(
tcp,
@@ -2167,29 +2204,27 @@ impl CaConn {
Ok(Ready(Some(())))
}
Ok(Err(e)) => {
debug!("error connect to {addr} {e}");
info!("error connect to {addr} {e}");
let addr = addr.clone();
self.iqdqs
.lt_rf3_rx
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
}));
}))?;
self.trigger_shutdown(ShutdownReason::IoError);
Ok(Ready(Some(())))
}
Err(e) => {
// TODO log with exponential backoff
debug!("timeout connect to {addr} {e}");
info!("timeout connect to {addr} {e}");
let addr = addr.clone();
self.iqdqs
.lt_rf3_rx
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
}));
}))?;
self.trigger_shutdown(ShutdownReason::IocTimeout);
Ok(Ready(Some(())))
}
@@ -2372,7 +2407,7 @@ impl CaConn {
count,
bytes,
});
self.iqdqs.lt_rf3_rx.push_back(item);
self.iqdqs.emit_status_item(item)?;
}
}
}
@@ -2454,7 +2489,10 @@ impl CaConn {
use scywr::senderpolling::Error as SpErr;
match e {
SpErr::NoSendInProgress => return Err(Error::NotSending),
SpErr::Closed(_) => return Err(Error::ClosedSending),
SpErr::Closed(_) => {
error!("{self_name} queue closed id {:10}", id);
return Err(Error::ClosedSending);
}
}
}
Pending => {
@@ -2471,6 +2509,11 @@ impl CaConn {
Ok(Ready(None))
}
}
fn log_queues_summary(&self) {
self.iqdqs.log_summary();
self.iqsp.log_summary();
}
}
// $have is tuple (have_progress, have_pending))
@@ -2592,6 +2635,7 @@ impl Stream for CaConn {
cx,
stats_fn
);
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
stats2.iiq_batch_len().ingest(item.len() as u32);
@@ -2607,6 +2651,7 @@ impl Stream for CaConn {
cx,
stats_fn
);
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
stats2.iiq_batch_len().ingest(item.len() as u32);
@@ -2622,6 +2667,22 @@ impl Stream for CaConn {
cx,
stats_fn
);
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue_dqs!(
self,
lt_rf3_rx,
lt_rf3_sp_pin,
send_batched::<256, _>,
32,
(&mut have_progress, &mut have_pending),
"lt_rf3_rx",
cx,
stats_fn
);
}
let lts3 = Instant::now();
@@ -2728,6 +2789,7 @@ impl Stream for CaConn {
continue;
} else if have_pending {
debug!("is_shutdown NOT queues_out_flushed pend {}", self.remote_addr_dbg);
self.log_queues_summary();
self.stats.poll_pending().inc();
Pending
} else {

View File

@@ -40,7 +40,6 @@ use scywr::iteminsertqueue::QueryItem;
use scywr::senderpolling::SenderPolling;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use serieswriter::writer::EstablishWorkerJob;
use statemap::ActiveChannelState;
use statemap::CaConnStateValue;
use statemap::ChannelState;
@@ -64,6 +63,7 @@ use std::pin::Pin;
use netpod::OnDrop;
use scywr::insertqueues::InsertQueuesTx;
use serieswriter::establish_worker::EstablishWorkerJob;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
@@ -1102,7 +1102,7 @@ impl CaConnSet {
) -> Result<EndOfStreamReason, Error> {
let mut eos_reason = None;
while let Some(item) = conn.next().await {
trace!("ca_conn_item_merge_inner item {item:?}");
trace!("ca_conn_item_merge_inner item {}", item.desc_short());
if let Some(x) = eos_reason {
let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}"));
error!("{e}");

View File

@@ -603,17 +603,17 @@ impl Stream for FindIocStream {
have_progress = true;
}
Ready(Err(e)) => {
error!("{e:?}");
error!("{e}");
}
Pending => {
g.clear_ready();
warn!("socket seemed ready for write, but is not");
// warn!("socket seemed ready for write, but is not");
have_progress = true;
}
},
Ready(Err(e)) => {
let e = Error::with_msg_no_trace(format!("{e:?}"));
error!("poll_write_ready {e:?}");
error!("poll_write_ready {e}");
let e = Error::from_string(e);
}
Pending => {}
}

View File

@@ -788,15 +788,7 @@ impl CaMsg {
};
CaMsg::from_ty_ts(CaMsgTy::Error(e), tsnow)
}
20 => {
let name = std::ffi::CString::new(payload)
.map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}")))
.unwrap_or_else(|e| format!("{e:?}"));
CaMsg::from_ty_ts(CaMsgTy::ClientNameRes(ClientNameRes { name }), tsnow)
}
// TODO make response type for host name:
21 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow),
6 => {
0x06 => {
if hi.payload_len() != 8 {
warn!("protocol error: search result is expected with fixed payload size 8");
}
@@ -815,29 +807,6 @@ impl CaMsg {
});
CaMsg::from_ty_ts(ty, tsnow)
}
18 => {
let ty = CaMsgTy::CreateChanRes(CreateChanRes {
data_type: hi.data_type,
// TODO what am I supposed to use here in case of extended header?
data_count: hi.data_count() as _,
cid: hi.param1,
sid: hi.param2,
});
CaMsg::from_ty_ts(ty, tsnow)
}
22 => {
// TODO use different structs for request and response:
let ty = CaMsgTy::AccessRightsRes(AccessRightsRes {
cid: hi.param1,
rights: hi.param2,
});
CaMsg::from_ty_ts(ty, tsnow)
}
26 => {
// TODO use different structs for request and response:
let ty = CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 });
CaMsg::from_ty_ts(ty, tsnow)
}
0x01 => {
if payload.len() < 12 {
if payload.len() == 0 {
@@ -897,7 +866,40 @@ impl CaMsg {
});
CaMsg::from_ty_ts(ty, tsnow)
}
0x11 => CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow),
0x12 => {
let ty = CaMsgTy::CreateChanRes(CreateChanRes {
data_type: hi.data_type,
// TODO what am I supposed to use here in case of extended header?
data_count: hi.data_count() as _,
cid: hi.param1,
sid: hi.param2,
});
CaMsg::from_ty_ts(ty, tsnow)
}
0x16 => {
let ty = CaMsgTy::AccessRightsRes(AccessRightsRes {
cid: hi.param1,
rights: hi.param2,
});
CaMsg::from_ty_ts(ty, tsnow)
}
0x17 => {
let ty = CaMsgTy::Echo;
CaMsg::from_ty_ts(ty, tsnow)
}
0x1a => {
// TODO use different structs for request and response:
let ty = CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 });
CaMsg::from_ty_ts(ty, tsnow)
}
0x14 => {
let name = std::ffi::CString::new(payload)
.map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}")))
.unwrap_or_else(|e| format!("{e:?}"));
CaMsg::from_ty_ts(CaMsgTy::ClientNameRes(ClientNameRes { name }), tsnow)
}
// TODO make response type for host name:
0x15 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow),
x => return Err(Error::CaCommandNotSupported(x)),
};
Ok(msg)

View File

@@ -5,6 +5,7 @@ use regex::Regex;
use scywr::config::ScyllaIngestConfig;
use serde::Deserialize;
use serde::Serialize;
use serieswriter::rtwriter::MinQuiets;
use std::collections::BTreeMap;
use std::path::Path;
use std::path::PathBuf;
@@ -39,6 +40,8 @@ pub struct CaIngestOpts {
insert_frac: Option<u64>,
use_rate_limit_queue: Option<bool>,
pub test_bsread_addr: Option<String>,
#[serde(default)]
scylla_disable: bool,
}
impl CaIngestOpts {
@@ -109,12 +112,16 @@ impl CaIngestOpts {
pub fn use_rate_limit_queue(&self) -> bool {
self.use_rate_limit_queue.unwrap_or(false)
}
pub fn scylla_disable(&self) -> bool {
self.scylla_disable
}
}
#[test]
fn parse_config_minimal() {
let conf = r###"
backend: scylla
backend: test_backend
timeout: 10m 3s 45ms
api_bind: "0.0.0.0:3011"
channels: /some/path/file.txt
@@ -127,7 +134,7 @@ postgresql:
user: USER
pass: PASS
name: NAME
scylla:
scylla_st:
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
@@ -525,4 +532,75 @@ impl ChannelConfig {
pub fn name(&self) -> &str {
&self.name
}
pub fn is_polled(&self) -> bool {
self.arch.is_polled
}
pub fn poll_conf(&self) -> Option<(u64,)> {
if self.is_polled() {
if let Some(ChannelReadConfig::Poll(x)) = self.arch.short_term {
Some((x.as_millis() as u64,))
} else if let Some(ChannelReadConfig::Poll(x)) = self.arch.medium_term {
Some((x.as_millis() as u64,))
} else if let Some(ChannelReadConfig::Poll(x)) = self.arch.long_term {
Some((x.as_millis() as u64,))
} else {
Some((60,))
}
} else {
None
}
}
/// Only used when in monitoring mode. If we do not see activity for this Duration then
/// we issue a manual read to see if the channel is alive.
pub fn manual_poll_on_quiet(&self) -> Duration {
Duration::from_secs(120)
}
pub fn expect_activity_within(&self) -> Duration {
let dur = if self.is_polled() {
// It would be anyway invalid to be polled and specify a monitor record policy.
match self.arch.short_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(),
None => match self.arch.medium_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(),
None => match self.arch.long_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet(),
None => {
// This is an invalid configuration, so just a fallback
self.manual_poll_on_quiet()
}
},
},
}
} else {
self.manual_poll_on_quiet()
};
dur + Duration::from_millis(1000 * 10)
}
pub fn min_quiets(&self) -> MinQuiets {
MinQuiets {
st: match self.arch.short_term {
Some(ChannelReadConfig::Monitor) => Duration::ZERO,
Some(ChannelReadConfig::Poll(x)) => x,
None => Duration::MAX,
},
mt: match self.arch.medium_term {
Some(ChannelReadConfig::Monitor) => Duration::ZERO,
Some(ChannelReadConfig::Poll(x)) => x,
None => Duration::MAX,
},
lt: match self.arch.long_term {
Some(ChannelReadConfig::Monitor) => Duration::ZERO,
Some(ChannelReadConfig::Poll(x)) => x,
None => Duration::MAX,
},
}
}
}

View File

@@ -55,10 +55,11 @@ pub async fn process_api_query_items(
#[allow(irrefutable_let_patterns)]
while let item = taskrun::tokio::time::timeout(Duration::from_millis(500), item_rx.recv()).await {
let deque = &mut iqdqs.st_rf3_rx;
let tsnow = Instant::now();
if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) {
sw_tick_last = tsnow;
tick_writers(mucache.all_ref_mut(), &mut iqdqs)?;
tick_writers(mucache.all_ref_mut(), deque)?;
}
let item = match item {
Ok(Ok(item)) => item,
@@ -83,23 +84,25 @@ pub async fn process_api_query_items(
stnow,
)
.await?;
sw.write(item.ts, item.ts, item.val, &mut iqdqs)?;
sw.write(item.ts, item.ts, item.val, deque)?;
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
}
finish_writers(mucache.all_ref_mut(), &mut iqdqs)?;
let deque = &mut iqdqs.st_rf3_rx;
finish_writers(mucache.all_ref_mut(), deque)?;
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
Ok(())
}
fn tick_writers(sws: Vec<&mut SeriesWriter>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(iqdqs)?;
sw.tick(deque)?;
}
Ok(())
}
fn finish_writers(sws: Vec<&mut SeriesWriter>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(iqdqs)?;
sw.tick(deque)?;
}
Ok(())
}

View File

@@ -2,10 +2,18 @@ use crate::iteminsertqueue::QueryItem;
use crate::senderpolling::SenderPolling;
use async_channel::Receiver;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use pin_project::pin_project;
use std::collections::VecDeque;
use std::pin::Pin;
#[derive(Debug, ThisError)]
pub enum Error {
QueuePush,
}
#[derive(Clone)]
pub struct InsertQueuesTx {
pub st_rf1_tx: Sender<VecDeque<QueryItem>>,
@@ -71,6 +79,32 @@ impl InsertDeques {
self.mt_rf3_rx.clear();
self.lt_rf3_rx.clear();
}
pub fn log_summary(&self) {
let summ = InsertDequesSummary {
st_rf1_len: self.st_rf1_rx.len(),
st_rf3_len: self.st_rf3_rx.len(),
mt_rf3_len: self.mt_rf3_rx.len(),
lt_rf3_len: self.lt_rf3_rx.len(),
};
info!("{summ:?}");
}
// Should be used only for connection and channel status items.
// It encapsulates the decision to which queue(s) we want to send these kind of items.
pub fn emit_status_item(&mut self, item: QueryItem) -> Result<(), Error> {
self.lt_rf3_rx.push_back(item);
Ok(())
}
}
#[derive(Debug)]
#[allow(unused)]
struct InsertDequesSummary {
st_rf1_len: usize,
st_rf3_len: usize,
mt_rf3_len: usize,
lt_rf3_len: usize,
}
#[pin_project]
@@ -81,6 +115,8 @@ pub struct InsertSenderPolling {
pub st_rf3_sp: SenderPolling<VecDeque<QueryItem>>,
#[pin]
pub mt_rf3_sp: SenderPolling<VecDeque<QueryItem>>,
#[pin]
pub lt_rf3_sp: SenderPolling<VecDeque<QueryItem>>,
}
impl InsertSenderPolling {
@@ -89,19 +125,15 @@ impl InsertSenderPolling {
st_rf1_sp: SenderPolling::new(iqtx.st_rf1_tx),
st_rf3_sp: SenderPolling::new(iqtx.st_rf3_tx),
mt_rf3_sp: SenderPolling::new(iqtx.mt_rf3_tx),
lt_rf3_sp: SenderPolling::new(iqtx.lt_rf3_tx),
}
}
pub fn is_idle(&self) -> bool {
self.st_rf1_sp.is_idle() && self.st_rf3_sp.is_idle() && self.mt_rf3_sp.is_idle()
self.st_rf1_sp.is_idle() && self.st_rf3_sp.is_idle() && self.mt_rf3_sp.is_idle() && self.lt_rf3_sp.is_idle()
}
pub fn st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
// unsafe {
// let this = self.get_unchecked_mut();
// let pp1 = &mut this.st_rf1_sp;
// Pin::new_unchecked(pp1)
// }
self.project().st_rf1_sp
}
@@ -112,4 +144,34 @@ impl InsertSenderPolling {
pub fn mt_rf3_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
self.project().mt_rf3_sp
}
pub fn lt_rf3_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
self.project().lt_rf3_sp
}
pub fn __st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling<VecDeque<QueryItem>>> {
if true {
panic!("encapsulated by pin_project");
}
unsafe { self.map_unchecked_mut(|x| &mut x.st_rf1_sp) }
}
pub fn log_summary(&self) {
let summ = InsertSenderPollingSummary {
st_rf1_idle: self.st_rf1_sp.is_idle(),
st_rf3_idle: self.st_rf3_sp.is_idle(),
mt_rf3_idle: self.mt_rf3_sp.is_idle(),
lt_rf3_idle: self.lt_rf3_sp.is_idle(),
};
info!("{summ:?}");
}
}
#[derive(Debug)]
#[allow(unused)]
struct InsertSenderPollingSummary {
st_rf1_idle: bool,
st_rf3_idle: bool,
mt_rf3_idle: bool,
lt_rf3_idle: bool,
}

View File

@@ -50,6 +50,15 @@ macro_rules! trace3 {
};
}
#[allow(unused)]
macro_rules! trace_item_execute {
($($arg:tt)*) => {
if true {
debug!($($arg)*);
}
};
}
fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::Error) {
use crate::iteminsertqueue::Error;
match err {
@@ -266,7 +275,10 @@ async fn worker_streamed(
.insert_workers_running
.fetch_add(1, atomic::Ordering::AcqRel);
let stream = item_inp;
let stream = inspect_items(stream);
let worker_name = data_store
.as_ref()
.map_or_else(|| format!("dummy"), |x| x.rett.debug_tag().to_string());
let stream = inspect_items(stream, worker_name.clone());
if let Some(data_store) = data_store {
let stream = transform_to_db_futures(stream, data_store, stats.clone());
let stream = stream
@@ -351,7 +363,10 @@ where
})
}
fn inspect_items(item_inp: Receiver<VecDeque<QueryItem>>) -> impl Stream<Item = VecDeque<QueryItem>> {
fn inspect_items(
item_inp: Receiver<VecDeque<QueryItem>>,
worker_name: String,
) -> impl Stream<Item = VecDeque<QueryItem>> {
trace!("transform_to_db_futures begin");
// TODO possible without box?
// let item_inp = Box::pin(item_inp);
@@ -359,19 +374,19 @@ fn inspect_items(item_inp: Receiver<VecDeque<QueryItem>>) -> impl Stream<Item =
for item in batch {
match &item {
QueryItem::ConnectionStatus(_) => {
trace2!("execute ConnectionStatus {item:?}");
trace_item_execute!("execute {worker_name} ConnectionStatus {item:?}");
}
QueryItem::ChannelStatus(_) => {
trace2!("execute ChannelStatus {item:?}");
trace_item_execute!("execute {worker_name} ChannelStatus {item:?}");
}
QueryItem::Insert(item) => {
trace3!("execute Insert {}", item.string_short());
trace_item_execute!("execute {worker_name} Insert {}", item.string_short());
}
QueryItem::TimeBinSimpleF32(_) => {
trace2!("execute TimeBinSimpleF32");
trace_item_execute!("execute {worker_name} TimeBinSimpleF32");
}
QueryItem::Accounting(_) => {
trace2!("execute Accounting {item:?}");
trace_item_execute!("execute {worker_name} Accounting {item:?}");
}
}
}

View File

@@ -7,6 +7,8 @@ use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
#[allow(unused)]
use netpod::log::*;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::Shape;
@@ -43,7 +45,7 @@ pub enum Error {
GetValHelpInnerTypeMismatch,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub enum ScalarValue {
I8(i8),
I16(i16),
@@ -86,7 +88,7 @@ impl ScalarValue {
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub enum ArrayValue {
I8(Vec<i8>),
I16(Vec<i16>),
@@ -207,7 +209,7 @@ impl ArrayValue {
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub enum DataValue {
Scalar(ScalarValue),
Array(ArrayValue),
@@ -620,6 +622,14 @@ impl InsertFut {
// let fut = StackFuture::from(fut);
Self { scy, qu, fut }
}
pub fn dummy(scy: Arc<ScySession>, qu: Arc<PreparedStatement>) -> Self {
Self {
scy,
qu,
fut: Box::pin(async { Err(QueryError::InvalidMessage("no longer used".into())) }),
}
}
}
impl Future for InsertFut {
@@ -834,6 +844,16 @@ pub fn insert_item_fut(
}
}
#[cfg(DISABLED)]
pub fn insert_connection_status_fut(
item: ConnectionStatusItem,
data_store: &DataStore,
stats: Arc<InsertWorkerStats>,
) -> InsertFut {
warn!("separate connection status table no longer used");
InsertFut::dummy(data_store.scy.clone(), data_store.qu_dummy.clone())
}
pub fn insert_connection_status_fut(
item: ConnectionStatusItem,
data_store: &DataStore,
@@ -855,6 +875,16 @@ pub fn insert_connection_status_fut(
)
}
#[cfg(DISABLED)]
pub fn insert_channel_status_fut(
item: ChannelStatusItem,
data_store: &DataStore,
stats: Arc<InsertWorkerStats>,
) -> SmallVec<[InsertFut; 4]> {
warn!("separate channel status table no longer used");
SmallVec::new()
}
pub fn insert_channel_status_fut(
item: ChannelStatusItem,
data_store: &DataStore,
@@ -884,6 +914,12 @@ pub fn insert_channel_status_fut(
smallvec![fut1, fut2]
}
#[cfg(DISABLED)]
pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &DataStore) -> Result<(), Error> {
warn!("separate connection status table no longer used");
Ok(())
}
pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &DataStore) -> Result<(), Error> {
let ts = TsMs::from_system_time(item.ts);
let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV);
@@ -897,6 +933,12 @@ pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &D
Ok(())
}
#[cfg(DISABLED)]
pub async fn insert_channel_status(item: ChannelStatusItem, data_store: &DataStore) -> Result<(), Error> {
warn!("separate channel status table no longer used");
Ok(())
}
pub async fn insert_channel_status(item: ChannelStatusItem, data_store: &DataStore) -> Result<(), Error> {
let ts = TsMs::from_system_time(item.ts);
let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV);

View File

@@ -80,26 +80,6 @@ pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result<bool,
}
}
pub async fn create_table_ts_msp(table_name: &str, scy: &ScySession) -> Result<(), Error> {
use std::fmt::Write;
// seconds:
let default_time_to_live = 60 * 60 * 5;
// hours:
let twcs_window_index = 24 * 4;
let mut s = String::new();
s.write_str("create table ")?;
s.write_str(table_name)?;
s.write_str(" (series bigint, ts_msp bigint, primary key (series, ts_msp))")?;
write!(s, " with default_time_to_live = {}", default_time_to_live)?;
s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy'")?;
s.write_str(", 'compaction_window_unit': 'HOURS'")?;
write!(s, ", 'compaction_window_size': {}", twcs_window_index)?;
s.write_str(" }")?;
eprintln!("create table cql {s}");
scy.query(s, ()).await?;
Ok(())
}
#[allow(unused)]
fn dhours(x: u64) -> Duration {
Duration::from_secs(60 * 60 * x)

View File

@@ -40,6 +40,7 @@ pub struct DataStore {
pub qu_insert_channel_status_by_ts_msp: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v02: Arc<PreparedStatement>,
pub qu_account_00: Arc<PreparedStatement>,
pub qu_dummy: Arc<PreparedStatement>,
}
macro_rules! prep_qu_ins_a {
@@ -57,6 +58,35 @@ macro_rules! prep_qu_ins_a {
}};
}
macro_rules! prep_qu_ins_b {
($id1:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!(
"insert into {}{} (series, ts_msp, ts_lsp, pulse, valueblob)",
" values (?, ?, ?, ?, ?)"
),
$rett.table_prefix(),
$id1
);
let q = $scy.prepare(cql).await?;
Arc::new(q)
}};
}
macro_rules! prep_qu_ins_c {
($id1:expr, $fields:expr, $values:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!("insert into {}{} ({})", " values ({})"),
$rett.table_prefix(),
$id1,
$fields,
$values,
);
let q = $scy.prepare(cql).await?;
Arc::new(q)
}};
}
impl DataStore {
pub async fn new(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result<Self, Error> {
let scy = create_session(scyconf).await.map_err(|_| Error::NewSession)?;
@@ -80,62 +110,63 @@ impl DataStore {
let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy);
// array
let cql = "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_array_i8 = Arc::new(q);
let cql = "insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_array_i16 = Arc::new(q);
let cql = "insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_array_i32 = Arc::new(q);
let cql = "insert into events_array_i64 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_array_i64 = Arc::new(q);
let cql = "insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_array_f32 = Arc::new(q);
let cql = "insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_array_f64 = Arc::new(q);
let cql = "insert into events_array_bool (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_array_bool = Arc::new(q);
let qu_insert_array_i8 = prep_qu_ins_b!("events_array_i8", rett, scy);
let qu_insert_array_i16 = prep_qu_ins_b!("events_array_i16", rett, scy);
let qu_insert_array_i32 = prep_qu_ins_b!("events_array_i32", rett, scy);
let qu_insert_array_i64 = prep_qu_ins_b!("events_array_i64", rett, scy);
let qu_insert_array_f32 = prep_qu_ins_b!("events_array_f32", rett, scy);
let qu_insert_array_f64 = prep_qu_ins_b!("events_array_f64", rett, scy);
let qu_insert_array_bool = prep_qu_ins_b!("events_array_bool", rett, scy);
// Connection status:
let cql = "insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_connection_status = Arc::new(q);
let cql = "insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_channel_status = Arc::new(q);
let cql = "insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?)";
let q = scy.prepare(cql).await?;
let qu_insert_channel_status_by_ts_msp = Arc::new(q);
let cql = concat!(
"insert into binned_scalar_f32 (",
"series, bin_len_ms, ts_msp, off, count, min, max, avg)",
" values (?, ?, ?, ?, ?, ?, ?, ?)"
let qu_insert_connection_status = prep_qu_ins_c!(
"connection_status",
"ts_msp, ts_lsp, kind, addr",
"?, ?, ?, ?",
rett,
scy
);
let q = scy.prepare(cql).await?;
let qu_insert_binned_scalar_f32_v02 = Arc::new(q);
let cql = concat!(
"insert into account_00",
" (part, ts, series, count, bytes)",
" values (?, ?, ?, ?, ?)"
let qu_insert_channel_status = prep_qu_ins_c!(
"channel_status",
"series, ts_msp, ts_lsp, kind",
"?, ?, ?, ?",
rett,
scy
);
let q = scy.prepare(cql).await?;
let qu_account_00 = Arc::new(q);
let qu_insert_channel_status_by_ts_msp = prep_qu_ins_c!(
"channel_status_by_ts_msp",
"ts_msp, ts_lsp, series, kind",
"?, ?, ?, ?",
rett,
scy
);
let qu_insert_binned_scalar_f32_v02 = prep_qu_ins_c!(
"binned_scalar_f32",
"series, bin_len_ms, ts_msp, off, count, min, max, avg",
"?, ?, ?, ?, ?, ?, ?, ?",
rett,
scy
);
let qu_account_00 = prep_qu_ins_c!(
"account_00",
"part, ts, series, count, bytes",
"?, ?, ?, ?, ?",
rett,
scy
);
let q = scy
.prepare(format!(
concat!("select * from {}{} limit 1"),
rett.table_prefix(),
"ts_msp"
))
.await?;
let qu_dummy = Arc::new(q);
let ret = Self {
rett,
@@ -161,6 +192,7 @@ impl DataStore {
qu_insert_channel_status_by_ts_msp,
qu_insert_binned_scalar_f32_v02,
qu_account_00,
qu_dummy,
};
Ok(ret)
}

View File

@@ -0,0 +1,193 @@
use crate::rtwriter::MinQuiets;
use crate::rtwriter::RtWriter;
use crate::writer::SeriesWriter;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use futures_util::future;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::SeriesWriterEstablishStats;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
pub enum Error {
Postgres(#[from] dbpg::err::Error),
PostgresSchema(#[from] dbpg::schema::Error),
ScyllaSession(#[from] scywr::session::Error),
ScyllaSchema(#[from] scywr::schema::Error),
SeriesWriter(#[from] crate::writer::Error),
SeriesByChannel(#[from] dbpg::seriesbychannel::Error),
}
pub struct JobId(pub u64);
pub struct EstablishWriterWorker {
worker_tx: Sender<ChannelInfoQuery>,
jobrx: Receiver<EstablishWorkerJob>,
stats: Arc<SeriesWriterEstablishStats>,
}
impl EstablishWriterWorker {
fn new(
worker_tx: Sender<ChannelInfoQuery>,
jobrx: Receiver<EstablishWorkerJob>,
stats: Arc<SeriesWriterEstablishStats>,
) -> Self {
Self {
worker_tx,
jobrx,
stats,
}
}
async fn work(self) {
let cnt = Arc::new(AtomicU64::new(0));
taskrun::spawn({
let cnt = cnt.clone();
async move {
if true {
return Ok::<_, Error>(());
}
loop {
taskrun::tokio::time::sleep(Duration::from_millis(10000)).await;
debug!("EstablishWriterWorker cnt {}", cnt.load(atomic::Ordering::SeqCst));
}
Ok::<_, Error>(())
}
});
self.jobrx
.map(move |item| {
let wtx = self.worker_tx.clone();
let cnt = cnt.clone();
let stats = self.stats.clone();
async move {
let res = RtWriter::new(
wtx.clone(),
item.cssid,
item.backend,
item.channel,
item.scalar_type,
item.shape,
item.min_quiets,
item.tsnow,
)
.await;
cnt.fetch_add(1, atomic::Ordering::SeqCst);
if item.restx.send((item.job_id, res)).await.is_err() {
stats.result_send_fail().inc();
trace!("can not send writer establish result");
}
}
})
.buffer_unordered(512)
.for_each(|_| future::ready(()))
.await;
}
}
pub struct EstablishWorkerJob {
job_id: JobId,
backend: String,
channel: String,
cssid: ChannelStatusSeriesId,
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
restx: Sender<(JobId, Result<RtWriter, crate::rtwriter::Error>)>,
tsnow: SystemTime,
}
impl EstablishWorkerJob {
pub fn new(
job_id: JobId,
backend: String,
channel: String,
cssid: ChannelStatusSeriesId,
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
restx: Sender<(JobId, Result<RtWriter, crate::rtwriter::Error>)>,
tsnow: SystemTime,
) -> Self {
Self {
job_id,
backend,
channel,
cssid,
scalar_type,
shape,
min_quiets,
restx,
tsnow,
}
}
}
pub fn start_writer_establish_worker(
worker_tx: Sender<ChannelInfoQuery>,
stats: Arc<SeriesWriterEstablishStats>,
) -> Result<(Sender<EstablishWorkerJob>,), Error> {
let (tx, rx) = async_channel::bounded(256);
let worker = EstablishWriterWorker::new(worker_tx, rx, stats);
taskrun::spawn(worker.work());
Ok((tx,))
}
#[test]
fn write_00() {
use netpod::Database;
use scywr::config::ScyllaIngestConfig;
use stats::SeriesByChannelStats;
use std::sync::Arc;
let fut = async {
let dbconf = &Database {
name: "daqbuffer".into(),
host: "localhost".into(),
port: 5432,
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
};
let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00_rf3", "daqingest_test_00_rf1");
let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?;
dbpg::schema::schema_check(&pgc).await?;
scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?;
let scy = scywr::session::create_session(scyconf).await?;
let stats = SeriesByChannelStats::new();
let stats = Arc::new(stats);
let (tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers::<dbpg::seriesbychannel::SalterRandom>(1, dbconf, stats)
.await?;
let backend = "bck-test-00";
let channel = "chn-test-00";
let scalar_type = ScalarType::I16;
let shape = Shape::Scalar;
let tsnow = SystemTime::now();
let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?;
eprintln!("{writer:?}");
let mut iqdqs = InsertDeques::new();
for i in 0..10 {
let ts = TsNano::from_ns(HOUR * 24 + SEC * i);
let ts_local = ts.clone();
let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _));
writer.write(ts, ts_local, val, &mut iqdqs.st_rf3_rx)?;
}
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}

View File

@@ -1,3 +1,5 @@
pub mod establish_worker;
pub mod patchcollect;
pub mod rtwriter;
pub mod timebin;
pub mod writer;

View File

@@ -0,0 +1,186 @@
use crate::writer::SeriesWriter;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::time::Duration;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
pub enum Error {
SeriesLookupError,
SeriesWriter(#[from] crate::writer::Error),
}
#[derive(Debug)]
pub struct MinQuiets {
pub st: Duration,
pub mt: Duration,
pub lt: Duration,
}
#[derive(Debug)]
pub struct RtWriter {
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
state_st: State,
state_mt: State,
state_lt: State,
min_quiets: MinQuiets,
}
impl RtWriter {
pub async fn new(
channel_info_tx: Sender<ChannelInfoQuery>,
cssid: ChannelStatusSeriesId,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
stnow: SystemTime,
) -> Result<Self, Error> {
let sid = {
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
backend,
channel,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
tx: Box::pin(tx),
};
channel_info_tx.send(item).await.map_err(|_| Error::SeriesLookupError)?;
let res = rx
.recv()
.await
.map_err(|_| Error::SeriesLookupError)?
.map_err(|_| Error::SeriesLookupError)?;
res.series.to_series()
};
let state_st = {
let writer =
SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?;
State { writer, last_ins: None }
};
let state_mt = {
let writer =
SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?;
State { writer, last_ins: None }
};
let state_lt = {
let writer =
SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?;
State { writer, last_ins: None }
};
let ret = Self {
sid,
scalar_type,
shape,
state_st,
state_mt,
state_lt,
min_quiets,
};
Ok(ret)
}
pub fn sid(&self) -> SeriesId {
self.sid.clone()
}
pub fn scalar_type(&self) -> ScalarType {
self.scalar_type.clone()
}
pub fn shape(&self) -> Shape {
self.shape.clone()
}
pub fn write(
&mut self,
ts: TsNano,
ts_local: TsNano,
val: DataValue,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
// Decide whether we want to write.
{
let min_quiet = self.min_quiets.st;
let deque = &mut iqdqs.st_rf3_rx;
if self.state_st.last_ins.as_ref().map_or(true, |x| {
if x.0 >= ts_local {
// bad clock, ignore.
// TODO count in stats.
false
} else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() {
false
} else {
val != x.1
}
}) {
self.state_st.last_ins = Some((ts, val.clone()));
self.state_st.writer.write(ts, ts_local, val.clone(), deque)?;
}
}
{
let min_quiet = self.min_quiets.mt;
let deque = &mut iqdqs.mt_rf3_rx;
if self.state_mt.last_ins.as_ref().map_or(true, |x| {
if x.0 >= ts_local {
// bad clock, ignore.
// TODO count in stats.
false
} else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() {
false
} else {
val != x.1
}
}) {
self.state_mt.last_ins = Some((ts, val.clone()));
self.state_mt.writer.write(ts, ts_local, val.clone(), deque)?;
}
}
{
let min_quiet = self.min_quiets.lt;
let deque = &mut iqdqs.lt_rf3_rx;
if self.state_lt.last_ins.as_ref().map_or(true, |x| {
if x.0 >= ts_local {
// bad clock, ignore.
// TODO count in stats.
false
} else if ts_local.ms() - x.0.ms() < 1000 * min_quiet.as_secs() {
false
} else {
val != x.1
}
}) {
self.state_lt.last_ins = Some((ts, val.clone()));
self.state_lt.writer.write(ts, ts_local, val.clone(), deque)?;
}
}
Ok(())
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
self.state_st.writer.tick(&mut iqdqs.st_rf3_rx)?;
self.state_mt.writer.tick(&mut iqdqs.mt_rf3_rx)?;
self.state_lt.writer.tick(&mut iqdqs.lt_rf3_rx)?;
Ok(())
}
}
#[derive(Debug)]
struct State {
writer: SeriesWriter,
last_ins: Option<(TsNano, DataValue)>,
}

View File

@@ -388,8 +388,8 @@ fn store_bins(
// TODO check which RT we want to push into
iqdqs.st_rf3_rx.push_back(item.clone());
iqdqs.mt_rf3_rx.push_back(item.clone());
iqdqs.lt_rf3_rx.push_back(item);
// iqdqs.mt_rf3_rx.push_back(item.clone());
// iqdqs.lt_rf3_rx.push_back(item);
}
}
Ok(())

View File

@@ -1,12 +1,8 @@
use crate::timebin::ConnTimeBin;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use futures_util::future;
use futures_util::StreamExt;
use log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::ScalarType;
@@ -19,12 +15,7 @@ use scywr::iteminsertqueue::InsertItem;
use scywr::iteminsertqueue::QueryItem;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::SeriesWriterEstablishStats;
use std::collections::VecDeque;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
@@ -42,12 +33,13 @@ pub enum Error {
}
impl<T> From<async_channel::SendError<T>> for Error {
fn from(value: async_channel::SendError<T>) -> Self {
fn from(_value: async_channel::SendError<T>) -> Self {
Error::ChannelSendError
}
}
impl From<async_channel::RecvError> for Error {
fn from(value: async_channel::RecvError) -> Self {
fn from(_value: async_channel::RecvError) -> Self {
Error::ChannelRecvError
}
}
@@ -65,8 +57,7 @@ pub struct SeriesWriter {
msp_max_bytes: u32,
// TODO this should be in an Option:
ts_msp_grid_last: u32,
binner: ConnTimeBin,
written_last: Option<DataValue>,
binner: Option<ConnTimeBin>,
}
impl SeriesWriter {
@@ -94,13 +85,13 @@ impl SeriesWriter {
}
pub async fn establish_with_cssid(
worker_tx: Sender<ChannelInfoQuery>,
channel_info_tx: Sender<ChannelInfoQuery>,
cssid: ChannelStatusSeriesId,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
tsnow: SystemTime,
stnow: SystemTime,
) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
@@ -111,11 +102,23 @@ impl SeriesWriter {
shape: shape.clone(),
tx: Box::pin(tx),
};
worker_tx.send(item).await?;
channel_info_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series.to_series();
Self::establish_with_cssid_sid(cssid, sid, scalar_type, shape, stnow).await
}
pub async fn establish_with_cssid_sid(
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
stnow: SystemTime,
) -> Result<Self, Error> {
let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10));
binner.setup_for(&scalar_type, &shape, tsnow)?;
binner.setup_for(&scalar_type, &shape, stnow)?;
let _ = binner;
let binner = None;
let res = Self {
cssid,
sid,
@@ -128,7 +131,6 @@ impl SeriesWriter {
msp_max_bytes: 1024 * 1024 * 20,
ts_msp_grid_last: 0,
binner,
written_last: None,
};
Ok(res)
}
@@ -150,10 +152,12 @@ impl SeriesWriter {
ts: TsNano,
ts_local: TsNano,
val: DataValue,
iqdqs: &mut InsertDeques,
deque: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
// TODO compute the binned data here as well and flush completed bins if needed.
self.binner.push(ts.clone(), &val)?;
if let Some(binner) = self.binner.as_mut() {
binner.push(ts.clone(), &val)?;
}
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
@@ -203,161 +207,15 @@ impl SeriesWriter {
ts_local: ts_local.to_ts_ms(),
};
// TODO decide on the path in the new deques struct
iqdqs.st_rf3_rx.push_back(QueryItem::Insert(item));
deque.push_back(QueryItem::Insert(item));
Ok(())
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
self.binner.tick(iqdqs)?;
pub fn tick(&mut self, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
if let Some(binner) = self.binner.as_mut() {
// TODO
//binner.tick(deque)?;
}
Ok(())
}
}
pub struct JobId(pub u64);
pub struct EstablishWriterWorker {
worker_tx: Sender<ChannelInfoQuery>,
jobrx: Receiver<EstablishWorkerJob>,
stats: Arc<SeriesWriterEstablishStats>,
}
impl EstablishWriterWorker {
fn new(
worker_tx: Sender<ChannelInfoQuery>,
jobrx: Receiver<EstablishWorkerJob>,
stats: Arc<SeriesWriterEstablishStats>,
) -> Self {
Self {
worker_tx,
jobrx,
stats,
}
}
async fn work(self) {
let cnt = Arc::new(AtomicU64::new(0));
taskrun::spawn({
let cnt = cnt.clone();
async move {
if true {
return Ok::<_, Error>(());
}
loop {
taskrun::tokio::time::sleep(Duration::from_millis(10000)).await;
debug!("EstablishWriterWorker cnt {}", cnt.load(atomic::Ordering::SeqCst));
}
Ok::<_, Error>(())
}
});
self.jobrx
.map(move |item| {
let wtx = self.worker_tx.clone();
let cnt = cnt.clone();
let stats = self.stats.clone();
async move {
let res = SeriesWriter::establish(
wtx.clone(),
item.backend,
item.channel,
item.scalar_type,
item.shape,
item.tsnow,
)
.await;
cnt.fetch_add(1, atomic::Ordering::SeqCst);
if item.restx.send((item.job_id, res)).await.is_err() {
stats.result_send_fail().inc();
trace!("can not send writer establish result");
}
}
})
.buffer_unordered(512)
.for_each(|_| future::ready(()))
.await;
}
}
pub struct EstablishWorkerJob {
job_id: JobId,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
restx: Sender<(JobId, Result<SeriesWriter, Error>)>,
tsnow: SystemTime,
}
impl EstablishWorkerJob {
pub fn new(
job_id: JobId,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
restx: Sender<(JobId, Result<SeriesWriter, Error>)>,
tsnow: SystemTime,
) -> Self {
Self {
job_id,
backend,
channel,
scalar_type,
shape,
restx,
tsnow,
}
}
}
pub fn start_writer_establish_worker(
worker_tx: Sender<ChannelInfoQuery>,
stats: Arc<SeriesWriterEstablishStats>,
) -> Result<(Sender<EstablishWorkerJob>,), Error> {
let (tx, rx) = async_channel::bounded(256);
let worker = EstablishWriterWorker::new(worker_tx, rx, stats);
taskrun::spawn(worker.work());
Ok((tx,))
}
#[test]
fn write_00() {
use netpod::Database;
use scywr::config::ScyllaIngestConfig;
use stats::SeriesByChannelStats;
use std::sync::Arc;
let fut = async {
let dbconf = &Database {
name: "daqbuffer".into(),
host: "localhost".into(),
port: 5432,
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
};
let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00_rf3", "daqingest_test_00_rf1");
let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?;
dbpg::schema::schema_check(&pgc).await?;
scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?;
let scy = scywr::session::create_session(scyconf).await?;
let stats = SeriesByChannelStats::new();
let stats = Arc::new(stats);
let (tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers::<dbpg::seriesbychannel::SalterRandom>(1, dbconf, stats)
.await?;
let backend = "bck-test-00";
let channel = "chn-test-00";
let scalar_type = ScalarType::I16;
let shape = Shape::Scalar;
let tsnow = SystemTime::now();
let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?;
eprintln!("{writer:?}");
let mut iqdqs = InsertDeques::new();
for i in 0..10 {
let ts = TsNano::from_ns(HOUR * 24 + SEC * i);
let ts_local = ts.clone();
let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _));
writer.write(ts, ts_local, val, &mut iqdqs)?;
}
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}

View File

@@ -347,6 +347,16 @@ stats_proc::stats_struct!((
transition_to_polling_already_in,
transition_to_polling_bad_state,
channel_add_exists,
recv_event_add_while_wait_on_read_notify,
recv_read_notify_state_passive_found_ioid,
recv_read_notify_state_passive,
recv_read_notify_state_read_pending_bad_ioid,
recv_read_notify_state_read_pending,
recv_read_notify_but_not_init_yet,
recv_read_notify_but_no_longer_ready,
recv_read_notify_while_enabling_monitoring,
recv_read_notify_while_polling_idle,
channel_not_alive_no_activity,
),
values(inter_ivl_ema, read_ioids_len, proto_out_len,),
histolog2s(