Try optional h3 handler

This commit is contained in:
Dominik Werder
2025-02-17 13:37:30 +01:00
parent a967fbd08a
commit 8b1b14de4f
7 changed files with 95 additions and 79 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.5-aa.6"
version = "0.5.5-aa.7"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -66,7 +66,7 @@ pub async fn delay_io_medium() {
}
pub async fn create_connection(db_config: &Database) -> Result<(PgClient, JoinHandle<Result<(), Error>>), Error> {
warn!("create_connection creating postgres connection");
info!("create_connection creating postgres connection");
// TODO use a common already running worker pool for these queries:
let d = db_config;
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name);

View File

@@ -15,9 +15,10 @@ http = "1.2.0"
http-body-util = { version = "0.1.2" }
hyper = { version = "1.6.0", features = ["http1", "http2", "client", "server"] }
hyper-util = { version = "0.1.10", features = ["http1", "http2", "client", "server"] }
h3 = { version = "0.0.6", optional = true }
h3-quinn = { version = "0.0.7", optional = true }
quinn = { version = "0.11.6", optional = true, default-features = false, features = ["log", "platform-verifier", "runtime-tokio", "rustls-ring"] }
rustls = { version = "0.23.22", optional = true, default-features = false, features = ["logging", "std", "tls12", "ring"] }
rustls = { version = "0.23.22", optional = true, default-features = false, features = ["logging", "std", "ring"] }
rustls-pki-types = { version = "1.11.0", optional = true }
pin-project = "1.1.9"
bytes = "1.10.0"
@@ -53,7 +54,7 @@ daqbuf-redis = { path = "../daqbuf-redis" }
httpclient = { path = "../httpclient" }
[features]
default = []
#default = ["http3"]
#default = []
default = ["http3"]
prometheus_endpoint = []
http3 = ["h3-quinn", "quinn", "rustls", "rustls-pki-types"]
http3 = ["h3", "h3-quinn", "quinn", "rustls", "rustls-pki-types"]

View File

@@ -1,3 +1,5 @@
use bytes::Bytes;
use http::StatusCode;
use quinn;
use quinn::crypto::rustls::QuicServerConfig;
use quinn::Endpoint;
@@ -12,6 +14,8 @@ use std::task::Context;
use std::task::Poll;
use taskrun::tokio;
const EARLY_DATA_MAX: u32 = u32::MAX;
macro_rules! info { ($($arg:expr),*) => ( if true { netpod::log::info!($($arg),*); } ); }
autoerr::create_error_v1!(
@@ -19,6 +23,12 @@ autoerr::create_error_v1!(
enum variants {
NoRuntime,
IO(#[from] std::io::Error),
H3(#[from] h3::Error),
Http(#[from] http::Error),
Pem(#[from] rustls::pki_types::pem::Error),
Rustls(#[from] rustls::Error),
NoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite),
QuinnConnection(#[from] quinn::ConnectionError),
},
);
@@ -27,60 +37,63 @@ pub struct Http3Support {
}
impl Http3Support {
pub async fn new(bind_addr: SocketAddr) -> Result<Self, Error> {
let key = match PemObject::from_pem_file("key.pem") {
Ok(x) => x,
Err(e) => {
info!("key error {}", e);
return Ok(Self::dummy());
}
};
let cert = match PemObject::from_pem_file("cert.pem") {
Ok(x) => x,
Err(e) => {
info!("cert error {}", e);
return Ok(Self::dummy());
}
};
let conf = EndpointConfig::default();
let mut tls_conf = match rustls::ServerConfig::builder()
pub async fn new_or_dummy(bind_addr: SocketAddr) -> Result<Self, Error> {
Ok(Self::new(bind_addr).await.unwrap_or_else(|e| {
info!("error {}", e);
Self::dummy()
}))
}
fn dummy() -> Self {
Self { ep: None }
}
async fn new(bind_addr: SocketAddr) -> Result<Self, Error> {
let key = PemObject::from_pem_file("key.pem")?;
let cert = PemObject::from_pem_file("cert.pem")?;
let mut tls_conf = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert], key)
{
Ok(x) => x,
Err(e) => {
info!("tls config error {}", e);
return Ok(Self::dummy());
}
};
tls_conf.alpn_protocols = vec![b"h3".to_vec()];
tls_conf.max_early_data_size = u32::MAX;
.with_single_cert(vec![cert], key)?;
tls_conf.alpn_protocols = vec![b"HTTP/3".to_vec(), b"h3".to_vec()];
tls_conf.max_early_data_size = EARLY_DATA_MAX;
let tls_conf = tls_conf;
let v = match QuicServerConfig::try_from(tls_conf) {
Ok(x) => x,
Err(e) => {
info!("config error {}", e);
return Ok(Self::dummy());
}
};
let v = QuicServerConfig::try_from(tls_conf)?;
let quic_conf = Arc::new(v);
let conf_srv = quinn::ServerConfig::with_crypto(quic_conf);
let ep2 = Endpoint::server(conf_srv, bind_addr)?;
{
let ep = ep2.clone();
tokio::task::spawn(Self::accept(ep));
}
let ret = Self { ep: Some(ep2) };
Ok(ret)
}
async fn new_plain_quic(bind_addr: SocketAddr) -> Result<Self, Error> {
let key = PemObject::from_pem_file("key.pem")?;
let cert = PemObject::from_pem_file("cert.pem")?;
let conf = EndpointConfig::default();
let mut tls_conf = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert], key)?;
tls_conf.alpn_protocols = vec![b"h3".to_vec()];
tls_conf.max_early_data_size = EARLY_DATA_MAX;
let tls_conf = tls_conf;
let v = QuicServerConfig::try_from(tls_conf)?;
let quic_conf = Arc::new(v);
let conf_srv = quinn::ServerConfig::with_crypto(quic_conf);
let sock = std::net::UdpSocket::bind(bind_addr)?;
info!("h3 sock {:?}", sock);
let rt = quinn::default_runtime().ok_or_else(|| Error::NoRuntime)?;
let ep = Endpoint::new(conf, Some(conf_srv), sock, rt)?;
let ep1 = Endpoint::new(conf, Some(conf_srv.clone()), sock, rt)?;
{
let ep = ep.clone();
let ep = ep1.clone();
tokio::task::spawn(Self::accept(ep));
}
let ret = Self { ep: Some(ep) };
let ret = Self { ep: Some(ep1) };
Ok(ret)
}
fn dummy() -> Self {
Self { ep: None }
}
pub async fn wait_idle(self) -> () {
if let Some(ep) = self.ep.as_ref() {
ep.close(quinn::VarInt::from_u32(1), b"shutdown");
@@ -95,37 +108,39 @@ impl Http3Support {
}
}
async fn handle_incoming(inc: Incoming) {
info!("new incoming {:?}", inc.remote_address());
let conn = match inc.await {
Ok(x) => x,
async fn handle_incoming(inc: Incoming) -> Result<(), Error> {
match Self::handle_incoming_inner(inc).await {
Ok(x) => Ok(x),
Err(e) => {
info!("connection error {}", e);
return;
info!("error handle_connection {}", e);
Err(e)
}
};
let fut1 = {
let conn = conn.clone();
async move {
let bi = conn.accept_bi().await;
info!("got bi {:?}", bi);
match bi {
Ok(mut v) => {
v.0.write(b"some-data").await;
}
Err(e) => {}
}
}
};
let fut2 = {
let conn = conn.clone();
async move {
let uni = conn.accept_uni().await;
info!("got uni {:?}", uni);
}
};
tokio::spawn(fut1);
tokio::spawn(fut2);
}
}
async fn handle_incoming_inner(inc: Incoming) -> Result<(), Error> {
let addr_remote = inc.remote_address();
info!("new incoming {:?}", addr_remote);
let conn1 = inc.accept()?.await?;
let conn2 = h3_quinn::Connection::new(conn1);
let mut conn3 = h3::server::builder().build::<_, Bytes>(conn2).await?;
while let Some((req, mut stream)) = conn3.accept().await? {
let (head, _body) = req.into_parts();
info!(
"see request {} {:?} {:?} {:?}",
addr_remote, head.method, head.uri, head.headers
);
let res = http::Response::builder()
.version(http::Version::HTTP_3)
.status(StatusCode::OK)
.header("x-daqbuf-tmp", "8e4b217")
.body(())?;
stream.send_response(res).await?;
stream.send_data(Bytes::from_static(b"2025-02-05T16:37:12Z")).await?;
stream.finish().await?;
info!("response sent {}", addr_remote);
}
Ok(())
}
}

View File

@@ -160,7 +160,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
let shared_res = Arc::new(shared_res);
use std::str::FromStr;
let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?;
let http3 = http3::Http3Support::new(bind_addr.clone()).await?;
let http3 = http3::Http3Support::new_or_dummy(bind_addr.clone()).await?;
// tokio::net::TcpSocket::new_v4()?.listen(200)?
let listener = TcpListener::bind(bind_addr).await?;
loop {

View File

@@ -80,7 +80,7 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -
status_board_init();
use std::str::FromStr;
let bind_addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?;
let http3 = http3::Http3Support::new(bind_addr.clone()).await?;
let http3 = http3::Http3Support::new_or_dummy(bind_addr.clone()).await?;
let listener = TcpListener::bind(bind_addr).await?;
loop {
let (stream, addr) = match listener.accept().await {

View File

@@ -18,7 +18,7 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result<Arc<ScySession
}
pub async fn create_scy_session_no_ks(scyconf: &ScyllaConfig) -> Result<ScySession, Error> {
warn!("creating scylla connection");
info!("creating scylla connection");
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.default_execution_profile_handle(