From e1e930f453fae6979bd7337055607ca3b377b422 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 17 Sep 2021 20:38:20 +0200 Subject: [PATCH] Mark closed pulse map files --- daqbufp2/src/test/timeweightedjson.rs | 2 +- dbconn/src/lib.rs | 4 + disk/src/aggtest.rs | 1 - disk/src/eventchunker.rs | 7 +- disk/src/gen.rs | 25 ++-- disk/src/lib.rs | 28 +--- disk/src/paths.rs | 18 +-- httpret/src/api1.rs | 177 ++++++++++++++++---------- httpret/src/lib.rs | 7 +- httpret/src/pulsemap.rs | 145 +++++++++++++++------ netfetch/src/test.rs | 1 - netpod/src/lib.rs | 2 - nodenet/src/conn.rs | 6 +- taskrun/src/lib.rs | 1 - 14 files changed, 263 insertions(+), 161 deletions(-) diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 0be08c4..d0cb677 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -44,7 +44,7 @@ fn time_weighted_json_01() { 10, AggKind::DimXBins1, cluster, - 11, + 9, true, ) .await?; diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index fe598db..dd0a204 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -7,6 +7,10 @@ use tokio_postgres::{Client, NoTls}; pub mod scan; pub mod search; +pub mod pg { + pub use tokio_postgres::Client; +} + pub async fn delay_us(mu: u64) { tokio::time::sleep(Duration::from_micros(mu)).await; } diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 0f1c9a5..ea2026d 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -13,7 +13,6 @@ pub fn make_test_node(id: u32) -> Node { port_raw: 8800 + id as u16 + 100, data_base_path: format!("../tmpdata/node{:02}", id).into(), cache_base_path: format!("../tmpdata/node{:02}", id).into(), - split: id, ksprefix: "ks".into(), backend: "testbackend".into(), archiver_appliance: None, diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index e13ad02..96ed20e 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -375,21 +375,21 @@ impl EventChunker { let ts2 = Instant::now(); let dt = ts2.duration_since(ts1); self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); - decomp + Some(decomp) } Err(e) => { return Err(Error::with_msg(format!("decompression failed {:?}", e)))?; } } } else { - BytesMut::new() + None } }; ret.add_event( ts, pulse, buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), - Some(decomp), + decomp, ScalarType::from_dtype_index(type_index)?, is_big_endian, shape_this, @@ -401,6 +401,7 @@ impl EventChunker { Err(Error::with_msg(msg))?; } let vlen = len - p1 as u32 - 4; + // TODO in this case, decomp and comp is the same and not needed. let decomp = BytesMut::from(&buf[p1 as usize..(p1 as u32 + vlen) as usize]); ret.add_event( ts, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 8820ad9..d245b36 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -120,7 +120,6 @@ pub async fn gen_test_data() -> Result<(), Error> { listen: "0.0.0.0".into(), port: 7780 + i1 as u16, port_raw: 7780 + i1 as u16 + 100, - split: i1, data_base_path: data_base_path.join(format!("node{:02}", i1)), cache_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), @@ -129,8 +128,8 @@ pub async fn gen_test_data() -> Result<(), Error> { }; ensemble.nodes.push(node); } - for node in &ensemble.nodes { - gen_node(node, &ensemble).await?; + for (split, node) in ensemble.nodes.iter().enumerate() { + gen_node(split as u32, node, &ensemble).await?; } Ok(()) } @@ -146,14 +145,14 @@ pub struct ChannelGenProps { gen_var: GenVar, } -async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { +async fn gen_node(split: u32, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { for chn in &ensemble.channels { - gen_channel(chn, node, ensemble).await? + gen_channel(chn, split, node, ensemble).await? } Ok(()) } -async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { +async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { let config_path = node.data_base_path.join("config").join(&chn.config.channel.name); let channel_path = node .data_base_path @@ -175,6 +174,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> chn.time_spacing, &channel_path, &chn.config, + split, node, ensemble, &chn.gen_var, @@ -321,14 +321,13 @@ async fn gen_timebin( ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, + split: u32, node: &Node, ensemble: &Ensemble, gen_var: &GenVar, ) -> Result { let tb = ts.ns / config.time_bin_size.ns; - let path = channel_path - .join(format!("{:019}", tb)) - .join(format!("{:010}", node.split)); + let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", split)); tokio::fs::create_dir_all(&path).await?; let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size.ns / MS, 0)); let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size.ns / MS, 0)); @@ -363,20 +362,22 @@ async fn gen_timebin( }; while ts.ns < tsmax.ns { match gen_var { + // TODO + // Splits and nodes are not in 1-to-1 correspondence. GenVar::Default => { - if evix % ensemble.nodes.len() as u64 == node.split as u64 { + if evix % ensemble.nodes.len() as u64 == split as u64 { gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; } } GenVar::ConstRegular => { - if evix % ensemble.nodes.len() as u64 == node.split as u64 { + if evix % ensemble.nodes.len() as u64 == split as u64 { gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; } } GenVar::TimeWeight => { let m = evix % 20; if m == 0 || m == 1 { - if evix % ensemble.nodes.len() as u64 == node.split as u64 { + if evix % ensemble.nodes.len() as u64 == split as u64 { gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; } } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 2d89f59..c427ed6 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -4,6 +4,7 @@ use err::Error; use futures_core::Stream; use futures_util::future::FusedFuture; use futures_util::StreamExt; +use netpod::histo::HistoLog2; use netpod::{log::*, FileIoBufferSize}; use netpod::{ChannelConfig, Node, Shape}; use std::future::Future; @@ -36,8 +37,9 @@ pub mod paths; pub mod raw; pub mod streamlog; +// TODO transform this into a self-test or remove. pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result { - let path = paths::datapath(query.timebin as u64, &query.channel_config, &node); + let path = paths::datapath(query.timebin as u64, &query.channel_config, 0, &node); debug!("try path: {:?}", path); let fin = OpenOptions::new().read(true).open(path).await?; let meta = fin.metadata().await; @@ -270,7 +272,7 @@ pub struct NeedMinBuffer { inp: Pin> + Send>>, need_min: u32, left: Option, - buf_len_histo: [u32; 16], + buf_len_histo: HistoLog2, errored: bool, completed: bool, } @@ -281,7 +283,7 @@ impl NeedMinBuffer { inp: inp, need_min: 1, left: None, - buf_len_histo: Default::default(), + buf_len_histo: HistoLog2::new(8), errored: false, completed: false, } @@ -300,7 +302,7 @@ impl NeedMinBuffer { // TODO remove this again impl Drop for NeedMinBuffer { fn drop(&mut self) { - info!("NeedMinBuffer histo: {:?}", self.buf_len_histo); + info!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo); } } @@ -320,23 +322,7 @@ impl Stream for NeedMinBuffer { let mut again = false; let z = match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(fcr))) => { - const SUB: usize = 8; - let mut u = fcr.buf.len(); - let mut po = 0; - while u != 0 && po < 15 { - u = u >> 1; - po += 1; - } - let po = if po >= self.buf_len_histo.len() + SUB { - self.buf_len_histo.len() - 1 - } else { - if po > SUB { - po - SUB - } else { - 0 - } - }; - self.buf_len_histo[po] += 1; + self.buf_len_histo.ingest(fcr.buf.len() as u32); //info!("NeedMinBuffer got buf len {}", fcr.buf.len()); match self.left.take() { Some(mut lfcr) => { diff --git a/disk/src/paths.rs b/disk/src/paths.rs index 6a535e3..c7cce60 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -5,14 +5,14 @@ use netpod::timeunits::MS; use netpod::{ChannelConfig, Nanos, Node}; use std::path::PathBuf; -// TODO remove this -pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf { +// TODO remove/replace this +pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, split: u32, node: &Node) -> PathBuf { node.data_base_path .join(format!("{}_{}", node.ksprefix, config.keyspace)) .join("byTime") .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) - .join(format!("{:010}", node.split)) + .join(format!("{:010}", split)) .join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS)) } @@ -77,22 +77,22 @@ pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Ok(ret) } -pub fn data_dir_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { +pub fn data_dir_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { let ret = channel_timebins_dir_path(channel_config, node)? .join(format!("{:019}", ts.ns / channel_config.time_bin_size.ns)) - .join(format!("{:010}", node.split)); + .join(format!("{:010}", split)); Ok(ret) } -pub fn data_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { +pub fn data_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns / MS, 0); - let ret = data_dir_path(ts, channel_config, node)?.join(fname); + let ret = data_dir_path(ts, channel_config, split, node)?.join(fname); Ok(ret) } -pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { +pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns / MS, 0); - let ret = data_dir_path(ts, channel_config, node)?.join(fname); + let ret = data_dir_path(ts, channel_config, split, node)?.join(fname); Ok(ret) } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 429484f..81d91f5 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -14,7 +14,9 @@ use netpod::{ log::*, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape, APP_OCTET, }; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; -use parse::channelconfig::{extract_matching_config_entry, read_local_config, Config, MatchingConfigEntry}; +use parse::channelconfig::{ + extract_matching_config_entry, read_local_config, Config, ConfigEntry, MatchingConfigEntry, +}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; @@ -537,6 +539,93 @@ impl DataApiPython3DataStream { completed: false, } } + + fn convert_item( + b: EventFull, + channel: &Channel, + entry: &ConfigEntry, + header_out: &mut bool, + count_events: &mut usize, + ) -> Result { + let mut d = BytesMut::new(); + for i1 in 0..b.tss.len() { + if *count_events < 6 { + info!( + "deco len {:?} BE {} scalar-type {:?} shape {:?} comps {:?}", + b.decomps[i1].as_ref().map(|x| x.len()), + b.be[i1], + b.scalar_types[i1], + b.shapes[i1], + b.comps[i1], + ); + } + // TODO emit warning when we use a different setting compared to channel config. + if false { + let _compression = if let (Shape::Image(..), Some(..)) = (&b.shapes[i1], &b.comps[i1]) { + Some(1) + } else { + None + }; + }; + let compression = if let Some(_) = &b.comps[i1] { Some(1) } else { None }; + if !*header_out { + let head = Api1ChannelHeader { + name: channel.name.clone(), + ty: scalar_type_to_api3proto(&b.scalar_types[i1]).into(), + byte_order: if b.be[i1] { + "BIG_ENDIAN".into() + } else { + "LITTLE_ENDIAN".into() + }, + // The shape is inconsistent on the events. + // Seems like the config is to be trusted in this case. + shape: shape_to_api3proto(&entry.shape), + compression, + }; + let h = serde_json::to_string(&head)?; + info!("sending channel header {}", h); + let l1 = 1 + h.as_bytes().len() as u32; + d.put_u32(l1); + d.put_u8(0); + d.extend_from_slice(h.as_bytes()); + d.put_u32(l1); + *header_out = true; + } + { + match &b.shapes[i1] { + Shape::Image(_, _) => { + let l1 = 17 + b.blobs[i1].len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&b.blobs[i1]); + d.put_u32(l1); + } + Shape::Wave(_) => { + let l1 = 17 + b.blobs[i1].len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&b.blobs[i1]); + d.put_u32(l1); + } + _ => { + let l1 = 17 + b.blobs[i1].len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&b.blobs[i1]); + d.put_u32(l1); + } + } + } + *count_events += 1; + } + Ok(d) + } } impl Stream for DataApiPython3DataStream { @@ -583,9 +672,10 @@ impl Stream for DataApiPython3DataStream { } MatchingConfigEntry::Entry(entry) => entry.clone(), }; - warn!("found channel_config {:?}", entry); + let channel = self.channels[self.chan_ix - 1].clone(); + info!("found channel_config for {}: {:?}", channel.name, entry); let evq = RawEventsQuery { - channel: self.channels[self.chan_ix - 1].clone(), + channel, range: self.range.clone(), agg_kind: netpod::AggKind::EventBlobs, disk_io_buffer_size: self.file_io_buffer_size.0, @@ -626,70 +716,13 @@ impl Stream for DataApiPython3DataStream { Ok(b) => { let f = match b { StreamItem::DataItem(RangeCompletableItem::Data(b)) => { - let mut d = BytesMut::new(); - for i1 in 0..b.tss.len() { - if count_events < 6 { - info!( - "deco len {:?} BE {} scalar-type {:?} shape {:?} comps {:?}", - b.decomps[i1].as_ref().map(|x| x.len()), - b.be[i1], - b.scalar_types[i1], - b.shapes[i1], - b.comps[i1], - ); - } - let compression = if let (Shape::Image(..), Some(..)) = (&b.shapes[i1], &b.comps[i1]) { Some(1) } else { None }; - if !header_out { - let head = Api1ChannelHeader { - name: channel.name.clone(), - ty: scalar_type_to_api3proto(&b.scalar_types[i1]) - .into(), - byte_order: if b.be[i1] { - "BIG_ENDIAN".into() - } else { - "LITTLE_ENDIAN".into() - }, - // The shape is inconsistent on the events. - // Seems like the config is to be trusted in this case. - shape: shape_to_api3proto(&entry.shape), - compression, - }; - let h = serde_json::to_string(&head)?; - info!("sending channel header {}", h); - let l1 = 1 + h.as_bytes().len() as u32; - d.put_u32(l1); - d.put_u8(0); - d.extend_from_slice(h.as_bytes()); - d.put_u32(l1); - header_out = true; - } - { - match &b.shapes[i1] { - Shape::Image(_, _) => { - let l1 = 17 + b.blobs[i1].len() as u32; - d.put_u32(l1); - d.put_u8(1); - d.put_u64(b.tss[i1]); - d.put_u64(b.pulses[i1]); - d.put_slice(&b.blobs[i1]); - d.put_u32(l1); - } - _ => { - if let Some(deco) = &b.decomps[i1] { - let l1 = 17 + deco.len() as u32; - d.put_u32(l1); - d.put_u8(1); - d.put_u64(b.tss[i1]); - d.put_u64(b.pulses[i1]); - d.put_slice(&deco); - d.put_u32(l1); - } - } - } - } - count_events += 1; - } - d + Self::convert_item( + b, + &channel, + &entry, + &mut header_out, + &mut count_events, + )? } _ => BytesMut::new(), }; @@ -732,7 +765,7 @@ impl Stream for DataApiPython3DataStream { } pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("api1_binary_events headers: {:?}", req.headers()); + info!("api1_binary_events uri: {:?} headers: {:?}", req.uri(), req.headers()); let accept_def = ""; let accept = req .headers() @@ -741,8 +774,12 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach .to_owned(); let (_head, body) = req.into_parts(); let body_data = hyper::body::to_bytes(body).await?; - info!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec())); - let qu: Api1Query = serde_json::from_slice(&body_data)?; + let qu: Api1Query = if let Ok(qu) = serde_json::from_slice(&body_data) { + qu + } else { + error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec())); + return Err(Error::with_msg_no_trace("can not parse query")); + }; info!("got Api1Query: {:?}", qu); let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date); let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 7bab114..827d6df 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -149,6 +149,7 @@ macro_rules! static_http_api1 { } async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("http_service_try {:?}", req.uri()); let uri = req.uri().clone(); let path = uri.path(); if path == "/api/4/node_status" { @@ -251,10 +252,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> if req.method() == Method::POST { Ok(api1::api1_binary_events(req, &node_config).await?) } else { - Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if pulsemap::IndexFullHttpFunction::path_matches(path) { pulsemap::IndexFullHttpFunction::handle(req, &node_config).await + } else if pulsemap::MarkClosedHttpFunction::path_matches(path) { + pulsemap::MarkClosedHttpFunction::handle(req, &node_config).await } else if pulsemap::MapPulseLocalHttpFunction::path_matches(path) { pulsemap::MapPulseLocalHttpFunction::handle(req, &node_config).await } else if pulsemap::MapPulseHistoHttpFunction::path_matches(path) { @@ -387,7 +390,7 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result binned_binary(query, node_config).await, diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index ab84faf..3a06f36 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -36,6 +36,7 @@ const _MAP_INDEX_FAST_URL_PREFIX: &'static str = "/api/1/map/index/fast/"; const MAP_PULSE_HISTO_URL_PREFIX: &'static str = "/api/1/map/pulse/histo/"; const MAP_PULSE_URL_PREFIX: &'static str = "/api/1/map/pulse/"; const MAP_PULSE_LOCAL_URL_PREFIX: &'static str = "/api/1/map/pulse/local/"; +const MAP_PULSE_MARK_CLOSED_URL_PREFIX: &'static str = "/api/1/map/pulse/mark/closed/"; async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> { let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; @@ -217,53 +218,73 @@ impl IndexFullHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } - Self::index(node_config).await + let ret = match Self::index(node_config).await { + Ok(msg) => response(StatusCode::OK).body(Body::from(msg))?, + Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?, + }; + Ok(ret) } - pub async fn index(node_config: &NodeConfigCached) -> Result, Error> { + pub async fn index_channel( + channel_name: String, + conn: &dbconn::pg::Client, + node_config: &NodeConfigCached, + ) -> Result { + let mut msg = format!("Index channel {}", channel_name); + let files = datafiles_for_channel(channel_name.clone(), node_config).await?; + msg = format!("{}\n{:?}", msg, files); + for path in files { + let splitted: Vec<_> = path.to_str().unwrap().split("/").collect(); + let timebin: u64 = splitted[splitted.len() - 3].parse()?; + let split: u64 = splitted[splitted.len() - 2].parse()?; + if false { + info!( + "hostname {} timebin {} split {}", + node_config.node.host, timebin, split + ); + } + let file = tokio::fs::OpenOptions::new().read(true).open(path).await?; + let (r2, file) = read_first_chunk(file).await?; + msg = format!("{}\n{:?}", msg, r2); + let (r3, _file) = read_last_chunk(file, r2.pos, r2.len).await?; + msg = format!("{}\n{:?}", msg, r3); + // TODO remove update of static columns when older clients are removed. + let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; + conn.execute( + sql, + &[ + &channel_name, + &(split as i32), + &(timebin as i32), + &(r2.pulse as i64), + &(r3.pulse as i64), + &node_config.node.host, + ], + ) + .await?; + } + Ok(msg) + } + + pub async fn index(node_config: &NodeConfigCached) -> Result { // TODO avoid double-insert on central storage. - // TODO Mark files as "closed". let mut msg = format!("LOG"); make_tables(node_config).await?; let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let chs = timer_channel_names(); for channel_name in &chs[..] { - info!("channel_name {}", channel_name); - let files = datafiles_for_channel(channel_name.clone(), node_config).await?; - msg = format!("\n{:?}", files); - for path in files { - let splitted: Vec<_> = path.to_str().unwrap().split("/").collect(); - //info!("splitted: {:?}", splitted); - let timebin: u64 = splitted[splitted.len() - 3].parse()?; - let split: u64 = splitted[splitted.len() - 2].parse()?; - if false { - info!( - "hostname {} timebin {} split {}", - node_config.node.host, timebin, split - ); + match Self::index_channel(channel_name.clone(), &conn, node_config).await { + Ok(m) => { + msg.push_str("\n"); + msg.push_str(&m); + } + Err(e) => { + error!("error while indexing {} {:?}", channel_name, e); + return Err(e); } - let file = tokio::fs::OpenOptions::new().read(true).open(path).await?; - let (r2, file) = read_first_chunk(file).await?; - msg = format!("{}\n{:?}", msg, r2); - let (r3, _file) = read_last_chunk(file, r2.pos, r2.len).await?; - msg = format!("{}\n{:?}", msg, r3); - // TODO remove update of static when older clients are removed. - let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; - conn.execute( - sql, - &[ - &channel_name, - &(split as i32), - &(timebin as i32), - &(r2.pulse as i64), - &(r3.pulse as i64), - &node_config.node.host, - ], - ) - .await?; } } - Ok(response(StatusCode::OK).body(Body::from(msg))?) + Ok(msg) } } @@ -526,6 +547,7 @@ impl MapPulseHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } + info!("MapPulseHttpFunction handle uri: {:?}", req.uri()); let urls = format!("{}", req.uri()); let pulse: u64 = urls[MAP_PULSE_URL_PREFIX.len()..].parse()?; let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; @@ -544,3 +566,54 @@ impl MapPulseHttpFunction { } } } + +pub struct MarkClosedHttpFunction {} + +impl MarkClosedHttpFunction { + pub fn path_matches(path: &str) -> bool { + path.starts_with(MAP_PULSE_MARK_CLOSED_URL_PREFIX) + } + + pub async fn handle(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + info!("MarkClosedHttpFunction handle uri: {:?}", req.uri()); + match MarkClosedHttpFunction::mark_closed(node_config).await { + Ok(_) => { + let ret = response(StatusCode::OK).body(Body::empty())?; + Ok(ret) + } + Err(e) => { + let msg = format!("{:?}", e); + let ret = response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(msg))?; + Ok(ret) + } + } + } + + pub async fn mark_closed(node_config: &NodeConfigCached) -> Result<(), Error> { + let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + let sql = "select distinct channel from map_pulse_files order by channel"; + let rows = conn.query(sql, &[]).await?; + let chns: Vec<_> = rows.iter().map(|r| r.get::<_, String>(0)).collect(); + for chn in &chns { + let sql = concat!( + "with q1 as (select channel, split, timebin from map_pulse_files", + " where channel = $1 and hostname = $2", + " order by timebin desc offset 2)", + " update map_pulse_files t2 set closed = 1 from q1", + " where t2.channel = q1.channel", + " and t2.closed = 0", + " and t2.split = q1.split", + " and t2.timebin = q1.timebin", + ); + let nmod = conn.execute(sql, &[&chn, &node_config.node.host]).await?; + info!( + "mark files mod {} chn {:?} host {:?}", + nmod, chn, node_config.node.host + ); + } + Ok(()) + } +} diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index 6a3352d..61d7a65 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -15,7 +15,6 @@ fn ca_connect_1() { port: 123, port_raw: 123, backend: "".into(), - split: 0, data_base_path: "".into(), cache_base_path: "".into(), listen: "".into(), diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index affb300..44f8353 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -129,7 +129,6 @@ pub struct Node { pub listen: String, pub port: u16, pub port_raw: u16, - pub split: u32, pub data_base_path: PathBuf, pub cache_base_path: PathBuf, pub ksprefix: String, @@ -144,7 +143,6 @@ impl Node { listen: "dummy".into(), port: 4444, port_raw: 4444, - split: 0, data_base_path: PathBuf::new(), cache_base_path: PathBuf::new(), ksprefix: "daqlocal".into(), diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 381b40e..e504783 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -5,6 +5,7 @@ use futures_core::Stream; use futures_util::StreamExt; use items::frame::{decode_frame, make_term_frame}; use items::{Framable, StreamItem}; +use netpod::histo::HistoLog2; use netpod::query::RawEventsQuery; use netpod::{log::*, AggKind}; use netpod::{EventQueryJsonStringFrame, NodeConfigCached, PerfOpts}; @@ -134,13 +135,13 @@ async fn events_conn_handler_inner_try( }, } }; - + let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = p1.next().await { //info!("conn.rs encode frame typeid {:x}", item.typeid()); let item = item.make_frame(); match item { Ok(buf) => { - info!("events_conn_handler send {} bytes", buf.len()); + buf_len_histo.ingest(buf.len() as u32); match netout.write_all(&buf).await { Ok(_) => {} Err(e) => return Err((e, netout))?, @@ -160,5 +161,6 @@ async fn events_conn_handler_inner_try( Ok(_) => (), Err(e) => return Err((e, netout))?, } + info!("events_conn_handler_inner_try buf_len_histo: {:?}", buf_len_histo); Ok(()) } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 1c9a70c..8fd9009 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -117,7 +117,6 @@ pub fn test_cluster() -> netpod::Cluster { data_base_path: format!("../tmpdata/node{:02}", id).into(), cache_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), - split: id, backend: "testbackend".into(), archiver_appliance: None, })