Fix warnings

This commit is contained in:
Dominik Werder
2021-11-05 15:53:07 +01:00
parent 96e0473392
commit daf3f6c14c
8 changed files with 27 additions and 15 deletions

View File

@@ -19,7 +19,7 @@ use crate::wrap_task;
use async_channel::{Receiver, Sender};
use err::Error;
use futures_util::StreamExt;
use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen};
use items::{Sitemty, StatsItem, StreamItem, WithLen};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{

View File

@@ -16,6 +16,7 @@ pub struct BackReadBuf<F> {
seek_request: u64,
seek_done: u64,
read_done: u64,
bytes_read: u64,
}
impl<F> BackReadBuf<F>
@@ -33,6 +34,7 @@ where
seek_request: 0,
seek_done: 0,
read_done: 0,
bytes_read: 0,
};
ret.seek(pos).await?;
Ok(ret)
@@ -70,6 +72,7 @@ where
//debug!("I/O fill n {}", n);
self.wp += n;
self.read_done += 1;
self.bytes_read += n as u64;
Ok(n)
}
@@ -85,7 +88,6 @@ where
}
pub async fn seek(&mut self, pos: u64) -> Result<u64, Error> {
let dp = pos as i64 - self.abs as i64 - self.rp as i64;
if pos >= self.abs && pos < self.abs + self.buf.len() as u64 - 64 {
self.rp = (pos - self.abs) as usize;
self.seek_request += 1;
@@ -106,6 +108,14 @@ where
Ok(ret)
}
}
pub fn rp_abs(&self) -> u64 {
self.abs as u64 + self.rp as u64
}
pub fn bytes_read(&self) -> u64 {
self.bytes_read
}
}
impl<F> fmt::Debug for BackReadBuf<F> {
@@ -117,6 +127,7 @@ impl<F> fmt::Debug for BackReadBuf<F> {
.field("seek_request", &self.seek_request)
.field("seek_done", &self.seek_done)
.field("read_done", &self.read_done)
.field("bytes_read", &self.bytes_read)
.finish()
}
}

View File

@@ -91,8 +91,13 @@ impl BlockrefStream {
for row in rows {
self.paths.push_back(row.try_get(0)?);
}
self.steps = SetupNextPath;
Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("DBQUERY"))), self)))
if self.paths.len() == 0 {
self.steps = Done;
Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("NOMOREPATHS"))), self)))
} else {
self.steps = SetupNextPath;
Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("DBQUERY"))), self)))
}
}
SetupNextPath => {
let stats = &StatsChannel::dummy();

View File

@@ -406,9 +406,10 @@ fn _format_debug_2(evs: WaveEvents<i32>) -> Result<(), Error> {
pub async fn read_data2(
rb: &mut RingBuf<File>,
datafile_header: &DatafileHeader,
range: NanoRange,
_range: NanoRange,
_expand: bool,
) -> Result<EventsItem, Error> {
// TODO handle range
// TODO handle expand mode
{
let dpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64;

View File

@@ -1,13 +1,12 @@
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{
format_hex_block, name_hash, open_read, readu16, readu32, readu64, seek, StatsChannel, EPICS_EPOCH_OFFSET,
format_hex_block, name_hash, open_read, readu16, readu32, readu64, StatsChannel, EPICS_EPOCH_OFFSET,
};
use err::Error;
use netpod::{log::*, NanoRange};
use netpod::{timeunits::SEC, FilePos, Nanos};
use std::collections::VecDeque;
use std::fmt;
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tokio::fs::File;

View File

@@ -2,7 +2,6 @@ use crate::archeng::{read, seek, StatsChannel};
use err::Error;
use netpod::log::*;
use std::fmt;
use std::mem::ManuallyDrop;
use std::{borrow::BorrowMut, io::SeekFrom};
use tokio::fs::File;

View File

@@ -1,12 +1,11 @@
use crate::response;
use disk::events::PlainEventsJsonQuery;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use http::{header, Method, Request, Response, StatusCode};
use hyper::Body;
use netpod::query::RawEventsQuery;
use netpod::{get_url_query_pairs, log::*, Channel, NanoRange};
use netpod::log::*;
use netpod::{get_url_query_pairs, Channel, NanoRange};
use netpod::{NodeConfigCached, APP_JSON_LINES};
use serde::Serialize;
use serde_json::Value as JsVal;
@@ -342,9 +341,8 @@ impl BlockStream {
let read_queue = pairs.get("readQueue").unwrap_or(&"1".to_string()).parse()?;
let channel_name = pairs.get("channelName").map(String::from).unwrap_or("NONE".into());
let channel = Channel {
backend: "".into(),
backend: node_config.node.backend.clone(),
name: channel_name,
//name: "ARIDI-PCT:CURRENT".into(),
};
use archapp_wrap::archapp::archeng;
let s = archeng::blockrefstream::blockref_stream(channel, range.clone(), conf.clone());
@@ -354,7 +352,7 @@ impl BlockStream {
Ok(item) => {
use archeng::blockstream::BlockItem;
match item {
BlockItem::EventsItem(item) => Ok(JsVal::String("EventsItem".into())),
BlockItem::EventsItem(_item) => Ok(JsVal::String("EventsItem".into())),
BlockItem::JsVal(jsval) => Ok(jsval),
}
}

View File

@@ -83,7 +83,6 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
.await?;
Ok(ret)
} else {
info!("bad accept: {:?}", head.headers.get(header::ACCEPT));
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::from(format!("{:?}", proxy_config.name)))?)
}
}