Интеграция Dispatcher Service и Kafka
Avro классы сгенерированы, мы можем приступить к реализации логики вычитыванияAvroOrderPlacedEvent
и отправки OrderDispatchedEvent
.
Примените в ветке master патч 2_kafka_integration_1.patch
Настраиваем Kafka Producer
В Spring Boot приложениях сообщения отправляются в Kafka с помощью класса
Посмотрите интерфейс
С другими настройками Producer можно ознакомиться в документации.
KafkaTemplate<Key, Value>
.Посмотрите интерфейс
KafkaOperations
,
который он реализует (удобно его смотреть прямо в коде IDEA через Ctrl+N).
Настройка Kafka Producer через application.yml
подразумевает, что мы прописываем все параметры продъюсера в конфигурационном файле, а Spring Boot на их
основе настраивает бины:
ProducerFactory<KEY, VALUE>
- фабрика для создания экземпляров продъюсеровKafkaTemplate<KEY, VALUE>
- сам продъюсер.
application.yml
нам обязательно необходимо указать:
spring.kafka.bootstrap-servers
- список адресов для связи с брокерами Kafka, указывается в формате:broker1:port
,broker2:port
и т.д. Эта настройка будет использована как для Producer, так и для Consumer.spring.kafka.producer.key-serializer
- определяет сериализатор для ключей отправляемых в Kafka сообщений. В нашем случае будем использоватьorg.apache.kafka.common.serialization.StringSerializer
. Обычно в качестве ключей сообщений используются примитивные типы данных.spring.kafka.producer.value-serializer
- определяет сериализатор значений сообщений, отправляемых в Kafka. Мы используем Apache Avro, поэтому в качестве сериализатора указываемio.confluent.kafka.serializers.KafkaAvroSerializer
.spring.kafka.producer.properties.schema.registry.url
- так как мы в проекте используем Confluent Schema Registry, необходимо указать URL для связи с ней.
- Каждому продюсеру присваивается уникальный ID, который используется для идентификации в рамках кластера Kafka.
- Каждому сообщению, отправляемому продюсером, присваивается уникальный номер в рамках своего PID. Это позволяет брокеру определять и фильтровать дубликаты.
- Kafka хранит состояние последнего успешно зафиксированного сообщения для каждого PID и может соответственно обработать повторные попытки отправки.
spring.kafka.producer.properties.enable.idempotence = true
spring.kafka.producer.properties.acks = all
- продъюсер ждет подтверждения доставки сообщения от всех синхронизированных реплик
v1.orders_dispatch
и обновлять статус заказа в Postgres, что также является
идемпотентной операцией,
поэтому в итоге мы получим end-to-end идемпотентность.
По умолчанию остаются настройки:
spring.kafka.producer.properties.max.in.flight.requests.per.connection
- количество сообщений, которые могут быть "в полете", то есть отправлены, но еще не подтверждены брокером Kafka в рамках одного соединения с брокером, по умолчанию - 5. Для обеспечения идемпотентности продъюсера необходимо, чтобы этот параметр был от 1 до 5.spring.kafka.producer.properties.retries
- количество повторных попыток отправить сообщение в случае ошибки. По умолчаниюInteger.MAX_VALUE
, то есть неограниченное количество ретраев. Для обеспечения идемпотентности продъюсера необходимо, чтобы этот параметр был больше 1.compression.type
- тип сжатия данных. Если отправляемые продъюсером сообщения имеют большой размер, а в приложении важную роль занимает пропускная способность, то можно применить сжатие данных. По умолчанию, она равнаnone
, то есть сжатие не производится. Мы не будем сжимать данные, так как наши сообщения небольшие по размеру, кроме того, такую оптимизацию лучше проводить на основе реальных данных мониторинга потребления сетевых и дисковых ресурсов. Другие варианты этой настройки:Gzip
- лучше всего сжимает данные, но потребляет много CPU и, соответственно, требует больше всего времени. Однако значительно снижает потребление сетевых ресурсов.Snappy
иZstd
- имеют средние показатели сжатия и потребления CPU.Lz4
- сжимает данные быстро, но не так эффективно, как остальные алгоритмы, соответственно, потребление сетевых ресурсов выше.
batch.size
- размер пакета в байтах, по умолчанию - 16384 байт. Producer отправляет сообщения в Kafka-брокер не по одному, а пакетами, размер которых контролируется этой настройкой. Увеличение размера отправляемого пакета приведет к повышению пропускной способности, однако может также увеличить время ожидания отправки сообщений и их обработки консьюмером.max.request.size
, по умолчанию это 1048576 байт. В одном запросе на отправку сообщений может быть несколько пакетов, размер всего отправляемого запроса в байтах контролируется этой настройкой.linger.ms
- время ожидания Producer-ом, пока сообщения в пакете скопятся до нужного размера, по умолчанию - 0. То есть по умолчанию продьюсер вообще не будет ждать, пока наберется пакет нужного размера. Мы оставим эту настройку такой, чтобы быть уверенными, что наши тестовые сообщения будут быстро попадать в топики Kafka.
batch.size
, max.request.size
и linger.ms
в prod-среде определяются на основе данных мониторинга и зависят от
нагрузки на продъюсера.
Посмотрим конфигурацию application.yml
, котороя относится к продъюсеру:
spring:
...
kafka:
bootstrap-servers: localhost:9097
listener:
ack-mode: manual
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081
enable.idempotence: true
acks: all
kafkaprops:
...
order-dispatch-topic: v1.orders_dispatch
replication-factor: 1
partitions-count: 3
В классе KafkaTopicConfig
создаем топик v1.orders_dispatch
. Топик создастся в Kafka при первом запуске приложения с
помощью созданного в KafkaAutoConfiguration
клиента KafkaAdmin
, для конфигурации которого необходима настройка spring.kafka.bootstrap-servers
.
@Configuration
public class KafkaTopicConfig {
@Value("${kafkaprops.order-dispatch-topic}")
private String ordersDispatchTopic;
@Value("${kafkaprops.replication-factor}")
private Integer replicationFactor;
@Value("${kafkaprops.partitions-count}")
private Integer partitionsCount;
@Bean
public NewTopic ordersDispatchTopic() {
return TopicBuilder
.name(ordersDispatchTopic)
.replicas(replicationFactor)
.partitions(partitionsCount)
.build();
}
}
Пример настройки Kafka Producer в классах @Configuration
можно посмотреть в документации Spring.С другими настройками Producer можно ознакомиться в документации.
Настраиваем Kafka Consumer
Kafka-консьюмер может быть реализован в любом зарегистрированном в контексте спринга бине с помощью аннотации
Так как в настройках консьюмера мы указали
Для отправки сообщений в топик Kafka используется
До отправки первого сообщения в Kafka продъюсер регистрирует его схему в Schema Registry, доступ к которой указан в настройке
@KafkaListener
,
которую необходимо поставить над публичным методом этого бина. В качестве параметров метод может принимать:
- одно сообщение типа
ConsumerRecord<KEY, VALUE>
- несколько таких сообщений, обернутых в
ConsumerRecords<KEY, VALUE>
- само значение сообщения (в нашем случае это
AvroOrderPlacedEvent
) - список значений, например,
List<AvroOrderPlacedEvent>
- ряд заголовков, которые передаются в ответе от брокера:
KafkaHeaders.OFFSET
,KafkaHeaders.RECEIVED_KEY
(если ключ отсутствует в сообщении, то передается null),KafkaHeaders.RECEIVED_TOPIC
,KafkaHeaders.RECEIVED_PARTITION
,KafkaHeaders.RECEIVED_TIMESTAMP
application.yml
(см. Apache Kafka Support)
или настроив бины в классе @Configuration
(см. Spring Kafka
Receiving Messages).
Настройка Kafka Consumer через application.yml
подразумевает, что мы прописываем все параметры консьюмера в конфигурационном файле,
а Spring Boot на их основе настраивает необходимые бины:
ConsumerFactory<KEY, VALUE>
- сущность, которая непосредственно создает экземпляры консьюмеров на основе переданных настроек. Реализация данного интерфейса -DefaultKafkaConsumerFactory
.KafkaListenerContainerFactory
- фабрика для контейнеров типаMessageListenerContainer
, в которых реализуется логика консьюмеров. В Spring Boot реализовано два типаMessageListenerContainer
:KafkaMessageListenerContainer
- контейнер для консьюмера, который обрабатывает все сообщения из всех топиков и партиций, указанных в настройках, в одном потоке.ConcurrentMessageListenerContainer
- контейнер, который создает необходимое количествоKafkaMessageListenerContainer
, чтобы обрабатывать сообщения в многопоточном режиме.
@KafkaListener
по умолчанию настраивается фабрика ConcurrentKafkaListenerContainerFactory
,
которая создает контейнеры типа ConcurrentMessageListenerContainer
. В параметрах аннтотации @KafkaListener
можно выставить нужный уровень параллельности (concurrency), чтобы в рамках одного приложения вычитывать данные из партиций топика в несколько потоков.
В нашем учебном проекте это не требуется, однако стоит знать о такой возможности.
В application.yml
мы задаем следующие конфигурации для spring.kafka.consumer
:
group-id
- уникальный строковый идентификатор Consumer Group для того, чтобы инстансы приложения выступали в качестве одной группы потребителей.key-deserializer
- определяем десериализатор для ключей сообщений. Мы получаем сообщения из топикаv1.public.orders_outbox
ключи с типомString
, поэтому используемorg.apache.kafka.common.serialization.StringDeserializer
.value-deserializer
- определяем десериализатор для значений сообщений. Значения имеют типAvroOrderPlacedEvent
, поэтому используемio.confluent.kafka.serializers.KafkaAvroDeserializer
.spring.kafka.consumer.properties
- дополнительная специфическая конфигурация:schema.registry.url
- указываем url для связи с Confluent Schema Registry, так как схема значений сообщений будет храниться именно там.specific.avro.reader
- этот параметр выставляем вtrue
, так как мы планируем вычитывать сообщение с конкретным типомAvroOrderPlacedEvent
. В противном случае нам придется обрабатывать обобщенные записи типаGenericRecord
, в котором не будет нужных нам полей в явном виде, что не очень удобно.
-
spring.kafka.consumer.enable-auto-commit
- указывает, должен ли Kafka Consumer автоматически коммитить оффсеты прочитанных сообщений. Приtrue
Consumer автоматически коммитит оффсеты в фоне с интервалом, задаваемым параметромauto.commit.interval.ms
(по умолчанию 5000, то есть 5 секунд). Приfalse
ответственность за коммит оффсетов ложится на разработчика, и подтверждение должно происходить вручную в коде приложения. Мы хотим, чтобы оффсет был закоммичен только после успешной отправки сообщения в топикv1.orders_dispatch
. Если по какой-то причине произойдет ошибка во время отправки сообщения, консьюмер запросит еще раз пачку сообщений у брокера, начиная с первого незакоммиченного оффсета. Для этого мы будем самостоятельно контролировать процесс фиксации оффсетов (подтверждение чтения). spring.kafka.listener.ack-mode
- настройка, требуемая для организации ручного управления коммитами оффсетов. Допустимые значения:batch
- коммитим оффсеты после обработки всего пакета данных.count
- коммитим оффсеты после обработкиack-count
(параметр консьюмера) количества сообщений.count_time
- коммитим оффсеты после обработкиack-count
количества сообщений или после истечения времени, установленного вack-time
(параметр консьюмера).manual
- вручную коммитим оффсеты, при этом коммиты будут собираться в очередь по мере обработки сообщений из полученного пакета. После обработки всех сообщений из пакета оффсеты из очереди (для разных партиций) будут закоммичены в одной операции.manual_immediate
- вручную коммитим оффсеты в брокере сразу после обработки сообщения, еслиAcknowledgment.acknowledge()
вызывается в том же потоке, что и консьюмер. В противном случае коммиты собираются в очередь по мере обработки сообщений из пакета.record
- коммитим оффсет после обработки каждого сообщения.time
- коммитим оффсеты после истечения указанного вack-time
времени.
-
auto-offset-reset
- определяет, как консьюмер должен реагировать, когда для топика нет закоммиченных оффсетов (например, консьюмер впервые начал вычитывать сообщения из топика) или последний закоммиченный консьюмером оффсет уже удален из брокера из-за настроек сохранения данных. В Kafka есть три основных значения для этой настройки:earliest
: консьюмер начнет чтение с начального оффсета в топике, то есть с самого старого доступного сообщения. Мы выбрали это значение, так как нам необходимо гарантировать обработку всех сообщений о создании заказа, даже если произойдет сбой и оффсеты будут утеряны. В этом случае будут дубликаты сообщений, однако мы полагаемся на end-to-end идемпотентность, которая достигается за счет идемпотентного продъюсера и идемпотентной операции обновления статуса заказа в базе данных. Кроме того,AvroOrderPlacedEvent
содержит вспомогательную информацию в виде LSN и времени совершения транзакции, которая также может быть использована для дедуплицирования на стороне Dispatcher Service, если в дальнейшем изменится логика обработки заказа.latest
: консьюмер начнет чтение с самого последнего оффсета.none
: если в топике, на которые подписан консьюмер, отсутствуют закоммиченные оффсеты, он выбросит ошибку, сообщая о невозможности его найти. Это потребует от приложения явной обработки такой ситуации. При отсуствии закоммиченных оффсетов (на старте) необходимо будет обработать исключениеNoOffsetForPartitionException
.
fetch.min.bytes
- минимальное количество байт, которые консьюмер ожидает получить от брокера. По умолчанию - 1 байт. Увеличение этой настройки приведет к снижению количества сетевых запросов, однако может увеличить время простоя консьюмера, так как он будет ждать, когда на брокере будет достаточно информации для передачи.fetch.max.bytes
- максимальное количество байт, которое консьюмер ждет от брокера. По умолчанию - 57671680 байт (55 мебибайт). Увеличение этой настройки также может привести к снижению количества сетевых запросов, однако повысит расход памяти приложения.session.timeout.ms
- время, в течение которого брокер ждет хартбита от консьюмера, по умолчанию - 45000 миллисекунд или 45 секунд, после которого брокер считает консьюмера мертвым. После этого он производит ребалансировку группы консьюмеров, что повлияет на время обработки сообщений. В целом, мы стремимся минимизировать ребалансировки, однако менять значение стоит после мониторинга производительности консьюмера. Также важно знать, что это значение должно быть в итервале отgroup.min.session.timeout.ms
(по умолчанию 6 секунд) доgroup.max.session.timeout.ms
(по умолчанию 30 минут), которые установлены в брокере.hearbeat.interval.ms
- интервал между посылаемыми консьюмером в брокер хартбитами (по умолчанию - 3 секунды). Должен быть нижеsession.timeout.ms
, в типичных сценариях использования устанавливается в 1/3 отsession.timeout.ms
. Более короткий интервал позволяет быстрее обнаруживать “упавших” консьюмеров, при этом повышает нагрузку на координатора группы.
application.yml
прописаны указанные конфигурации, а также настройки, необходимые для создания топика v1.orders_dispatch
и ручного
коммита оффсетов:
spring:
...
kafka:
bootstrap-servers: localhost:9097
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
specific.avro.reader: true
schema.registry.url: http://localhost:8081
auto-offset-reset: earliest
enable-auto-commit: false
listener:
ack-mode: manual
...
kafkaprops:
order-dispatch-topic: v1.orders_dispatch
order-placed-topic: v1.public.orders_outbox
...
nack-sleep-duration: 100ms
В классе DispatcherService
реализована логика получения сообщений из топика v1.public.orders_outbox
, их обработки и отправки сообщений в
топик v1.orders_dispatch
.
@Slf4j
@RequiredArgsConstructor
@Service
public class DispatcherService {
private final KafkaTemplate<String, OrderDispatchedEvent> kafkaTemplate;
@Value("${kafkaprops.order-dispatch-topic}")
private String orderDispatchTopic;
@Value("${kafkaprops.nack-sleep-duration}")
private Duration nackSleepDuration;
@KafkaListener(topics = {"${kafkaprops.order-placed-topic}"})
public void consumeOrderPlacedEvent(AvroOrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION) Integer partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment acknowledgment) {
log.info("Consuming message from Kafka: {}. Key: {}. Partition: {}. Topic: {}",
event, key, partition, topic);
var orderDispatchEvent = processEvent(event);
try {
kafkaTemplate.send(orderDispatchTopic, key, orderDispatchEvent).get();
log.info("Successfully processed and sent OrderDispatchedEvent {} to Kafka.",
orderDispatchEvent);
// commit the offset
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Failed to send OrderDispatchedEvent {} to Kafka", orderDispatchEvent);
// don't commit the offset
acknowledgment.nack(nackSleepDuration);
}
}
private OrderDispatchedEvent processEvent(AvroOrderPlacedEvent event) {
log.info("Processing OrderPlacedEvent: {}", event);
OrderDispatchStatus status = event.getOrderId() % 2 == 0 ?
OrderDispatchStatus.ACCEPTED : OrderDispatchStatus.REJECTED;
return OrderDispatchedEvent.newBuilder()
.setOrderId(event.getOrderId())
.setStatus(status)
.build();
}
}
Метод, в котором происходит чтение данных из топика Kafka, помечен аннотацией @KafkaListener
.Так как в настройках консьюмера мы указали
enable-auto-commit=false
, управление коммитами оффсетов осуществляется вручную с помощью сущности Acknowledgment
.Для отправки сообщений в топик Kafka используется
KafkaTemplate<String, OrderDispatchedEvent> kafkaTemplate
.До отправки первого сообщения в Kafka продъюсер регистрирует его схему в Schema Registry, доступ к которой указан в настройке
schema.registry.url
.
После отправки сообщения мы дожидаемся результата с помощью блокирующего CompletableFuture.get()
, затем коммитим оффсет с помощью
Acknowledgment.acknowledge()
. Таким образом мы гарантируем, что оффсеты будут закоммичены только после отправки сообщения в топик.
В случае ошибки мы не коммитим оффсет, а вызываем метод Acknowledgment.nack(Duration)
, который фактически отбрасывает все остальные сообщения из пачки,
заставляет поток консьюмера уснуть на время, указанное в параметре Duration
, а после этого перечитывает пачку сообщений, начиная с незакоммиченного
оффсета.
Тестируем интеграцию с Kafka. TestContainers Kafka
Примените в ветке master патч 2_kafka_integration_2.patch
SchemaRegistryContainer
расширяет GenericContainer
и реализует логику поднятия контейнера Schema Registry.
В конструкторе класса мы ожидаем, пока реестр схем будет доступен с помощью механизма waitFor()
и стратегии ожидания
Wait.forHttp(”/subjects”).forStatusCode(200)
.В методе
SchemaRegistryContainer.withKafka
Schema Registry инициализируется параметрами, необходимыми для связи с тестовым контейнером Kafka.
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {
public static final String SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry";
public static final int SCHEMA_REGISTRY_PORT = 8081;
public SchemaRegistryContainer(String version) {
super(SCHEMA_REGISTRY_IMAGE + ":" + version);
waitingFor(Wait.forHttp("/subjects").forStatusCode(200));
withExposedPorts(SCHEMA_REGISTRY_PORT);
}
public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092");
}
public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
withNetwork(network);
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry");
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081");
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers);
return self();
}
}
В классе DispatcherServiceTest
тестируются сценарии одобрения и отклонения заказа.
Инициализируем контейнеры Kafka и Schema Registry:
public static final String CONFLUENT_VERSION = "7.5.2";
...
private static final Network NETWORK = Network.newNetwork();
public static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_VERSION))
.withKraft()
.withNetwork(NETWORK);
public static final SchemaRegistryContainer SCHEMA_REGISTRY = new SchemaRegistryContainer(CONFLUENT_VERSION);
@BeforeAll
static void setup() {
KAFKA.start();
SCHEMA_REGISTRY.withKafka(KAFKA).start();
System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers());
System.setProperty("spring.kafka.consumer.properties.schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
System.setProperty("spring.kafka.producer.properties.schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
}
Для тестирования нам необходимо подготовить продьюсера, с помощью которого мы сможем отправить сообщение в топик v1.public.orders_outbox
:
@Autowired
private KafkaTemplate<String, AvroOrderPlacedEvent> kafkaTemplate;
Настройки продъюсера подтянуться из application.yml
.Затем это сообщение вычитает настроенный в нашем сервисе консьюмер, а продъюсер, объявленный там же, отправит обработанное сообщение в топик
v1.orders_dispatch
.
Вычитать отправленное сообщение для проверки мы сможем с помощью другого консьюмера, который придется настроить самостоятельно:
private static final String ORDER_DISPATCH_TOPIC = "v1.orders_dispatch";
...
private Consumer<String, OrderDispatchedEvent> outputConsumer;
@BeforeEach
void setupConsumer() {
Map<String, Object> consumerProps = new HashMap<>(KafkaTestUtils.consumerProps(KAFKA.getBootstrapServers(), "testConsumer", "true"));
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
consumerProps.put("specific.avro.reader", "true");
outputConsumer = new DefaultKafkaConsumerFactory<String, OrderDispatchedEvent>(consumerProps).createConsumer();
outputConsumer.subscribe(List.of(ORDER_DISPATCH_TOPIC));
outputConsumer.poll(Duration.ofMillis(0));
}
После создания консьюмера мы подписываемся на топик v1.orders_dispatch
и через poll
непрерывно опрашиваем сообщения из брокера.
После каждого теста консьюмер закрывается:
@AfterEach
void closeConsumer() {
outputConsumer.close();
}
Тестируем сценарий, когда заказ отклоняется (идентификатор заказа нечетный). Для этого отправляем сообщение в топик v1.public.orders_outbox
с помощью
подготовленного продъюсера, а затем с помощью
KafkaTestUtils
и настроенного консьюмера вычитываем обработанное сообщение из топика v1.orders_dispatch
:
@Test
void consumeOrderPlacedEvent_rejectsOrderWithOddId() {
Long toBeRejected = 1L;
var input = buildEvent(toBeRejected);
kafkaTemplate.send(ORDER_PLACED_TOPIC, "1", input);
ConsumerRecord<String, OrderDispatchedEvent> consumed = KafkaTestUtils.getSingleRecord(outputConsumer, ORDER_DISPATCH_TOPIC, Duration.ofSeconds(10));
assertThat(consumed.value().getOrderId()).isEqualTo(toBeRejected);
assertThat(consumed.value().getStatus()).isEqualTo(OrderDispatchStatus.REJECTED);
}
Тестирование сценария одобрения заказа происходит аналогичным способом:
@Test
void consumeOrderPlacedEvent_acceptsOrderWithEvenId() {
Long toBeAccepted = 2L;
var input = buildEvent(toBeAccepted);
kafkaTemplate.send(ORDER_PLACED_TOPIC, "2", input);
ConsumerRecord<String, OrderDispatchedEvent> consumed = KafkaTestUtils.getSingleRecord(outputConsumer, ORDER_DISPATCH_TOPIC, Duration.ofSeconds(10));
assertThat(consumed.value().getOrderId()).isEqualTo(toBeAccepted);
assertThat(consumed.value().getStatus()).isEqualTo(OrderDispatchStatus.ACCEPTED);
}
Запускаем тесты:
./gradlew test