Configure replication factor 1 keyspace
This commit is contained in:
@@ -371,9 +371,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.31"
|
||||
version = "4.5.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767"
|
||||
checksum = "6088f3ae8c3608d19260cd7445411865a485688711b78b5be70d78cd96136f83"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
@@ -381,9 +381,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.5.31"
|
||||
version = "4.5.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863"
|
||||
checksum = "22a7ef7f676155edfb82daa97f99441f3ebf4a58d5e32f295a56259f1b6facc8"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
@@ -393,9 +393,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "4.5.28"
|
||||
version = "4.5.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed"
|
||||
checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
@@ -713,7 +713,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "daqingest"
|
||||
version = "0.2.7-aa.6"
|
||||
version = "0.2.7-aa.7"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"autoerr",
|
||||
@@ -1152,9 +1152,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "1.2.0"
|
||||
version = "1.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea"
|
||||
checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@@ -1173,12 +1173,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "http-body-util"
|
||||
version = "0.1.2"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
|
||||
checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"futures-core",
|
||||
"http",
|
||||
"http-body",
|
||||
"pin-project-lite",
|
||||
@@ -1198,9 +1198,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.1.0"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
||||
checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f"
|
||||
|
||||
[[package]]
|
||||
name = "humantime-serde"
|
||||
@@ -1417,9 +1417,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.7.1"
|
||||
version = "2.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652"
|
||||
checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"hashbrown 0.15.2",
|
||||
@@ -1492,9 +1492,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.170"
|
||||
version = "0.2.171"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828"
|
||||
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
|
||||
|
||||
[[package]]
|
||||
name = "litemap"
|
||||
@@ -1695,9 +1695,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.20.3"
|
||||
version = "1.21.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e"
|
||||
checksum = "cde51589ab56b20a6f686b2c68f7a0bd6add753d697abf720d63f8db3ab7b1ad"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
@@ -1849,11 +1849,11 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.20"
|
||||
version = "0.2.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
|
||||
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
|
||||
dependencies = [
|
||||
"zerocopy 0.7.35",
|
||||
"zerocopy 0.8.23",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1867,9 +1867,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.39"
|
||||
version = "1.0.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801"
|
||||
checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
@@ -2135,9 +2135,9 @@ checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.218"
|
||||
version = "1.0.219"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60"
|
||||
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
@@ -2154,9 +2154,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.218"
|
||||
version = "1.0.219"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b"
|
||||
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -2391,9 +2391,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.99"
|
||||
version = "2.0.100"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2"
|
||||
checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -2538,9 +2538,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.43.0"
|
||||
version = "1.44.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
|
||||
checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
@@ -3102,7 +3102,6 @@ version = "0.7.35"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"zerocopy-derive 0.7.35",
|
||||
]
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "daqingest"
|
||||
version = "0.2.7-aa.6"
|
||||
version = "0.2.7-aa.7"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2024"
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
|
||||
pass: k.pg_pass,
|
||||
name: k.pg_name,
|
||||
};
|
||||
let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace, "DUMMY");
|
||||
let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace, 3);
|
||||
// scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short)
|
||||
// .await
|
||||
// .map_err(Error::from_string)?;
|
||||
@@ -170,6 +170,7 @@ async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(),
|
||||
opts.scylla_config_st(),
|
||||
opts.scylla_config_mt(),
|
||||
opts.scylla_config_lt(),
|
||||
opts.scylla_config_st_rf1(),
|
||||
],
|
||||
do_change,
|
||||
)
|
||||
|
||||
@@ -807,6 +807,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
|
||||
opts.scylla_config_st(),
|
||||
opts.scylla_config_mt(),
|
||||
opts.scylla_config_lt(),
|
||||
opts.scylla_config_st_rf1(),
|
||||
],
|
||||
false,
|
||||
)
|
||||
|
||||
@@ -1320,6 +1320,7 @@ impl CaConn {
|
||||
shape,
|
||||
ch.conf.min_quiets(),
|
||||
ch.conf.is_polled(),
|
||||
ch.conf.replication(),
|
||||
&|| CaWriterValueState::new(st.series_status, chinfo.series.to_series()),
|
||||
)?;
|
||||
self.handle_writer_establish_inner(cid, writer)?;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::Database;
|
||||
use netpod::log::*;
|
||||
use regex::Regex;
|
||||
use scywr::config::ScyllaIngestConfig;
|
||||
use serde::Deserialize;
|
||||
@@ -32,6 +32,7 @@ pub struct CaIngestOpts {
|
||||
scylla_st: ScyllaIngestConfig,
|
||||
scylla_mt: ScyllaIngestConfig,
|
||||
scylla_lt: ScyllaIngestConfig,
|
||||
scylla_st_rf1: ScyllaIngestConfig,
|
||||
array_truncate: Option<u64>,
|
||||
insert_worker_count: Option<usize>,
|
||||
insert_worker_concurrency: Option<usize>,
|
||||
@@ -80,6 +81,10 @@ impl CaIngestOpts {
|
||||
&self.scylla_lt
|
||||
}
|
||||
|
||||
pub fn scylla_config_st_rf1(&self) -> &ScyllaIngestConfig {
|
||||
&self.scylla_st_rf1
|
||||
}
|
||||
|
||||
pub fn search(&self) -> &Vec<String> {
|
||||
&self.search
|
||||
}
|
||||
@@ -358,11 +363,11 @@ fn bool_true() -> bool {
|
||||
mod serde_ingest_config_archiving {
|
||||
use super::ChannelReadConfigApiFormat;
|
||||
use super::IngestConfigArchiving;
|
||||
use serde::Deserializer;
|
||||
use serde::Serializer;
|
||||
use serde::de;
|
||||
use serde::ser;
|
||||
use serde::ser::SerializeMap;
|
||||
use serde::Deserializer;
|
||||
use serde::Serializer;
|
||||
use std::fmt;
|
||||
|
||||
impl ser::Serialize for IngestConfigArchiving {
|
||||
@@ -421,9 +426,9 @@ mod serde_ChannelReadConfigApiFormat {
|
||||
}
|
||||
|
||||
mod serde_replication_bool {
|
||||
use serde::de;
|
||||
use serde::Deserializer;
|
||||
use serde::Serializer;
|
||||
use serde::de;
|
||||
use std::fmt;
|
||||
|
||||
pub fn serialize<S>(v: &bool, ser: S) -> Result<S::Ok, S::Error>
|
||||
@@ -484,9 +489,9 @@ mod serde_replication_bool {
|
||||
|
||||
mod serde_option_channel_read_config {
|
||||
use super::ChannelReadConfig;
|
||||
use serde::de;
|
||||
use serde::Deserializer;
|
||||
use serde::Serializer;
|
||||
use serde::de;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -568,11 +573,7 @@ pub enum ChannelReadConfig {
|
||||
|
||||
impl ChannelReadConfig {
|
||||
pub fn is_monitor(&self) -> bool {
|
||||
if let Self::Monitor = self {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
if let Self::Monitor = self { true } else { false }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -680,6 +681,10 @@ impl ChannelConfig {
|
||||
self.arch.is_polled
|
||||
}
|
||||
|
||||
pub fn replication(&self) -> bool {
|
||||
self.arch.replication
|
||||
}
|
||||
|
||||
pub fn poll_conf(&self) -> Option<(u64,)> {
|
||||
if self.is_polled() {
|
||||
if let Some(ChannelReadConfig::Poll(x)) = self.arch.short_term {
|
||||
|
||||
@@ -4,21 +4,20 @@ use serde::Deserialize;
|
||||
pub struct ScyllaIngestConfig {
|
||||
hosts: Vec<String>,
|
||||
keyspace: String,
|
||||
keyspace_rf1: Option<String>,
|
||||
rf: u8,
|
||||
}
|
||||
|
||||
impl ScyllaIngestConfig {
|
||||
pub fn new<I, H, K1, K2>(hosts: I, ks_rf3: K1, ks_rf1: K2) -> Self
|
||||
pub fn new<I, H, K1>(hosts: I, ks: K1, rf: u8) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = H>,
|
||||
H: Into<String>,
|
||||
K1: Into<String>,
|
||||
K2: Into<String>,
|
||||
{
|
||||
Self {
|
||||
hosts: hosts.into_iter().map(Into::into).collect(),
|
||||
keyspace: ks_rf3.into(),
|
||||
keyspace_rf1: Some(ks_rf1.into()),
|
||||
keyspace: ks.into(),
|
||||
rf,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +29,7 @@ impl ScyllaIngestConfig {
|
||||
&self.keyspace
|
||||
}
|
||||
|
||||
pub fn keyspace_rf1(&self) -> Option<&String> {
|
||||
self.keyspace_rf1.as_ref()
|
||||
pub fn rf(&self) -> u8 {
|
||||
self.rf
|
||||
}
|
||||
}
|
||||
|
||||
@@ -527,44 +527,24 @@ async fn migrate_scylla_data_schema(
|
||||
let scy2 = create_session_no_ks(scyconf).await?;
|
||||
let scy = &scy2;
|
||||
let durable = true;
|
||||
let ks = scyconf.keyspace();
|
||||
|
||||
if !has_keyspace(scyconf.keyspace(), scy).await? {
|
||||
// TODO
|
||||
let replication = 3;
|
||||
if !has_keyspace(ks, scy).await? {
|
||||
let replication = scyconf.rf();
|
||||
let cql = format!(
|
||||
concat!(
|
||||
"create keyspace {}",
|
||||
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
|
||||
" and durable_writes = {};"
|
||||
),
|
||||
scyconf.keyspace(),
|
||||
replication,
|
||||
durable
|
||||
ks, replication, durable
|
||||
);
|
||||
info!("scylla create keyspace {cql}");
|
||||
chs.add_todo(cql);
|
||||
} else {
|
||||
info!("scylla has keyspace {ks}");
|
||||
}
|
||||
|
||||
if let Some(ks) = scyconf.keyspace_rf1() {
|
||||
if !has_keyspace(ks, scy).await? {
|
||||
let replication = 1;
|
||||
let cql = format!(
|
||||
concat!(
|
||||
"create keyspace {}",
|
||||
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
|
||||
" and durable_writes = {};"
|
||||
),
|
||||
scyconf.keyspace(),
|
||||
replication,
|
||||
durable
|
||||
);
|
||||
info!("scylla create keyspace {cql}");
|
||||
chs.add_todo(cql);
|
||||
}
|
||||
}
|
||||
|
||||
let ks = scyconf.keyspace();
|
||||
|
||||
scy.use_keyspace(ks, true).await?;
|
||||
|
||||
check_event_tables(ks, rett.clone(), chs, scy).await?;
|
||||
@@ -726,18 +706,23 @@ async fn migrate_scylla_data_schema(
|
||||
}
|
||||
|
||||
pub async fn migrate_scylla_data_schema_all_rt(
|
||||
scyconfs: [&ScyllaIngestConfig; 3],
|
||||
scyconfs: [&ScyllaIngestConfig; 4],
|
||||
do_change: bool,
|
||||
) -> Result<(), Error> {
|
||||
let mut chsa = [Changeset::new(), Changeset::new(), Changeset::new()];
|
||||
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
|
||||
for ((rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.into_iter()).zip(chsa.iter_mut()) {
|
||||
let mut chsa = [Changeset::new(), Changeset::new(), Changeset::new(), Changeset::new()];
|
||||
let rts = [
|
||||
RetentionTime::Short,
|
||||
RetentionTime::Medium,
|
||||
RetentionTime::Long,
|
||||
RetentionTime::Short,
|
||||
];
|
||||
for ((rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) {
|
||||
migrate_scylla_data_schema(scyconf, rt, chs).await?;
|
||||
}
|
||||
let todo = chsa.iter().any(|x| x.has_to_do());
|
||||
if do_change {
|
||||
if todo {
|
||||
for ((_rt, scyconf), chs) in rts.into_iter().zip(scyconfs.into_iter()).zip(chsa.iter_mut()) {
|
||||
for ((_rt, scyconf), chs) in rts.into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) {
|
||||
if chs.has_to_do() {
|
||||
let scy2 = create_session_no_ks(scyconf).await?;
|
||||
let scy = &scy2;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::log::*;
|
||||
use crate::log;
|
||||
use crate::ratelimitwriter::RateLimitWriter;
|
||||
use crate::writer::EmittableType;
|
||||
use netpod::ScalarType;
|
||||
@@ -11,13 +11,7 @@ use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
macro_rules! trace_emit {
|
||||
($det:expr, $($arg:tt)*) => {
|
||||
if $det {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); }
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "SerieswriterRtwriter"),
|
||||
@@ -86,6 +80,7 @@ where
|
||||
state_lt: State<ET>,
|
||||
min_quiets: MinQuiets,
|
||||
do_trace_detail: bool,
|
||||
do_st_rf1: bool,
|
||||
}
|
||||
|
||||
impl<ET> RtWriter<ET>
|
||||
@@ -98,6 +93,7 @@ where
|
||||
shape: Shape,
|
||||
min_quiets: MinQuiets,
|
||||
is_polled: bool,
|
||||
do_st_rf1: bool,
|
||||
emit_state_new: &dyn Fn() -> <ET as EmittableType>::State,
|
||||
) -> Result<Self, Error> {
|
||||
let state_st = {
|
||||
@@ -122,6 +118,7 @@ where
|
||||
state_lt,
|
||||
min_quiets,
|
||||
do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()),
|
||||
do_st_rf1,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -162,7 +159,13 @@ where
|
||||
if !res_lt.accept {
|
||||
res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
|
||||
if !res_mt.accept {
|
||||
res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?;
|
||||
if self.do_st_rf1 {
|
||||
res_st =
|
||||
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf1_qu)?;
|
||||
} else {
|
||||
res_st =
|
||||
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -191,7 +194,11 @@ where
|
||||
}
|
||||
|
||||
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
|
||||
self.state_st.writer.tick(&mut iqdqs.st_rf3_qu)?;
|
||||
if self.do_st_rf1 {
|
||||
self.state_st.writer.tick(&mut iqdqs.st_rf1_qu)?;
|
||||
} else {
|
||||
self.state_st.writer.tick(&mut iqdqs.st_rf3_qu)?;
|
||||
}
|
||||
self.state_mt.writer.tick(&mut iqdqs.mt_rf3_qu)?;
|
||||
self.state_lt.writer.tick(&mut iqdqs.lt_rf3_qu)?;
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user