Find handled channels by name
This commit is contained in:
@@ -28,6 +28,8 @@ hex = "0.4"
|
||||
libc = "0.2"
|
||||
regex = "1.5.5"
|
||||
axum = "0.5"
|
||||
http = "0.2"
|
||||
hyper = "0.14"
|
||||
log = { path = "../log" }
|
||||
stats = { path = "../stats" }
|
||||
err = { path = "../../daqbuffer/err" }
|
||||
|
||||
@@ -4,7 +4,9 @@ pub mod search;
|
||||
pub mod store;
|
||||
|
||||
use self::store::DataStore;
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::store::{CommonInsertItemQueue, QueryItem};
|
||||
use async_channel::Sender;
|
||||
use conn::CaConn;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
@@ -13,7 +15,7 @@ use netpod::{Database, ScyllaConfig};
|
||||
use scylla::batch::Consistency;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::net::{Ipv4Addr, SocketAddrV4};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
@@ -229,16 +231,35 @@ async fn spawn_scylla_insert_workers(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct CommandQueueSet {
|
||||
queues: tokio::sync::Mutex<VecDeque<Sender<ConnCommand>>>,
|
||||
}
|
||||
|
||||
impl CommandQueueSet {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
queues: tokio::sync::Mutex::new(VecDeque::<Sender<ConnCommand>>::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn queues(&self) -> &tokio::sync::Mutex<VecDeque<Sender<ConnCommand>>> {
|
||||
&self.queues
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let facility = "scylla";
|
||||
let insert_frac = Arc::new(AtomicU64::new(1000));
|
||||
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
|
||||
let opts = parse_config(opts.config).await?;
|
||||
let scyconf = opts.scyconf.clone();
|
||||
|
||||
let command_queue_set = Arc::new(CommandQueueSet::new());
|
||||
tokio::spawn(crate::metrics::start_metrics_service(
|
||||
opts.api_bind.clone(),
|
||||
insert_frac.clone(),
|
||||
insert_ivl_min.clone(),
|
||||
command_queue_set.clone(),
|
||||
));
|
||||
let d = Database {
|
||||
name: opts.pgconf.name.clone(),
|
||||
@@ -363,6 +384,8 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
conn.channel_add(c);
|
||||
}
|
||||
let stats2 = conn.stats();
|
||||
let conn_command_tx = conn.conn_command_tx();
|
||||
command_queue_set.queues().lock().await.push_back(conn_command_tx);
|
||||
let conn_block = async move {
|
||||
while let Some(item) = conn.next().await {
|
||||
match item {
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::ca::proto::{CreateChan, EventAdd, HeadInfo};
|
||||
use crate::ca::store::ChannelRegistry;
|
||||
use crate::series::{Existence, SeriesId};
|
||||
use crate::store::{CommonInsertItemQueueSender, InsertItem, IvlItem, MuteItem, QueryItem};
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
use futures_util::stream::FuturesOrdered;
|
||||
use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
|
||||
@@ -108,6 +109,26 @@ impl IdStore {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ConnCommandKind {
|
||||
FindChannel(String, Sender<(SocketAddrV4, Vec<String>)>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnCommand {
|
||||
kind: ConnCommandKind,
|
||||
}
|
||||
|
||||
impl ConnCommand {
|
||||
pub fn find_channel(pattern: String) -> (ConnCommand, async_channel::Receiver<(SocketAddrV4, Vec<String>)>) {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let cmd = Self {
|
||||
kind: ConnCommandKind::FindChannel(pattern, tx),
|
||||
};
|
||||
(cmd, rx)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub struct CaConn {
|
||||
state: CaConnState,
|
||||
@@ -134,6 +155,8 @@ pub struct CaConn {
|
||||
stats: Arc<CaConnStats>,
|
||||
insert_queue_max: usize,
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
conn_command_tx: async_channel::Sender<ConnCommand>,
|
||||
conn_command_rx: async_channel::Receiver<ConnCommand>,
|
||||
}
|
||||
|
||||
impl CaConn {
|
||||
@@ -146,6 +169,7 @@ impl CaConn {
|
||||
insert_queue_max: usize,
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
) -> Self {
|
||||
let (cq_tx, cq_rx) = async_channel::bounded(32);
|
||||
Self {
|
||||
state: CaConnState::Unconnected,
|
||||
proto: None,
|
||||
@@ -170,6 +194,45 @@ impl CaConn {
|
||||
stats: Arc::new(CaConnStats::new()),
|
||||
insert_queue_max,
|
||||
insert_ivl_min,
|
||||
conn_command_tx: cq_tx,
|
||||
conn_command_rx: cq_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn conn_command_tx(&self) -> async_channel::Sender<ConnCommand> {
|
||||
self.conn_command_tx.clone()
|
||||
}
|
||||
|
||||
fn handle_conn_command(&mut self, cx: &mut Context) {
|
||||
// TODO if this loops for too long time, interrupt and make sure we get called waken up again.
|
||||
use Poll::*;
|
||||
loop {
|
||||
match self.conn_command_rx.poll_next_unpin(cx) {
|
||||
Ready(Some(a)) => match a.kind {
|
||||
ConnCommandKind::FindChannel(pattern, tx) => {
|
||||
info!("Search for {pattern:?}");
|
||||
let mut res = Vec::new();
|
||||
for name in self.name_by_cid.values() {
|
||||
if !pattern.is_empty() && name.contains(&pattern) {
|
||||
res.push(name.clone());
|
||||
}
|
||||
}
|
||||
let msg = (self.remote_addr_dbg.clone(), res);
|
||||
match tx.try_send(msg) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
error!("response channel full or closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Ready(None) => {
|
||||
error!("Command queue closed");
|
||||
}
|
||||
Pending => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -723,6 +786,7 @@ impl Stream for CaConn {
|
||||
self.poll_count += 1;
|
||||
// TODO factor out the inner loop:
|
||||
let ret = 'outer: loop {
|
||||
self.handle_conn_command(cx);
|
||||
let q = self.handle_insert_futs(cx);
|
||||
let ts2 = Instant::now();
|
||||
self.stats
|
||||
|
||||
@@ -1,12 +1,28 @@
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::ca::CommandQueueSet;
|
||||
use log::*;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub async fn start_metrics_service(bind_to: String, insert_frac: Arc<AtomicU64>, insert_ivl_min: Arc<AtomicU64>) {
|
||||
let app = axum::Router::new()
|
||||
async fn get_empty() -> String {
|
||||
format!("")
|
||||
}
|
||||
|
||||
pub async fn start_metrics_service(
|
||||
bind_to: String,
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
command_queue_set: Arc<CommandQueueSet>,
|
||||
) {
|
||||
use axum::extract::Query;
|
||||
use axum::routing::{get, post, put};
|
||||
use axum::{extract, Router};
|
||||
use http::request::Parts;
|
||||
let app = Router::new()
|
||||
.route(
|
||||
"/metrics",
|
||||
axum::routing::get(|| async {
|
||||
get(|| async {
|
||||
let stats = crate::ca::get_metrics();
|
||||
match stats {
|
||||
Some(s) => {
|
||||
@@ -20,17 +36,88 @@ pub async fn start_metrics_service(bind_to: String, insert_frac: Arc<AtomicU64>,
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/daqingest/find/channel",
|
||||
get(|Query(params): Query<HashMap<String, String>>| async move {
|
||||
let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string();
|
||||
let g = command_queue_set.queues().lock().await;
|
||||
let mut rxs = Vec::new();
|
||||
for tx in g.iter() {
|
||||
let (cmd, rx) = ConnCommand::find_channel(pattern.clone());
|
||||
rxs.push(rx);
|
||||
if let Err(_) = tx.send(cmd).await {
|
||||
error!("can not send command");
|
||||
}
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for rx in rxs {
|
||||
let item = rx.recv().await.unwrap();
|
||||
let item = (item.0.to_string(), item.1);
|
||||
if item.1.len() > 0 {
|
||||
res.push(item);
|
||||
}
|
||||
}
|
||||
serde_json::to_string(&res).unwrap()
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/insert_frac",
|
||||
axum::routing::put(|v: axum::extract::Json<u64>| async move {
|
||||
get(get_empty).put(|v: extract::Json<u64>| async move {
|
||||
insert_frac.store(v.0, Ordering::Release);
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/insert_ivl_min",
|
||||
axum::routing::put(|v: axum::extract::Json<u64>| async move {
|
||||
put(|v: extract::Json<u64>| async move {
|
||||
insert_ivl_min.store(v.0, Ordering::Release);
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/status/buildinfo",
|
||||
get(|| async {
|
||||
let res = serde_json::json!({
|
||||
"status": "success",
|
||||
"data": {
|
||||
"version": "2.37",
|
||||
"revision": "aaaaaaaaaaaaaaaaaaaaaaaaa",
|
||||
"branch": "dev",
|
||||
"buildUser": "empty",
|
||||
"buildDate": "2022-07-14",
|
||||
"goVersion": "go1"
|
||||
}
|
||||
});
|
||||
serde_json::to_string(&res).unwrap()
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/query",
|
||||
post(
|
||||
|Query(params): Query<HashMap<String, String>>, parts: Parts| async move {
|
||||
info!("/api/v1/query params {params:?} {parts:?}");
|
||||
},
|
||||
),
|
||||
)
|
||||
.fallback(
|
||||
get(|parts: Parts, body: extract::RawBody<hyper::Body>| async move {
|
||||
let bytes = hyper::body::to_bytes(body.0).await.unwrap();
|
||||
let s = String::from_utf8_lossy(&bytes);
|
||||
info!("GET {parts:?} body: {s:?}");
|
||||
})
|
||||
.post(|parts: Parts, body: extract::RawBody<hyper::Body>| async move {
|
||||
let bytes = hyper::body::to_bytes(body.0).await.unwrap();
|
||||
let s = String::from_utf8_lossy(&bytes);
|
||||
info!("POST {parts:?} body: {s:?}");
|
||||
})
|
||||
.put(|parts: Parts, body: extract::RawBody<hyper::Body>| async move {
|
||||
let bytes = hyper::body::to_bytes(body.0).await.unwrap();
|
||||
let s = String::from_utf8_lossy(&bytes);
|
||||
info!("PUT {parts:?} body: {s:?}");
|
||||
})
|
||||
.delete(|parts: Parts, body: extract::RawBody<hyper::Body>| async move {
|
||||
let bytes = hyper::body::to_bytes(body.0).await.unwrap();
|
||||
let s = String::from_utf8_lossy(&bytes);
|
||||
info!("DELETE {parts:?} body: {s:?}");
|
||||
}),
|
||||
);
|
||||
axum::Server::bind(&bind_to.parse().unwrap())
|
||||
.serve(app.into_make_service())
|
||||
|
||||
Reference in New Issue
Block a user