tonic由三部分组成:

  • 范型gRPC实现:用于支持任何http/2实现,通过一组范型trait进行编码。
  • http/2实现:基于hyper框架实现,至于hyper则是基于tokio技术栈的一个高性能http/1.1和http/2的客户端与服务端框架。
  • 基于prost的代码生成:包含基于protobuf定义生成代码的工具。

按照tonic提供的功能一一进行使用

  • 双向流
  • 高性能异步io
  • 基于rustls的TLS
  • 负载均衡
  • 自定义元数据
  • 认证
  • 健康检查

服务

按照客户端和服务端的通信方式,可以分为ping-pong,单向流、双向流的服务。以此举例说明。

ping-pong服务

即请求一次,响应一次的服务,也是最常见的服务。

定义proto文件

一个简单的proto文件

syntax = "proto3";
package helloworld;
 
service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply);
}
 
message HelloRequest {
   string name = 1;
}
 
message HelloReply {
    string message = 1;
}

添加项目依赖

除了基本的tonic依赖,这里还添加了tonic-build这个构建依赖,让我们用来将protobuf客户端和服务端代码的生成过程合并到应用程序的构建过程当中去。

[package]
name = "helloworld-tonic"
version = "0.1.0"
edition = "2021"
 
[dependencies]
tonic = "0.12"
prost = "0.13"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
 
[build-dependencies]
tonic-build = "0.12"

添加代码生成到构建过程

rust项目在构建过程中会执行项目build.rs中内容,所以我们基于tonic-build来将代码生成过程添加到项目构建过程当中去。 在build.rs中添加如下代码

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("proto/helloworld.proto")?;
    Ok(())
}

tonic-build可以对生成的代码做很多配置,但是这里就不加介绍了,可以参考官方文档tonic-build,或者后续新增专题补充一下。

服务端实现

导入生成的代码,然后实现,就像其它语言那样。

pub mod hello_world {
    tonic::include_proto!("helloworld");
}
 
#[derive(Debug, Default)]
pub struct MyGreet {}
 
#[tonic::async_trait]
impl Greeter for MyGreet {
    async fn say_hello(
        &self,
        request: Request<HelloRequest>,
    ) -> Result<Response<HelloReply>, Status> {
	    println!("Got a request: {:?}", request);
        let reply = HelloReply {
            message: format!("Hello: {}!", request.into_inner().name),
        };
        Ok(Response::new(reply))
    }
}

注册这个grpc服务并启动服务端

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let greet = MyGreet::default();
 
    Server::builder()
        .add_service(GreeterServer::new(greet))
        .serve(addr)
        .await?;
 
    Ok(())
}

src/server.rs作为一个可执行程序加入到项目当中,在Cargo.toml中添加

[[bin]]
name = "helloworld-server"
path = "src/server.rs"

这样就可以通过执行cargo run --bin helloworld-server来启动服务端程序了。 现在还没有客户端可以与服务端进行通信,不过可以借助grpc工具,例如grpcurl来完成这一点。 grpcurl -plaintext -import-path ./proto -proto helloworld.proto -d '{"name": "Tonic"}' '[::1]:50051' helloworld.Greeter/SayHello

客户端实现

完成与服务端进行通信的客户端实现。相比于服务端,客户端实现要简单一些,总结下来就是

  1. 导入生成的代码
  2. 与服务端建立链接
  3. 构造请求并发送
pub mod hello_world {
    tonic::include_proto!("helloworld");
}
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接服务端
    let mut client =
        hello_world::greeter_client::GreeterClient::connect("http://[::1]:50051").await?;
 
    // 构造请求
    let request = tonic::Request::new(HelloRequest {
        name: "Tonic".into(),
    });
 
    // 发送请求
    let response = client.say_hello(request).await?;
 
    println!("Response: {:?}", response);
 
    Ok(())
}

同样可以将客户端作为二进制程序运行,在Cargo.toml中添加

[[bin]]
name = "helloworld-client"
path = "src/client.rs"

运行命令cargo run --bin helloworld-client即可看到客户端成功请求服务端并打印响应信息到控制台

流服务

单向流和双向流都在这里统一进行说明,因为它们的逻辑是统一的。

定义protobuf文件

下面定义了ping-pong模式,服务端单向流、客户端单向流和双向流的接口。

syntax = "proto3";
package echo;
 
service Echo {
    rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
    rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
    rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
    rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}
 
message EchoRequest {
    string message = 1;
}
 
message EchoResponse {
    string message = 1;
}

添加代码生成到构建过程

fn main() -> Result<(), Box<dyn std::error::Error>> {
	...
    tonic_build::compile_protos("proto/echo.proto")?;
    Ok(())
}

服务端实现

相比于ping-pong模式,流式的服务比较复杂,无论是单向还是双向,流的一段操作的对应都是一个流,也就是一个可迭代的对象。 对于服务端流也就是需要返回流对象的接口,我们还需要额外引入tokio-stream这个crate来方便构造响应流,除此之外,还可以使用async-stream这个crate来构建流对象。

type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;
 
type ServerStreamingEchoStream = ResponseStream;
 
// server streaming
async fn server_streaming_echo(
	&self,
	request: Request<EchoRequest>,
) -> Result<Response<Self::ServerStreamingEchoStream>, Status> {
	println!("EchoServer::server_streaming_echo");
	println!("\tclient connected from: {:?}", request.remote_addr());
 
	let repeat = std::iter::repeat(EchoResponse {
		message: request.into_inner().message,
	});
 
	let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));
 
	let (tx, rx) = mpsc::channel(128);
	tokio::spawn(async move {
		while let Some(item) = stream.next().await {
			match tx.send(Result::<_, Status>::Ok(item)).await {
				Ok(_) => {}
				Err(_item) => {
					break;
				}
			}
		}
		println!("\tclient disconnected");
	});
 
	// 返回结果为一个流
	let output_stream = ReceiverStream::new(rx);
	Ok(Response::new(
		Box::pin(output_stream) as Self::ServerStreamingEchoStream
	))
}
 
type BidirectionalStreamingEchoStream = ResponseStream;
// 双向流
async fn bidirectional_streaming_echo(
	&self,
	request: Request<Streaming<EchoRequest>>,
) -> Result<Response<Self::BidirectionalStreamingEchoStream>, Status> {
	println!("EchoServer::bidirectional_streaming_echo");
 
	// 获取输入流
	let mut in_stream = request.into_inner();
	let (tx, rx) = mpsc::channel(128);
 
	// 启动一个异步协程,从输入流中读取数据,通过tx写入
	tokio::spawn(async move {
		while let Some(item) = in_stream.next().await {
			match item {
				Ok(v) => tx
					.send(Ok(EchoResponse { message: v.message }))
					.await
					.expect("working rx"),
				Err(err) => {
					break;
				}
			}
		}
		println!("\tstream ended");
	});
	// 返回一个流,负责从rx中读取数据
	let out_stream = ReceiverStream::new(rx);
	Ok(Response::new(
		Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
	))
}

客户端实现

客户端可以构造一个流式请求对象,也可以操作返回的流失响应对象

pub mod echo {
    tonic::include_proto!("echo");
}
 
async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
    let stream = client
        .server_streaming_echo(EchoRequest {
            message: "foo".into(),
        })
        .await
        .unwrap()
        .into_inner();
 
    let mut stream = stream.take(num);
    while let Some(item) = stream.next().await {
        println!("{}", item.unwrap().message);
    }
}
 
fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
    tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
        message: format!("msg {:02}", i),
    })
}
async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
    let in_stream = echo_requests_iter().take(num);
    let response = client
        .bidirectional_streaming_echo(in_stream)
        .await
        .unwrap();
 
    let mut resp_stream = response.into_inner();
 
    while let Some(item) = resp_stream.next().await {
        let received = item.unwrap();
        println!("{}", received.message)
    }
}
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = echo::echo_client::EchoClient::connect("http://[::1]:50051").await?;
    println!("Streaming echo");
    streaming_echo(&mut client, 5).await;
 
    println!("Bidirectional streaming echo");
    bidirectional_streaming_echo(&mut client, 17).await;
    Ok(())
}

TLS

不带客户端认证的TLS

也就是比较简单和常见的,只在服务端配置证书和密钥,客户端只设置ca证书负责检验证书有效性。

  • 服务端
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
	// 读取服务端证书和密钥文件
    let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]);
    let cert = std::fs::read_to_string(data_dir.join("tls/server.pem"))?;
    let key = std::fs::read_to_string(data_dir.join("tls/server.key"))?;
	// 身份认证
    let identity = Identity::from_pem(cert, key);
 
    let addr = "[::1]:50051".parse().unwrap();
    let server = EchoServer::default();
 
    Server::builder()
	    // 配置tls
        .tls_config(ServerTlsConfig::new().identity(identity))?
        .add_service(pb::echo_server::EchoServer::new(server))
        .serve(addr)
        .await?;
 
    Ok(())
}
  • 客户端
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
	// 读取ca证书
    let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]);
    let pem = std::fs::read_to_string(data_dir.join("tls/ca.pem"))?;
    let ca = Certificate::from_pem(pem);
	// 创建客户端tls配置
    let tls = ClientTlsConfig::new()
        .ca_certificate(ca)
        .domain_name("example.com");
	// 建立tls的channel
    let channel = Channel::from_static("https://[::1]:50051")
        .tls_config(tls)?
        .connect()
        .await?;
 
    let mut client = EchoClient::new(channel);
    let request = tonic::Request::new(EchoRequest {
        message: "hello".into(),
    });
 
    let response = client.unary_echo(request).await?;
 
    println!("RESPONSE={:?}", response);
 
    Ok(())
}

tls的连接相关信息会保存在请求的扩展当中,在服务端的服务端实现当中可以从中取得需要的连接信息

let conn_info = request
            .extensions()
            .get::<TlsConnectInfo<TcpConnectInfo>>()
            .unwrap();
        println!(
            "Got a request from {:?} with info {:?}",
            request.remote_addr(),
            conn_info
        );

带客户端认证的TLS

其实就是tls变为双向的。服务端有证书和密钥,客户端有对应的ca证书进行验证;现在客户端也有自己的证书和密钥,服务端也有对应的ca证书进行校验。

更加简单来说,就是反向再配置一份tls的配置

  • 服务端
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]);
    let cert = std::fs::read_to_string(data_dir.join("tls/server.pem"))?;
    let key = std::fs::read_to_string(data_dir.join("tls/server.key"))?;
    let server_identity = Identity::from_pem(cert, key);
	// 服务端添加客户端证书的ca证书文件
    let client_ca_cert = std::fs::read_to_string(data_dir.join("tls/client_ca.pem"))?;
    let client_ca_cert = Certificate::from_pem(client_ca_cert);
 
    let addr = "[::1]:50051".parse().unwrap();
    let server = EchoServer::default();
 
    let tls = ServerTlsConfig::new()
        .identity(server_identity)
        // 这里多了一个client_ca_root的配置
        .client_ca_root(client_ca_cert);
 
    Server::builder()
        .tls_config(tls)?
        .add_service(pb::echo_server::EchoServer::new(server))
        .serve(addr)
        .await?;
 
    Ok(())
}
  • 客户端
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]);
    let server_root_ca_cert = std::fs::read_to_string(data_dir.join("tls/ca.pem"))?;
    let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
    // 新增了客户端的证书和密钥文件
    let client_cert = std::fs::read_to_string(data_dir.join("tls/client1.pem"))?;
    let client_key = std::fs::read_to_string(data_dir.join("tls/client1.key"))?;
    let client_identity = Identity::from_pem(client_cert, client_key);
 
    let tls = ClientTlsConfig::new()
        .domain_name("localhost")
        .ca_certificate(server_root_ca_cert)
        // 多了一个客户端的认证
        .identity(client_identity);
 
    let channel = Channel::from_static("https://[::1]:50051")
        .tls_config(tls)?
        .connect()
        .await?;
 
    let mut client = EchoClient::new(channel);
 
    Ok(())
}

服务端要获取客户端连接的认证信息同样可以从请求扩展中获得,提供的有对应的方法

let certs = request
            .peer_certs()
            .expect("Client did not send its certs!");

使用rustls的TLS

同样是tls的配置,不过使用的是纯rust实现的rustls配置,不受平台环境影响。思路还是一样,生成tls配置,不过在代码使用上有些差异

  • 服务端
let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]);
    let certs = {
        let fd = std::fs::File::open(data_dir.join("tls/server.pem"))?;
        let mut buf = std::io::BufReader::new(&fd);
        rustls_pemfile::certs(&mut buf).collect::<Result<Vec<_>, _>>()?
    };
    let key = {
        let fd = std::fs::File::open(data_dir.join("tls/server.key"))?;
        let mut buf = std::io::BufReader::new(&fd);
        rustls_pemfile::private_key(&mut buf)?.unwrap()
    };
 
    let mut tls = ServerConfig::builder()
        .with_no_client_auth()
        .with_single_cert(certs, key)?;
    tls.alpn_protocols = vec![b"h2".to_vec()];
 
  • 客户端
let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]);
    let fd = std::fs::File::open(data_dir.join("tls/ca.pem"))?;
 
    let mut roots = RootCertStore::empty();
 
    let mut buf = std::io::BufReader::new(&fd);
    let certs = rustls_pemfile::certs(&mut buf).collect::<Result<Vec<_>, _>>()?;
    roots.add_parsable_certificates(certs.into_iter());
 
    let tls = ClientConfig::builder()
        .with_root_certificates(roots)
        .with_no_client_auth();
 

选项设置

超时

超时分为客户端超时与服务端超时。客户端超时控制客户端等待响应的时间,避免长时间阻塞;服务端超时控制处理请求的时间,避免长时间占用资源。

客户端超时

这里引入了tower crate和和timeout feature来创建了一个超时控制的channel,基于这个channel与服务端通信可在达到超时时间仍为接收到影响的时候直接返回一个超时错误。 需要注意的是,这里的客户端超时返回,服务端可能仍在正常执行

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let channel = Channel::from_static("http://[::1]:50051").connect().await?;
    let timeout_channel = Timeout::new(channel, std::time::Duration::from_secs(10));
 
    let mut client = GreeterClient::new(timeout_channel);
 
    let request = Request::new(HelloRequest {
        name: "Tonic".into(),
    });
 
    let response = client.say_hello(request).await?;
 
    println!("RESPONSE={:?}", response);
    Ok(())
}

服务端超时

服务单超时直接配置超时选项即可,服务端超时之后会取消请求处理并返回超时错误给客户端

 Server::builder()
        .timeout(Duration::from_secs(5))
        .add_service(GreeterServer::new(greet))
        .serve(addr)
        .await?;

可取消的请求

在配置服务端超时之后会取消请求执行。现在将取消移动到业务内可以自行处理。 学习下面的示例,原来配置客户端超时之后,可能服务端仍然在执行,资源没有释放。这里提供一种可以让客户端取消服务端future执行的一种方式。

服务端

在服务端通过检查客户端是否取消以及通过select!来复用选择的异步结果。

  • 准备好要正常执行和接收到取消请求的处理函数
let remote_addr = request.remote_addr();
        // 正常请求处理
        let request_future = async move {
            println!("Got a request from {:?}", request.remote_addr());
 
            // Take a long time to complete request for the client to cancel early
            sleep(Duration::from_secs(10)).await;
 
            let reply = HelloReply {
                message: format!("Hello {}!", request.into_inner().name),
            };
 
            Ok(Response::new(reply))
        };
        // 收到客户端取消请求的处理
        let cancellation_future = async move {
            println!("Request from {:?} cancelled by client", remote_addr);
            // If this future is executed it means the request future was dropped,
            // so it doesn't actually matter what is returned here
            Err(Status::cancelled("Request cancelled by client"))
        };
        with_cancellation_handler(request_future, cancellation_future).await

with_cancellation_handler当中,会复用并获取到最先完成的future的结果,然后返回

async fn with_cancellation_handler(
    request_future: impl std::future::Future<Output = Result<Response<HelloReply>, Status>>
        + Send
        + 'static,
    cancellation_future: impl std::future::Future<Output = Result<Response<HelloReply>, Status>>
        + Send
        + 'static,
) -> Result<Response<HelloReply>, Status> {
    let token = CancellationToken::new();
    // // 将一个可以被取消的操作与token在select!关联起来,然后为客户端提供一个取消接口就可以实现取消操作
    let _drop_guard = token.clone().drop_guard();
 
    let select_task = tokio::spawn(async move {
        // 那个先执行完成返回那个的结果
        select! {
            res = request_future => res,
            _ = token.cancelled() => cancellation_future.await,
        }
    });
 
    select_task.await.unwrap()
}

这里还借助了tokio-util crate,用来检测客户端是否取消请求。

客户端

客户端还是借用超时实现的,也就是超时直接取消

let mut client = GreeterClient::connect("http://[::1]:50051").await?;
 
    let request = tonic::Request::new(HelloRequest {
        name: "Tonic".into(),
    });
 
    // Cancelling the request by dropping the request future after 1 second
    let response = match timeout(Duration::from_secs(1), client.say_hello(request)).await {
        Ok(response) => response?,
        Err(_) => {
            println!("Cancelled request after 1s");
            return Ok(());
        }
    };

压缩

可以将请求和响应进行压缩,配置对应的压缩选项即可

  • 服务端
let service = GreeterServer::new(greeter)
        .send_compressed(CompressionEncoding::Gzip)
        .accept_compressed(CompressionEncoding::Gzip);
  • 客户端
let mut client = GreeterClient::new(channel)
        .send_compressed(CompressionEncoding::Gzip)
        .accept_compressed(CompressionEncoding::Gzip)

负载均衡

负载均衡的主要逻辑是在客户端进行服务器实例的选择。

先在两个异步任务中分别启动服务

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addrs = ["[::1]:50051", "[::1]:50052"];
 
    let (tx, mut rx) = mpsc::unbounded_channel();
    for addr in &addrs {
        let tx = tx.clone();
        let addr = addr.parse()?;
        let server = MyGreeter { addr };
        let server = Server::builder()
            .add_service(GreeterServer::new(server))
            .serve(addr);
 
        tokio::spawn(async move {
            if let Err(e) = server.await {
                eprintln!("server error: {}", e);
            }
            tx.send(()).unwrap();
        });
    }
    rx.recv().await;
    Ok(())
}

客户端负载均衡也很简单,就是创建一个可以负载均衡的channel,用来与后端通信,到这里创建的client用法就与前面没有区别了。

let endpoints = ["http://[::1]:50051", "http://[::1]:50052"]
        .iter()
        .map(|v| Channel::from_static(v));
 
    let channel = Channel::balance_list(endpoints);
    let mut client = GreeterClient::new(channel);

反射

由服务端将rpc服务提供给客户端,可以实现通用的grpc客户端而不需要生成的桩代码。

  • 首先,在生成代码的时候,配置生成反射描述文件。注意后缀是.bin
tonic_build::configure()
        .file_descriptor_set_path(out_dir.join("helloworld_descriptor.bin"))
        .compile(&["proto/helloworld.proto"], &["proto"])?;
  • 想引入生成代码一样,也将描述文件引入,tonic也提供了对应的宏
mod proto {
    tonic::include_proto!("helloworld");
 
    pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
        tonic::include_file_descriptor_set!("helloworld_descriptor");
}
  • tonic的tonic-reflection crate提供了一个专门的反射服务,将需要反射的服务的描述文件注册即可。
let service = tonic_reflection::server::Builder::configure()
        .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
        .build()
        .unwrap();
  • 最后将反射服务和其它服务都添加的grpc server当中,就能够对外提供反射能力了
Server::builder()
        .add_service(service)
        .add_service(proto::greeter_server::GreeterServer::new(greeter))
        .serve(addr)
        .await?;

健康检测

grpc定义的有健康检测协议,tonic提供了tonic-health crate来提供这一能力。

// reporter用于更新grpc服务的状态,server则是用于注册对外提供的grpc服务,与自定义实现的服务一样
let (mut health_reporter, health_server) = tonic_health::server::health_reporter();
 
// 设置服务的状态
health_reporter
	.set_serving::<GreeterServer<MyGreeter>>() // 可用
	// .set_not_serving::<GreeterServer<MyGreeter>>() // 不可用
	.await;
 
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();
 
Server::builder()
	.add_service(health_server)
	.add_service(GreeterServer::new(greeter))
	.serve(addr)
	.await?;

通过reporter可以设置后台服务的状态,通过grpc-health-probe可以检测服务是否可用。

#查询后端service的健康状态
grpc_health_probe -addr=[::1]:50051 -service=helloworld.Greeter

拦截器

可以在请求处理之前进行额外的处理。

服务端

  • 定义一个拦截处理器,为grpc请求添加额外信息。如下示例:将请求处理之前添加了一个额外对象到请求体当中
fn intercept(mut req: Request<()>) -> Result<Request<()>, Status> {
    println!("Intercepting request: {:?}", req);
 
    // Set an extension that can be retrieved by `say_hello`
    req.extensions_mut().insert(MyExtension {
        some_piece_of_data: "foo".to_string(),
    });
 
    Ok(req)
}
 
#[derive(Clone)]
struct MyExtension {
    some_piece_of_data: String,
}
  • 现在在请求处理函数的实现当中,我们可以读取到拦截器中添加的信息
#[derive(Default)]
pub struct MyGreeter {}
 
#[tonic::async_trait]
impl Greeter for MyGreeter {
    async fn say_hello(
        &self,
        request: Request<HelloRequest>,
    ) -> Result<Response<HelloReply>, Status> {
        let extension = request.extensions().get::<MyExtension>().unwrap();
        println!("extension data = {}", extension.some_piece_of_data);
 
        let reply = hello_world::HelloReply {
            message: format!("Hello {}!", request.into_inner().name),
        };
        Ok(Response::new(reply))
    }
}
  • 为了拦截器能够生效,需要在创建服务的时候,将拦截器添加上
let svc = GreeterServer::with_interceptor(greeter, intercept);

客户端

客户端的实现方式和服务端基本一样,也是创建一个连接器,然后创建客户端的时候添加进去即可

  • 实现拦截器
fn intercept(req: Request<()>) -> Result<Request<()>, Status> {
    println!("Intercepting request: {:?}", req);
    Ok(req)
}
  • 添加拦截器
let mut client = GreeterClient::with_interceptor(channel, intercept);

丰富的错误处理

tonic提供了ErrorDetails这个结构体,可以帮助丰富错误信息。比如添加help信息,国际化等等。一个简单的示例如下

 let mut err_details = ErrorDetails::new();
 
        // Add error details conditionally
        if name.is_empty() {
            err_details.add_bad_request_violation("name", "name cannot be empty");
        } else if name.len() > 20 {
            err_details.add_bad_request_violation("name", "name is too long");
        }
 
        if err_details.has_bad_request_violations() {
            // Add additional error details if necessary
            err_details
                .add_help_link("description of link", "https://resource.example.local")
                .set_localized_message("en-US", "message for the user");
 
            // Generate error status
            let status = Status::with_error_details(
                Code::InvalidArgument,
                "request contains invalid arguments",
                err_details,
            );
 
            return Err(status);

整合tower中间件

服务端

tonic可以直接服用tower丰富的中间件能力。

let layer = tower::ServiceBuilder::new()
        // Apply middleware from tower
        .timeout(Duration::from_secs(30))
        // Apply our own middleware
        .layer(MyMiddlewareLayer::default())
        // Interceptors can be also be applied as middleware
        .layer(tonic::service::interceptor(intercept))
        .into_inner();

作为layer添加到grpc server当中

Server::builder()
        // Wrap all services in the middleware stack
        .layer(layer)
        .add_service(svc)
        .serve(addr)
        .await?;

客户端

使用方式和服务端同理

let channel = ServiceBuilder::new()
        // Interceptors can be also be applied as middleware
        .layer(tonic::service::interceptor(intercept))
        .layer_fn(AuthSvc::new)
        .service(channel);
 
    let mut client = GreeterClient::new(channel);