From 9c0062d27ca275e4522e9751518fa9733ab40840 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 6 Jun 2023 09:10:20 +0200 Subject: [PATCH] Simplify channel cache update --- dbconn/src/scan.rs | 331 +++++++++++++++++++++++------------------ httpret/src/httpret.rs | 1 + 2 files changed, 189 insertions(+), 143 deletions(-) diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index def28c8..1afd5cb 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -163,18 +163,15 @@ impl Stream for FindChannelNamesFromConfigReadDir { } } -async fn find_channel_names_from_config(base_dir: impl AsRef, mut cb: F) -> Result<(), Error> -where - F: FnMut(&str) -> Fut, - Fut: Future>, -{ +async fn find_channel_names_from_config(base_dir: impl AsRef) -> Result, Error> { + let mut ret = Vec::new(); let path2: PathBuf = base_dir.as_ref().join("config"); let mut rd = tokio::fs::read_dir(&path2).await?; while let Ok(Some(entry)) = rd.next_entry().await { let fname = String::from_utf8(entry.file_name().into_vec())?; - cb(&fname).await?; + ret.push(fname); } - Ok(()) + Ok(ret) } #[allow(unused)] @@ -199,68 +196,97 @@ pub struct UpdatedDbWithChannelNames { count: u32, } +async fn update_db_with_channel_names_inner( + tx: async_channel::Sender>, + node_config: NodeConfigCached, + db_config: Database, +) -> Result<(), Error> { + let dbc = create_connection(&db_config).await?; + info!("update_db_with_channel_names connection done"); + let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?; + info!("update_db_with_channel_names get_node_disk_ident done"); + { + let sql = concat!( + "insert into channels (facility, name) select facility, name from (values ($1::bigint, $2::text)) v1 (facility, name)", + " where not exists (select 1 from channels t1 where t1.facility = v1.facility and t1.name = v1.name)", + " on conflict do nothing", + ); + let fac: i64 = 1; + let ch = format!("tmp_dummy_04"); + let ret = dbc + .query(sql, &[&fac, &ch]) + .await + .err_conv() + .map_err(|e| format!("in channel name insert: {e}")); + info!("DUMMY INSERT ATTEMPT: {ret:?}"); + } + let mut c1 = 0; + dbc.query("begin", &[]).await.err_conv()?; + let dbc = Arc::new(dbc); + let tx = Arc::new(tx); + let base_path = &node_config + .node + .sf_databuffer + .as_ref() + .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? + .data_base_path; + let channel_names = find_channel_names_from_config(base_path).await?; + for ch in channel_names { + let fac = node_disk_ident.facility; + crate::delay_io_short().await; + let sql = concat!( + "insert into channels (facility, name) select facility, name from (values ($1::bigint, $2::text)) v1 (facility, name)", + " where not exists (select 1 from channels t1 where t1.facility = v1.facility and t1.name = v1.name)", + " on conflict do nothing", + ); + let ret = dbc + .query(sql, &[&fac, &ch]) + .await + .err_conv() + .map_err(|e| format!("in channel name insert: {e}")); + let ret = match ret { + Ok(x) => x, + Err(e) => { + error!("failed insert attempt {e}"); + Err(e)? + } + }; + { + let n = ret.len(); + if n > 0 { + info!("insert n {n}"); + } + } + c1 += 1; + if c1 % 200 == 0 { + dbc.query("commit", &[]).await.err_conv()?; + let ret = UpdatedDbWithChannelNames { + msg: format!("current {}", ch), + count: c1, + }; + tx.send(Ok(ret)).await.err_conv()?; + delay_io_medium().await; + dbc.query("begin", &[]).await.err_conv()?; + } + } + dbc.query("commit", &[]).await.err_conv()?; + let ret = UpdatedDbWithChannelNames { + msg: format!("all done"), + count: c1, + }; + tx.send(Ok(ret)).await.err_conv()?; + Ok::<_, Error>(()) +} + pub async fn update_db_with_channel_names( node_config: NodeConfigCached, db_config: &Database, ) -> Result>, Error> { + info!("update_db_with_channel_names"); let (tx, rx) = bounded(16); let tx2 = tx.clone(); let db_config = db_config.clone(); - let block1 = async move { - let dbc = create_connection(&db_config).await?; - let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?; - let c1 = Arc::new(RwLock::new(0u32)); - dbc.query("begin", &[]).await.err_conv()?; - let dbc = Arc::new(dbc); - let tx = Arc::new(tx); - let base_path = &node_config - .node - .sf_databuffer - .as_ref() - .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? - .data_base_path; - find_channel_names_from_config(base_path, |ch| { - let ch = ch.to_owned(); - let dbc = dbc.clone(); - let c1 = c1.clone(); - let tx = tx.clone(); - let fac = node_disk_ident.facility; - async move { - crate::delay_io_short().await; - dbc.query( - "insert into channels (facility, name) select facility, name from (values ($1, $2)) v1 (facility, name) where not exists (select 1 from channels t1 where t1.facility = v1.facility and t1.name = v1.name) on conflict do nothing", - &[&fac, &ch], - ) - .await - .err_conv()?; - let c2 = { - let mut g = c1.write()?; - *g += 1; - *g - }; - if c2 % 200 == 0 { - dbc.query("commit", &[]).await.err_conv()?; - let ret = UpdatedDbWithChannelNames { - msg: format!("current {}", ch), - count: c2, - }; - tx.send(Ok(ret)).await.err_conv()?; - delay_io_medium().await; - dbc.query("begin", &[]).await.err_conv()?; - } - Ok(()) - } - }) - .await?; - dbc.query("commit", &[]).await.err_conv()?; - let c2 = *c1.read()?; - let ret = UpdatedDbWithChannelNames { - msg: format!("all done"), - count: c2, - }; - tx.send(Ok(ret)).await.err_conv()?; - Ok::<_, Error>(()) - }; + let block1 = update_db_with_channel_names_inner(tx, node_config.clone(), db_config.clone()); let block2 = async move { match block1.await { Ok(_) => {} @@ -309,78 +335,85 @@ pub struct UpdatedDbWithAllChannelConfigs { count: u32, } +async fn update_db_with_all_channel_configs_inner( + tx: async_channel::Sender>, + node_config: NodeConfigCached, +) -> Result<(), Error> { + let node_config = &node_config; + let dbc = create_connection(&node_config.node_config.cluster.database).await?; + let dbc = Arc::new(dbc); + let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; + let rows = dbc + .query( + "select rowid, facility, name from channels where facility = $1 order by facility, name", + &[&node_disk_ident.facility], + ) + .await + .err_conv()?; + let mut c1 = 0; + dbc.query("begin", &[]).await.err_conv()?; + let mut count_inserted = 0; + let mut count_updated = 0; + let mut count_config_not_found = 0; + for row in rows { + let rowid: i64 = row.try_get(0).err_conv()?; + let _facility: i64 = row.try_get(1).err_conv()?; + let channel: String = row.try_get(2).err_conv()?; + match update_db_with_channel_config( + node_config, + node_disk_ident, + rowid, + &channel, + dbc.clone(), + &mut count_inserted, + &mut count_updated, + ) + .await + { + Err(e) => { + error!("{:?}", e); + delay_io_medium().await; + // TODO recover, open new transaction, test recovery. + return Err(e); + } + Ok(UpdateChannelConfigResult::NotFound) => { + //warn!("can not find channel config {}", channel); + count_config_not_found += 1; + delay_io_short().await; + } + Ok(UpdateChannelConfigResult::Done) => { + c1 += 1; + if c1 % 200 == 0 { + dbc.query("commit", &[]).await.err_conv()?; + let msg = format!( + "channel no {:6} inserted {:6} updated {:6}", + c1, count_inserted, count_updated + ); + let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; + tx.send(Ok(ret)).await.err_conv()?; + dbc.query("begin", &[]).await.err_conv()?; + } + delay_io_short().await; + } + } + } + dbc.query("commit", &[]).await.err_conv()?; + let msg = format!( + "ALL DONE channel no {:6} inserted {:6} updated {:6} not_found {:6}", + c1, count_inserted, count_updated, count_config_not_found, + ); + let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; + tx.send(Ok(ret)).await.err_conv()?; + Ok::<_, Error>(()) +} + pub async fn update_db_with_all_channel_configs( node_config: NodeConfigCached, ) -> Result>, Error> { let (tx, rx) = bounded(16); - let tx = Arc::new(tx); let tx2 = tx.clone(); let tx3 = tx.clone(); - let block1 = async move { - let node_config = &node_config; - let dbc = create_connection(&node_config.node_config.cluster.database).await?; - let dbc = Arc::new(dbc); - let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; - let rows = dbc - .query( - "select rowid, facility, name from channels where facility = $1 order by facility, name", - &[&node_disk_ident.facility], - ) - .await - .err_conv()?; - let mut c1 = 0; - dbc.query("begin", &[]).await.err_conv()?; - let mut count_inserted = 0; - let mut count_updated = 0; - for row in rows { - let rowid: i64 = row.try_get(0).err_conv()?; - let _facility: i64 = row.try_get(1).err_conv()?; - let channel: String = row.try_get(2).err_conv()?; - match update_db_with_channel_config( - node_config, - node_disk_ident, - rowid, - &channel, - dbc.clone(), - &mut count_inserted, - &mut count_updated, - ) - .await - { - Err(e) => { - error!("{:?}", e); - delay_io_medium().await; - } - Ok(UpdateChannelConfigResult::NotFound) => { - warn!("can not find channel config {}", channel); - delay_io_medium().await; - } - Ok(UpdateChannelConfigResult::Done) => { - c1 += 1; - if c1 % 200 == 0 { - dbc.query("commit", &[]).await.err_conv()?; - let msg = format!( - "channel no {:6} inserted {:6} updated {:6}", - c1, count_inserted, count_updated - ); - let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; - tx.send(Ok(ret)).await.err_conv()?; - dbc.query("begin", &[]).await.err_conv()?; - } - delay_io_short().await; - } - } - } - dbc.query("commit", &[]).await.err_conv()?; - let msg = format!( - "ALL DONE channel no {:6} inserted {:6} updated {:6}", - c1, count_inserted, count_updated - ); - let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; - tx.send(Ok(ret)).await.err_conv()?; - Ok::<_, Error>(()) - } - .then({ + let block1 = update_db_with_all_channel_configs_inner(tx, node_config).then({ |item| async move { match item { Ok(_) => {} @@ -410,7 +443,10 @@ pub async fn update_db_with_all_channel_configs( pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result { let dbc = create_connection(&node_config.node_config.cluster.database).await?; - dbc.query("select update_cache()", &[]).await.err_conv()?; + dbc.query("select update_cache()", &[]) + .await + .err_conv() + .map_err(|e| format!("error update_search_cache: {e}"))?; Ok(true) } @@ -460,18 +496,20 @@ async fn update_db_with_channel_config( if rows.len() > 1 { return Err(Error::with_msg("more than one row")); } - let (config_id, do_parse) = if rows.len() > 0 { - let row = &rows[0]; + let (config_id, do_parse) = if let Some(row) = rows.first() { let rowid: i64 = row.get(0); let file_size: u32 = row.get::<_, i64>(1) as u32; let parsed_until: u32 = row.get::<_, i64>(2) as u32; let _channel_id = row.get::<_, i64>(2) as i64; if meta.len() < file_size as u64 || meta.len() < parsed_until as u64 { let sql = concat!( - "insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) ", - "select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1" + "insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) ", + "select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1" ); - dbc.query(sql, &[&rowid]).await.err_conv()?; + dbc.query(sql, &[&rowid]) + .await + .err_conv() + .map_err(|e| format!("on config history insert {e}"))?; } //ensure!(meta.len() >= parsed_until as u64, ConfigFileOnDiskShrunk{path}); (Some(rowid), true) @@ -484,7 +522,20 @@ async fn update_db_with_channel_config( match config_id { None => { dbc.query( - "insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5)", + "insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5) on conflict (node, channel) do update set fileSize = $3, parsedUntil = $4, config = $5", + &[ + &node_disk_ident.rowid(), + &channel_id, + &(meta.len() as i64), + &(buf.len() as i64), + &serde_json::to_value(config)?, + ], + ).await.err_conv().map_err(|e| format!("on config insert {e}"))?; + *count_inserted += 1; + } + Some(_config_id) => { + dbc.query( + "update configs set fileSize = $3, parsedUntil = $4, config = $5 where node = $1 and channel = $2", &[ &node_disk_ident.rowid(), &channel_id, @@ -494,14 +545,8 @@ async fn update_db_with_channel_config( ], ) .await - .err_conv()?; - *count_inserted += 1; - } - Some(_config_id_2) => { - dbc.query( - "insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5) on conflict (node, channel) do update set fileSize = $3, parsedUntil = $4, config = $5", - &[&node_disk_ident.rowid(), &channel_id, &(meta.len() as i64), &(buf.len() as i64), &serde_json::to_value(config)?], - ).await.err_conv()?; + .err_conv() + .map_err(|e| format!("on config update {e}"))?; *count_updated += 1; } } diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index f5129ad..da7ccd4 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -510,6 +510,7 @@ async fn update_db_with_channel_names( _ctx: &ReqCtx, node_config: &NodeConfigCached, ) -> Result, Error> { + info!("httpret::update_db_with_channel_names"); let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"),