Receive hello message from ca server
This commit is contained in:
@@ -21,4 +21,5 @@ netpod = { path = "../netpod" }
|
||||
dbconn = { path = "../dbconn" }
|
||||
disk = { path = "../disk" }
|
||||
parse = { path = "../parse" }
|
||||
netfetch = { path = "../netfetch" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
|
||||
@@ -198,6 +198,12 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
||||
} else {
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||
}
|
||||
} else if path == "/api/4/ca_connect_1" {
|
||||
if req.method() == Method::GET {
|
||||
Ok(ca_connect_1(req, &node_config).await?)
|
||||
} else {
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||
}
|
||||
} else if path == "/api/4/channel/config" {
|
||||
if req.method() == Method::GET {
|
||||
Ok(channel_config(req, &node_config).await?)
|
||||
@@ -532,3 +538,20 @@ pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached)
|
||||
.body(Body::from(serde_json::to_string(&res)?))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn ca_connect_1(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
let (head, _body) = req.into_parts();
|
||||
let params = netpod::query_params(head.uri.query());
|
||||
let addr = params.get("addr").unwrap().into();
|
||||
let res = netfetch::ca_connect_1(addr, node_config).await?;
|
||||
let ret = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, "application/jsonlines")
|
||||
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
|
||||
Ok(mut item) => {
|
||||
item.push('\n');
|
||||
Ok(item)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
})))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
20
netfetch/Cargo.toml
Normal file
20
netfetch/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "netfetch"
|
||||
version = "0.0.1-a.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
serde_cbor = "0.11.1"
|
||||
tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||
tokio-stream = {version = "0.1.5", features = ["fs"]}
|
||||
async-channel = "1.6"
|
||||
bytes = "1.0.1"
|
||||
arrayref = "0.3.6"
|
||||
byteorder = "1.4.3"
|
||||
futures-core = "0.3.14"
|
||||
futures-util = "0.3.14"
|
||||
err = { path = "../err" }
|
||||
netpod = { path = "../netpod" }
|
||||
62
netfetch/src/lib.rs
Normal file
62
netfetch/src/lib.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use async_channel::{bounded, Receiver};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use err::Error;
|
||||
use futures_util::FutureExt;
|
||||
use netpod::NodeConfigCached;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Message {
|
||||
cmd: u16,
|
||||
payload_len: u16,
|
||||
type_type: u16,
|
||||
data_len: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum FetchItem {
|
||||
Log(String),
|
||||
Message(Message),
|
||||
}
|
||||
|
||||
pub async fn ca_connect_1(
|
||||
addr: String,
|
||||
_node_config: &NodeConfigCached,
|
||||
) -> Result<Receiver<Result<FetchItem, Error>>, Error> {
|
||||
let (tx, rx) = bounded(16);
|
||||
let tx2 = tx.clone();
|
||||
tokio::task::spawn(
|
||||
async move {
|
||||
let mut conn = tokio::net::TcpStream::connect(addr).await?;
|
||||
let (mut inp, mut out) = conn.split();
|
||||
tx.send(Ok(FetchItem::Log(format!("connected")))).await?;
|
||||
let mut b2 = BytesMut::with_capacity(128);
|
||||
b2.put_u16(0);
|
||||
b2.put_u16(0);
|
||||
b2.put_u16(0);
|
||||
b2.put_u16(0xb);
|
||||
b2.put_u32(0);
|
||||
b2.put_u32(0);
|
||||
out.write_all(&b2).await?;
|
||||
tx.send(Ok(FetchItem::Log(format!("written")))).await?;
|
||||
let mut buf = [0; 64];
|
||||
let n1 = inp.read(&mut buf).await?;
|
||||
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
|
||||
.await?;
|
||||
Ok::<_, Error>(())
|
||||
}
|
||||
.then({
|
||||
move |item| async move {
|
||||
match item {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))).await?;
|
||||
}
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
}
|
||||
}),
|
||||
);
|
||||
Ok(rx)
|
||||
}
|
||||
Reference in New Issue
Block a user