Simplify channel cache update

This commit is contained in:
Dominik Werder
2023-06-06 09:10:20 +02:00
parent 0971454746
commit 9c0062d27c
2 changed files with 189 additions and 143 deletions

View File

@@ -163,18 +163,15 @@ impl Stream for FindChannelNamesFromConfigReadDir {
}
}
async fn find_channel_names_from_config<F, Fut>(base_dir: impl AsRef<Path>, mut cb: F) -> Result<(), Error>
where
F: FnMut(&str) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
async fn find_channel_names_from_config(base_dir: impl AsRef<Path>) -> Result<Vec<String>, Error> {
let mut ret = Vec::new();
let path2: PathBuf = base_dir.as_ref().join("config");
let mut rd = tokio::fs::read_dir(&path2).await?;
while let Ok(Some(entry)) = rd.next_entry().await {
let fname = String::from_utf8(entry.file_name().into_vec())?;
cb(&fname).await?;
ret.push(fname);
}
Ok(())
Ok(ret)
}
#[allow(unused)]
@@ -199,68 +196,97 @@ pub struct UpdatedDbWithChannelNames {
count: u32,
}
async fn update_db_with_channel_names_inner(
tx: async_channel::Sender<Result<UpdatedDbWithChannelNames, Error>>,
node_config: NodeConfigCached,
db_config: Database,
) -> Result<(), Error> {
let dbc = create_connection(&db_config).await?;
info!("update_db_with_channel_names connection done");
let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?;
info!("update_db_with_channel_names get_node_disk_ident done");
{
let sql = concat!(
"insert into channels (facility, name) select facility, name from (values ($1::bigint, $2::text)) v1 (facility, name)",
" where not exists (select 1 from channels t1 where t1.facility = v1.facility and t1.name = v1.name)",
" on conflict do nothing",
);
let fac: i64 = 1;
let ch = format!("tmp_dummy_04");
let ret = dbc
.query(sql, &[&fac, &ch])
.await
.err_conv()
.map_err(|e| format!("in channel name insert: {e}"));
info!("DUMMY INSERT ATTEMPT: {ret:?}");
}
let mut c1 = 0;
dbc.query("begin", &[]).await.err_conv()?;
let dbc = Arc::new(dbc);
let tx = Arc::new(tx);
let base_path = &node_config
.node
.sf_databuffer
.as_ref()
.ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))?
.data_base_path;
let channel_names = find_channel_names_from_config(base_path).await?;
for ch in channel_names {
let fac = node_disk_ident.facility;
crate::delay_io_short().await;
let sql = concat!(
"insert into channels (facility, name) select facility, name from (values ($1::bigint, $2::text)) v1 (facility, name)",
" where not exists (select 1 from channels t1 where t1.facility = v1.facility and t1.name = v1.name)",
" on conflict do nothing",
);
let ret = dbc
.query(sql, &[&fac, &ch])
.await
.err_conv()
.map_err(|e| format!("in channel name insert: {e}"));
let ret = match ret {
Ok(x) => x,
Err(e) => {
error!("failed insert attempt {e}");
Err(e)?
}
};
{
let n = ret.len();
if n > 0 {
info!("insert n {n}");
}
}
c1 += 1;
if c1 % 200 == 0 {
dbc.query("commit", &[]).await.err_conv()?;
let ret = UpdatedDbWithChannelNames {
msg: format!("current {}", ch),
count: c1,
};
tx.send(Ok(ret)).await.err_conv()?;
delay_io_medium().await;
dbc.query("begin", &[]).await.err_conv()?;
}
}
dbc.query("commit", &[]).await.err_conv()?;
let ret = UpdatedDbWithChannelNames {
msg: format!("all done"),
count: c1,
};
tx.send(Ok(ret)).await.err_conv()?;
Ok::<_, Error>(())
}
pub async fn update_db_with_channel_names(
node_config: NodeConfigCached,
db_config: &Database,
) -> Result<Receiver<Result<UpdatedDbWithChannelNames, Error>>, Error> {
info!("update_db_with_channel_names");
let (tx, rx) = bounded(16);
let tx2 = tx.clone();
let db_config = db_config.clone();
let block1 = async move {
let dbc = create_connection(&db_config).await?;
let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?;
let c1 = Arc::new(RwLock::new(0u32));
dbc.query("begin", &[]).await.err_conv()?;
let dbc = Arc::new(dbc);
let tx = Arc::new(tx);
let base_path = &node_config
.node
.sf_databuffer
.as_ref()
.ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))?
.data_base_path;
find_channel_names_from_config(base_path, |ch| {
let ch = ch.to_owned();
let dbc = dbc.clone();
let c1 = c1.clone();
let tx = tx.clone();
let fac = node_disk_ident.facility;
async move {
crate::delay_io_short().await;
dbc.query(
"insert into channels (facility, name) select facility, name from (values ($1, $2)) v1 (facility, name) where not exists (select 1 from channels t1 where t1.facility = v1.facility and t1.name = v1.name) on conflict do nothing",
&[&fac, &ch],
)
.await
.err_conv()?;
let c2 = {
let mut g = c1.write()?;
*g += 1;
*g
};
if c2 % 200 == 0 {
dbc.query("commit", &[]).await.err_conv()?;
let ret = UpdatedDbWithChannelNames {
msg: format!("current {}", ch),
count: c2,
};
tx.send(Ok(ret)).await.err_conv()?;
delay_io_medium().await;
dbc.query("begin", &[]).await.err_conv()?;
}
Ok(())
}
})
.await?;
dbc.query("commit", &[]).await.err_conv()?;
let c2 = *c1.read()?;
let ret = UpdatedDbWithChannelNames {
msg: format!("all done"),
count: c2,
};
tx.send(Ok(ret)).await.err_conv()?;
Ok::<_, Error>(())
};
let block1 = update_db_with_channel_names_inner(tx, node_config.clone(), db_config.clone());
let block2 = async move {
match block1.await {
Ok(_) => {}
@@ -309,78 +335,85 @@ pub struct UpdatedDbWithAllChannelConfigs {
count: u32,
}
async fn update_db_with_all_channel_configs_inner(
tx: async_channel::Sender<Result<UpdatedDbWithAllChannelConfigs, Error>>,
node_config: NodeConfigCached,
) -> Result<(), Error> {
let node_config = &node_config;
let dbc = create_connection(&node_config.node_config.cluster.database).await?;
let dbc = Arc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
let rows = dbc
.query(
"select rowid, facility, name from channels where facility = $1 order by facility, name",
&[&node_disk_ident.facility],
)
.await
.err_conv()?;
let mut c1 = 0;
dbc.query("begin", &[]).await.err_conv()?;
let mut count_inserted = 0;
let mut count_updated = 0;
let mut count_config_not_found = 0;
for row in rows {
let rowid: i64 = row.try_get(0).err_conv()?;
let _facility: i64 = row.try_get(1).err_conv()?;
let channel: String = row.try_get(2).err_conv()?;
match update_db_with_channel_config(
node_config,
node_disk_ident,
rowid,
&channel,
dbc.clone(),
&mut count_inserted,
&mut count_updated,
)
.await
{
Err(e) => {
error!("{:?}", e);
delay_io_medium().await;
// TODO recover, open new transaction, test recovery.
return Err(e);
}
Ok(UpdateChannelConfigResult::NotFound) => {
//warn!("can not find channel config {}", channel);
count_config_not_found += 1;
delay_io_short().await;
}
Ok(UpdateChannelConfigResult::Done) => {
c1 += 1;
if c1 % 200 == 0 {
dbc.query("commit", &[]).await.err_conv()?;
let msg = format!(
"channel no {:6} inserted {:6} updated {:6}",
c1, count_inserted, count_updated
);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
tx.send(Ok(ret)).await.err_conv()?;
dbc.query("begin", &[]).await.err_conv()?;
}
delay_io_short().await;
}
}
}
dbc.query("commit", &[]).await.err_conv()?;
let msg = format!(
"ALL DONE channel no {:6} inserted {:6} updated {:6} not_found {:6}",
c1, count_inserted, count_updated, count_config_not_found,
);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
tx.send(Ok(ret)).await.err_conv()?;
Ok::<_, Error>(())
}
pub async fn update_db_with_all_channel_configs(
node_config: NodeConfigCached,
) -> Result<Receiver<Result<UpdatedDbWithAllChannelConfigs, Error>>, Error> {
let (tx, rx) = bounded(16);
let tx = Arc::new(tx);
let tx2 = tx.clone();
let tx3 = tx.clone();
let block1 = async move {
let node_config = &node_config;
let dbc = create_connection(&node_config.node_config.cluster.database).await?;
let dbc = Arc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
let rows = dbc
.query(
"select rowid, facility, name from channels where facility = $1 order by facility, name",
&[&node_disk_ident.facility],
)
.await
.err_conv()?;
let mut c1 = 0;
dbc.query("begin", &[]).await.err_conv()?;
let mut count_inserted = 0;
let mut count_updated = 0;
for row in rows {
let rowid: i64 = row.try_get(0).err_conv()?;
let _facility: i64 = row.try_get(1).err_conv()?;
let channel: String = row.try_get(2).err_conv()?;
match update_db_with_channel_config(
node_config,
node_disk_ident,
rowid,
&channel,
dbc.clone(),
&mut count_inserted,
&mut count_updated,
)
.await
{
Err(e) => {
error!("{:?}", e);
delay_io_medium().await;
}
Ok(UpdateChannelConfigResult::NotFound) => {
warn!("can not find channel config {}", channel);
delay_io_medium().await;
}
Ok(UpdateChannelConfigResult::Done) => {
c1 += 1;
if c1 % 200 == 0 {
dbc.query("commit", &[]).await.err_conv()?;
let msg = format!(
"channel no {:6} inserted {:6} updated {:6}",
c1, count_inserted, count_updated
);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
tx.send(Ok(ret)).await.err_conv()?;
dbc.query("begin", &[]).await.err_conv()?;
}
delay_io_short().await;
}
}
}
dbc.query("commit", &[]).await.err_conv()?;
let msg = format!(
"ALL DONE channel no {:6} inserted {:6} updated {:6}",
c1, count_inserted, count_updated
);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
tx.send(Ok(ret)).await.err_conv()?;
Ok::<_, Error>(())
}
.then({
let block1 = update_db_with_all_channel_configs_inner(tx, node_config).then({
|item| async move {
match item {
Ok(_) => {}
@@ -410,7 +443,10 @@ pub async fn update_db_with_all_channel_configs(
pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<bool, Error> {
let dbc = create_connection(&node_config.node_config.cluster.database).await?;
dbc.query("select update_cache()", &[]).await.err_conv()?;
dbc.query("select update_cache()", &[])
.await
.err_conv()
.map_err(|e| format!("error update_search_cache: {e}"))?;
Ok(true)
}
@@ -460,18 +496,20 @@ async fn update_db_with_channel_config(
if rows.len() > 1 {
return Err(Error::with_msg("more than one row"));
}
let (config_id, do_parse) = if rows.len() > 0 {
let row = &rows[0];
let (config_id, do_parse) = if let Some(row) = rows.first() {
let rowid: i64 = row.get(0);
let file_size: u32 = row.get::<_, i64>(1) as u32;
let parsed_until: u32 = row.get::<_, i64>(2) as u32;
let _channel_id = row.get::<_, i64>(2) as i64;
if meta.len() < file_size as u64 || meta.len() < parsed_until as u64 {
let sql = concat!(
"insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) ",
"select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1"
"insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) ",
"select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1"
);
dbc.query(sql, &[&rowid]).await.err_conv()?;
dbc.query(sql, &[&rowid])
.await
.err_conv()
.map_err(|e| format!("on config history insert {e}"))?;
}
//ensure!(meta.len() >= parsed_until as u64, ConfigFileOnDiskShrunk{path});
(Some(rowid), true)
@@ -484,7 +522,20 @@ async fn update_db_with_channel_config(
match config_id {
None => {
dbc.query(
"insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5)",
"insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5) on conflict (node, channel) do update set fileSize = $3, parsedUntil = $4, config = $5",
&[
&node_disk_ident.rowid(),
&channel_id,
&(meta.len() as i64),
&(buf.len() as i64),
&serde_json::to_value(config)?,
],
).await.err_conv().map_err(|e| format!("on config insert {e}"))?;
*count_inserted += 1;
}
Some(_config_id) => {
dbc.query(
"update configs set fileSize = $3, parsedUntil = $4, config = $5 where node = $1 and channel = $2",
&[
&node_disk_ident.rowid(),
&channel_id,
@@ -494,14 +545,8 @@ async fn update_db_with_channel_config(
],
)
.await
.err_conv()?;
*count_inserted += 1;
}
Some(_config_id_2) => {
dbc.query(
"insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5) on conflict (node, channel) do update set fileSize = $3, parsedUntil = $4, config = $5",
&[&node_disk_ident.rowid(), &channel_id, &(meta.len() as i64), &(buf.len() as i64), &serde_json::to_value(config)?],
).await.err_conv()?;
.err_conv()
.map_err(|e| format!("on config update {e}"))?;
*count_updated += 1;
}
}

View File

@@ -510,6 +510,7 @@ async fn update_db_with_channel_names(
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
info!("httpret::update_db_with_channel_names");
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),