Testcontainers 系列专题 - 第 5 篇:实战案例 - 测试复杂系统
引言
在前四篇中,我们从 Testcontainers 的基础用法逐步过渡到 CI/CD 集成,掌握了其核心功能和优化技巧。然而,在实际项目中,系统往往包含多个依赖(如消息队列、分布式存储),测试这些组件的交互是一个挑战。本篇将通过两个实战案例——测试 Kafka 消息队列和模拟 Elasticsearch 分布式系统——展示 Testcontainers 的强大能力,并分析它与 Mock 的使用场景。
实战案例 1:测试消息队列(Kafka)
场景描述
假设你正在开发一个订单处理系统,订单通过 Kafka 消息队列传递给下游服务。你需要测试生产者和消费者的正确性。
实现步骤
添加依赖:
1 2 3 4 5 6 7 8 9 10 11
<dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.19.7</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.0</version> </dependency>
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.time.Duration; import java.util.Collections; import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; @Testcontainers public class KafkaTest { @Container public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")); @Test public void testProducerAndConsumer() throws Exception { String bootstrapServers = kafka.getBootstrapServers(); String topic = "orders"; // 配置生产者 Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 发送消息 KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); producer.send(new ProducerRecord<>(topic, "order1", "Order #1")).get(); producer.close(); // 配置消费者 Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList(topic)); // 消费消息 var records = consumer.poll(Duration.ofSeconds(5)); consumer.close(); assertEquals(1, records.count()); records.forEach(record -> { assertEquals("order1", record.key()); assertEquals("Order #1", record.value()); }); } }
- 代码说明:
KafkaContainer
:启动一个 Kafka 实例,默认包含 Zookeeper。getBootstrapServers()
:获取 Kafka 的连接地址。- 使用 Kafka Java 客户端发送和接收消息,验证消息传递的正确性。
测试结果
运行测试后,Testcontainers 会拉取 Kafka 镜像,启动容器,生产者发送消息,消费者接收并验证,确保整个流程正常工作。
实战案例 2:测试分布式系统(Elasticsearch + Kibana)
场景描述
假设你开发了一个日志分析系统,使用 Elasticsearch 存储日志,Kibana 提供可视化。你需要测试数据写入和查询功能。
实现步骤
添加依赖:
1 2 3 4 5 6 7 8 9 10 11
<dependency> <groupId>org.testcontainers</groupId> <artifactId>elasticsearch</artifactId> <version>1.19.7</version> <scope>test</scope> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.17.9</version> </dependency>
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.Test; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; @Testcontainers public class ElasticsearchKibanaTest { private static final Network network = Network.newNetwork(); @Container public GenericContainer<?> elasticsearch = new GenericContainer<>("docker.elastic.co/elasticsearch/elasticsearch:7.17.9") .withNetwork(network) .withNetworkAliases("elasticsearch") .withExposedPorts(9200) .withEnv("discovery.type", "single-node") .withEnv("xpack.security.enabled", "false"); @Container public GenericContainer<?> kibana = new GenericContainer<>("docker.elastic.co/kibana/kibana:7.17.9") .withNetwork(network) .withExposedPorts(5601) .withEnv("ELASTICSEARCH_HOSTS", "http://elasticsearch:9200") .dependsOn(elasticsearch); @Test public void testElasticsearchIndexing() throws Exception { // 创建 Elasticsearch 客户端 RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost("localhost", elasticsearch.getMappedPort(9200), "http")) ); // 索引数据 IndexRequest indexRequest = new IndexRequest("logs") .source(Map.of("message", "Test log", "level", "INFO")); client.index(indexRequest, RequestOptions.DEFAULT); // 搜索数据 SearchRequest searchRequest = new SearchRequest("logs"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("message", "Test")); searchRequest.source(searchSourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); assertEquals(1, response.getHits().getTotalHits().value); client.close(); } }
- 代码说明:
Network
:将 Elasticsearch 和 Kibana 放入同一网络。withNetworkAliases
:设置别名,Kibana 通过elasticsearch
访问 Elasticsearch。- 使用 Elasticsearch 高级客户端写入和查询数据,验证功能。
测试结果
测试会启动 Elasticsearch 和 Kibana 容器,写入日志数据并搜索,验证分布式系统的基本功能。
Mock vs. Testcontainers
适用场景对比
Mock:
- 优点:轻量、快速,适合单元测试或隔离外部依赖。
- 缺点:无法模拟真实服务的行为,可能掩盖集成问题。
- 适用场景:测试纯逻辑代码,或外部服务不可控时。
Testcontainers:
- 优点:接近生产环境,能发现集成问题,测试更真实。
- 缺点:启动容器需要时间,资源占用较高。
- 适用场景:集成测试、端到端测试,或需要真实服务行为时。
示例场景
- 仅测试业务逻辑:使用 Mock(如 Mockito)模拟 Kafka 客户端。
- 验证 Kafka 集成:使用 Testcontainers 启动真实 Kafka,确保生产者与消费者正常通信。
建议
- 小型单元测试优先使用 Mock。
- 集成测试或关键功能验证使用 Testcontainers。
- 结合使用:Mock 次要依赖,Testcontainers 测试核心依赖。
总结
通过 Kafka 和 Elasticsearch 的实战案例,本篇展示了 Testcontainers 在测试复杂系统中的能力。无论是消息队列还是分布式存储,Testcontainers 都能提供接近生产环境的测试支持,同时对比 Mock 帮助你选择合适的测试策略。下一篇文章将分享最佳实践与注意事项。