Reduce output
This commit is contained in:
@@ -11,6 +11,7 @@ use items::eventvalues::EventValues;
|
|||||||
use items::{RangeCompletableItem, StreamItem};
|
use items::{RangeCompletableItem, StreamItem};
|
||||||
use netpod::timeunits::SEC;
|
use netpod::timeunits::SEC;
|
||||||
use netpod::{log::*, ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, DataHeaderPos, FilePos, Nanos};
|
use netpod::{log::*, ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, DataHeaderPos, FilePos, Nanos};
|
||||||
|
use serde::Serialize;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::io::{self, SeekFrom};
|
use std::io::{self, SeekFrom};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@@ -322,7 +323,7 @@ pub async fn read_file_basics(file: &mut File) -> Result<IndexFileBasics, Error>
|
|||||||
} else {
|
} else {
|
||||||
panic!();
|
panic!();
|
||||||
};
|
};
|
||||||
info!("read_file_basics w/o hashposs {:?}", ret);
|
trace!("read_file_basics w/o hashposs {:?}", ret);
|
||||||
{
|
{
|
||||||
if ret.name_hash_anchor_len > 2000 {
|
if ret.name_hash_anchor_len > 2000 {
|
||||||
return Err(Error::with_msg_no_trace(format!(
|
return Err(Error::with_msg_no_trace(format!(
|
||||||
@@ -881,7 +882,7 @@ async fn read_data_1(file: &mut File, datafile_header: &DatafileHeader) -> Resul
|
|||||||
let res = match &datafile_header.dbr_type {
|
let res = match &datafile_header.dbr_type {
|
||||||
DbrType::DbrTimeDouble => {
|
DbrType::DbrTimeDouble => {
|
||||||
if datafile_header.dbr_count == 1 {
|
if datafile_header.dbr_count == 1 {
|
||||||
info!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble");
|
trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble");
|
||||||
let mut evs = EventValues {
|
let mut evs = EventValues {
|
||||||
tss: vec![],
|
tss: vec![],
|
||||||
values: vec![],
|
values: vec![],
|
||||||
@@ -915,11 +916,16 @@ async fn read_data_1(file: &mut File, datafile_header: &DatafileHeader) -> Resul
|
|||||||
let item = EventsItem::Plain(plain);
|
let item = EventsItem::Plain(plain);
|
||||||
item
|
item
|
||||||
} else {
|
} else {
|
||||||
// 1d shape
|
let msg = format!("dbr_count {:?} not yet supported", datafile_header.dbr_count);
|
||||||
err::todoval()
|
error!("{}", msg);
|
||||||
|
return Err(Error::with_msg_no_trace(msg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => err::todoval(),
|
_ => {
|
||||||
|
let msg = format!("Type {:?} not yet supported", datafile_header.dbr_type);
|
||||||
|
error!("{}", msg);
|
||||||
|
return Err(Error::with_msg_no_trace(msg));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
@@ -979,7 +985,9 @@ where
|
|||||||
match tx.send(Err(e)).await {
|
match tx.send(Err(e)).await {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("wrap_task can not forward error: {:?}", e);
|
if false {
|
||||||
|
error!("wrap_task can not forward error: {:?}", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -988,7 +996,14 @@ where
|
|||||||
taskrun::spawn(task);
|
taskrun::spawn(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn list_all_channels(node: &ChannelArchiver) -> Receiver<Result<String, Error>> {
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct ListChannelItem {
|
||||||
|
name: String,
|
||||||
|
index_path: String,
|
||||||
|
matches: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn list_all_channels(node: &ChannelArchiver) -> Receiver<Result<ListChannelItem, Error>> {
|
||||||
let node = node.clone();
|
let node = node.clone();
|
||||||
let (tx, rx) = async_channel::bounded(4);
|
let (tx, rx) = async_channel::bounded(4);
|
||||||
let tx2 = tx.clone();
|
let tx2 = tx.clone();
|
||||||
@@ -996,18 +1011,31 @@ pub fn list_all_channels(node: &ChannelArchiver) -> Receiver<Result<String, Erro
|
|||||||
let mut ixf = list_index_files(&node);
|
let mut ixf = list_index_files(&node);
|
||||||
while let Some(f) = ixf.next().await {
|
while let Some(f) = ixf.next().await {
|
||||||
let index_path = f?;
|
let index_path = f?;
|
||||||
if !index_path.to_str().unwrap().contains("archive_X02DA_LO/20130101/index") {
|
//info!("try to read for {:?}", index_path);
|
||||||
//continue;
|
let channels = channel_list(index_path.clone()).await?;
|
||||||
}
|
//info!("list_all_channels emit {} channels", channels.len());
|
||||||
info!("try to read for {:?}", index_path);
|
|
||||||
//continue;
|
|
||||||
let channels = channel_list(index_path).await?;
|
|
||||||
info!("list_all_channels emit {} channels", channels.len());
|
|
||||||
for ch in channels {
|
for ch in channels {
|
||||||
tx.send(Ok(ch)).await?;
|
let mm = match ch.split("-").next() {
|
||||||
|
Some(k) => {
|
||||||
|
let dname = index_path.parent().unwrap().file_name().unwrap().to_str().unwrap();
|
||||||
|
if dname.starts_with(&format!("archive_{}", k)) {
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => false,
|
||||||
|
};
|
||||||
|
let item = ListChannelItem {
|
||||||
|
name: ch,
|
||||||
|
index_path: index_path.to_str().unwrap().into(),
|
||||||
|
matches: mm,
|
||||||
|
};
|
||||||
|
tx.send(Ok(item)).await?;
|
||||||
|
//info!("{:?} parent {:?} channel {}", index_path, index_path.parent(), ch);
|
||||||
|
//break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("list_all_channels DONE");
|
|
||||||
Ok::<_, Error>(())
|
Ok::<_, Error>(())
|
||||||
};
|
};
|
||||||
wrap_task(task, tx2);
|
wrap_task(task, tx2);
|
||||||
|
|||||||
@@ -32,7 +32,9 @@ async fn datablock_stream(
|
|||||||
Err(e) => match tx.send(Err(e)).await {
|
Err(e) => match tx.send(Err(e)).await {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("can not forward error: {:?}", e);
|
if false {
|
||||||
|
error!("can not send. error: {}", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -55,6 +57,7 @@ async fn datablock_stream_inner(
|
|||||||
"search for {:?} with basename: {} in path {:?}",
|
"search for {:?} with basename: {} in path {:?}",
|
||||||
channel, basename, base
|
channel, basename, base
|
||||||
);
|
);
|
||||||
|
// TODO need to try both:
|
||||||
let index_path = base.join(format!("archive_{}_SH", basename)).join("index");
|
let index_path = base.join(format!("archive_{}_SH", basename)).join("index");
|
||||||
let res = open_read(index_path.clone()).await;
|
let res = open_read(index_path.clone()).await;
|
||||||
debug!("tried to open index file: {:?}", res);
|
debug!("tried to open index file: {:?}", res);
|
||||||
|
|||||||
@@ -682,11 +682,9 @@ pub async fn update_search_cache(req: Request<Body>, node_config: &NodeConfigCac
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||||
info!("channel_config");
|
|
||||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||||
//let pairs = get_url_query_pairs(&url);
|
//let pairs = get_url_query_pairs(&url);
|
||||||
let q = ChannelConfigQuery::from_url(&url)?;
|
let q = ChannelConfigQuery::from_url(&url)?;
|
||||||
info!("ChannelConfigQuery {:?}", q);
|
|
||||||
let conf = if let Some(conf) = &node_config.node.channel_archiver {
|
let conf = if let Some(conf) = &node_config.node.channel_archiver {
|
||||||
archapp_wrap::archapp::archeng::channel_config(&q, conf).await?
|
archapp_wrap::archapp::archeng::channel_config(&q, conf).await?
|
||||||
} else if let Some(conf) = &node_config.node.archiver_appliance {
|
} else if let Some(conf) = &node_config.node.archiver_appliance {
|
||||||
|
|||||||
Reference in New Issue
Block a user