Интеграция Dispatcher Service и Kafka

Avro классы сгенерированы, мы можем приступить к реализации логики вычитывания AvroOrderPlacedEvent и отправки OrderDispatchedEvent.
Примените в ветке master патч  2_kafka_integration_1.patch

Настраиваем Kafka Producer

В Spring Boot приложениях сообщения отправляются в Kafka с помощью класса KafkaTemplate<Key, Value>.
Посмотрите интерфейс KafkaOperations, который он реализует (удобно его смотреть прямо в коде IDEA через Ctrl+N).

Настройка Kafka Producer через application.yml подразумевает, что мы прописываем все параметры продъюсера в конфигурационном файле, а Spring Boot на их основе настраивает бины:
  • ProducerFactory<KEY, VALUE> - фабрика для создания экземпляров продъюсеров
  • KafkaTemplate<KEY, VALUE> - сам продъюсер.
В application.yml нам обязательно необходимо указать:
  • spring.kafka.bootstrap-servers - список адресов для связи с брокерами Kafka, указывается в формате: broker1:port, broker2:port и т.д. Эта настройка будет использована как для Producer, так и для Consumer.
  • spring.kafka.producer.key-serializer - определяет сериализатор для ключей отправляемых в Kafka сообщений. В нашем случае будем использовать org.apache.kafka.common.serialization.StringSerializer. Обычно в качестве ключей сообщений используются примитивные типы данных.
  • spring.kafka.producer.value-serializer - определяет сериализатор значений сообщений, отправляемых в Kafka. Мы используем Apache Avro, поэтому в качестве сериализатора указываем io.confluent.kafka.serializers.KafkaAvroSerializer.
  • spring.kafka.producer.properties.schema.registry.url - так как мы в проекте используем Confluent Schema Registry, необходимо указать URL для связи с ней.
Кроме того, мы настроим продъюсера так, чтобы он был идемпотентным, то есть сообщения, отправляемые в топик, не будут дублироваться. Это достигается за счет следующих механизмов:
  1. Каждому продюсеру присваивается уникальный ID, который используется для идентификации в рамках кластера Kafka.
  2. Каждому сообщению, отправляемому продюсером, присваивается уникальный номер в рамках своего PID. Это позволяет брокеру определять и фильтровать дубликаты.
  3. Kafka хранит состояние последнего успешно зафиксированного сообщения для каждого PID и может соответственно обработать повторные попытки отправки.
Для этого необходимо указать следующие конфигурации:
  • spring.kafka.producer.properties.enable.idempotence = true
  • spring.kafka.producer.properties.acks = all - продъюсер ждет подтверждения доставки сообщения от всех синхронизированных реплик
Идемпотентный Producer избавляет от дубликатов в случае ошибок, возникающих при отправке сообщений в Kafka, однако дубликаты все еще возможны, если упадет экземпляр самого Producer, так как после восстановления ему присвоится новый Producer ID, и Kafka брокер не будет знать о том, что отправляемые этим продъюсером сообщения могут являться дубликатами. Однако в дальнейшем Orders Service будет вычитывает сообщения из топика v1.orders_dispatch и обновлять статус заказа в Postgres, что также является идемпотентной операцией, поэтому в итоге мы получим end-to-end идемпотентность.

По умолчанию остаются настройки:
  • spring.kafka.producer.properties.max.in.flight.requests.per.connection - количество сообщений, которые могут быть "в полете", то есть отправлены, но еще не подтверждены брокером Kafka в рамках одного соединения с брокером, по умолчанию - 5. Для обеспечения идемпотентности продъюсера необходимо, чтобы этот параметр был от 1 до 5.
  • spring.kafka.producer.properties.retries - количество повторных попыток отправить сообщение в случае ошибки. По умолчанию Integer.MAX_VALUE, то есть неограниченное количество ретраев. Для обеспечения идемпотентности продъюсера необходимо, чтобы этот параметр был больше 1.
  • compression.type - тип сжатия данных. Если отправляемые продъюсером сообщения имеют большой размер, а в приложении важную роль занимает пропускная способность, то можно применить сжатие данных. По умолчанию, она равна none, то есть сжатие не производится. Мы не будем сжимать данные, так как наши сообщения небольшие по размеру, кроме того, такую оптимизацию лучше проводить на основе реальных данных мониторинга потребления сетевых и дисковых ресурсов. Другие варианты этой настройки:
    • Gzip - лучше всего сжимает данные, но потребляет много CPU и, соответственно, требует больше всего времени. Однако значительно снижает потребление сетевых ресурсов.
    • Snappy и Zstd - имеют средние показатели сжатия и потребления CPU.
    • Lz4 - сжимает данные быстро, но не так эффективно, как остальные алгоритмы, соответственно, потребление сетевых ресурсов выше.
  • batch.size - размер пакета в байтах, по умолчанию - 16384 байт. Producer отправляет сообщения в Kafka-брокер не по одному, а пакетами, размер которых контролируется этой настройкой. Увеличение размера отправляемого пакета приведет к повышению пропускной способности, однако может также увеличить время ожидания отправки сообщений и их обработки консьюмером.
  • max.request.size, по умолчанию это 1048576 байт. В одном запросе на отправку сообщений может быть несколько пакетов, размер всего отправляемого запроса в байтах контролируется этой настройкой.
  • linger.ms - время ожидания Producer-ом, пока сообщения в пакете скопятся до нужного размера, по умолчанию - 0. То есть по умолчанию продьюсер вообще не будет ждать, пока наберется пакет нужного размера. Мы оставим эту настройку такой, чтобы быть уверенными, что наши тестовые сообщения будут быстро попадать в топики Kafka.
Настройки batch.size, max.request.size и linger.ms в prod-среде определяются на основе данных мониторинга и зависят от нагрузки на продъюсера.

Посмотрим конфигурацию application.yml, котороя относится к продъюсеру:
spring:
  ...
  kafka:
    bootstrap-servers: localhost:9097
    listener:
      ack-mode: manual
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema.registry.url: http://localhost:8081
        enable.idempotence: true
        acks: all

kafkaprops:
  ...
  order-dispatch-topic: v1.orders_dispatch
  replication-factor: 1
  partitions-count: 3

В классе KafkaTopicConfig создаем топик v1.orders_dispatch. Топик создастся в Kafka при первом запуске приложения с помощью созданного в KafkaAutoConfiguration клиента KafkaAdmin, для конфигурации которого необходима настройка spring.kafka.bootstrap-servers.
@Configuration
public class KafkaTopicConfig {

    @Value("${kafkaprops.order-dispatch-topic}")
    private String ordersDispatchTopic;
    @Value("${kafkaprops.replication-factor}")
    private Integer replicationFactor;
    @Value("${kafkaprops.partitions-count}")
    private Integer partitionsCount;

    @Bean
    public NewTopic ordersDispatchTopic() {
        return TopicBuilder
                .name(ordersDispatchTopic)
                .replicas(replicationFactor)
                .partitions(partitionsCount)
                .build();
    }
}
Пример настройки Kafka Producer в классах @Configuration можно посмотреть в документации Spring.
С другими настройками Producer можно ознакомиться в документации.

Настраиваем Kafka Consumer

Kafka-консьюмер может быть реализован в любом зарегистрированном в контексте спринга бине с помощью аннотации @KafkaListener, которую необходимо поставить над публичным методом этого бина. В качестве параметров метод может принимать:
  • одно сообщение типа ConsumerRecord<KEY, VALUE>
  • несколько таких сообщений, обернутых в ConsumerRecords<KEY, VALUE>
  • само значение сообщения (в нашем случае это AvroOrderPlacedEvent)
  • список значений, например, List<AvroOrderPlacedEvent>
  • ряд заголовков, которые передаются в ответе от брокера: KafkaHeaders.OFFSET, KafkaHeaders.RECEIVED_KEY (если ключ отсутствует в сообщении, то передается null), KafkaHeaders.RECEIVED_TOPIC, KafkaHeaders.RECEIVED_PARTITION, KafkaHeaders.RECEIVED_TIMESTAMP
“Под капотом” Spring Boot создает Consumer, который будет читать данные из Kafka. Для использования этих абстракций нам необходимо их предварительно настроить. Сделать это можно в application.yml (см. Apache Kafka Support) или настроив бины в классе @Configuration (см. Spring Kafka Receiving Messages).

Настройка Kafka Consumer через application.yml подразумевает, что мы прописываем все параметры консьюмера в конфигурационном файле, а Spring Boot на их основе настраивает необходимые бины:
  • ConsumerFactory<KEY, VALUE> - сущность, которая непосредственно создает экземпляры консьюмеров на основе переданных настроек. Реализация данного интерфейса - DefaultKafkaConsumerFactory.
  • KafkaListenerContainerFactory - фабрика для контейнеров типа MessageListenerContainer, в которых реализуется логика консьюмеров. В Spring Boot реализовано два типа MessageListenerContainer:
    • KafkaMessageListenerContainer- контейнер для консьюмера, который обрабатывает все сообщения из всех топиков и партиций, указанных в настройках, в одном потоке.
    • ConcurrentMessageListenerContainer - контейнер, который создает необходимое количество KafkaMessageListenerContainer, чтобы обрабатывать сообщения в многопоточном режиме.
При использовании аннотации @KafkaListener по умолчанию настраивается фабрика ConcurrentKafkaListenerContainerFactory, которая создает контейнеры типа ConcurrentMessageListenerContainer. В параметрах аннтотации @KafkaListener можно выставить нужный уровень параллельности (concurrency), чтобы в рамках одного приложения вычитывать данные из партиций топика в несколько потоков. В нашем учебном проекте это не требуется, однако стоит знать о такой возможности.

В application.yml мы задаем следующие конфигурации для spring.kafka.consumer:
  • group-id - уникальный строковый идентификатор Consumer Group для того, чтобы инстансы приложения выступали в качестве одной группы потребителей.
  • key-deserializer - определяем десериализатор для ключей сообщений. Мы получаем сообщения из топика v1.public.orders_outbox ключи с типом String, поэтому используем org.apache.kafka.common.serialization.StringDeserializer.
  • value-deserializer - определяем десериализатор для значений сообщений. Значения имеют тип AvroOrderPlacedEvent, поэтому используем io.confluent.kafka.serializers.KafkaAvroDeserializer.
  • spring.kafka.consumer.properties - дополнительная специфическая конфигурация:
    • schema.registry.url - указываем url для связи с Confluent Schema Registry, так как схема значений сообщений будет храниться именно там.
    • specific.avro.reader - этот параметр выставляем в true, так как мы планируем вычитывать сообщение с конкретным типом AvroOrderPlacedEvent. В противном случае нам придется обрабатывать обобщенные записи типа GenericRecord, в котором не будет нужных нам полей в явном виде, что не очень удобно.
Также мы выставим следующие дополнительные настройки:
  • spring.kafka.consumer.enable-auto-commit - указывает, должен ли Kafka Consumer автоматически коммитить оффсеты прочитанных сообщений. При true Consumer автоматически коммитит оффсеты в фоне с интервалом, задаваемым параметром auto.commit.interval.ms (по умолчанию 5000, то есть 5 секунд). При false ответственность за коммит оффсетов ложится на разработчика, и подтверждение должно происходить вручную в коде приложения. Мы хотим, чтобы оффсет был закоммичен только после успешной отправки сообщения в топик v1.orders_dispatch. Если по какой-то причине произойдет ошибка во время отправки сообщения, консьюмер запросит еще раз пачку сообщений у брокера, начиная с первого незакоммиченного оффсета. Для этого мы будем самостоятельно контролировать процесс фиксации оффсетов (подтверждение чтения).
  • spring.kafka.listener.ack-mode - настройка, требуемая для организации ручного управления коммитами оффсетов. Допустимые значения:
    • batch - коммитим оффсеты после обработки всего пакета данных.
    • count - коммитим оффсеты после обработки ack-count (параметр консьюмера) количества сообщений.
    • count_time - коммитим оффсеты после обработки ack-count количества сообщений или после истечения времени, установленного в ack-time (параметр консьюмера).
    • manual - вручную коммитим оффсеты, при этом коммиты будут собираться в очередь по мере обработки сообщений из полученного пакета. После обработки всех сообщений из пакета оффсеты из очереди (для разных партиций) будут закоммичены в одной операции.
    • manual_immediate - вручную коммитим оффсеты в брокере сразу после обработки сообщения, если Acknowledgment.acknowledge() вызывается в том же потоке, что и консьюмер. В противном случае коммиты собираются в очередь по мере обработки сообщений из пакета.
    • record - коммитим оффсет после обработки каждого сообщения.
    • time - коммитим оффсеты после истечения указанного в ack-time времени.
  • auto-offset-reset - определяет, как консьюмер должен реагировать, когда для топика нет закоммиченных оффсетов (например, консьюмер впервые начал вычитывать сообщения из топика) или последний закоммиченный консьюмером оффсет уже удален из брокера из-за настроек сохранения данных. В Kafka есть три основных значения для этой настройки:
    • earliest: консьюмер начнет чтение с начального оффсета в топике, то есть с самого старого доступного сообщения. Мы выбрали это значение, так как нам необходимо гарантировать обработку всех сообщений о создании заказа, даже если произойдет сбой и оффсеты будут утеряны. В этом случае будут дубликаты сообщений, однако мы полагаемся на end-to-end идемпотентность, которая достигается за счет идемпотентного продъюсера и идемпотентной операции обновления статуса заказа в базе данных. Кроме того, AvroOrderPlacedEvent содержит вспомогательную информацию в виде LSN и времени совершения транзакции, которая также может быть использована для дедуплицирования на стороне Dispatcher Service, если в дальнейшем изменится логика обработки заказа.
    • latest: консьюмер начнет чтение с самого последнего оффсета.
    • none: если в топике, на которые подписан консьюмер, отсутствуют закоммиченные оффсеты, он выбросит ошибку, сообщая о невозможности его найти. Это потребует от приложения явной обработки такой ситуации. При отсуствии закоммиченных оффсетов (на старте) необходимо будет обработать исключение NoOffsetForPartitionException.
Остальные настройки оставим по умолчанию, однако стоит учесть, что если потребуется оптимизация консьюмеров, то можно обратить внимание на следующие конфигурации:
  • fetch.min.bytes - минимальное количество байт, которые консьюмер ожидает получить от брокера. По умолчанию - 1 байт. Увеличение этой настройки приведет к снижению количества сетевых запросов, однако может увеличить время простоя консьюмера, так как он будет ждать, когда на брокере будет достаточно информации для передачи.
  • fetch.max.bytes - максимальное количество байт, которое консьюмер ждет от брокера. По умолчанию - 57671680 байт (55 мебибайт). Увеличение этой настройки также может привести к снижению количества сетевых запросов, однако повысит расход памяти приложения.
  • session.timeout.ms - время, в течение которого брокер ждет хартбита от консьюмера, по умолчанию - 45000 миллисекунд или 45 секунд, после которого брокер считает консьюмера мертвым. После этого он производит ребалансировку группы консьюмеров, что повлияет на время обработки сообщений. В целом, мы стремимся минимизировать ребалансировки, однако менять значение стоит после мониторинга производительности консьюмера. Также важно знать, что это значение должно быть в итервале от group.min.session.timeout.ms (по умолчанию 6 секунд) до group.max.session.timeout.ms (по умолчанию 30 минут), которые установлены в брокере.
  • hearbeat.interval.ms - интервал между посылаемыми консьюмером в брокер хартбитами (по умолчанию - 3 секунды). Должен быть ниже session.timeout.ms, в типичных сценариях использования устанавливается в 1/3 от session.timeout.ms. Более короткий интервал позволяет быстрее обнаруживать “упавших” консьюмеров, при этом повышает нагрузку на координатора группы.
С другими настройками Consumer можно ознакомиться в документации.

В application.yml прописаны указанные конфигурации, а также настройки, необходимые для создания топика v1.orders_dispatch и ручного коммита оффсетов:
spring:
  ...
  kafka:
    bootstrap-servers: localhost:9097
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        specific.avro.reader: true
        schema.registry.url: http://localhost:8081
      auto-offset-reset: earliest
      enable-auto-commit: false
    listener:
      ack-mode: manual
    ...

kafkaprops:
  order-dispatch-topic: v1.orders_dispatch
  order-placed-topic: v1.public.orders_outbox
  ...
  nack-sleep-duration: 100ms
В классе DispatcherService реализована логика получения сообщений из топика v1.public.orders_outbox, их обработки и отправки сообщений в топик v1.orders_dispatch.
@Slf4j
@RequiredArgsConstructor
@Service
public class DispatcherService {

    private final KafkaTemplate<String, OrderDispatchedEvent> kafkaTemplate;
    @Value("${kafkaprops.order-dispatch-topic}")
    private String orderDispatchTopic;
    @Value("${kafkaprops.nack-sleep-duration}")
    private Duration nackSleepDuration;

    @KafkaListener(topics = {"${kafkaprops.order-placed-topic}"})
    public void consumeOrderPlacedEvent(AvroOrderPlacedEvent event,
                                        @Header(KafkaHeaders.RECEIVED_KEY) String key,
                                        @Header(KafkaHeaders.RECEIVED_PARTITION) Integer partition,
                                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                        Acknowledgment acknowledgment) {
        log.info("Consuming message from Kafka: {}. Key: {}. Partition: {}. Topic: {}",
                event, key, partition, topic);
        var orderDispatchEvent = processEvent(event);
        try {
            kafkaTemplate.send(orderDispatchTopic, key, orderDispatchEvent).get();
            log.info("Successfully processed and sent OrderDispatchedEvent {} to Kafka.",
                    orderDispatchEvent);
            // commit the offset
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("Failed to send OrderDispatchedEvent {} to Kafka", orderDispatchEvent);
            // don't commit the offset
            acknowledgment.nack(nackSleepDuration);
        }
    }

    private OrderDispatchedEvent processEvent(AvroOrderPlacedEvent event) {
        log.info("Processing OrderPlacedEvent: {}", event);
        OrderDispatchStatus status = event.getOrderId() % 2 == 0 ?
                    OrderDispatchStatus.ACCEPTED : OrderDispatchStatus.REJECTED;
        return OrderDispatchedEvent.newBuilder()
                .setOrderId(event.getOrderId())
                .setStatus(status)
                .build();
    }
}
Метод, в котором происходит чтение данных из топика Kafka, помечен аннотацией @KafkaListener.
Так как в настройках консьюмера мы указали enable-auto-commit=false, управление коммитами оффсетов осуществляется вручную с помощью сущности Acknowledgment.
Для отправки сообщений в топик Kafka используется KafkaTemplate<String, OrderDispatchedEvent> kafkaTemplate.
До отправки первого сообщения в Kafka продъюсер регистрирует его схему в Schema Registry, доступ к которой указан в настройке schema.registry.url. После отправки сообщения мы дожидаемся результата с помощью блокирующего CompletableFuture.get(), затем коммитим оффсет с помощью Acknowledgment.acknowledge(). Таким образом мы гарантируем, что оффсеты будут закоммичены только после отправки сообщения в топик. В случае ошибки мы не коммитим оффсет, а вызываем метод Acknowledgment.nack(Duration), который фактически отбрасывает все остальные сообщения из пачки, заставляет поток консьюмера уснуть на время, указанное в параметре Duration, а после этого перечитывает пачку сообщений, начиная с незакоммиченного оффсета.

Тестируем интеграцию с Kafka. TestContainers Kafka

Примените в ветке master патч  2_kafka_integration_2.patch
Так как помимо тестового контейнера с брокером Kafka мы используем Confluent Schema Registry для хранения схем отправляемых в Kafka сообщений, нам также необходимо поднять контейнер со Schema Registry и связать его с Kafka. Для этого используем механизм Network, предоставляемый библиотекой TestContainers.

Класс SchemaRegistryContainer расширяет GenericContainer и реализует логику поднятия контейнера Schema Registry. В конструкторе класса мы ожидаем, пока реестр схем будет доступен с помощью механизма waitFor() и стратегии ожидания Wait.forHttp(”/subjects”).forStatusCode(200).
В методе SchemaRegistryContainer.withKafka Schema Registry инициализируется параметрами, необходимыми для связи с тестовым контейнером Kafka.
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;

public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {
    public static final String SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry";
    public static final int SCHEMA_REGISTRY_PORT = 8081;

    public SchemaRegistryContainer(String version) {
        super(SCHEMA_REGISTRY_IMAGE + ":" + version);
        waitingFor(Wait.forHttp("/subjects").forStatusCode(200));
        withExposedPorts(SCHEMA_REGISTRY_PORT);
    }

    public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
        return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092");
    }

    public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
        withNetwork(network);
        withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry");
        withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081");
        withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers);
        return self();
    }
}
В классе DispatcherServiceTest тестируются сценарии одобрения и отклонения заказа. Инициализируем контейнеры Kafka и Schema Registry:
public static final String CONFLUENT_VERSION = "7.5.2";
...
private static final Network NETWORK = Network.newNetwork();

public static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_VERSION))
        .withKraft()
        .withNetwork(NETWORK);

public static final SchemaRegistryContainer SCHEMA_REGISTRY = new SchemaRegistryContainer(CONFLUENT_VERSION);

@BeforeAll
static void setup() {
    KAFKA.start();
    SCHEMA_REGISTRY.withKafka(KAFKA).start();

    System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers());
    System.setProperty("spring.kafka.consumer.properties.schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
    System.setProperty("spring.kafka.producer.properties.schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
}
Для тестирования нам необходимо подготовить продьюсера, с помощью которого мы сможем отправить сообщение в топик v1.public.orders_outbox:
@Autowired
private KafkaTemplate<String, AvroOrderPlacedEvent> kafkaTemplate;
Настройки продъюсера подтянуться из application.yml.
Затем это сообщение вычитает настроенный в нашем сервисе консьюмер, а продъюсер, объявленный там же, отправит обработанное сообщение в топик v1.orders_dispatch. Вычитать отправленное сообщение для проверки мы сможем с помощью другого консьюмера, который придется настроить самостоятельно:
private static final String ORDER_DISPATCH_TOPIC = "v1.orders_dispatch";

...
private Consumer<String, OrderDispatchedEvent> outputConsumer;

@BeforeEach
void setupConsumer() {
    Map<String, Object> consumerProps = new HashMap<>(KafkaTestUtils.consumerProps(KAFKA.getBootstrapServers(), "testConsumer", "true"));
    consumerProps.put("key.deserializer", StringDeserializer.class);
    consumerProps.put("value.deserializer", KafkaAvroDeserializer.class);
    consumerProps.put("schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
    consumerProps.put("specific.avro.reader", "true");

    outputConsumer = new DefaultKafkaConsumerFactory<String, OrderDispatchedEvent>(consumerProps).createConsumer();
    outputConsumer.subscribe(List.of(ORDER_DISPATCH_TOPIC));
    outputConsumer.poll(Duration.ofMillis(0));
}
После создания консьюмера мы подписываемся на топик v1.orders_dispatch и через poll непрерывно опрашиваем сообщения из брокера.

После каждого теста консьюмер закрывается:
@AfterEach
void closeConsumer() {
    outputConsumer.close();
}
Тестируем сценарий, когда заказ отклоняется (идентификатор заказа нечетный). Для этого отправляем сообщение в топик v1.public.orders_outbox с помощью подготовленного продъюсера, а затем с помощью KafkaTestUtils и настроенного консьюмера вычитываем обработанное сообщение из топика v1.orders_dispatch:
@Test
void consumeOrderPlacedEvent_rejectsOrderWithOddId() {
    Long toBeRejected = 1L;
    var input = buildEvent(toBeRejected);

    kafkaTemplate.send(ORDER_PLACED_TOPIC, "1", input);
    ConsumerRecord<String, OrderDispatchedEvent> consumed = KafkaTestUtils.getSingleRecord(outputConsumer, ORDER_DISPATCH_TOPIC, Duration.ofSeconds(10));

    assertThat(consumed.value().getOrderId()).isEqualTo(toBeRejected);
    assertThat(consumed.value().getStatus()).isEqualTo(OrderDispatchStatus.REJECTED);
}
Тестирование сценария одобрения заказа происходит аналогичным способом:
@Test
void consumeOrderPlacedEvent_acceptsOrderWithEvenId() {
    Long toBeAccepted = 2L;
    var input = buildEvent(toBeAccepted);

    kafkaTemplate.send(ORDER_PLACED_TOPIC, "2", input);
    ConsumerRecord<String, OrderDispatchedEvent> consumed = KafkaTestUtils.getSingleRecord(outputConsumer, ORDER_DISPATCH_TOPIC, Duration.ofSeconds(10));

    assertThat(consumed.value().getOrderId()).isEqualTo(toBeAccepted);
    assertThat(consumed.value().getStatus()).isEqualTo(OrderDispatchStatus.ACCEPTED);
}
Запускаем тесты:
./gradlew test
Интеграция с Kafka II > Config Server, Контейнеризация, GitHub Actions