Testcontainers 系列专题:第 5 篇 实战案例 - 测试复杂系统

Testcontainers 系列专题 - 第 5 篇:实战案例 - 测试复杂系统

引言

在前四篇中,我们从 Testcontainers 的基础用法逐步过渡到 CI/CD 集成,掌握了其核心功能和优化技巧。然而,在实际项目中,系统往往包含多个依赖(如消息队列、分布式存储),测试这些组件的交互是一个挑战。本篇将通过两个实战案例——测试 Kafka 消息队列和模拟 Elasticsearch 分布式系统——展示 Testcontainers 的强大能力,并分析它与 Mock 的使用场景。


实战案例 1:测试消息队列(Kafka)

场景描述

假设你正在开发一个订单处理系统,订单通过 Kafka 消息队列传递给下游服务。你需要测试生产者和消费者的正确性。

实现步骤

  1. 添加依赖

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <version>1.19.7</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.0</version>
    </dependency>
    
  2. 测试代码

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.jupiter.api.Test;
    import org.testcontainers.containers.KafkaContainer;
    import org.testcontainers.junit.jupiter.Container;
    import org.testcontainers.junit.jupiter.Testcontainers;
    import org.testcontainers.utility.DockerImageName;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    import static org.junit.jupiter.api.Assertions.assertEquals;
    
    @Testcontainers
    public class KafkaTest {
    
        @Container
        public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
    
        @Test
        public void testProducerAndConsumer() throws Exception {
            String bootstrapServers = kafka.getBootstrapServers();
            String topic = "orders";
    
            // 配置生产者
            Properties producerProps = new Properties();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 发送消息
            KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
            producer.send(new ProducerRecord<>(topic, "order1", "Order #1")).get();
            producer.close();
    
            // 配置消费者
            Properties consumerProps = new Properties();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
            consumer.subscribe(Collections.singletonList(topic));
    
            // 消费消息
            var records = consumer.poll(Duration.ofSeconds(5));
            consumer.close();
    
            assertEquals(1, records.count());
            records.forEach(record -> {
                assertEquals("order1", record.key());
                assertEquals("Order #1", record.value());
            });
        }
    }
    
  • 代码说明
    • KafkaContainer:启动一个 Kafka 实例,默认包含 Zookeeper。
    • getBootstrapServers():获取 Kafka 的连接地址。
    • 使用 Kafka Java 客户端发送和接收消息,验证消息传递的正确性。

测试结果

运行测试后,Testcontainers 会拉取 Kafka 镜像,启动容器,生产者发送消息,消费者接收并验证,确保整个流程正常工作。


实战案例 2:测试分布式系统(Elasticsearch + Kibana)

场景描述

假设你开发了一个日志分析系统,使用 Elasticsearch 存储日志,Kibana 提供可视化。你需要测试数据写入和查询功能。

实现步骤

  1. 添加依赖

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>1.19.7</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.17.9</version>
    </dependency>
    
  2. 测试代码

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.search.SearchRequest;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import org.junit.jupiter.api.Test;
    import org.testcontainers.containers.GenericContainer;
    import org.testcontainers.containers.Network;
    import org.testcontainers.junit.jupiter.Container;
    import org.testcontainers.junit.jupiter.Testcontainers;
    
    import java.util.Map;
    
    import static org.junit.jupiter.api.Assertions.assertEquals;
    
    @Testcontainers
    public class ElasticsearchKibanaTest {
    
        private static final Network network = Network.newNetwork();
    
        @Container
        public GenericContainer<?> elasticsearch = new GenericContainer<>("docker.elastic.co/elasticsearch/elasticsearch:7.17.9")
                .withNetwork(network)
                .withNetworkAliases("elasticsearch")
                .withExposedPorts(9200)
                .withEnv("discovery.type", "single-node")
                .withEnv("xpack.security.enabled", "false");
    
        @Container
        public GenericContainer<?> kibana = new GenericContainer<>("docker.elastic.co/kibana/kibana:7.17.9")
                .withNetwork(network)
                .withExposedPorts(5601)
                .withEnv("ELASTICSEARCH_HOSTS", "http://elasticsearch:9200")
                .dependsOn(elasticsearch);
    
        @Test
        public void testElasticsearchIndexing() throws Exception {
            // 创建 Elasticsearch 客户端
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("localhost", elasticsearch.getMappedPort(9200), "http"))
            );
    
            // 索引数据
            IndexRequest indexRequest = new IndexRequest("logs")
                    .source(Map.of("message", "Test log", "level", "INFO"));
            client.index(indexRequest, RequestOptions.DEFAULT);
    
            // 搜索数据
            SearchRequest searchRequest = new SearchRequest("logs");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(QueryBuilders.matchQuery("message", "Test"));
            searchRequest.source(searchSourceBuilder);
    
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            assertEquals(1, response.getHits().getTotalHits().value);
    
            client.close();
        }
    }
    
  • 代码说明
    • Network:将 Elasticsearch 和 Kibana 放入同一网络。
    • withNetworkAliases:设置别名,Kibana 通过 elasticsearch 访问 Elasticsearch。
    • 使用 Elasticsearch 高级客户端写入和查询数据,验证功能。

测试结果

测试会启动 Elasticsearch 和 Kibana 容器,写入日志数据并搜索,验证分布式系统的基本功能。


Mock vs. Testcontainers

适用场景对比

  • Mock

    • 优点:轻量、快速,适合单元测试或隔离外部依赖。
    • 缺点:无法模拟真实服务的行为,可能掩盖集成问题。
    • 适用场景:测试纯逻辑代码,或外部服务不可控时。
  • Testcontainers

    • 优点:接近生产环境,能发现集成问题,测试更真实。
    • 缺点:启动容器需要时间,资源占用较高。
    • 适用场景:集成测试、端到端测试,或需要真实服务行为时。

示例场景

  • 仅测试业务逻辑:使用 Mock(如 Mockito)模拟 Kafka 客户端。
  • 验证 Kafka 集成:使用 Testcontainers 启动真实 Kafka,确保生产者与消费者正常通信。

建议

  • 小型单元测试优先使用 Mock。
  • 集成测试或关键功能验证使用 Testcontainers。
  • 结合使用:Mock 次要依赖,Testcontainers 测试核心依赖。

总结

通过 Kafka 和 Elasticsearch 的实战案例,本篇展示了 Testcontainers 在测试复杂系统中的能力。无论是消息队列还是分布式存储,Testcontainers 都能提供接近生产环境的测试支持,同时对比 Mock 帮助你选择合适的测试策略。下一篇文章将分享最佳实践与注意事项。

updatedupdated2025-03-312025-03-31