kitex-微服务-grpc框架(字节跳动开源)
2026年1月15日 · 1385 字 · 7 分钟
Kitex,字节跳动内部的 Go 微服务 RPC 框架 ,包括但不限于脚手架构建代码、日志、服务注册服务发现、中间件、负载均衡、限流、熔断、服务降级
Kitex
Kitex[kaɪt’eks] 字节跳动内部的 Go 微服务 RPC 框架,具有高性能、强可扩展的特点,在字节内部已广泛使用。如果对微服务性能有要求,又希望定制扩展融入自己的治理体系,Kitex 会是一个不错的选择。
kitex序列化工具
安装
go install github.com/cloudwego/kitex/tool/cmd/kitex@latest
在项目引入包
go get -u github.com/cloudwego/kitex
my_kitex/idl/math.proto
syntax="proto3";
package idl; // proto文件互相引用时需要指定package。不需要相互引用时,这一行可以不写
option go_package="math_service"; //生成go文件后对应的package名
message AddRequest {
int32 left = 1;
int32 right = 2;
}
message AddResponse {
int32 sum = 1;
}
message SubRequest {
int32 left = 1;
int32 right = 2;
}
message SubResponse {
int32 diff = 1;
}
service Math {
rpc Add(AddRequest) returns (AddResponse);
rpc Sub(SubRequest) returns (SubResponse);
}
kitex -module myKitex idl/math.proto
会生成一个kitex_gen目录,import语句里需要用到module名称
math.pb.go是官方的pb工具生成的,math.pb.fast.go是字节自家的fastpb工具生成的,math.pb.fast.go是对math.pb.go的补充
执行一下go mod tidy
PS:Kitex的序列化和反序列化工具比默认grpc序列化反序列化快了很多
服务脚手架代码
生成服务端代码
| 参数 | 作用 | 示例 |
|---|---|---|
| kitex | 工具名 | - |
| -module | Go 模块名 | myKitex |
| -service | 生成服务端代码(服务名称,任意字符串) | easyimpr.math |
| -use | 引用已有的 kitex_gen | myKitex/kitex_gen |
| IDL文件 | 接口定义文件 | ../idl/math.proto |
cd /my_kitex/server/
kitex -module myKitex -service easyimpr.math -use myKitex/kitex_gen ../idl/math.proto
接下来我们只需要就该server下面的handler.go实现具体的逻辑即可
// Add implements the MathImpl interface.
func (s *MathImpl) Add(ctx context.Context, req *math_service.AddRequest) (resp *math_service.AddResponse, err error) {
// TODO: Your code here...
resp = &math_service.AddResponse{
Sum: req.Left + req.Right,
}
return resp, nil
}
// Sub implements the MathImpl interface.
func (s *MathImpl) Sub(ctx context.Context, req *math_service.SubRequest) (resp *math_service.SubResponse, err error) {
// TODO: Your code here...
resp = &math_service.SubResponse{
Diff: req.Left - req.Right,
}
return resp, nil
}
kitex还是很贴心的,连编译和运行的脚本都写好了,由于我是mac,没怎么用过windows,不知道windows是否能正常运行(好像是用wsl?)
当然,我们不可能只用默认的8888端口号
我们修改main.go
addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:5678")
svr := math_service.NewServer(new(MathImpl), server.WithServiceAddr(addr))
err := svr.Run()
if err != nil {
log.Println(err.Error())
}
完成客户端代码
cd ./my_kitex/client/
新建main.go
import (
"context"
"fmt"
"log"
"myKitex/kitex_gen/math_service"
"myKitex/kitex_gen/math_service/math"
"github.com/cloudwego/kitex/client"
)
func main() {
//这个函数是之前生成的脚手架代码中的函数,用于创建一个客户端,前面的是服务名称,后面的是地址
client, err := math.NewClient("easyimpr.math", client.WithHostPorts("127.0.0.1:5678"))
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
//定义请求参数
request := math_service.SubRequest{Left: 9, Right: 1}
//调用方法
response, err := client.Sub(context.Background(), &request)
if err != nil {
log.Fatalf("failed to call sub: %v", err)
}
//打印结果
fmt.Println(response.Diff)
}
kitex日志工具
需要的包
"github.com/cloudwego/kitex/pkg/klog"
正常打印在终端
klog.SetLevel(klog.LevelInfo) //大于等于info的日志会被打印出来
klog.Debug("我是debug")
klog.Info("我是info")
klog.Warn("我是warn")
klog.Error("我是error")
如果想要打印到文件
fout, err := os.OpenFile("./log/server.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
panic(err)
}
defer fout.Close()
klog.SetOutput(fout)
服务注册与发现
我们在服务端,把server的ip、端口号、服务名称注册到服务中心中,客户端访问服务中心,根据服务名称获得一个ip和端口号。所以一个server可以有多个ip和端口号,服务中心根据负载均衡,在客户端调用时,发配一个ip和端口号。
当服务端每多一台服务器或者少一台服务器,服务中心都可以感知到
在这里演示了Kitex通过etcd实现服务注册与发现
server服务注册
首先需要导包
etcd "github.com/kitex-contrib/registry-etcd" //起了个别名为etcd
addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:5678")
reg, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:2379"}) //线上需要使用密码,用NewEtcdRegistryWithAuth
if err != nil {
klog.Fatal("failed to create registry: %v", err)
}
svr := math_service.NewServer(new(MathImpl),
server.WithServiceAddr(addr),
server.WithRegistry(reg), //注册服务
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
ServiceName: "easyimpr.math",
}),//在这里就必须写上服务的名称了,因为etcd得知道
)
err = svr.Run()
client调用服务
resolver, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"})
if err != nil {
log.Fatalf("failed to create resolver: %v", err)
}
//这里就不需要指定端口号和ip地址了
client, err := math.NewClient("easyimpr.math", client.WithResolver(resolver))
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
中间件
计时中间件
服务端
middleware
func TimeMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
begin := time.Now()
err = next(ctx, req, resp) //这里指运行剩下的中间件,以及核心业务接口
elapsed := time.Since(begin)
//可以获取执行的业务
method, _ := kitexutil.GetMethod(ctx)
//可以获取调用方地址
addr, _ := kitexutil.GetCallerAddr(ctx)
//可以获取上游server
server, _ := kitexutil.GetCaller(ctx)
fmt.Printf("method %s from %s took %s 上游 %s\n", method, addr, elapsed, server)
return err
}
}
svr := math_service.NewServer(new(MathImpl),
server.WithServiceAddr(addr),
server.WithRegistry(reg), //注册服务
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
ServiceName: "easyimpr.math",
}),
server.WithMiddleware(middleware.TimeMiddleware), //注册中间件
)
客户端
middleware
func TimeMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
begin := time.Now()
err = next(ctx, req, resp) //这里指运行剩下的中间件,以及核心业务接口
elapsed := time.Since(begin)
//可以获取执行的业务
method, _ := kitexutil.GetMethod(ctx)
//可以获取下游的service name
serviceName, _ := kitexutil.GetIDLServiceName(ctx)
//也可以这么写
// var serviceName, method string
// ri := rpcinfo.GetRPCInfo(ctx)
// if ivk := ri.Invocation(); ivk != nil {
// serviceName = ivk.ServiceName()
// method = ivk.MethodName()
// }
fmt.Printf("method %s 下游 %s 耗时 %s\n", method, serviceName, elapsed)
return err
}
}
client, err := math.NewClient("easyimpr.math",
client.WithResolver(resolver),
client.WithMiddleware(middleware.TimeMiddleware), //注册中间件
)
在中间件里获取请求和响应数据
在刚刚的计时中间件,我们并没有用请求和响应数据,而是直接用了next,我们这里演示获取请求和响应数据(使用断言)
就是一直在类型断言,还挺好玩的
func RecordRequestAndResponse(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
//打印请求参数
if arg, ok := req.(utils.KitexArgs); ok { //类型断言
//调用断言后的函数方法后,发现返回了一个空接口,又要类型断言
switch request := arg.GetFirstArgument().(type) {
case *math_service.AddRequest:
fmt.Printf("request: left %d, right %d\n", request.Left, request.Right)
case *math_service.SubRequest:
fmt.Printf("request: left %d, right %d\n", request.Left, request.Right)
default:
fmt.Printf("unknown request type %v\n", request)
}
}
//执行接下来的中间件与函数
err = next(ctx, req, resp)
//打印响应参数
if ri := rpcinfo.GetRPCInfo(ctx); ri != nil { //获取rpcinfo
if stats := ri.Stats(); stats != nil { //rpc正常执行
if result, ok := resp.(utils.KitexResult); ok { //类型断言
switch response := result.GetResult().(type) { //调用断言后的函数方法后,发现返回了一个空接口,又要类型断言
case *math_service.AddResponse:
if response.Sum != 0 {
fmt.Printf("response: sum %d\n", response.Sum)
}
case *math_service.SubResponse:
if response.Diff != 0 {
fmt.Printf("response: diff %d\n", response.Diff)
}
default:
fmt.Printf("unknown response type %v\n", result.GetResult())
}
}
}
}
return err
}
}
服务端panic处理
假如我们在服务端的一个方法中故意写成调用必定会panic
0不能做分母,所以会panic
func (s *MathImpl) Sub(ctx context.Context, req *math_service.SubRequest) (resp *math_service.SubResponse, err error) {
// TODO: Your code here...
a := 0
b := 10 / a
resp = &math_service.SubResponse{
Diff: int32(b),
}
return resp, nil
}
客户端调用后可以看到,服务端没有因为panic而挂掉,因为kitex自带了recover,并且打印错误信息
我们如何自定义错误信息呢,那么我们就要完成服务端的中间件了
//打印响应参数
if ri := rpcinfo.GetRPCInfo(ctx); ri != nil { //获取rpcinfo
if stats := ri.Stats(); stats != nil { //rpc正常执行
//打印panic信息
if panicHappened, panicInfo := stats.Panicked(); panicHappened { //panicInfo就是recover()的返回值
klog.Errorf("panic info %s", panicInfo)//打印错误信息到日志
return fmt.Errorf("服务内部发生panic") //原始panic信息对client隐藏
} else {
................
................
}
}
}
客户端接口超时控制
作为客户端,当我们调用一个接口的时候如果时间过长,我们不可能一直去等待他结束,所以我们需要加入超时控制
client, err := math.NewClient("easyimpr.math",
client.WithResolver(resolver),
client.WithMiddleware(middleware.TimeMiddleware), //注册中间件
client.WithConnectTimeout(100*time.Millisecond), //连接超时
client.WithRPCTimeout(2*time.Second), //rpc超时
)
每个grpc的方法肯定都是不一样的,那么需要的耗时肯定也是不一样的,所以我们可以单独为某个grpc的调用单独增加超时管理,这个的优先级是高于刚刚设置的全局设置
response, err := client.Sub(
context.Background(),
&request,
callopt.WithConnectTimeout(100*time.Millisecond), //连接超时
callopt.WithRPCTimeout(200*time.Millisecond),
)
当我们第一次调用client的时候可能会超时,因为要与服务中心建立好连接等等,所以我们可以预热一下
client, err := math.NewClient("easyimpr.math",
client.WithResolver(resolver),
client.WithMiddleware(middleware.TimeMiddleware), //注册中间件
client.WithConnectTimeout(100*time.Millisecond), //连接超时
client.WithRPCTimeout(2*time.Second), //rpc超时
client.WithWarmingUp(&warmup.ClientOption{ //预先初始化服务发现和连接池的相关组件,避免在首次请求时产生较大的延迟
ResolverOption: &warmup.ResolverOption{
Dests: []*rpcinfo.EndpointBasicInfo{
{
ServiceName: "easyimpr.math", //服务名称
},
},
},
}),
)
客户端fail重试机制
//重试机制
failurePolicy := retry.NewFailurePolicy()
failurePolicy.WithMaxRetryTimes(2) //最多重试2次(不包含首次),不是立即重试,中间要休息一段时间
client, err := math.NewClient("easyimpr.math",
client.WithResolver(resolver),
............
............
client.WithFailureRetry(failurePolicy),//注册一下重试机制
)
负载均衡
我们如何实现均衡的调用微服务呢
首先我们在服务注册的时候可以设置权重
"github.com/cloudwego/kitex/pkg/registry"
server.WithRegistryInfo(®istry.Info{
Weight: 100, // 必须是正数
}),
可以在客户端调用的时候,选择负载均衡的模式
这里粘贴几种实现客户端负载均衡的方法:
WeightedRoundRobin
该 LoadBalancer 使用的是基于权重的轮询策略,也是 Kitex 的默认策略。
该 LoadBalancer 能让所有下游实例拥有最小的同时 inflight 请求数,以减少下游过载情况的发生。
如果所有的实例的权重都一样,会使用一个纯轮询的实现,来避免加权计算的一些额外开销。
使用方式:
cli, err := echo.NewClient("echo", client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()))
InterleavedWeightedRoundRobin
与 WeightedRoundRobin 相同, 该 LoadBalancer 使用的也是基于权重的轮询策略。
区别在于 WeightedRoundRobin 的空间复杂度是将所有实例按权重选择一遍的最小正周期(所有实例权重的和除以所有实例权重的最大公约数), 而该 LoadBalancer 的空间复杂度是下游实例数,在下游实例数权重总和非常大时更节省空间。
使用方式:
cli, err := echo.NewClient("echo", client.WithLoadBalancer(loadbalance.NewInterleavedWeightedRoundRobinBalancer()))
WeightedRandom
顾名思义,这个 LoadBalancer 使用的是基于权重的随机策略。
这个 LoadBalancer 会依据实例的权重进行加权随机,并保证每个实例分配到的负载和自己的权重成比例。
如果所有的实例的权重都一样,会使用一个纯随机的实现,来避免加权计算的一些额外开销。
使用方式:
cli, err := echo.NewClient("echo", client.WithLoadBalancer(loadbalance.NewWeightedRandomBalancer()))
Alias Method
使用别名方法的 LoadBalancer ,具体来说实现的是 Darts, Dice, and Coins 中的 Vose’s Alias Method
使用 O(n) 时间生成别名表,之后在 O(1) 时间内选取实例,选取效率比 WeightedRandom 更高
cli, err := echo.NewClient("echo", client.WithLoadBalancer(loadbalance.NewWeightedRandomWithAliasMethodBalancer()))
接口限流
kitex的接口限流并不支持grpc协议,所以我们要自己一个中间件实现限流
关于接口限流我单独写了一个博客
熔断
用于上游服务调用下游服务总是失败时的保护机制,熔断是由调用方(客户端)设置的。
熔断的作用
当下游服务出现问题(如响应缓慢、频繁报错)时,继续调用不仅浪费资源,还可能导致雪崩效应。熔断器可以:
- 快速失败,避免等待超时
- 保护下游服务,给它恢复的时间
- 提高整体系统的可用性
熔断器的三种状态
- 关闭状态(Closed):正常调用,统计错误率
- 开启状态(Open):所有请求直接被拒绝(冷却期)
- 半开启状态(Half-Open):放过部分请求试探下游是否恢复
关闭 ──错误率超过阈值──> 开启 ──冷却期结束──> 半开启 ──连续成功若干次──> 关闭
↑ │
└────────────────────────────────────────────────────────────┘
熔断器粒度
Kitex 支持两种粒度的熔断控制:
1. 服务粒度熔断
针对整个服务进行熔断控制(如 easyimpr.math 服务)
import "github.com/cloudwego/kitex/pkg/circuitbreak"
// 服务粒度:根据服务名称进行熔断
serviceCBSuit := circuitbreak.NewCBSuite(func(ri rpcinfo.RPCInfo) string {
return ri.To().ServiceName()
})
2. 方法粒度熔断
针对服务中的某个具体方法进行熔断控制(如 Math/Add、Math/Sub)
// 方法粒度:根据服务名称+方法名进行熔断
methodCBSuit := circuitbreak.NewCBSuite(func(ri rpcinfo.RPCInfo) string {
return ri.To().ServiceName() + "/" + ri.To().Method()
})
注册熔断器
在创建客户端时注册熔断器:
client, err := math.NewClient("easyimpr.math",
client.WithResolver(resolver),
client.WithCircuitBreaker(serviceCBSuit), // 服务粒度熔断
client.WithCircuitBreaker(methodCBSuit), // 方法粒度熔断
)
配置熔断策略
使用 UpdateServiceCBConfig 方法配置熔断参数:
// 方法粒度配置:Math/Add 方法
methodCBSuit.UpdateServiceCBConfig("Math/Add", circuitbreak.CBConfig{
Enable: true, // 是否启用熔断
ErrRate: 0.3, // 错误率阈值(30%)
MinSample: 200, // 最小采样数(至少200个请求后才开始统计)
})
// 方法粒度配置:Math/Sub 方法
methodCBSuit.UpdateServiceCBConfig("Math/Sub", circuitbreak.CBConfig{
Enable: true,
ErrRate: 0.4, // 错误率阈值(40%)
MinSample: 200,
})
// 服务粒度配置:整个 Math 服务
serviceCBSuit.UpdateServiceCBConfig("Math", circuitbreak.CBConfig{
Enable: true,
ErrRate: 0.3,
MinSample: 500, // 服务级别需要更多样本
})
熔断配置参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
| Enable | bool | 是否启用熔断 |
| ErrRate | float64 | 错误率阈值(0.0-1.0),超过此值触发熔断 |
| MinSample | int64 | 最小采样数,样本数不足时不会触发熔断 |
示例说明:
ErrRate: 0.3, MinSample: 200:当至少有 200 个请求,且错误率超过 30% 时触发熔断- 如果只有 100 个请求,即使错误率 100% 也不会触发熔断(样本数不足)
熔断的工作流程
-
统计阶段(关闭状态)
- 正常处理请求
- 在固定时间窗口内统计请求数和失败数
- 当
失败数/总请求数 > ErrRate且总请求数 >= MinSample时,触发熔断
-
冷却期(开启状态)
- 所有请求直接被拒绝,快速失败
- 给下游服务恢复的时间
- 冷却期持续一段时间后进入半开启状态
-
试探阶段(半开启状态)
- 放过部分请求测试下游是否恢复
- 如果连续成功若干次,熔断器关闭,恢复正常调用
- 如果仍然失败,重新进入开启状态
动态更新熔断策略
可以在任意位置调用 UpdateServiceCBConfig 更新熔断策略,更新后对后续调用立即生效:
// 运行时动态调整
methodCBSuit.UpdateServiceCBConfig("Math/Add", circuitbreak.CBConfig{
Enable: true,
ErrRate: 0.5, // 调整阈值为 50%
MinSample: 100, // 降低采样数
})
服务降级
假如我们调用下游服务失败(超时、熔断、业务错误等),为了保证系统可用性,我们可以返回默认值、缓存数据或从数据库读取备用信息返回给前端,这就是服务降级。
降级的作用
- 保证系统可用性:即使下游服务不可用,也能给用户返回有意义的响应
- 提升用户体验:避免直接返回错误,提供降级后的备用数据
- 防止雪崩:配合熔断使用,避免故障传播
降级 vs 熔断
| 对比项 | 熔断 | 降级 |
|---|---|---|
| 触发条件 | 错误率超过阈值 | 任何失败场景 |
| 处理方式 | 快速失败,拒绝请求 | 返回备用数据 |
| 目的 | 保护下游服务 | 保证系统可用性 |
| 设置位置 | 客户端 | 客户端 |
通常配合使用:熔断器触发后,降级策略提供备用数据。
降级策略类型
Kitex 提供两种降级策略:
1. ErrorFallback(业务错误降级)
针对业务 error 的降级策略,当调用返回错误时触发。
2. TimeoutAndCBFallback(超时和熔断降级)
针对超时和熔断的降级策略,当调用超时或触发熔断时执行。
基本用法
需要导入的包:
import (
"github.com/cloudwego/kitex/client/callopt"
"github.com/cloudwego/kitex/pkg/fallback"
)
接口级别降级策略
在单次调用时设置降级策略(优先级高于全局配置):
response, err := client.Add(
context.Background(),
&request,
callopt.WithFallback( // 接口级别的降级策略
fallback.NewFallbackPolicy(
fallback.UnwrapHelper(
func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
if err != nil { // 发生了 error(业务错误、超时、熔断等)
fbResp = &math_service.AddResponse{Sum: 0} // 返回默认值
fbErr = nil // 清除错误,对上游隐藏失败
} else { // 没有发生 error,直接使用原始结果
fbResp = resp
}
return
},
),
),
),
)
分类降级策略
可以针对不同的失败场景设置不同的降级策略:
callopt.WithFallback(
fallback.NewFallbackPolicy(
// 1. 业务错误降级
fallback.ErrorFallback(func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
// 处理业务逻辑错误
klog.Errorf("business error: %v", err)
fbResp = &math_service.AddResponse{Sum: -1} // 返回特殊值表示业务错误
fbErr = nil
return
}),
// 2. 超时和熔断降级
fallback.TimeoutAndCBFallback(func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
// 处理超时和熔断情况
klog.Errorf("timeout or circuit breaker: %v", err)
fbResp = &math_service.AddResponse{Sum: -2} // 返回特殊值表示超时/熔断
fbErr = nil
return
}),
),
)
降级函数参数说明
func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error)
| 参数 | 类型 | 说明 |
|---|---|---|
| ctx | context.Context | 上下文,可以传递链路信息 |
| req | interface{} | 原始请求参数 |
| resp | interface{} | 原始响应(可能为 nil) |
| err | error | 错误信息 |
| fbResp | interface{} | 降级后的响应 |
| fbErr | error | 降级后的错误(通常返回 nil) |
全局降级策略(客户端初始化时配置)
可以在创建客户端时设置全局降级策略:
client, err := math.NewClient("easyimpr.math",
client.WithResolver(resolver),
client.WithFallback(
fallback.NewFallbackPolicy(
fallback.UnwrapHelper(
func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
if err != nil {
klog.Errorf("fallback triggered: %v", err)
// 全局默认降级策略
fbResp = getDefaultResponse(req)
fbErr = nil
} else {
fbResp = resp
}
return
},
),
),
),
)