package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.camunda.zeebe.broker.exporter.util.ControlledTestExporter;
import io.camunda.zeebe.broker.exporter.util.PojoConfigurationExporter;
import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.exporter.api.context.ScheduledTask;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.stream.impl.SkipPositionsFilter;
import io.camunda.zeebe.test.util.TestUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterDirectorTest.class */
public final class ExporterDirectorTest {
    private static final String EXPORTER_ID_1 = "exporter-1";
    private static final String EXPORTER_ID_2 = "exporter-2";
    private static final int TIMEOUT_MILLIS = 5000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(5000);

    @Rule
    public final ExporterRule rule = ExporterRule.activeExporter();
    private final List<ControlledTestExporter> exporters = new ArrayList();
    private final List<ExporterDescriptor> exporterDescriptors = new ArrayList();

    @Before
    public void init() {
        this.exporters.clear();
        this.exporterDescriptors.clear();
        createExporter(EXPORTER_ID_1, Collections.singletonMap("x", 1));
        createExporter(EXPORTER_ID_2, Collections.singletonMap("y", 2));
    }

    private void createExporter(String str, Map<String, Object> map) {
        ControlledTestExporter controlledTestExporter = (ControlledTestExporter) Mockito.spy(new ControlledTestExporter());
        ExporterDescriptor exporterDescriptor = (ExporterDescriptor) Mockito.spy(new ExporterDescriptor(str, controlledTestExporter.getClass(), map));
        ((ExporterDescriptor) Mockito.doAnswer(invocationOnMock -> {
            return controlledTestExporter;
        }).when(exporterDescriptor)).newInstance();
        this.exporters.add(controlledTestExporter);
        this.exporterDescriptors.add(exporterDescriptor);
    }

    private void startExporterDirector(List<ExporterDescriptor> list) {
        this.rule.startExporterDirector(list);
    }

    @Test
    public void shouldUpdatePositionWhenInitialRecordsAreSkipped() {
        ControlledTestExporter controlledTestExporter = this.exporters.get(1);
        this.exporters.forEach(controlledTestExporter2 -> {
            controlledTestExporter2.onConfigure(withFilter(List.of(RecordType.COMMAND), List.of(ValueType.DEPLOYMENT))).shouldAutoUpdatePosition(false);
        });
        startExporterDirector(this.exporterDescriptors);
        ExportersState exportersState = this.rule.getExportersState();
        long writeEvent = this.rule.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
        this.rule.writeCommand(DeploymentIntent.CREATE, new DeploymentRecord());
        Awaitility.await("director has read all records until now").atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assertions.assertThat(controlledTestExporter.getExportedRecords()).hasSize(1);
        });
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(writeEvent);
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(writeEvent);
    }

    @Test
    public void shouldUpdatePositionOfUpToDateExportersOnSkipRecord() {
        ControlledTestExporter controlledTestExporter = this.exporters.get(0);
        ControlledTestExporter controlledTestExporter2 = this.exporters.get(1);
        controlledTestExporter2.onConfigure(withFilter(List.of(RecordType.COMMAND), List.of(ValueType.DEPLOYMENT))).shouldAutoUpdatePosition(false);
        controlledTestExporter.onConfigure(withFilter(List.of(RecordType.COMMAND), List.of(ValueType.DEPLOYMENT))).shouldAutoUpdatePosition(false);
        startExporterDirector(this.exporterDescriptors);
        ExportersState exportersState = this.rule.getExportersState();
        long writeCommand = this.rule.writeCommand(DeploymentIntent.CREATE, new DeploymentRecord());
        Awaitility.await("filteringExporter has exported the first record").atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assertions.assertThat(controlledTestExporter.getExportedRecords()).hasSize(1);
        });
        controlledTestExporter.getController().updateLastExportedRecordPosition(writeCommand);
        long writeCommand2 = this.rule.writeCommand(IncidentIntent.CREATED, new IncidentRecord());
        this.rule.writeCommand(DeploymentIntent.CREATE, new DeploymentRecord());
        Awaitility.await("director has read all records until now").atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assertions.assertThat(controlledTestExporter2.getExportedRecords()).hasSize(2);
        });
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(writeCommand2);
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(-1L);
    }

    @Test
    public void shouldUpdateIfSkippingInitialRecordForSingleExporter() {
        ControlledTestExporter controlledTestExporter = this.exporters.get(0);
        ControlledTestExporter controlledTestExporter2 = this.exporters.get(1);
        controlledTestExporter2.onConfigure(withFilter(List.of(RecordType.COMMAND, RecordType.EVENT), List.of(ValueType.DEPLOYMENT))).shouldAutoUpdatePosition(false);
        controlledTestExporter.onConfigure(withFilter(List.of(RecordType.COMMAND), List.of(ValueType.DEPLOYMENT))).shouldAutoUpdatePosition(false);
        startExporterDirector(this.exporterDescriptors);
        ExportersState exportersState = this.rule.getExportersState();
        long writeEvent = this.rule.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
        this.rule.writeCommand(DeploymentIntent.CREATE, new DeploymentRecord());
        Awaitility.await("director has read all records until now").atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assertions.assertThat(controlledTestExporter2.getExportedRecords()).hasSize(2);
        });
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(writeEvent);
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(-1L);
    }

    @Test
    public void shouldRetryOpenCallIfFails() throws Exception {
        ControlledTestExporter startExporterWithFaultyOpenCall = startExporterWithFaultyOpenCall();
        Awaitility.await("exporter open has been retried").atMost(Duration.ofSeconds(10L)).untilAsserted(() -> {
            ((ControlledTestExporter) Mockito.verify(startExporterWithFaultyOpenCall, Mockito.times(2))).open((Controller) Mockito.any());
        });
        this.rule.closeExporterDirector();
    }

    @Test
    public void shouldExportAfterOpenRetried() throws Exception {
        ControlledTestExporter startExporterWithFaultyOpenCall = startExporterWithFaultyOpenCall();
        Awaitility.await("exporter open has been retried").atMost(Duration.ofSeconds(10L)).untilAsserted(() -> {
            ((ControlledTestExporter) Mockito.verify(startExporterWithFaultyOpenCall, Mockito.times(2))).open((Controller) Mockito.any());
        });
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        Awaitility.await("Exporter has exported all records").untilAsserted(() -> {
            Assertions.assertThat(startExporterWithFaultyOpenCall.getExportedRecords()).extracting((v0) -> {
                return v0.getPosition();
            }).containsExactly(new Long[]{Long.valueOf(writeEvent), Long.valueOf(writeEvent2)});
        });
        this.rule.closeExporterDirector();
    }

    @Test
    public void shouldNotStartExportingUntilExportersFinishOpening() throws Exception {
        writeEvent();
        ControlledTestExporter startExporterWithFaultyOpenCall = startExporterWithFaultyOpenCall();
        Awaitility.await("Record has been exported").until(() -> {
            return Boolean.valueOf(!startExporterWithFaultyOpenCall.getExportedRecords().isEmpty());
        });
        InOrder inOrder = Mockito.inOrder(new Object[]{startExporterWithFaultyOpenCall});
        ((ControlledTestExporter) inOrder.verify(startExporterWithFaultyOpenCall)).open((Controller) Mockito.any());
        ((ControlledTestExporter) inOrder.verify(startExporterWithFaultyOpenCall)).open((Controller) Mockito.any());
        ((ControlledTestExporter) inOrder.verify(startExporterWithFaultyOpenCall, Mockito.timeout(5000L))).export((Record) Mockito.any());
        this.rule.closeExporterDirector();
    }

    @Test
    public void shouldUpdateIfRecordSkipsSingleUpToDateExporter() {
        ControlledTestExporter controlledTestExporter = this.exporters.get(0);
        ControlledTestExporter controlledTestExporter2 = this.exporters.get(1);
        controlledTestExporter2.onConfigure(withFilter(List.of(RecordType.COMMAND, RecordType.EVENT), List.of(ValueType.DEPLOYMENT))).shouldAutoUpdatePosition(false);
        controlledTestExporter.onConfigure(withFilter(List.of(RecordType.COMMAND), List.of(ValueType.DEPLOYMENT))).shouldAutoUpdatePosition(false);
        startExporterDirector(this.exporterDescriptors);
        ExportersState exportersState = this.rule.getExportersState();
        long writeCommand = this.rule.writeCommand(DeploymentIntent.CREATE, new DeploymentRecord());
        Awaitility.await("filteringExporter has exported the first record").atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assertions.assertThat(controlledTestExporter.getExportedRecords()).hasSize(1);
        });
        controlledTestExporter.getController().updateLastExportedRecordPosition(writeCommand);
        long writeEvent = this.rule.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
        Awaitility.await("director has read all records until now").atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assertions.assertThat(controlledTestExporter2.getExportedRecords()).hasSize(2);
        });
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(writeEvent);
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(-1L);
    }

    @Test
    public void shouldConfigureAllExportersProperlyOnStart() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(this.exporters.size());
        this.exporters.forEach(controlledTestExporter -> {
            controlledTestExporter.onOpen(controller -> {
                countDownLatch.countDown();
            });
        });
        startExporterDirector(this.exporterDescriptors);
        Assertions.assertThat(countDownLatch.await(5000L, TimeUnit.MILLISECONDS)).isTrue();
        ((ControlledTestExporter) Mockito.verify(this.exporters.get(0), TIMEOUT)).open((Controller) Mockito.any());
        ((ControlledTestExporter) Mockito.verify(this.exporters.get(1), TIMEOUT)).open((Controller) Mockito.any());
        this.exporters.forEach(controlledTestExporter2 -> {
            Assertions.assertThat(controlledTestExporter2.getController()).isNotNull();
            Assertions.assertThat(controlledTestExporter2.getContext().getLogger()).isNotNull();
            Assertions.assertThat(controlledTestExporter2.getContext().getConfiguration()).isNotNull();
        });
        Context context = this.exporters.get(0).getContext();
        Assertions.assertThat(context.getConfiguration().getId()).isEqualTo(EXPORTER_ID_1);
        Assertions.assertThat(context.getConfiguration().getArguments()).isEqualTo(Collections.singletonMap("x", 1));
        Assertions.assertThat(context.getLogger().getName()).isEqualTo(Loggers.getExporterLogger(EXPORTER_ID_1).getName());
        Context context2 = this.exporters.get(1).getContext();
        Assertions.assertThat(context2.getConfiguration().getId()).isEqualTo(EXPORTER_ID_2);
        Assertions.assertThat(context2.getConfiguration().getArguments()).isEqualTo(Collections.singletonMap("y", 2));
        Assertions.assertThat(context2.getLogger().getName()).isEqualTo(Loggers.getExporterLogger(EXPORTER_ID_2).getName());
    }

    @Test
    public void shouldCloseAllExportersOnClose() throws Exception {
        startExporterDirector(this.exporterDescriptors);
        this.rule.closeExporterDirector();
        ((ControlledTestExporter) Mockito.verify(this.exporters.get(0), TIMEOUT)).close();
        ((ControlledTestExporter) Mockito.verify(this.exporters.get(1), TIMEOUT)).close();
    }

    @Test
    public void shouldInstantiateConfigurationClass() {
        HashMap hashMap = new HashMap();
        hashMap.put("bar", "baz");
        hashMap.put("y", Double.valueOf(32.12d));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("foo", "bar");
        hashMap2.put("x", 123);
        hashMap2.put("nested", hashMap);
        startExporterDirector(Collections.singletonList(new ExporterDescriptor("instantiateConfiguration", PojoConfigurationExporter.class, hashMap2)));
        TestUtil.waitUntil(() -> {
            return PojoConfigurationExporter.configuration != null;
        });
        PojoConfigurationExporter.PojoExporterConfiguration pojoExporterConfiguration = PojoConfigurationExporter.configuration;
        Assertions.assertThat(pojoExporterConfiguration.getFoo()).isEqualTo("bar");
        Assertions.assertThat(pojoExporterConfiguration.getX()).isEqualTo(123);
        Assertions.assertThat(pojoExporterConfiguration.getNested().getBar()).isEqualTo("baz");
        Assertions.assertThat(pojoExporterConfiguration.getNested().getY()).isEqualTo(32.12d);
    }

    @Test
    public void shouldApplyRecordFilter() {
        this.exporters.get(0).onConfigure(withFilter(Arrays.asList(RecordType.COMMAND, RecordType.EVENT), Collections.singletonList(ValueType.DEPLOYMENT)));
        this.exporters.get(1).onConfigure(withFilter(Collections.singletonList(RecordType.EVENT), Arrays.asList(ValueType.DEPLOYMENT, ValueType.JOB)));
        startExporterDirector(this.exporterDescriptors);
        long writeCommand = this.rule.writeCommand(DeploymentIntent.CREATE, new DeploymentRecord());
        long writeEvent = this.rule.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
        this.rule.writeEvent(IncidentIntent.CREATED, new IncidentRecord());
        long writeEvent2 = this.rule.writeEvent(JobIntent.CREATED, new JobRecord());
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() == 2;
        });
        Assertions.assertThat(this.exporters.get(0).getExportedRecords()).extracting((v0) -> {
            return v0.getPosition();
        }).hasSize(2).contains(new Long[]{Long.valueOf(writeCommand), Long.valueOf(writeEvent)});
        Assertions.assertThat(this.exporters.get(1).getExportedRecords()).extracting((v0) -> {
            return v0.getPosition();
        }).hasSize(2).contains(new Long[]{Long.valueOf(writeEvent), Long.valueOf(writeEvent2)});
    }

    @Test
    public void shouldNotExportSkipRecordsFilter() {
        this.exporters.get(1).onConfigure(withFilter(List.of(RecordType.COMMAND), List.of(ValueType.DEPLOYMENT)));
        this.rule.withPositionsToSkipFilter(SkipPositionsFilter.of(Set.of(1L)));
        startExporterDirector(this.exporterDescriptors);
        this.rule.writeCommand(DeploymentIntent.CREATE, new DeploymentRecord());
        this.rule.writeCommand(DeploymentIntent.CREATE, new DeploymentRecord());
        Awaitility.await("filteringExporter has exported only the second record").atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assertions.assertThat(this.exporters.get(1).getExportedRecords()).extracting((v0) -> {
                return v0.getPosition();
            }).containsExactly(new Long[]{2L});
        });
    }

    @Test
    public void shouldRetryExportingOnException() {
        AtomicLong atomicLong = new AtomicLong(3L);
        this.exporters.get(0).onExport(record -> {
            if (atomicLong.getAndDecrement() > 0) {
                throw new RuntimeException("Export failed (expected)");
            }
        });
        startExporterDirector(this.exporterDescriptors);
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        TestUtil.doRepeatedly(() -> {
            this.rule.getClock().addTime(Duration.ofSeconds(1L));
        }).until(r6 -> {
            return Boolean.valueOf(atomicLong.get() <= -2);
        });
        Awaitility.await("Exporter %s has exported all records".formatted(EXPORTER_ID_1)).untilAsserted(() -> {
            Assertions.assertThat(this.exporters.get(0).getExportedRecords()).extracting((v0) -> {
                return v0.getPosition();
            }).containsExactly(new Long[]{Long.valueOf(writeEvent), Long.valueOf(writeEvent2)});
        });
        Awaitility.await("Exporter %s has exported all records".formatted(EXPORTER_ID_2)).untilAsserted(() -> {
            Assertions.assertThat(this.exporters.get(1).getExportedRecords()).extracting((v0) -> {
                return v0.getPosition();
            }).containsExactly(new Long[]{Long.valueOf(writeEvent), Long.valueOf(writeEvent2)});
        });
    }

    @Test
    public void shouldExecuteScheduledTask() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Duration ofSeconds = Duration.ofSeconds(10L);
        this.exporters.get(0).onExport(record -> {
            Controller controller = this.exporters.get(0).getController();
            Objects.requireNonNull(countDownLatch);
            controller.scheduleCancellableTask(ofSeconds, countDownLatch::countDown);
            countDownLatch2.countDown();
        });
        startExporterDirector(this.exporterDescriptors);
        writeEvent();
        Assertions.assertThat(countDownLatch2.await(5L, TimeUnit.SECONDS)).isTrue();
        this.rule.getClock().addTime(ofSeconds);
        Assertions.assertThat(countDownLatch.await(5L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldExecuteScheduledCancellableTask() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Duration ofSeconds = Duration.ofSeconds(10L);
        ControlledTestExporter controlledTestExporter = this.exporters.get(0);
        controlledTestExporter.onExport(record -> {
            Controller controller = controlledTestExporter.getController();
            Objects.requireNonNull(countDownLatch);
            controller.scheduleCancellableTask(ofSeconds, countDownLatch::countDown);
            countDownLatch2.countDown();
        });
        startExporterDirector(this.exporterDescriptors);
        writeEvent();
        Assertions.assertThat(countDownLatch2.await(5L, TimeUnit.SECONDS)).isTrue();
        this.rule.getClock().addTime(ofSeconds);
        Assertions.assertThat(countDownLatch.await(5L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldCancelScheduledCancellableTask() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Duration ofSeconds = Duration.ofSeconds(10L);
        ControlledTestExporter controlledTestExporter = this.exporters.get(0);
        AtomicLong atomicLong = new AtomicLong(0L);
        controlledTestExporter.onExport(record -> {
            ScheduledTask scheduleCancellableTask = controlledTestExporter.getController().scheduleCancellableTask(ofSeconds, () -> {
                atomicLong.set(1L);
            });
            Controller controller = controlledTestExporter.getController();
            Objects.requireNonNull(countDownLatch2);
            controller.scheduleCancellableTask(ofSeconds, countDownLatch2::countDown);
            scheduleCancellableTask.cancel();
            countDownLatch.countDown();
        });
        startExporterDirector(this.exporterDescriptors);
        writeEvent();
        Assertions.assertThat(countDownLatch.await(5L, TimeUnit.SECONDS)).isTrue();
        this.rule.getClock().addTime(ofSeconds);
        Assertions.assertThat(countDownLatch2.await(5L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(atomicLong.get()).isZero();
    }

    @Test
    public void shouldRecoverPositionsFromState() throws Exception {
        startExporterDirector(this.exporterDescriptors);
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        TestUtil.waitUntil(() -> {
            return this.exporters.get(0).getExportedRecords().size() == 2;
        });
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() == 2;
        });
        this.exporters.get(0).getController().updateLastExportedRecordPosition(writeEvent2);
        this.exporters.get(1).getController().updateLastExportedRecordPosition(writeEvent);
        this.rule.closeExporterDirector();
        this.exporters.get(0).getExportedRecords().clear();
        this.exporters.get(1).getExportedRecords().clear();
        startExporterDirector(this.exporterDescriptors);
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() >= 1;
        });
        Assertions.assertThat(this.exporters.get(0).getExportedRecords()).isEmpty();
        Assertions.assertThat(this.exporters.get(1).getExportedRecords()).extracting((v0) -> {
            return v0.getPosition();
        }).hasSize(1).contains(new Long[]{Long.valueOf(writeEvent2)});
    }

    @Test
    public void shouldRecoverMetadataFromState() throws Exception {
        startExporterDirector(this.exporterDescriptors);
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        byte[] bytes = "e1".getBytes();
        byte[] bytes2 = "e2".getBytes();
        Awaitility.await("wait until the exporters read the records").until(() -> {
            return Boolean.valueOf(this.exporters.get(0).getExportedRecords().size() == 2 && this.exporters.get(1).getExportedRecords().size() == 2);
        });
        this.exporters.get(0).getController().updateLastExportedRecordPosition(writeEvent2, bytes);
        this.exporters.get(1).getController().updateLastExportedRecordPosition(writeEvent, bytes2);
        this.rule.closeExporterDirector();
        this.exporters.get(0).getExportedRecords().clear();
        this.exporters.get(1).getExportedRecords().clear();
        startExporterDirector(this.exporterDescriptors);
        Awaitility.await("wait until the exporters are opened").until(() -> {
            return Boolean.valueOf(this.exporters.get(1).getExportedRecords().size() >= 1);
        });
        Assertions.assertThat(this.exporters.get(0).getController().readMetadata()).hasValue(bytes);
        Assertions.assertThat(this.exporters.get(1).getController().readMetadata()).hasValue(bytes2);
    }

    @Test
    public void shouldNotUpdatePositionToSmallerValue() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.exporters.get(0).onOpen(controller -> {
            countDownLatch.countDown();
        });
        startExporterDirector(this.exporterDescriptors);
        countDownLatch.await();
        this.exporters.get(0).getController().updateLastExportedRecordPosition(1L);
        Long l = (Long) Awaitility.await().until(() -> {
            return Long.valueOf(this.rule.getExportersState().getPosition(EXPORTER_ID_1));
        }, l2 -> {
            return l2.longValue() > -1;
        });
        this.exporters.get(0).getController().updateLastExportedRecordPosition(-1L);
        Assertions.assertThat(this.rule.getExportersState().getPosition(EXPORTER_ID_1)).isEqualTo(l);
    }

    @Test
    public void shouldUpdateLastExportedPositionOnClose() throws Exception {
        startExporterDirector(this.exporterDescriptors);
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        TestUtil.waitUntil(() -> {
            return this.exporters.get(0).getExportedRecords().size() == 2;
        });
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() == 2;
        });
        this.exporters.get(0).onClose(() -> {
            this.exporters.get(0).getController().updateLastExportedRecordPosition(writeEvent);
        });
        this.rule.closeExporterDirector();
        this.exporters.get(0).getExportedRecords().clear();
        this.exporters.get(1).getExportedRecords().clear();
        startExporterDirector(this.exporterDescriptors);
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() >= 2;
        });
        Assertions.assertThat(this.exporters.get(0).getExportedRecords()).extracting((v0) -> {
            return v0.getPosition();
        }).hasSize(1).contains(new Long[]{Long.valueOf(writeEvent2)});
        Assertions.assertThat(this.exporters.get(1).getExportedRecords()).extracting((v0) -> {
            return v0.getPosition();
        }).hasSize(2).contains(new Long[]{Long.valueOf(writeEvent), Long.valueOf(writeEvent2)});
    }

    @Test
    public void shouldRemoveExporterFromState() throws Exception {
        startExporterDirector(this.exporterDescriptors);
        long writeEvent = writeEvent();
        TestUtil.waitUntil(() -> {
            return this.exporters.get(0).getExportedRecords().size() == 1;
        });
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() == 1;
        });
        this.exporters.get(0).getController().updateLastExportedRecordPosition(writeEvent);
        this.exporters.get(1).getController().updateLastExportedRecordPosition(writeEvent);
        this.rule.closeExporterDirector();
        startExporterDirector(Collections.singletonList(this.exporterDescriptors.get(0)));
        ((ControlledTestExporter) Mockito.verify(this.exporters.get(0), TIMEOUT.times(2))).open((Controller) Mockito.any());
        ExportersState exportersState = this.rule.getExportersState();
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(writeEvent);
        TestUtil.waitUntil(() -> {
            return exportersState.getPosition(EXPORTER_ID_2) == -1;
        });
    }

    @Test
    public void shouldRecoverFromStartWithNonUpdatingExporter() throws Exception {
        startExporterDirector(this.exporterDescriptors);
        long writeEvent = writeEvent();
        TestUtil.waitUntil(() -> {
            return this.exporters.get(0).getExportedRecords().size() == 1;
        });
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() == 1;
        });
        this.exporters.get(1).getController().updateLastExportedRecordPosition(writeEvent);
        this.rule.closeExporterDirector();
        this.exporters.get(0).getExportedRecords().clear();
        this.exporters.get(1).getExportedRecords().clear();
        startExporterDirector(this.exporterDescriptors);
        TestUtil.waitUntil(() -> {
            return this.exporters.get(0).getExportedRecords().size() >= 1;
        });
        Assertions.assertThat(this.exporters.get(0).getExportedRecords()).extracting((v0) -> {
            return v0.getPosition();
        }).containsExactly(new Long[]{Long.valueOf(writeEvent)});
        Assertions.assertThat(this.exporters.get(1).getExportedRecords()).isEmpty();
    }

    @Test
    public void shouldStartContainersSoftPaused() {
        this.rule.startExporterDirector(this.exporterDescriptors, ExporterPhase.SOFT_PAUSED);
        writeEvent();
        writeEvent();
        this.exporters.get(0).shouldAutoUpdatePosition(true);
        this.exporters.get(1).shouldAutoUpdatePosition(true);
        TestUtil.waitUntil(() -> {
            return this.exporters.get(0).getExportedRecords().size() == 2;
        });
        TestUtil.waitUntil(() -> {
            return this.exporters.get(1).getExportedRecords().size() == 2;
        });
        Assertions.assertThat(this.rule.getExportersState().getPosition(EXPORTER_ID_1)).isEqualTo(-1L);
        Assertions.assertThat(this.rule.getExportersState().getPosition(EXPORTER_ID_2)).isEqualTo(-1L);
    }

    private long writeEvent() {
        return this.rule.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
    }

    private Consumer<Context> withFilter(List<RecordType> list, List<ValueType> list2) {
        return context -> {
            context.setFilter(new Context.RecordFilter(this) { // from class: io.camunda.zeebe.broker.exporter.stream.ExporterDirectorTest.1
                public boolean acceptType(RecordType recordType) {
                    return list.contains(recordType);
                }

                public boolean acceptValue(ValueType valueType) {
                    return list2.contains(valueType);
                }
            });
        };
    }

    private ControlledTestExporter startExporterWithFaultyOpenCall() {
        ControlledTestExporter controlledTestExporter = (ControlledTestExporter) Mockito.spy(new ControlledTestExporter());
        ((ControlledTestExporter) Mockito.doThrow(new Throwable[]{new RuntimeException("open failed")}).doCallRealMethod().when(controlledTestExporter)).open((Controller) Mockito.any());
        ExporterDescriptor exporterDescriptor = (ExporterDescriptor) Mockito.spy(new ExporterDescriptor("exporter-failing", controlledTestExporter.getClass(), Collections.singletonMap("x", 1)));
        ((ExporterDescriptor) Mockito.doAnswer(invocationOnMock -> {
            return controlledTestExporter;
        }).when(exporterDescriptor)).newInstance();
        startExporterDirector(List.of(exporterDescriptor));
        return controlledTestExporter;
    }
}
