package io.camunda.tasklist.zeebeimport;

import io.camunda.tasklist.Metrics;
import io.camunda.tasklist.entities.meta.ImportPositionEntity;
import io.camunda.tasklist.exceptions.NoSuchIndexException;
import io.camunda.tasklist.exceptions.TasklistRuntimeException;
import io.camunda.tasklist.property.TasklistProperties;
import io.camunda.tasklist.util.ThreadUtil;
import io.camunda.tasklist.zeebe.ImportValueType;
import io.camunda.zeebe.protocol.Protocol;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/* loaded from: input_file:BOOT-INF/lib/tasklist-importer-8.6.0-alpha1-rc1.jar:io/camunda/tasklist/zeebeimport/RecordsReaderAbstract.class */
public abstract class RecordsReaderAbstract implements RecordsReader, Runnable {
    public static final String PARTITION_ID_FIELD_NAME = "partitionId";
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RecordsReaderAbstract.class);

    @Autowired
    protected TasklistProperties tasklistProperties;

    @Autowired
    protected Metrics metrics;
    protected final int partitionId;
    protected final ImportValueType importValueType;
    protected final long maxPossibleSequence;
    protected int countEmptyRuns;

    @Autowired
    private ImportPositionHolder importPositionHolder;

    @Autowired
    private BeanFactory beanFactory;

    @Autowired
    @Qualifier("recordsReaderThreadPoolExecutor")
    private ThreadPoolTaskScheduler readersExecutor;

    @Autowired
    @Qualifier("importThreadPoolExecutor")
    private ThreadPoolTaskExecutor importExecutor;
    private ImportJob pendingImportJob;
    private final ReentrantLock schedulingImportJobLock = new ReentrantLock();
    private boolean ongoingRescheduling;
    private final BlockingQueue<Callable<Boolean>> importJobs;
    private Callable<Boolean> active;

    public RecordsReaderAbstract(int i, ImportValueType importValueType, int i2) {
        this.partitionId = i;
        this.importValueType = importValueType;
        this.importJobs = new LinkedBlockingQueue(i2);
        this.maxPossibleSequence = Protocol.encodePartitionId(i + 1, 0L) - 1;
    }

    @Override // java.lang.Runnable
    public void run() {
        readAndScheduleNextBatch();
    }

    @Override // io.camunda.tasklist.zeebeimport.RecordsReader
    public int readAndScheduleNextBatch(boolean z) {
        ImportBatch readNextBatchByPositionAndPartition;
        int readerBackoff = this.tasklistProperties.getImporter().getReaderBackoff();
        boolean isUseOnlyPosition = this.tasklistProperties.getImporter().isUseOnlyPosition();
        try {
            ImportPositionEntity latestScheduledPosition = this.importPositionHolder.getLatestScheduledPosition(this.importValueType.getAliasTemplate(), this.partitionId);
            if (isUseOnlyPosition || latestScheduledPosition == null || latestScheduledPosition.getSequence() <= 0) {
                LOGGER.debug("Use import for {} ( {} ) by position", this.importValueType.name(), Integer.valueOf(this.partitionId));
                readNextBatchByPositionAndPartition = readNextBatchByPositionAndPartition(latestScheduledPosition.getPosition(), null);
            } else {
                LOGGER.debug("Use import for {} ( {} ) by sequence", this.importValueType.name(), Integer.valueOf(this.partitionId));
                readNextBatchByPositionAndPartition = readNextBatchBySequence(Long.valueOf(latestScheduledPosition.getSequence()));
            }
            Integer num = null;
            if (readNextBatchByPositionAndPartition.getHits().size() == 0) {
                num = Integer.valueOf(readerBackoff);
            } else {
                if (!scheduleImportJob(createImportJob(latestScheduledPosition, readNextBatchByPositionAndPartition), !z)) {
                    return 0;
                }
            }
            if (z) {
                rescheduleReader(num);
            }
            return readNextBatchByPositionAndPartition.getHits().size();
        } catch (NoSuchIndexException e) {
            if (!z) {
                return 0;
            }
            rescheduleReader(Integer.valueOf(readerBackoff));
            return 0;
        } catch (Exception e2) {
            LOGGER.error(e2.getMessage(), (Throwable) e2);
            if (!z) {
                return 0;
            }
            rescheduleReader(null);
            return 0;
        }
    }

    @Override // io.camunda.tasklist.zeebeimport.RecordsReader
    public int readAndScheduleNextBatch() {
        return readAndScheduleNextBatch(true);
    }

    @Override // io.camunda.tasklist.zeebeimport.RecordsReader
    public ImportBatch readNextBatchBySequence(Long l) throws NoSuchIndexException {
        return readNextBatchBySequence(l, null);
    }

    @Override // io.camunda.tasklist.zeebeimport.RecordsReader
    public boolean tryToScheduleImportJob(ImportJob importJob, boolean z) {
        return ((Boolean) withReschedulingImportJobLock(() -> {
            boolean z2 = false;
            int i = 3;
            while (true) {
                int i2 = i;
                if (z2 || i2 <= 0) {
                    break;
                }
                z2 = this.importJobs.offer(executeJob(importJob));
                i = i2 - 1;
            }
            this.pendingImportJob = (z || z2) ? null : importJob;
            if (z2 && this.active == null) {
                executeNext();
            }
            return Boolean.valueOf(z2);
        })).booleanValue();
    }

    @Override // io.camunda.tasklist.zeebeimport.RecordsReader
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override // io.camunda.tasklist.zeebeimport.RecordsReader
    public ImportValueType getImportValueType() {
        return this.importValueType;
    }

    @Override // io.camunda.tasklist.zeebeimport.RecordsReader
    public BlockingQueue<Callable<Boolean>> getImportJobs() {
        return this.importJobs;
    }

    private ImportJob createImportJob(ImportPositionEntity importPositionEntity, ImportBatch importBatch) {
        return (ImportJob) this.beanFactory.getBean(ImportJob.class, importBatch, importPositionEntity);
    }

    private void rescheduleReader(Integer num) {
        if (num != null) {
            this.readersExecutor.schedule(this, OffsetDateTime.now().plus(num.intValue(), (TemporalUnit) ChronoUnit.MILLIS).toInstant());
        } else {
            this.readersExecutor.submit(this);
        }
    }

    private boolean scheduleImportJob(ImportJob importJob, boolean z) {
        if (!tryToScheduleImportJob(importJob, z)) {
            return false;
        }
        importJobScheduledSucceeded(importJob);
        return true;
    }

    private void importJobScheduledSucceeded(ImportJob importJob) {
        this.metrics.getTimer(Metrics.TIMER_NAME_IMPORT_JOB_SCHEDULED_TIME, "type", this.importValueType.name(), Metrics.TAG_KEY_PARTITION, String.valueOf(this.partitionId)).record(Duration.between(importJob.getCreationTime(), OffsetDateTime.now()));
        importJob.recordLatestScheduledPosition();
    }

    private Callable<Boolean> executeJob(ImportJob importJob) {
        return () -> {
            try {
                Boolean call = importJob.call();
                if (call.booleanValue()) {
                    executeNext();
                    rescheduleRecordsReaderIfNecessary();
                } else {
                    ThreadUtil.sleepFor(2000L);
                    execute(this.active);
                }
                return call;
            } catch (Exception e) {
                LOGGER.error("Exception occurred when importing data: " + e.getMessage(), (Throwable) e);
                ThreadUtil.sleepFor(2000L);
                execute(this.active);
                return false;
            }
        };
    }

    private void executeNext() {
        this.active = this.importJobs.poll();
        if (this.active != null) {
            this.importExecutor.submit(this.active);
            LOGGER.debug("Submitted next job");
        }
    }

    private void execute(Callable<Boolean> callable) {
        this.importExecutor.submit(callable);
        LOGGER.debug("Submitted the same job");
    }

    private void rescheduleRecordsReaderIfNecessary() {
        withReschedulingImportJobLock(() -> {
            if (hasPendingImportJobToReschedule() && shouldReschedulePendingImportJob()) {
                startRescheduling();
                this.readersExecutor.submit(this::reschedulePendingImportJob);
            }
        });
    }

    private void reschedulePendingImportJob() {
        try {
            scheduleImportJob(this.pendingImportJob, false);
        } finally {
            withReschedulingImportJobLock(() -> {
                this.pendingImportJob = null;
                completeRescheduling();
                rescheduleReader(null);
            });
        }
    }

    private boolean hasPendingImportJobToReschedule() {
        return this.pendingImportJob != null;
    }

    private boolean shouldReschedulePendingImportJob() {
        return !this.ongoingRescheduling;
    }

    private void startRescheduling() {
        this.ongoingRescheduling = true;
    }

    private void completeRescheduling() {
        this.ongoingRescheduling = false;
    }

    private void withReschedulingImportJobLock(Runnable runnable) {
        withReschedulingImportJobLock(() -> {
            runnable.run();
            return null;
        });
    }

    private <T> T withReschedulingImportJobLock(Callable<T> callable) {
        try {
            try {
                this.schedulingImportJobLock.lock();
                T call = callable.call();
                this.schedulingImportJobLock.unlock();
                return call;
            } catch (Exception e) {
                throw new TasklistRuntimeException(e);
            }
        } catch (Throwable th) {
            this.schedulingImportJobLock.unlock();
            throw th;
        }
    }
}
