Интеграция Dispatcher Service и Kafka
Avro классы сгенерированы, мы можем приступить к реализации логики вычитыванияAvroOrderPlacedEvent и отправки OrderDispatchedEvent.
Примените в ветке master патч 2_kafka_integration_1.patch
Настраиваем Kafka Producer
В Spring Boot приложениях сообщения отправляются в Kafka с помощью класса
Посмотрите интерфейс
С другими настройками Producer можно ознакомиться в документации.
KafkaTemplate<Key, Value>.Посмотрите интерфейс
KafkaOperations,
который он реализует (удобно его смотреть прямо в коде IDEA через Ctrl+N).
Настройка Kafka Producer через application.yml подразумевает, что мы прописываем все параметры продъюсера в конфигурационном файле, а Spring Boot на их
основе настраивает бины:
ProducerFactory<KEY, VALUE>- фабрика для создания экземпляров продъюсеровKafkaTemplate<KEY, VALUE>- сам продъюсер.
application.yml нам обязательно необходимо указать:
spring.kafka.bootstrap-servers- список адресов для связи с брокерами Kafka, указывается в формате:broker1:port,broker2:portи т.д. Эта настройка будет использована как для Producer, так и для Consumer.spring.kafka.producer.key-serializer- определяет сериализатор для ключей отправляемых в Kafka сообщений. В нашем случае будем использоватьorg.apache.kafka.common.serialization.StringSerializer. Обычно в качестве ключей сообщений используются примитивные типы данных.spring.kafka.producer.value-serializer- определяет сериализатор значений сообщений, отправляемых в Kafka. Мы используем Apache Avro, поэтому в качестве сериализатора указываемio.confluent.kafka.serializers.KafkaAvroSerializer.spring.kafka.producer.properties.schema.registry.url- так как мы в проекте используем Confluent Schema Registry, необходимо указать URL для связи с ней.
- Каждому продюсеру присваивается уникальный ID, который используется для идентификации в рамках кластера Kafka.
- Каждому сообщению, отправляемому продюсером, присваивается уникальный номер в рамках своего PID. Это позволяет брокеру определять и фильтровать дубликаты.
- Kafka хранит состояние последнего успешно зафиксированного сообщения для каждого PID и может соответственно обработать повторные попытки отправки.
spring.kafka.producer.properties.enable.idempotence = truespring.kafka.producer.properties.acks = all- продъюсер ждет подтверждения доставки сообщения от всех синхронизированных реплик
v1.orders_dispatch и обновлять статус заказа в Postgres, что также является
идемпотентной операцией,
поэтому в итоге мы получим end-to-end идемпотентность.
По умолчанию остаются настройки:
spring.kafka.producer.properties.max.in.flight.requests.per.connection- количество сообщений, которые могут быть "в полете", то есть отправлены, но еще не подтверждены брокером Kafka в рамках одного соединения с брокером, по умолчанию - 5. Для обеспечения идемпотентности продъюсера необходимо, чтобы этот параметр был от 1 до 5.spring.kafka.producer.properties.retries- количество повторных попыток отправить сообщение в случае ошибки. По умолчаниюInteger.MAX_VALUE, то есть неограниченное количество ретраев. Для обеспечения идемпотентности продъюсера необходимо, чтобы этот параметр был больше 1.compression.type- тип сжатия данных. Если отправляемые продъюсером сообщения имеют большой размер, а в приложении важную роль занимает пропускная способность, то можно применить сжатие данных. По умолчанию, она равнаnone, то есть сжатие не производится. Мы не будем сжимать данные, так как наши сообщения небольшие по размеру, кроме того, такую оптимизацию лучше проводить на основе реальных данных мониторинга потребления сетевых и дисковых ресурсов. Другие варианты этой настройки:Gzip- лучше всего сжимает данные, но потребляет много CPU и, соответственно, требует больше всего времени. Однако значительно снижает потребление сетевых ресурсов.SnappyиZstd- имеют средние показатели сжатия и потребления CPU.Lz4- сжимает данные быстро, но не так эффективно, как остальные алгоритмы, соответственно, потребление сетевых ресурсов выше.
batch.size- размер пакета в байтах, по умолчанию - 16384 байт. Producer отправляет сообщения в Kafka-брокер не по одному, а пакетами, размер которых контролируется этой настройкой. Увеличение размера отправляемого пакета приведет к повышению пропускной способности, однако может также увеличить время ожидания отправки сообщений и их обработки консьюмером.max.request.size, по умолчанию это 1048576 байт. В одном запросе на отправку сообщений может быть несколько пакетов, размер всего отправляемого запроса в байтах контролируется этой настройкой.linger.ms- время ожидания Producer-ом, пока сообщения в пакете скопятся до нужного размера, по умолчанию - 0. То есть по умолчанию продьюсер вообще не будет ждать, пока наберется пакет нужного размера. Мы оставим эту настройку такой, чтобы быть уверенными, что наши тестовые сообщения будут быстро попадать в топики Kafka.
batch.size, max.request.size и linger.ms в prod-среде определяются на основе данных мониторинга и зависят от
нагрузки на продъюсера.
Посмотрим конфигурацию application.yml, котороя относится к продъюсеру:
spring:
...
kafka:
bootstrap-servers: localhost:9097
listener:
ack-mode: manual
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081
enable.idempotence: true
acks: all
kafkaprops:
...
order-dispatch-topic: v1.orders_dispatch
replication-factor: 1
partitions-count: 3
В классе KafkaTopicConfig создаем топик v1.orders_dispatch. Топик создастся в Kafka при первом запуске приложения с
помощью созданного в KafkaAutoConfiguration
клиента KafkaAdmin, для конфигурации которого необходима настройка spring.kafka.bootstrap-servers.
@Configuration
public class KafkaTopicConfig {
@Value("${kafkaprops.order-dispatch-topic}")
private String ordersDispatchTopic;
@Value("${kafkaprops.replication-factor}")
private Integer replicationFactor;
@Value("${kafkaprops.partitions-count}")
private Integer partitionsCount;
@Bean
public NewTopic ordersDispatchTopic() {
return TopicBuilder
.name(ordersDispatchTopic)
.replicas(replicationFactor)
.partitions(partitionsCount)
.build();
}
}
Пример настройки Kafka Producer в классах @Configuration можно посмотреть в документации Spring.С другими настройками Producer можно ознакомиться в документации.
Настраиваем Kafka Consumer
Kafka-консьюмер может быть реализован в любом зарегистрированном в контексте спринга бине с помощью аннотации
Так как в настройках консьюмера мы указали
Для отправки сообщений в топик Kafka используется
До отправки первого сообщения в Kafka продъюсер регистрирует его схему в Schema Registry, доступ к которой указан в настройке
@KafkaListener,
которую необходимо поставить над публичным методом этого бина. В качестве параметров метод может принимать:
- одно сообщение типа
ConsumerRecord<KEY, VALUE> - несколько таких сообщений, обернутых в
ConsumerRecords<KEY, VALUE> - само значение сообщения (в нашем случае это
AvroOrderPlacedEvent) - список значений, например,
List<AvroOrderPlacedEvent> - ряд заголовков, которые передаются в ответе от брокера:
KafkaHeaders.OFFSET,KafkaHeaders.RECEIVED_KEY(если ключ отсутствует в сообщении, то передается null),KafkaHeaders.RECEIVED_TOPIC,KafkaHeaders.RECEIVED_PARTITION,KafkaHeaders.RECEIVED_TIMESTAMP
application.yml (см. Apache Kafka Support)
или настроив бины в классе @Configuration (см. Spring Kafka
Receiving Messages).
Настройка Kafka Consumer через application.yml подразумевает, что мы прописываем все параметры консьюмера в конфигурационном файле,
а Spring Boot на их основе настраивает необходимые бины:
ConsumerFactory<KEY, VALUE>- сущность, которая непосредственно создает экземпляры консьюмеров на основе переданных настроек. Реализация данного интерфейса -DefaultKafkaConsumerFactory.KafkaListenerContainerFactory- фабрика для контейнеров типаMessageListenerContainer, в которых реализуется логика консьюмеров. В Spring Boot реализовано два типаMessageListenerContainer:KafkaMessageListenerContainer- контейнер для консьюмера, который обрабатывает все сообщения из всех топиков и партиций, указанных в настройках, в одном потоке.ConcurrentMessageListenerContainer- контейнер, который создает необходимое количествоKafkaMessageListenerContainer, чтобы обрабатывать сообщения в многопоточном режиме.
@KafkaListener по умолчанию настраивается фабрика ConcurrentKafkaListenerContainerFactory,
которая создает контейнеры типа ConcurrentMessageListenerContainer. В параметрах аннтотации @KafkaListener
можно выставить нужный уровень параллельности (concurrency), чтобы в рамках одного приложения вычитывать данные из партиций топика в несколько потоков.
В нашем учебном проекте это не требуется, однако стоит знать о такой возможности.
В application.yml мы задаем следующие конфигурации для spring.kafka.consumer:
group-id- уникальный строковый идентификатор Consumer Group для того, чтобы инстансы приложения выступали в качестве одной группы потребителей.key-deserializer- определяем десериализатор для ключей сообщений. Мы получаем сообщения из топикаv1.public.orders_outboxключи с типомString, поэтому используемorg.apache.kafka.common.serialization.StringDeserializer.value-deserializer- определяем десериализатор для значений сообщений. Значения имеют типAvroOrderPlacedEvent, поэтому используемio.confluent.kafka.serializers.KafkaAvroDeserializer.spring.kafka.consumer.properties- дополнительная специфическая конфигурация:schema.registry.url- указываем url для связи с Confluent Schema Registry, так как схема значений сообщений будет храниться именно там.specific.avro.reader- этот параметр выставляем вtrue, так как мы планируем вычитывать сообщение с конкретным типомAvroOrderPlacedEvent. В противном случае нам придется обрабатывать обобщенные записи типаGenericRecord, в котором не будет нужных нам полей в явном виде, что не очень удобно.
-
spring.kafka.consumer.enable-auto-commit- указывает, должен ли Kafka Consumer автоматически коммитить оффсеты прочитанных сообщений. ПриtrueConsumer автоматически коммитит оффсеты в фоне с интервалом, задаваемым параметромauto.commit.interval.ms(по умолчанию 5000, то есть 5 секунд). Приfalseответственность за коммит оффсетов ложится на разработчика, и подтверждение должно происходить вручную в коде приложения. Мы хотим, чтобы оффсет был закоммичен только после успешной отправки сообщения в топикv1.orders_dispatch. Если по какой-то причине произойдет ошибка во время отправки сообщения, консьюмер запросит еще раз пачку сообщений у брокера, начиная с первого незакоммиченного оффсета. Для этого мы будем самостоятельно контролировать процесс фиксации оффсетов (подтверждение чтения). spring.kafka.listener.ack-mode- настройка, требуемая для организации ручного управления коммитами оффсетов. Допустимые значения:batch- коммитим оффсеты после обработки всего пакета данных.count- коммитим оффсеты после обработкиack-count(параметр консьюмера) количества сообщений.count_time- коммитим оффсеты после обработкиack-countколичества сообщений или после истечения времени, установленного вack-time(параметр консьюмера).manual- вручную коммитим оффсеты, при этом коммиты будут собираться в очередь по мере обработки сообщений из полученного пакета. После обработки всех сообщений из пакета оффсеты из очереди (для разных партиций) будут закоммичены в одной операции.manual_immediate- вручную коммитим оффсеты в брокере сразу после обработки сообщения, еслиAcknowledgment.acknowledge()вызывается в том же потоке, что и консьюмер. В противном случае коммиты собираются в очередь по мере обработки сообщений из пакета.record- коммитим оффсет после обработки каждого сообщения.time- коммитим оффсеты после истечения указанного вack-timeвремени.
-
auto-offset-reset- определяет, как консьюмер должен реагировать, когда для топика нет закоммиченных оффсетов (например, консьюмер впервые начал вычитывать сообщения из топика) или последний закоммиченный консьюмером оффсет уже удален из брокера из-за настроек сохранения данных. В Kafka есть три основных значения для этой настройки:earliest: консьюмер начнет чтение с начального оффсета в топике, то есть с самого старого доступного сообщения. Мы выбрали это значение, так как нам необходимо гарантировать обработку всех сообщений о создании заказа, даже если произойдет сбой и оффсеты будут утеряны. В этом случае будут дубликаты сообщений, однако мы полагаемся на end-to-end идемпотентность, которая достигается за счет идемпотентного продъюсера и идемпотентной операции обновления статуса заказа в базе данных. Кроме того,AvroOrderPlacedEventсодержит вспомогательную информацию в виде LSN и времени совершения транзакции, которая также может быть использована для дедуплицирования на стороне Dispatcher Service, если в дальнейшем изменится логика обработки заказа.latest: консьюмер начнет чтение с самого последнего оффсета.none: если в топике, на которые подписан консьюмер, отсутствуют закоммиченные оффсеты, он выбросит ошибку, сообщая о невозможности его найти. Это потребует от приложения явной обработки такой ситуации. При отсуствии закоммиченных оффсетов (на старте) необходимо будет обработать исключениеNoOffsetForPartitionException.
fetch.min.bytes- минимальное количество байт, которые консьюмер ожидает получить от брокера. По умолчанию - 1 байт. Увеличение этой настройки приведет к снижению количества сетевых запросов, однако может увеличить время простоя консьюмера, так как он будет ждать, когда на брокере будет достаточно информации для передачи.fetch.max.bytes- максимальное количество байт, которое консьюмер ждет от брокера. По умолчанию - 57671680 байт (55 мебибайт). Увеличение этой настройки также может привести к снижению количества сетевых запросов, однако повысит расход памяти приложения.session.timeout.ms- время, в течение которого брокер ждет хартбита от консьюмера, по умолчанию - 45000 миллисекунд или 45 секунд, после которого брокер считает консьюмера мертвым. После этого он производит ребалансировку группы консьюмеров, что повлияет на время обработки сообщений. В целом, мы стремимся минимизировать ребалансировки, однако менять значение стоит после мониторинга производительности консьюмера. Также важно знать, что это значение должно быть в итервале отgroup.min.session.timeout.ms(по умолчанию 6 секунд) доgroup.max.session.timeout.ms(по умолчанию 30 минут), которые установлены в брокере.hearbeat.interval.ms- интервал между посылаемыми консьюмером в брокер хартбитами (по умолчанию - 3 секунды). Должен быть нижеsession.timeout.ms, в типичных сценариях использования устанавливается в 1/3 отsession.timeout.ms. Более короткий интервал позволяет быстрее обнаруживать “упавших” консьюмеров, при этом повышает нагрузку на координатора группы.
application.yml прописаны указанные конфигурации, а также настройки, необходимые для создания топика v1.orders_dispatch и ручного
коммита оффсетов:
spring:
...
kafka:
bootstrap-servers: localhost:9097
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
specific.avro.reader: true
schema.registry.url: http://localhost:8081
auto-offset-reset: earliest
enable-auto-commit: false
listener:
ack-mode: manual
...
kafkaprops:
order-dispatch-topic: v1.orders_dispatch
order-placed-topic: v1.public.orders_outbox
...
nack-sleep-duration: 100ms
В классе DispatcherService реализована логика получения сообщений из топика v1.public.orders_outbox, их обработки и отправки сообщений в
топик v1.orders_dispatch.
@Slf4j
@RequiredArgsConstructor
@Service
public class DispatcherService {
private final KafkaTemplate<String, OrderDispatchedEvent> kafkaTemplate;
@Value("${kafkaprops.order-dispatch-topic}")
private String orderDispatchTopic;
@Value("${kafkaprops.nack-sleep-duration}")
private Duration nackSleepDuration;
@KafkaListener(topics = {"${kafkaprops.order-placed-topic}"})
public void consumeOrderPlacedEvent(AvroOrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION) Integer partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment acknowledgment) {
log.info("Consuming message from Kafka: {}. Key: {}. Partition: {}. Topic: {}",
event, key, partition, topic);
var orderDispatchEvent = processEvent(event);
try {
kafkaTemplate.send(orderDispatchTopic, key, orderDispatchEvent).get();
log.info("Successfully processed and sent OrderDispatchedEvent {} to Kafka.",
orderDispatchEvent);
// commit the offset
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Failed to send OrderDispatchedEvent {} to Kafka", orderDispatchEvent);
// don't commit the offset
acknowledgment.nack(nackSleepDuration);
}
}
private OrderDispatchedEvent processEvent(AvroOrderPlacedEvent event) {
log.info("Processing OrderPlacedEvent: {}", event);
OrderDispatchStatus status = event.getOrderId() % 2 == 0 ?
OrderDispatchStatus.ACCEPTED : OrderDispatchStatus.REJECTED;
return OrderDispatchedEvent.newBuilder()
.setOrderId(event.getOrderId())
.setStatus(status)
.build();
}
}
Метод, в котором происходит чтение данных из топика Kafka, помечен аннотацией @KafkaListener.Так как в настройках консьюмера мы указали
enable-auto-commit=false, управление коммитами оффсетов осуществляется вручную с помощью сущности Acknowledgment.Для отправки сообщений в топик Kafka используется
KafkaTemplate<String, OrderDispatchedEvent> kafkaTemplate.До отправки первого сообщения в Kafka продъюсер регистрирует его схему в Schema Registry, доступ к которой указан в настройке
schema.registry.url.
После отправки сообщения мы дожидаемся результата с помощью блокирующего CompletableFuture.get(), затем коммитим оффсет с помощью
Acknowledgment.acknowledge(). Таким образом мы гарантируем, что оффсеты будут закоммичены только после отправки сообщения в топик.
В случае ошибки мы не коммитим оффсет, а вызываем метод Acknowledgment.nack(Duration), который фактически отбрасывает все остальные сообщения из пачки,
заставляет поток консьюмера уснуть на время, указанное в параметре Duration, а после этого перечитывает пачку сообщений, начиная с незакоммиченного
оффсета.
Тестируем интеграцию с Kafka. TestContainers Kafka
Примените в ветке master патч 2_kafka_integration_2.patch
SchemaRegistryContainer расширяет GenericContainer и реализует логику поднятия контейнера Schema Registry.
В конструкторе класса мы ожидаем, пока реестр схем будет доступен с помощью механизма waitFor() и стратегии ожидания
Wait.forHttp(”/subjects”).forStatusCode(200).В методе
SchemaRegistryContainer.withKafka Schema Registry инициализируется параметрами, необходимыми для связи с тестовым контейнером Kafka.
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {
public static final String SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry";
public static final int SCHEMA_REGISTRY_PORT = 8081;
public SchemaRegistryContainer(String version) {
super(SCHEMA_REGISTRY_IMAGE + ":" + version);
waitingFor(Wait.forHttp("/subjects").forStatusCode(200));
withExposedPorts(SCHEMA_REGISTRY_PORT);
}
public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092");
}
public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
withNetwork(network);
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry");
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081");
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers);
return self();
}
}
В классе DispatcherServiceTest тестируются сценарии одобрения и отклонения заказа.
Инициализируем контейнеры Kafka и Schema Registry:
public static final String CONFLUENT_VERSION = "7.5.2";
...
private static final Network NETWORK = Network.newNetwork();
public static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_VERSION))
.withKraft()
.withNetwork(NETWORK);
public static final SchemaRegistryContainer SCHEMA_REGISTRY = new SchemaRegistryContainer(CONFLUENT_VERSION);
@BeforeAll
static void setup() {
KAFKA.start();
SCHEMA_REGISTRY.withKafka(KAFKA).start();
System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers());
System.setProperty("spring.kafka.consumer.properties.schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
System.setProperty("spring.kafka.producer.properties.schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
}
Для тестирования нам необходимо подготовить продьюсера, с помощью которого мы сможем отправить сообщение в топик v1.public.orders_outbox:
@Autowired
private KafkaTemplate<String, AvroOrderPlacedEvent> kafkaTemplate;
Настройки продъюсера подтянуться из application.yml.Затем это сообщение вычитает настроенный в нашем сервисе консьюмер, а продъюсер, объявленный там же, отправит обработанное сообщение в топик
v1.orders_dispatch.
Вычитать отправленное сообщение для проверки мы сможем с помощью другого консьюмера, который придется настроить самостоятельно:
private static final String ORDER_DISPATCH_TOPIC = "v1.orders_dispatch";
...
private Consumer<String, OrderDispatchedEvent> outputConsumer;
@BeforeEach
void setupConsumer() {
Map<String, Object> consumerProps = new HashMap<>(KafkaTestUtils.consumerProps(KAFKA.getBootstrapServers(), "testConsumer", "true"));
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://localhost:" + SCHEMA_REGISTRY.getFirstMappedPort());
consumerProps.put("specific.avro.reader", "true");
outputConsumer = new DefaultKafkaConsumerFactory<String, OrderDispatchedEvent>(consumerProps).createConsumer();
outputConsumer.subscribe(List.of(ORDER_DISPATCH_TOPIC));
outputConsumer.poll(Duration.ofMillis(0));
}
После создания консьюмера мы подписываемся на топик v1.orders_dispatch и через poll непрерывно опрашиваем сообщения из брокера.
После каждого теста консьюмер закрывается:
@AfterEach
void closeConsumer() {
outputConsumer.close();
}
Тестируем сценарий, когда заказ отклоняется (идентификатор заказа нечетный). Для этого отправляем сообщение в топик v1.public.orders_outbox с помощью
подготовленного продъюсера, а затем с помощью
KafkaTestUtils и настроенного консьюмера вычитываем обработанное сообщение из топика v1.orders_dispatch:
@Test
void consumeOrderPlacedEvent_rejectsOrderWithOddId() {
Long toBeRejected = 1L;
var input = buildEvent(toBeRejected);
kafkaTemplate.send(ORDER_PLACED_TOPIC, "1", input);
ConsumerRecord<String, OrderDispatchedEvent> consumed = KafkaTestUtils.getSingleRecord(outputConsumer, ORDER_DISPATCH_TOPIC, Duration.ofSeconds(10));
assertThat(consumed.value().getOrderId()).isEqualTo(toBeRejected);
assertThat(consumed.value().getStatus()).isEqualTo(OrderDispatchStatus.REJECTED);
}
Тестирование сценария одобрения заказа происходит аналогичным способом:
@Test
void consumeOrderPlacedEvent_acceptsOrderWithEvenId() {
Long toBeAccepted = 2L;
var input = buildEvent(toBeAccepted);
kafkaTemplate.send(ORDER_PLACED_TOPIC, "2", input);
ConsumerRecord<String, OrderDispatchedEvent> consumed = KafkaTestUtils.getSingleRecord(outputConsumer, ORDER_DISPATCH_TOPIC, Duration.ofSeconds(10));
assertThat(consumed.value().getOrderId()).isEqualTo(toBeAccepted);
assertThat(consumed.value().getStatus()).isEqualTo(OrderDispatchStatus.ACCEPTED);
}
Запускаем тесты:
./gradlew test