/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.optimize.service.importing;

import com.google.common.collect.ImmutableMap;
import io.camunda.optimize.AbstractCCSMIT;
import io.camunda.optimize.dto.optimize.query.process.FlowNodeInstanceDto;
import io.camunda.optimize.dto.zeebe.ZeebeRecordDto;
import io.camunda.optimize.dto.zeebe.process.ZeebeProcessInstanceRecordDto;
import io.camunda.optimize.service.importing.PositionBasedImportIndexHandler;
import io.camunda.optimize.service.importing.zeebe.fetcher.es.AbstractZeebeRecordFetcherES;
import io.camunda.optimize.service.importing.zeebe.fetcher.os.AbstractZeebeRecordFetcherOS;
import io.camunda.optimize.util.ZeebeBpmnModels;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.github.netmikey.logunit.api.LogCapturer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.text.StringSubstitutor;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.event.LoggingEvent;

public class ZeebePositionBasedImportIndexIT
extends AbstractCCSMIT {
    public static final OffsetDateTime BEGINNING_OF_TIME = OffsetDateTime.ofInstant(Instant.EPOCH, ZoneId.systemDefault());
    @RegisterExtension
    @Order(value=1)
    private final LogCapturer positionBasedHandlerLogs = LogCapturer.create().captureForType(PositionBasedImportIndexHandler.class);
    @RegisterExtension
    @Order(value=2)
    private final LogCapturer zeebeFetcherLogs = LogCapturer.create().captureForType(AbstractZeebeRecordFetcherES.class).captureForType(AbstractZeebeRecordFetcherOS.class);

    @Test
    public void importPositionIsZeroIfNothingIsImportedYet() {
        List<PositionBasedImportIndexHandler> positionBasedHandlers = embeddedOptimizeExtension.getAllPositionBasedImportHandlers();
        ((ListAssert)Assertions.assertThat(positionBasedHandlers).hasSize(10)).allSatisfy(handler -> {
            Assertions.assertThat((long)handler.getPersistedPositionOfLastEntity()).isZero();
            Assertions.assertThat((long)handler.getPendingSequenceOfLastEntity()).isZero();
            Assertions.assertThat((OffsetDateTime)handler.getTimestampOfLastPersistedEntity()).isEqualTo((Object)BEGINNING_OF_TIME);
            Assertions.assertThat((OffsetDateTime)handler.getLastImportExecutionTimestamp()).isEqualTo((Object)BEGINNING_OF_TIME);
            Assertions.assertThat((boolean)handler.isHasSeenSequenceField()).isFalse();
        });
    }

    @Test
    public void latestPositionImportIndexesAreRestoredAfterRestartOfOptimize() {
        this.deployZeebeData();
        this.importAllZeebeEntitiesFromScratch();
        embeddedOptimizeExtension.storeImportIndexesToElasticsearch();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        List<Long> positionsBeforeRestart = this.getCurrentHandlerPositions();
        List<OffsetDateTime> lastImportedEntityTimestamps = this.getLastImportedEntityTimestamps();
        this.startAndUseNewOptimizeInstance();
        this.setupZeebeImportAndReloadConfiguration();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        ((ListAssert)Assertions.assertThat(this.getCurrentHandlerPositions()).anySatisfy(position -> Assertions.assertThat((Long)position).isPositive())).isEqualTo(positionsBeforeRestart);
        Assertions.assertThat(this.getLastImportedEntityTimestamps()).isEqualTo(lastImportedEntityTimestamps);
    }

    @Test
    public void latestSequenceImportIndexesAreRestoredAfterRestartOfOptimize() {
        this.deployZeebeData();
        this.importAllZeebeEntitiesFromScratch();
        embeddedOptimizeExtension.storeImportIndexesToElasticsearch();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        List<Long> sequencesBeforeRestart = this.getCurrentHandlerSequences();
        List<OffsetDateTime> lastImportedEntityTimestamps = this.getLastImportedEntityTimestamps();
        this.startAndUseNewOptimizeInstance();
        this.setupZeebeImportAndReloadConfiguration();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        ((ListAssert)Assertions.assertThat(this.getCurrentHandlerSequences()).anySatisfy(sequence -> Assertions.assertThat((Long)sequence).isPositive())).isEqualTo(sequencesBeforeRestart);
        Assertions.assertThat(this.getLastImportedEntityTimestamps()).isEqualTo(lastImportedEntityTimestamps);
        ((ListAssert)Assertions.assertThat(embeddedOptimizeExtension.getAllPositionBasedImportHandlers()).filteredOn(handler -> handler.getPersistedSequenceOfLastEntity() > 0L)).anySatisfy(handler -> Assertions.assertThat((boolean)handler.isHasSeenSequenceField()).isTrue());
    }

    @Test
    public void importIndexCanBeReset() {
        this.deployZeebeData();
        this.importAllZeebeEntitiesFromScratch();
        embeddedOptimizeExtension.storeImportIndexesToElasticsearch();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        embeddedOptimizeExtension.resetPositionBasedImportStartIndexes();
        embeddedOptimizeExtension.storeImportIndexesToElasticsearch();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        Assertions.assertThat(this.getCurrentHandlerPositions()).allSatisfy(position -> Assertions.assertThat((Long)position).isZero());
        Assertions.assertThat(this.getCurrentHandlerSequences()).allSatisfy(sequence -> Assertions.assertThat((Long)sequence).isZero());
        Assertions.assertThat(this.getLastImportedEntityTimestamps()).allSatisfy(timestamp -> Assertions.assertThat((OffsetDateTime)timestamp).isEqualTo((Object)BEGINNING_OF_TIME));
        Assertions.assertThat(embeddedOptimizeExtension.getAllPositionBasedImportHandlers()).allSatisfy(handler -> Assertions.assertThat((boolean)handler.isHasSeenSequenceField()).isFalse());
    }

    @Test
    public void recordsAreFetchedWithSequenceOrPosition() {
        embeddedOptimizeExtension.getConfigurationService().getConfiguredZeebe().setMaxImportPageSize(1);
        embeddedOptimizeExtension.reloadConfiguration();
        this.deployAndStartInstanceForProcess(ZeebeBpmnModels.createSimpleServiceTaskProcess("aProcess"));
        zeebeExtension.completeTaskForInstanceWithJobType("service_task");
        this.waitUntilMinimumDataExportedCount(3, "process-instance", this.getQueryForProcessableProcessInstanceEvents());
        this.removeSequenceFieldOfProcessRecords();
        this.updatePositionOfStartEventCompletedRecords();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        this.importAllZeebeEntitiesFromScratch();
        ((ObjectAssert)Assertions.assertThat(databaseIntegrationTestExtension.getAllProcessInstances()).singleElement()).satisfies(new ThrowingConsumer[]{instance -> Assertions.assertThat((List)instance.getFlowNodeInstances()).isEmpty()});
        this.importAllZeebeEntitiesFromLastIndex();
        this.importAllZeebeEntitiesFromLastIndex();
        ((ObjectAssert)Assertions.assertThat(databaseIntegrationTestExtension.getAllProcessInstances()).singleElement()).satisfies(new ThrowingConsumer[]{instance -> ((ObjectAssert)Assertions.assertThat((List)instance.getFlowNodeInstances()).singleElement()).extracting(new Function[]{FlowNodeInstanceDto::getFlowNodeType, FlowNodeInstanceDto::getEndDate}).containsExactly(new Object[]{"startEvent", null})});
        this.positionBasedHandlerLogs.assertContains("First Zeebe record with sequence field for import type zeebeProcessInstanceImportIndex has been imported. Zeebe records will now be fetched based on sequence.");
        this.importAllZeebeEntitiesFromLastIndex();
        this.importAllZeebeEntitiesFromLastIndex();
        this.importAllZeebeEntitiesFromLastIndex();
        embeddedOptimizeExtension.storeImportIndexesToElasticsearch();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        Assertions.assertThat(this.getCurrentHandlerPositions()).contains((Object[])new Long[]{9999L});
        ((ObjectAssert)Assertions.assertThat(databaseIntegrationTestExtension.getAllProcessInstances()).singleElement()).satisfies(new ThrowingConsumer[]{instance -> {
            ((ObjectAssert)Assertions.assertThat((List)instance.getFlowNodeInstances()).singleElement()).extracting(FlowNodeInstanceDto::getFlowNodeType).isEqualTo((Object)"startEvent");
            ((ObjectAssert)Assertions.assertThat((List)instance.getFlowNodeInstances()).singleElement()).extracting(FlowNodeInstanceDto::getEndDate).isNotNull();
        }});
    }

    @Test
    public void dynamicRecordQueryingIsUsedToFetchNewUnreachableData() {
        embeddedOptimizeExtension.getConfigurationService().getConfiguredZeebe().getImportConfig().setMaxEmptyPagesToImport(3);
        embeddedOptimizeExtension.reloadConfiguration();
        this.deployAndStartInstanceForProcess(ZeebeBpmnModels.createStartEndProcess("aProcess"));
        this.waitUntilInstanceRecordWithElementTypeAndIntentExported(BpmnElementType.PROCESS, (Intent)ProcessInstanceIntent.ELEMENT_COMPLETED);
        List<ZeebeProcessInstanceRecordDto> allProcessInstanceRecords = this.getZeebeExportedProcessInstances();
        allProcessInstanceRecords.sort(Comparator.comparing(ZeebeRecordDto::getPosition));
        this.updateSequenceOfAllProcessInstanceRecords(5000L);
        this.updateSequenceOfRecordWithPosition(allProcessInstanceRecords.get(0).getPosition(), 1L);
        this.updateSequenceOfRecordWithPosition(allProcessInstanceRecords.get(allProcessInstanceRecords.size() - 1).getPosition(), 5100L);
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        embeddedOptimizeExtension.getConfigurationService().getConfiguredZeebe().setMaxImportPageSize(1);
        embeddedOptimizeExtension.reloadConfiguration();
        this.importAllZeebeEntitiesFromScratch();
        ((ObjectAssert)Assertions.assertThat(databaseIntegrationTestExtension.getAllProcessInstances()).singleElement()).satisfies(new ThrowingConsumer[]{instance -> {
            Assertions.assertThat((String)instance.getState()).isEqualTo("ACTIVE");
            Assertions.assertThat((List)instance.getFlowNodeInstances()).isEmpty();
        }});
        embeddedOptimizeExtension.storeImportIndexesToElasticsearch();
        databaseIntegrationTestExtension.refreshAllOptimizeIndices();
        embeddedOptimizeExtension.getConfigurationService().getConfiguredZeebe().setMaxImportPageSize(20);
        embeddedOptimizeExtension.reloadConfiguration();
        this.importAllZeebeEntitiesFromLastIndex();
        this.importAllZeebeEntitiesFromLastIndex();
        this.importAllZeebeEntitiesFromLastIndex();
        ((ObjectAssert)Assertions.assertThat(databaseIntegrationTestExtension.getAllProcessInstances()).singleElement()).satisfies(new ThrowingConsumer[]{instance -> {
            Assertions.assertThat((String)instance.getState()).isEqualTo("ACTIVE");
            Assertions.assertThat((List)instance.getFlowNodeInstances()).isEmpty();
        }});
        this.importAllZeebeEntitiesFromLastIndex();
        List allLoggedEvents = this.zeebeFetcherLogs.getEvents().stream().map(LoggingEvent::getMessage).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat(allLoggedEvents).filteredOn(loggedMessage -> loggedMessage.contains("Using the position query to see if there are new records in the process-instance index"))).hasSize(1);
        ((ListAssert)Assertions.assertThat(allLoggedEvents).filteredOn(loggedMessage -> loggedMessage.contains("that can't be imported by the current sequence query. Will revert to position query for the next fetch attempt"))).hasSize(1);
        this.importAllZeebeEntitiesFromLastIndex();
        ((ObjectAssert)Assertions.assertThat(databaseIntegrationTestExtension.getAllProcessInstances()).singleElement()).satisfies(new ThrowingConsumer[]{instance -> {
            Assertions.assertThat((String)instance.getState()).isEqualTo("COMPLETED");
            Assertions.assertThat((List)instance.getFlowNodeInstances()).hasSize(2);
        }});
    }

    @Test
    public void dynamicRecordQueryingIsUsedToFetchNewUnreachableData_noUnreachableData() {
        embeddedOptimizeExtension.getConfigurationService().getConfiguredZeebe().setMaxImportPageSize(1000);
        embeddedOptimizeExtension.getConfigurationService().getConfiguredZeebe().getImportConfig().setMaxEmptyPagesToImport(3);
        embeddedOptimizeExtension.reloadConfiguration();
        this.deployAndStartInstanceForProcess(ZeebeBpmnModels.createStartEndProcess("aProcess"));
        this.waitUntilInstanceRecordWithElementTypeAndIntentExported(BpmnElementType.PROCESS, (Intent)ProcessInstanceIntent.ELEMENT_COMPLETED);
        this.importAllZeebeEntitiesFromScratch();
        ((ObjectAssert)Assertions.assertThat(databaseIntegrationTestExtension.getAllProcessInstances()).singleElement()).satisfies(new ThrowingConsumer[]{instance -> Assertions.assertThat((String)instance.getState()).isEqualTo("COMPLETED")});
        this.importAllZeebeEntitiesFromLastIndex();
        this.importAllZeebeEntitiesFromLastIndex();
        this.importAllZeebeEntitiesFromLastIndex();
        this.importAllZeebeEntitiesFromLastIndex();
        Assertions.assertThat((List)this.zeebeFetcherLogs.getEvents()).extracting(LoggingEvent::getMessage).anyMatch(eventLog -> eventLog.contains("Using the position query to see if there are new records in the process-instance index"));
        Assertions.assertThat((List)this.zeebeFetcherLogs.getEvents()).extracting(LoggingEvent::getMessage).anyMatch(eventLog -> eventLog.contains("There are no newer records to process, so empty pages of records are currently expected"));
    }

    private void deployZeebeData() {
        this.deployAndStartInstanceForProcess(ZeebeBpmnModels.createSimpleServiceTaskProcess("firstProcess"));
        this.deployAndStartInstanceForProcess(ZeebeBpmnModels.createSimpleServiceTaskProcess("secondProcess"));
        this.waitUntilMinimumDataExportedCount(8, "process-instance", this.getQueryForProcessableProcessInstanceEvents());
    }

    private void removeSequenceFieldOfProcessRecords() {
        StringSubstitutor substitutor = new StringSubstitutor((Map)ImmutableMap.builder().put((Object)"sequenceFieldName", (Object)"sequence").build());
        databaseIntegrationTestExtension.updateZeebeProcessRecordsOfBpmnElementTypeForPrefix(zeebeExtension.getZeebeRecordPrefix(), BpmnElementType.PROCESS, substitutor.replace("ctx._source.remove(\"${sequenceFieldName}\");"));
    }

    private void updatePositionOfStartEventCompletedRecords() {
        StringSubstitutor substitutor = new StringSubstitutor((Map)ImmutableMap.builder().put((Object)"positionField", (Object)"position").put((Object)"intentField", (Object)"intent").build());
        databaseIntegrationTestExtension.updateZeebeProcessRecordsOfBpmnElementTypeForPrefix(zeebeExtension.getZeebeRecordPrefix(), BpmnElementType.START_EVENT, substitutor.replace("if (ctx._source.${intentField}.equals('ELEMENT_COMPLETED')) { ctx._source.${positionField} = 9999; }"));
    }

    private void updateSequenceOfAllProcessInstanceRecords(long sequence) {
        databaseIntegrationTestExtension.updateZeebeRecordsForPrefix(zeebeExtension.getZeebeRecordPrefix(), "process-instance", this.getUpdateScript(sequence));
    }

    private void updateSequenceOfRecordWithPosition(long position, long sequence) {
        databaseIntegrationTestExtension.updateZeebeRecordsWithPositionForPrefix(zeebeExtension.getZeebeRecordPrefix(), "process-instance", position, this.getUpdateScript(sequence));
    }

    private String getUpdateScript(long sequence) {
        StringSubstitutor substitutor = new StringSubstitutor((Map)ImmutableMap.builder().put((Object)"fieldName", (Object)"sequence").put((Object)"sequence", (Object)String.valueOf(sequence)).build());
        return substitutor.replace("ctx._source.${fieldName} = ${sequence};");
    }

    private List<Long> getCurrentHandlerPositions() {
        return embeddedOptimizeExtension.getAllPositionBasedImportHandlers().stream().map(PositionBasedImportIndexHandler::getPersistedPositionOfLastEntity).collect(Collectors.toList());
    }

    private List<Long> getCurrentHandlerSequences() {
        return embeddedOptimizeExtension.getAllPositionBasedImportHandlers().stream().map(PositionBasedImportIndexHandler::getPersistedSequenceOfLastEntity).collect(Collectors.toList());
    }

    private List<OffsetDateTime> getLastImportedEntityTimestamps() {
        return embeddedOptimizeExtension.getAllPositionBasedImportHandlers().stream().map(PositionBasedImportIndexHandler::getTimestampOfLastPersistedEntity).collect(Collectors.toList());
    }

    private List<ZeebeProcessInstanceRecordDto> getZeebeExportedProcessInstances() {
        String expectedIndex = zeebeExtension.getZeebeRecordPrefix() + "-process-instance";
        return databaseIntegrationTestExtension.getAllDocumentsOfIndexAs(expectedIndex, ZeebeProcessInstanceRecordDto.class);
    }
}

