合作机构:阿里云 / 腾讯云 / 亚马逊云 / DreamHost / NameSilo / INWX / GODADDY / 百度统计
Kafka[1]是Apache基金会开源的一个分布式事件流处理平台,是Java阵营(最初为Scala)中的一款杀手级应用,其提供的高可靠性、高吞吐量和低延迟的数据传输能力,让其到目前为止依旧是现代企业级应用系统以及云原生应用系统中使用的重要中间件。
在日常开发Go程序时,我们经常会遇到一些依赖Kafka的代码[2],如何对这些代码进行测试,尤其是单测是摆在Go开发者前面的一个现实问题!
有人说用mock,是个路子。但看过我的《单测时尽量用fake object[3]》一文的童鞋估计已经走在了寻找kafka fake object的路上了!Kafka虽好,但身形硕大,不那么灵巧。找到一个合适的fake object不容易。在这篇文章中,我们就来聊聊如何测试那些依赖kafka的代码,再往本质一点说,就是和大家以找找那些合适的kafka fake object。
在《单测时尽量用fake object[4]》一文中,我们提到过,如果测试的依赖提供了tiny版本或某些简化版,我们可以直接使用这些版本作为fake object的候选,就像etcd提供了用于测试的自身简化版的实现(embed)[5]那样。
但Kafka并没有提供tiny版本,我们也只能选择《单测时尽量用fake object[6]》一文提到的另外一个策略,那就是利用容器来充当fake object,这是目前能搞到任意依赖的fake object的最简单路径了。也许以后WASI(WebAssembly System Interface)[7]成熟了,让wasm脱离浏览器并可以在本地系统上飞起,到时候换用wasm也不迟。
下面我们就按照使用容器的策略来找一找适合的kafka container。
我们第一站就来到了testcontainers-go[8]。testcontainers-go是一个Go语言开源项目,专门用于简化创建和清理基于容器的依赖项,常用于Go项目的单元测试、自动化集成或冒烟测试中。通过testcontainers-go提供的易于使用的API,开发人员能够以编程方式定义作为测试的一部分而运行的容器,并在测试完成时清理这些资源。
注:testcontainers[9]不仅提供Go API,它还覆盖了主流的编程语言,包括:Java、.NET、Python、Node.js、Rust[10]等。
在几个月之前,testcontainers-go[11]项目还没有提供对Kafka的直接支持,我们需要自己使用testcontainers.GenericContainer来自定义并启动kafka容器。2023年9月,以KRaft模式运行的Kafka容器才被首次引入testcontainers-go项目[12]。
目前testcontainers-go使用的kafka镜像版本是confluentinc/confluent-local:7.5.0[13]。Confluent[14]是在kafka背后的那家公司,基于kafka提供商业化支持。今年初,Confluent还收购了Immerok,将apache的另外一个明星项目Flink招致麾下。
confluent-local[15]并不是一个流行的kafka镜像,它只是一个使用KRaft模式的零配置的、包含Confluent Community RestProxy的Apache Kafka,并且镜像是实验性的,仅应用于本地开发工作流,不应该用在支持生产工作负载。
生产中最常用的开源kafka镜像是confluentinc/cp-kafka镜像[16],它是基于开源Kafka项目构建的,但在此基础上添加了一些额外的功能和工具,以提供更丰富的功能和更易于部署和管理的体验。cp-kafka镜像的版本号并非kafka的版本号,其对应关系需要cp-kafka镜像官网查询。
另外一个开发领域常用的kafka镜像是bitnami的kafka镜像。Bitnami是一个提供各种开源软件的预打包镜像和应用程序栈的公司。Bitnami Kafka镜像是基于开源Kafka项目构建的,是一个可用于快速部署和运行Kafka的Docker镜像。Bitnami Kafka镜像与其内部的Kakfa的版本号保持一致。
下面我们就来看看如何使用testcontainers-go的kafka来作为依赖kafka的Go单元测试用例的fake object。
这第一个测试示例改编自testcontainers-go/kafka module的example_test.go:
// testcontainers/kafka_setup/kafka_test.go
package main
import (
"context"
"fmt"
"testing"
"github.com/testcontainers/testcontainers-go/modules/kafka"
)
func TestKafkaSetup(t *testing.T) {
ctx := context.Background()
kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"))
if err != nil {
panic(err)
}
// Clean up the container
defer func() {
if err := kafkaContainer.Terminate(ctx); err != nil {
panic(err)
}
}()
state, err := kafkaContainer.State(ctx)
if err != nil {
panic(err)
}
if kafkaContainer.ClusterID != "test-cluster" {
t.Errorf("want test-cluster, actual %s", kafkaContainer.ClusterID)
}
if state.Running != true {
t.Errorf("want true, actual %t", state.Running)
}
brokers, _ := kafkaContainer.Brokers(ctx)
fmt.Printf("%q\n", brokers)
}
TOP