diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index f0dbfc6..927c792 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,20 +1,31 @@ -use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto}; +use super::proto; +use super::proto::CaItem; +use super::proto::CaMsg; +use super::proto::CaMsgTy; +use super::proto::CaProto; use super::store::DataStore; -use super::{ExtraInsertsConf, IngestCommons}; +use super::ExtraInsertsConf; +use super::IngestCommons; use crate::bsread::ChannelDescDecoded; -use crate::ca::proto::{CreateChan, EventAdd}; +use crate::ca::proto::CreateChan; +use crate::ca::proto::EventAdd; use crate::ca::store::ChannelRegistry; use crate::series::{Existence, SeriesId}; -use crate::store::{ - ChannelInfoItem, ChannelStatus, ChannelStatusItem, CommonInsertItemQueueSender, ConnectionStatus, - ConnectionStatusItem, InsertItem, IvlItem, MuteItem, QueryItem, -}; +use crate::store::ChannelInfoItem; +use crate::store::ChannelStatus; +use crate::store::ChannelStatusItem; +use crate::store::CommonInsertItemQueueSender; +use crate::store::ConnectionStatus; +use crate::store::ConnectionStatusItem; +use crate::store::{InsertItem, IvlItem, MuteItem, QueryItem}; use async_channel::Sender; use err::Error; use futures_util::stream::FuturesOrdered; use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; use log::*; use netpod::timeunits::*; +use netpod::TS_MSP_GRID_SPACING; +use netpod::TS_MSP_GRID_UNIT; use netpod::{ScalarType, Shape}; use serde::Serialize; use stats::{CaConnStats, IntervalEma}; @@ -930,7 +941,7 @@ impl CaConn { let ts_msp_last = st.ts_msp_last; let inserted_in_ts_msp = st.inserted_in_ts_msp; // TODO get event timestamp from channel access field - let ts_msp_grid = (ts / (SEC * 10 * 6 * 2)) as u32 * (6 * 2); + let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid { st.ts_msp_grid_last = ts_msp_grid; Some(ts_msp_grid) diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 5eaa4fb..abbdfef 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -208,14 +208,8 @@ pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Vec) file.read_to_end(&mut buf).await?; let lines = buf.split(|&x| x == 0x0a); let mut channels = Vec::new(); - let mut i = 0; for line in lines { let line = String::from_utf8_lossy(line); - if i < 50 { - eprintln!("line has {}", line.len()); - eprintln!("parse line {line}"); - } - i += 1; let use_line = if let Some(_cs) = re_p.captures(&line) { true } else if re_n.is_match(&line) { diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index d6bd6db..637027d 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -16,7 +16,7 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; use tokio::sync::Mutex as TokMx; -pub const CONNECTION_STATUS_DIV: u64 = netpod::timeunits::DAY; +pub use netpod::CONNECTION_STATUS_DIV; #[derive(Debug)] pub enum Error {