package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.camunda.zeebe.broker.exporter.util.ControlledTestExporter;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionEvaluationListener;
import org.awaitility.core.EvaluatedCondition;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterDirectorDistributionTest.class */
public final class ExporterDirectorDistributionTest {
    private static final String EXPORTER_ID_1 = "exporter-1";
    private static final String EXPORTER_ID_2 = "exporter-2";
    private static final DirectBuffer EXPORTER_METADATA_1 = BufferUtil.wrapString("e1");
    private static final DirectBuffer EXPORTER_METADATA_2 = BufferUtil.wrapString("e2");
    private static final UnsafeBuffer NO_METADATA = new UnsafeBuffer();
    private static final Duration DISTRIBUTION_INTERVAL = Duration.ofSeconds(15);
    private final SimplePartitionMessageService simplePartitionMessageService = new SimplePartitionMessageService();

    @Rule
    public final ExporterRule activeExporters = ExporterRule.activeExporter().withPartitionMessageService(this.simplePartitionMessageService).withDistributionInterval(DISTRIBUTION_INTERVAL);

    @Rule
    public final ExporterRule passiveExporters = ExporterRule.passiveExporter().withPartitionMessageService(this.simplePartitionMessageService);
    private final List<ControlledTestExporter> exporters = new ArrayList();
    private final List<ExporterDescriptor> exporterDescriptors = new ArrayList();

    /* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterDirectorDistributionTest$ClockShifter.class */
    private static final class ClockShifter extends Record implements ConditionEvaluationListener<Void> {
        private final ControlledActorClock clock;

        private ClockShifter(ControlledActorClock controlledActorClock) {
            this.clock = controlledActorClock;
        }

        public void conditionEvaluated(EvaluatedCondition<Void> evaluatedCondition) {
            this.clock.addTime(ExporterDirectorDistributionTest.DISTRIBUTION_INTERVAL);
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ClockShifter.class), ClockShifter.class, "clock", "FIELD:Lio/camunda/zeebe/broker/exporter/stream/ExporterDirectorDistributionTest$ClockShifter;->clock:Lio/camunda/zeebe/scheduler/clock/ControlledActorClock;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ClockShifter.class), ClockShifter.class, "clock", "FIELD:Lio/camunda/zeebe/broker/exporter/stream/ExporterDirectorDistributionTest$ClockShifter;->clock:Lio/camunda/zeebe/scheduler/clock/ControlledActorClock;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ClockShifter.class, Object.class), ClockShifter.class, "clock", "FIELD:Lio/camunda/zeebe/broker/exporter/stream/ExporterDirectorDistributionTest$ClockShifter;->clock:Lio/camunda/zeebe/scheduler/clock/ControlledActorClock;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public ControlledActorClock clock() {
            return this.clock;
        }
    }

    @Before
    public void init() {
        this.exporters.clear();
        this.exporterDescriptors.clear();
        createExporter(EXPORTER_ID_1, EXPORTER_METADATA_1);
        createExporter(EXPORTER_ID_2, EXPORTER_METADATA_2);
    }

    @After
    public void tearDown() throws Exception {
        this.activeExporters.closeExporterDirector();
        this.passiveExporters.closeExporterDirector();
    }

    private void createExporter(String str, DirectBuffer directBuffer) {
        ControlledTestExporter controlledTestExporter = (ControlledTestExporter) Mockito.spy(new ControlledTestExporter());
        ExporterDescriptor exporterDescriptor = (ExporterDescriptor) Mockito.spy(new ExporterDescriptor(str, controlledTestExporter.getClass(), Map.of()));
        ((ExporterDescriptor) Mockito.doAnswer(invocationOnMock -> {
            return controlledTestExporter;
        }).when(exporterDescriptor)).newInstance();
        byte[] bufferAsArray = BufferUtil.bufferAsArray(directBuffer);
        controlledTestExporter.onExport(record -> {
            controlledTestExporter.getController().updateLastExportedRecordPosition(record.getPosition(), bufferAsArray);
        });
        this.exporters.add(controlledTestExporter);
        this.exporterDescriptors.add(exporterDescriptor);
    }

    private void startExporters(List<ExporterDescriptor> list) {
        this.activeExporters.startExporterDirector(list);
        this.passiveExporters.startExporterDirector(list);
    }

    @Test
    public void shouldDistributeExporterState() {
        startExporters(this.exporterDescriptors);
        long writeEvent = this.activeExporters.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
        ExportersState exportersState = this.activeExporters.getExportersState();
        Awaitility.await("Director has read all records and update the positions.").untilAsserted(() -> {
            Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(writeEvent);
            Assertions.assertThat(exportersState.getExporterMetadata(EXPORTER_ID_1)).isEqualTo(EXPORTER_METADATA_1);
            Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(writeEvent);
            Assertions.assertThat(exportersState.getExporterMetadata(EXPORTER_ID_2)).isEqualTo(EXPORTER_METADATA_2);
        });
        ExportersState exportersState2 = this.passiveExporters.getExportersState();
        Assertions.assertThat(exportersState2.getPosition(EXPORTER_ID_1)).isEqualTo(-1L);
        Assertions.assertThat(exportersState2.getExporterMetadata(EXPORTER_ID_1)).isEqualTo(NO_METADATA);
        Assertions.assertThat(exportersState2.getPosition(EXPORTER_ID_2)).isEqualTo(-1L);
        Assertions.assertThat(exportersState2.getExporterMetadata(EXPORTER_ID_2)).isEqualTo(NO_METADATA);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        Awaitility.await("Active Director has distributed positions and passive has received it").conditionEvaluationListener(new ClockShifter(this.activeExporters.getClock())).untilAsserted(() -> {
            Assertions.assertThat(exportersState2.getPosition(EXPORTER_ID_1)).isEqualTo(writeEvent);
            Assertions.assertThat(exportersState2.getExporterMetadata(EXPORTER_ID_1)).isEqualTo(EXPORTER_METADATA_1);
            Assertions.assertThat(exportersState2.getPosition(EXPORTER_ID_2)).isEqualTo(writeEvent);
            Assertions.assertThat(exportersState2.getExporterMetadata(EXPORTER_ID_2)).isEqualTo(EXPORTER_METADATA_2);
        });
    }

    @Test
    public void shouldNotResetExporterPositionWhenOldPositionReceived() {
        startExporters(this.exporterDescriptors);
        Awaitility.await("Exporter has recovered and started exporting.").untilAsserted(() -> {
            Assertions.assertThat((ExporterPhase) this.activeExporters.getDirector().getPhase().join()).isEqualTo(ExporterPhase.EXPORTING);
        });
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_1, 10L);
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_2, 10L);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        ExportersState exportersState = this.passiveExporters.getExportersState();
        Awaitility.await("Active Director has distributed positions and passive has received it").conditionEvaluationListener(new ClockShifter(this.activeExporters.getClock())).untilAsserted(() -> {
            Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(10L);
            Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(10L);
        });
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_1, 9L);
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_2, 11L);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        Awaitility.await("Active Director has distributed positions and passive has received it").conditionEvaluationListener(new ClockShifter(this.activeExporters.getClock())).untilAsserted(() -> {
            Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_2)).isEqualTo(11L);
        });
        Assertions.assertThat(exportersState.getPosition(EXPORTER_ID_1)).isEqualTo(10L);
    }
}
