kitex-微服务-grpc框架(字节跳动开源)

2026年1月15日 · 1385 字 · 7 分钟

Kitex,字节跳动内部的 Go 微服务 RPC 框架 ,包括但不限于脚手架构建代码、日志、服务注册服务发现、中间件、负载均衡、限流、熔断、服务降级

Kitex

Kitex[kaɪt’eks] 字节跳动内部的 Go 微服务 RPC 框架,具有高性能、强可扩展的特点,在字节内部已广泛使用。如果对微服务性能有要求,又希望定制扩展融入自己的治理体系,Kitex 会是一个不错的选择。

kitex序列化工具

安装

go install github.com/cloudwego/kitex/tool/cmd/kitex@latest

在项目引入包

go get -u github.com/cloudwego/kitex

my_kitex/idl/math.proto

syntax="proto3";

package idl;    // proto文件互相引用时需要指定package。不需要相互引用时,这一行可以不写
option go_package="math_service";    //生成go文件后对应的package名

message AddRequest {
    int32 left = 1;
    int32 right = 2;
}

message AddResponse {
    int32 sum = 1;
}

message SubRequest {
    int32 left = 1;
    int32 right = 2;
}

message SubResponse {
    int32 diff = 1;
}

service Math {
    rpc Add(AddRequest) returns (AddResponse);
    rpc Sub(SubRequest) returns (SubResponse);
}
kitex -module myKitex idl/math.proto

会生成一个kitex_gen目录,import语句里需要用到module名称

math.pb.go是官方的pb工具生成的,math.pb.fast.go是字节自家的fastpb工具生成的,math.pb.fast.go是对math.pb.go的补充

执行一下go mod tidy

PS:Kitex的序列化和反序列化工具比默认grpc序列化反序列化快了很多

服务脚手架代码

生成服务端代码

参数 作用 示例
kitex 工具名 -
-module Go 模块名 myKitex
-service 生成服务端代码(服务名称,任意字符串) easyimpr.math
-use 引用已有的 kitex_gen myKitex/kitex_gen
IDL文件 接口定义文件 ../idl/math.proto
cd /my_kitex/server/
kitex -module myKitex -service easyimpr.math -use myKitex/kitex_gen  ../idl/math.proto

接下来我们只需要就该server下面的handler.go实现具体的逻辑即可

// Add implements the MathImpl interface.
func (s *MathImpl) Add(ctx context.Context, req *math_service.AddRequest) (resp *math_service.AddResponse, err error) {
	// TODO: Your code here...
	resp = &math_service.AddResponse{
		Sum: req.Left + req.Right,
	}
	return resp, nil
}

// Sub implements the MathImpl interface.
func (s *MathImpl) Sub(ctx context.Context, req *math_service.SubRequest) (resp *math_service.SubResponse, err error) {
	// TODO: Your code here...
	resp = &math_service.SubResponse{
		Diff: req.Left - req.Right,
	}
	return resp, nil
}

kitex还是很贴心的,连编译和运行的脚本都写好了,由于我是mac,没怎么用过windows,不知道windows是否能正常运行(好像是用wsl?)

当然,我们不可能只用默认的8888端口号

我们修改main.go

	addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:5678")
	svr := math_service.NewServer(new(MathImpl), server.WithServiceAddr(addr))
	err := svr.Run()
	if err != nil {
		log.Println(err.Error())
	}

完成客户端代码

cd ./my_kitex/client/

新建main.go

import (
	"context"
	"fmt"
	"log"
	"myKitex/kitex_gen/math_service"
	"myKitex/kitex_gen/math_service/math"

	"github.com/cloudwego/kitex/client"
)

func main() {
	//这个函数是之前生成的脚手架代码中的函数,用于创建一个客户端,前面的是服务名称,后面的是地址
	client, err := math.NewClient("easyimpr.math", client.WithHostPorts("127.0.0.1:5678"))
	if err != nil {
		log.Fatalf("failed to create client: %v", err)
	}
	//定义请求参数
	request := math_service.SubRequest{Left: 9, Right: 1}
	//调用方法
	response, err := client.Sub(context.Background(), &request)
	if err != nil {
		log.Fatalf("failed to call sub: %v", err)
	}
	//打印结果
	fmt.Println(response.Diff)
}

kitex日志工具

需要的包

	"github.com/cloudwego/kitex/pkg/klog"

正常打印在终端

	klog.SetLevel(klog.LevelInfo) //大于等于info的日志会被打印出来
	klog.Debug("我是debug")
	klog.Info("我是info")
	klog.Warn("我是warn")
	klog.Error("我是error")

如果想要打印到文件

	fout, err := os.OpenFile("./log/server.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
	if err != nil {
		panic(err)
	}
	defer fout.Close()
	klog.SetOutput(fout)

服务注册与发现

我们在服务端,把server的ip、端口号、服务名称注册到服务中心中,客户端访问服务中心,根据服务名称获得一个ip和端口号。所以一个server可以有多个ip和端口号,服务中心根据负载均衡,在客户端调用时,发配一个ip和端口号。

当服务端每多一台服务器或者少一台服务器,服务中心都可以感知到

在这里演示了Kitex通过etcd实现服务注册与发现

server服务注册

首先需要导包

etcd "github.com/kitex-contrib/registry-etcd" //起了个别名为etcd
addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:5678")
reg, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:2379"}) //线上需要使用密码,用NewEtcdRegistryWithAuth
	if err != nil {
		klog.Fatal("failed to create registry: %v", err)
	}
	svr := math_service.NewServer(new(MathImpl),
		server.WithServiceAddr(addr),
		server.WithRegistry(reg), //注册服务
		server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
			ServiceName: "easyimpr.math",
		}),//在这里就必须写上服务的名称了,因为etcd得知道
	)
	err = svr.Run()

client调用服务

resolver, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"})
	if err != nil {
		log.Fatalf("failed to create resolver: %v", err)
	}
	//这里就不需要指定端口号和ip地址了
	client, err := math.NewClient("easyimpr.math", client.WithResolver(resolver))
	if err != nil {
		log.Fatalf("failed to create client: %v", err)
	}

中间件

计时中间件

服务端

middleware

func TimeMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
	return func(ctx context.Context, req, resp interface{}) (err error) {
		begin := time.Now()
		err = next(ctx, req, resp) //这里指运行剩下的中间件,以及核心业务接口
		elapsed := time.Since(begin)
		//可以获取执行的业务
		method, _ := kitexutil.GetMethod(ctx)
		//可以获取调用方地址
		addr, _ := kitexutil.GetCallerAddr(ctx)
		//可以获取上游server
		server, _ := kitexutil.GetCaller(ctx)
		fmt.Printf("method %s from %s took %s 上游 %s\n", method, addr, elapsed, server)
		return err
	}
}
	svr := math_service.NewServer(new(MathImpl),
		server.WithServiceAddr(addr),
		server.WithRegistry(reg), //注册服务
		server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
			ServiceName: "easyimpr.math",
		}),
		server.WithMiddleware(middleware.TimeMiddleware), //注册中间件
	)

客户端

middleware

func TimeMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
	return func(ctx context.Context, req, resp interface{}) (err error) {
		begin := time.Now()
		err = next(ctx, req, resp) //这里指运行剩下的中间件,以及核心业务接口
		elapsed := time.Since(begin)
		//可以获取执行的业务
		method, _ := kitexutil.GetMethod(ctx)
		//可以获取下游的service name
		serviceName, _ := kitexutil.GetIDLServiceName(ctx)

		//也可以这么写
		// var serviceName, method string
		// ri := rpcinfo.GetRPCInfo(ctx)
		// if ivk := ri.Invocation(); ivk != nil {
		// 	serviceName = ivk.ServiceName()
		// 	method = ivk.MethodName()
		// }

		fmt.Printf("method %s 下游 %s 耗时 %s\n", method, serviceName, elapsed)
		return err
	}
}
	client, err := math.NewClient("easyimpr.math",
		client.WithResolver(resolver),
		client.WithMiddleware(middleware.TimeMiddleware), //注册中间件
	)

在中间件里获取请求和响应数据

在刚刚的计时中间件,我们并没有用请求和响应数据,而是直接用了next,我们这里演示获取请求和响应数据(使用断言)

就是一直在类型断言,还挺好玩的

func RecordRequestAndResponse(next endpoint.Endpoint) endpoint.Endpoint {
	return func(ctx context.Context, req, resp interface{}) (err error) {
		//打印请求参数
		if arg, ok := req.(utils.KitexArgs); ok { //类型断言
			//调用断言后的函数方法后,发现返回了一个空接口,又要类型断言
			switch request := arg.GetFirstArgument().(type) {
			case *math_service.AddRequest:
				fmt.Printf("request: left %d, right %d\n", request.Left, request.Right)
			case *math_service.SubRequest:
				fmt.Printf("request: left %d, right %d\n", request.Left, request.Right)
			default:
				fmt.Printf("unknown request type %v\n", request)
			}
		}

		//执行接下来的中间件与函数
		err = next(ctx, req, resp)

		//打印响应参数
		if ri := rpcinfo.GetRPCInfo(ctx); ri != nil { //获取rpcinfo
			if stats := ri.Stats(); stats != nil { //rpc正常执行
				if result, ok := resp.(utils.KitexResult); ok { //类型断言
					switch response := result.GetResult().(type) { //调用断言后的函数方法后,发现返回了一个空接口,又要类型断言
					case *math_service.AddResponse:
						if response.Sum != 0 {
							fmt.Printf("response: sum %d\n", response.Sum)
						}
					case *math_service.SubResponse:
						if response.Diff != 0 {
							fmt.Printf("response: diff %d\n", response.Diff)
						}
					default:
						fmt.Printf("unknown response type %v\n", result.GetResult())
					}
				}
			}
		}
		return err
	}
}

服务端panic处理

假如我们在服务端的一个方法中故意写成调用必定会panic

0不能做分母,所以会panic

func (s *MathImpl) Sub(ctx context.Context, req *math_service.SubRequest) (resp *math_service.SubResponse, err error) {
	// TODO: Your code here...
	a := 0
	b := 10 / a
	resp = &math_service.SubResponse{
		Diff: int32(b),
	}
	return resp, nil
}

客户端调用后可以看到,服务端没有因为panic而挂掉,因为kitex自带了recover,并且打印错误信息

我们如何自定义错误信息呢,那么我们就要完成服务端的中间件了

//打印响应参数
		if ri := rpcinfo.GetRPCInfo(ctx); ri != nil { //获取rpcinfo
			if stats := ri.Stats(); stats != nil { //rpc正常执行
				//打印panic信息
				if panicHappened, panicInfo := stats.Panicked(); panicHappened { //panicInfo就是recover()的返回值
					klog.Errorf("panic info %s", panicInfo)//打印错误信息到日志
					return fmt.Errorf("服务内部发生panic") //原始panic信息对client隐藏
				} else {
					................
                    ................
				}
			}
		}

客户端接口超时控制

作为客户端,当我们调用一个接口的时候如果时间过长,我们不可能一直去等待他结束,所以我们需要加入超时控制

	client, err := math.NewClient("easyimpr.math",
		client.WithResolver(resolver),
		client.WithMiddleware(middleware.TimeMiddleware), //注册中间件
		client.WithConnectTimeout(100*time.Millisecond),  //连接超时
		client.WithRPCTimeout(2*time.Second),             //rpc超时
	)

每个grpc的方法肯定都是不一样的,那么需要的耗时肯定也是不一样的,所以我们可以单独为某个grpc的调用单独增加超时管理,这个的优先级是高于刚刚设置的全局设置

	response, err := client.Sub(
		context.Background(),
		&request,
		callopt.WithConnectTimeout(100*time.Millisecond), //连接超时
		callopt.WithRPCTimeout(200*time.Millisecond),
	)

当我们第一次调用client的时候可能会超时,因为要与服务中心建立好连接等等,所以我们可以预热一下

	client, err := math.NewClient("easyimpr.math",
		client.WithResolver(resolver),
		client.WithMiddleware(middleware.TimeMiddleware), //注册中间件
		client.WithConnectTimeout(100*time.Millisecond),  //连接超时
		client.WithRPCTimeout(2*time.Second),             //rpc超时
		client.WithWarmingUp(&warmup.ClientOption{ //预先初始化服务发现和连接池的相关组件,避免在首次请求时产生较大的延迟
			ResolverOption: &warmup.ResolverOption{
				Dests: []*rpcinfo.EndpointBasicInfo{
					{
						ServiceName: "easyimpr.math", //服务名称
					},
				},
			},
		}),
	)

客户端fail重试机制

	//重试机制
	failurePolicy := retry.NewFailurePolicy()
	failurePolicy.WithMaxRetryTimes(2) //最多重试2次(不包含首次),不是立即重试,中间要休息一段时间
	client, err := math.NewClient("easyimpr.math",
		client.WithResolver(resolver),
		............
        ............
		client.WithFailureRetry(failurePolicy),//注册一下重试机制
	)

负载均衡

我们如何实现均衡的调用微服务呢

首先我们在服务注册的时候可以设置权重

        "github.com/cloudwego/kitex/pkg/registry"

		server.WithRegistryInfo(&registry.Info{
			Weight: 100, // 必须是正数
		}),

可以在客户端调用的时候,选择负载均衡的模式

这里粘贴几种实现客户端负载均衡的方法:

WeightedRoundRobin

该 LoadBalancer 使用的是基于权重的轮询策略,也是 Kitex 的默认策略。

该 LoadBalancer 能让所有下游实例拥有最小的同时 inflight 请求数,以减少下游过载情况的发生。

如果所有的实例的权重都一样,会使用一个纯轮询的实现,来避免加权计算的一些额外开销。

使用方式:

cli, err := echo.NewClient("echo", client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()))

InterleavedWeightedRoundRobin

与 WeightedRoundRobin 相同, 该 LoadBalancer 使用的也是基于权重的轮询策略。

区别在于 WeightedRoundRobin 的空间复杂度是将所有实例按权重选择一遍的最小正周期(所有实例权重的和除以所有实例权重的最大公约数), 而该 LoadBalancer 的空间复杂度是下游实例数,在下游实例数权重总和非常大时更节省空间。

使用方式:

cli, err := echo.NewClient("echo", client.WithLoadBalancer(loadbalance.NewInterleavedWeightedRoundRobinBalancer()))

WeightedRandom

顾名思义,这个 LoadBalancer 使用的是基于权重的随机策略。

这个 LoadBalancer 会依据实例的权重进行加权随机,并保证每个实例分配到的负载和自己的权重成比例。

如果所有的实例的权重都一样,会使用一个纯随机的实现,来避免加权计算的一些额外开销。

使用方式:

cli, err := echo.NewClient("echo", client.WithLoadBalancer(loadbalance.NewWeightedRandomBalancer()))

Alias Method

使用别名方法的 LoadBalancer ,具体来说实现的是 Darts, Dice, and Coins 中的 Vose’s Alias Method

使用 O(n) 时间生成别名表,之后在 O(1) 时间内选取实例,选取效率比 WeightedRandom 更高

cli, err := echo.NewClient("echo", client.WithLoadBalancer(loadbalance.NewWeightedRandomWithAliasMethodBalancer()))

接口限流

kitex的接口限流并不支持grpc协议,所以我们要自己一个中间件实现限流

关于接口限流我单独写了一个博客

熔断

用于上游服务调用下游服务总是失败时的保护机制,熔断是由调用方(客户端)设置的

熔断的作用

当下游服务出现问题(如响应缓慢、频繁报错)时,继续调用不仅浪费资源,还可能导致雪崩效应。熔断器可以:

  • 快速失败,避免等待超时
  • 保护下游服务,给它恢复的时间
  • 提高整体系统的可用性

熔断器的三种状态

  1. 关闭状态(Closed):正常调用,统计错误率
  2. 开启状态(Open):所有请求直接被拒绝(冷却期)
  3. 半开启状态(Half-Open):放过部分请求试探下游是否恢复
关闭 ──错误率超过阈值──> 开启 ──冷却期结束──> 半开启 ──连续成功若干次──> 关闭
  ↑                                                            │
  └────────────────────────────────────────────────────────────┘

熔断器粒度

Kitex 支持两种粒度的熔断控制:

1. 服务粒度熔断

针对整个服务进行熔断控制(如 easyimpr.math 服务)

import "github.com/cloudwego/kitex/pkg/circuitbreak"

// 服务粒度:根据服务名称进行熔断
serviceCBSuit := circuitbreak.NewCBSuite(func(ri rpcinfo.RPCInfo) string { 
    return ri.To().ServiceName() 
})

2. 方法粒度熔断

针对服务中的某个具体方法进行熔断控制(如 Math/AddMath/Sub

// 方法粒度:根据服务名称+方法名进行熔断
methodCBSuit := circuitbreak.NewCBSuite(func(ri rpcinfo.RPCInfo) string { 
    return ri.To().ServiceName() + "/" + ri.To().Method() 
})

注册熔断器

在创建客户端时注册熔断器:

client, err := math.NewClient("easyimpr.math",
    client.WithResolver(resolver),
    client.WithCircuitBreaker(serviceCBSuit),  // 服务粒度熔断
    client.WithCircuitBreaker(methodCBSuit),   // 方法粒度熔断
)

配置熔断策略

使用 UpdateServiceCBConfig 方法配置熔断参数:

// 方法粒度配置:Math/Add 方法
methodCBSuit.UpdateServiceCBConfig("Math/Add", circuitbreak.CBConfig{
    Enable:    true,   // 是否启用熔断
    ErrRate:   0.3,    // 错误率阈值(30%)
    MinSample: 200,    // 最小采样数(至少200个请求后才开始统计)
})

// 方法粒度配置:Math/Sub 方法
methodCBSuit.UpdateServiceCBConfig("Math/Sub", circuitbreak.CBConfig{
    Enable:    true,
    ErrRate:   0.4,    // 错误率阈值(40%)
    MinSample: 200,
})

// 服务粒度配置:整个 Math 服务
serviceCBSuit.UpdateServiceCBConfig("Math", circuitbreak.CBConfig{
    Enable:    true,
    ErrRate:   0.3,
    MinSample: 500,    // 服务级别需要更多样本
})

熔断配置参数说明

参数 类型 说明
Enable bool 是否启用熔断
ErrRate float64 错误率阈值(0.0-1.0),超过此值触发熔断
MinSample int64 最小采样数,样本数不足时不会触发熔断

示例说明:

  • ErrRate: 0.3, MinSample: 200:当至少有 200 个请求,且错误率超过 30% 时触发熔断
  • 如果只有 100 个请求,即使错误率 100% 也不会触发熔断(样本数不足)

熔断的工作流程

  1. 统计阶段(关闭状态)

    • 正常处理请求
    • 在固定时间窗口内统计请求数和失败数
    • 失败数/总请求数 > ErrRate总请求数 >= MinSample 时,触发熔断
  2. 冷却期(开启状态)

    • 所有请求直接被拒绝,快速失败
    • 给下游服务恢复的时间
    • 冷却期持续一段时间后进入半开启状态
  3. 试探阶段(半开启状态)

    • 放过部分请求测试下游是否恢复
    • 如果连续成功若干次,熔断器关闭,恢复正常调用
    • 如果仍然失败,重新进入开启状态

动态更新熔断策略

可以在任意位置调用 UpdateServiceCBConfig 更新熔断策略,更新后对后续调用立即生效

// 运行时动态调整
methodCBSuit.UpdateServiceCBConfig("Math/Add", circuitbreak.CBConfig{
    Enable:    true,
    ErrRate:   0.5,    // 调整阈值为 50%
    MinSample: 100,    // 降低采样数
})

服务降级

假如我们调用下游服务失败(超时、熔断、业务错误等),为了保证系统可用性,我们可以返回默认值、缓存数据或从数据库读取备用信息返回给前端,这就是服务降级

降级的作用

  • 保证系统可用性:即使下游服务不可用,也能给用户返回有意义的响应
  • 提升用户体验:避免直接返回错误,提供降级后的备用数据
  • 防止雪崩:配合熔断使用,避免故障传播

降级 vs 熔断

对比项 熔断 降级
触发条件 错误率超过阈值 任何失败场景
处理方式 快速失败,拒绝请求 返回备用数据
目的 保护下游服务 保证系统可用性
设置位置 客户端 客户端

通常配合使用:熔断器触发后,降级策略提供备用数据。

降级策略类型

Kitex 提供两种降级策略:

1. ErrorFallback(业务错误降级)

针对业务 error 的降级策略,当调用返回错误时触发。

2. TimeoutAndCBFallback(超时和熔断降级)

针对超时和熔断的降级策略,当调用超时或触发熔断时执行。

基本用法

需要导入的包:

import (
    "github.com/cloudwego/kitex/client/callopt"
    "github.com/cloudwego/kitex/pkg/fallback"
)

接口级别降级策略

在单次调用时设置降级策略(优先级高于全局配置):

response, err := client.Add(
    context.Background(),
    &request,
    callopt.WithFallback( // 接口级别的降级策略
        fallback.NewFallbackPolicy(
            fallback.UnwrapHelper(
                func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
                    if err != nil { // 发生了 error(业务错误、超时、熔断等)
                        fbResp = &math_service.AddResponse{Sum: 0} // 返回默认值
                        fbErr = nil  // 清除错误,对上游隐藏失败
                    } else { // 没有发生 error,直接使用原始结果
                        fbResp = resp
                    }
                    return
                },
            ),
        ),
    ),
)

分类降级策略

可以针对不同的失败场景设置不同的降级策略:

callopt.WithFallback(
    fallback.NewFallbackPolicy(
        // 1. 业务错误降级
        fallback.ErrorFallback(func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
            // 处理业务逻辑错误
            klog.Errorf("business error: %v", err)
            fbResp = &math_service.AddResponse{Sum: -1} // 返回特殊值表示业务错误
            fbErr = nil
            return
        }),
        
        // 2. 超时和熔断降级
        fallback.TimeoutAndCBFallback(func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
            // 处理超时和熔断情况
            klog.Errorf("timeout or circuit breaker: %v", err)
            fbResp = &math_service.AddResponse{Sum: -2} // 返回特殊值表示超时/熔断
            fbErr = nil
            return
        }),
    ),
)

降级函数参数说明

func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error)
参数 类型 说明
ctx context.Context 上下文,可以传递链路信息
req interface{} 原始请求参数
resp interface{} 原始响应(可能为 nil)
err error 错误信息
fbResp interface{} 降级后的响应
fbErr error 降级后的错误(通常返回 nil)

全局降级策略(客户端初始化时配置)

可以在创建客户端时设置全局降级策略:

client, err := math.NewClient("easyimpr.math",
    client.WithResolver(resolver),
    client.WithFallback(
        fallback.NewFallbackPolicy(
            fallback.UnwrapHelper(
                func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
                    if err != nil {
                        klog.Errorf("fallback triggered: %v", err)
                        // 全局默认降级策略
                        fbResp = getDefaultResponse(req)
                        fbErr = nil
                    } else {
                        fbResp = resp
                    }
                    return
                },
            ),
        ),
    ),
)