package io.camunda.operate.zeebeimport.elasticsearch;

import io.camunda.operate.Metrics;
import io.camunda.operate.conditions.ElasticsearchCondition;
import io.camunda.operate.entities.HitEntity;
import io.camunda.operate.entities.meta.ImportPositionEntity;
import io.camunda.operate.exceptions.NoSuchIndexException;
import io.camunda.operate.exceptions.OperateRuntimeException;
import io.camunda.operate.property.OperateProperties;
import io.camunda.operate.util.BackoffIdleStrategy;
import io.camunda.operate.util.ElasticsearchUtil;
import io.camunda.operate.util.NumberThrottleable;
import io.camunda.operate.util.ThreadUtil;
import io.camunda.operate.zeebe.ImportValueType;
import io.camunda.operate.zeebeimport.ImportBatch;
import io.camunda.operate.zeebeimport.ImportJob;
import io.camunda.operate.zeebeimport.ImportListener;
import io.camunda.operate.zeebeimport.ImportPositionHolder;
import io.camunda.operate.zeebeimport.RecordsReader;
import jakarta.annotation.PostConstruct;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Conditional({ElasticsearchCondition.class})
@Component
/* loaded from: input_file:io/camunda/operate/zeebeimport/elasticsearch/ElasticsearchRecordsReader.class */
public class ElasticsearchRecordsReader implements RecordsReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchRecordsReader.class);
    private final int partitionId;
    private final ImportValueType importValueType;
    private final BlockingQueue<Callable<Boolean>> importJobs;
    private final ReentrantLock schedulingImportJobLock = new ReentrantLock();
    private NumberThrottleable batchSizeThrottle;
    private Callable<Boolean> active;
    private ImportJob pendingImportJob;
    private boolean ongoingRescheduling;
    private long maxPossibleSequence;
    private int countEmptyRuns;
    private BackoffIdleStrategy errorStrategy;

    @Autowired
    @Qualifier("importThreadPoolExecutor")
    private ThreadPoolTaskExecutor importExecutor;

    @Autowired
    @Qualifier("recordsReaderThreadPoolExecutor")
    private ThreadPoolTaskScheduler readersExecutor;

    @Autowired
    private ImportPositionHolder importPositionHolder;

    @Autowired
    private OperateProperties operateProperties;

    @Autowired
    @Qualifier("zeebeEsClient")
    private RestHighLevelClient zeebeEsClient;

    @Autowired
    private BeanFactory beanFactory;

    @Autowired
    private Metrics metrics;

    @Autowired(required = false)
    private List<ImportListener> importListeners;

    public ElasticsearchRecordsReader(int i, ImportValueType importValueType, int i2) {
        this.partitionId = i;
        this.importValueType = importValueType;
        this.importJobs = new LinkedBlockingQueue(i2);
    }

    @PostConstruct
    private void postConstruct() {
        this.batchSizeThrottle = new NumberThrottleable.DivideNumberThrottle(this.operateProperties.getZeebeElasticsearch().getBatchSize());
        this.maxPossibleSequence = sequence(this.partitionId + 1, 0L) - 1;
        this.countEmptyRuns = 0;
        this.errorStrategy = new BackoffIdleStrategy(this.operateProperties.getImporter().getReaderBackoff(), 1.2f, 10000L);
    }

    @Override // java.lang.Runnable
    public void run() {
        readAndScheduleNextBatch();
    }

    private void readAndScheduleNextBatch() {
        readAndScheduleNextBatch(true);
    }

    @Override // io.camunda.operate.zeebeimport.RecordsReader
    public void readAndScheduleNextBatch(boolean z) {
        ImportBatch importBatch;
        int readerBackoff = this.operateProperties.getImporter().getReaderBackoff();
        boolean isUseOnlyPosition = this.operateProperties.getImporter().isUseOnlyPosition();
        try {
            this.metrics.registerGaugeQueueSize("operate.import.queue.size", this.importJobs, new String[]{"partition", String.valueOf(this.partitionId), "type", this.importValueType.name()});
            ImportPositionEntity latestScheduledPosition = this.importPositionHolder.getLatestScheduledPosition(this.importValueType.getAliasTemplate(), this.partitionId);
            if (!isUseOnlyPosition && latestScheduledPosition != null && latestScheduledPosition.getSequence() > 0) {
                LOGGER.debug("Use import for {} ( {} ) by sequence", this.importValueType.name(), Integer.valueOf(this.partitionId));
                importBatch = readNextBatchBySequence(Long.valueOf(latestScheduledPosition.getSequence()));
            } else if (latestScheduledPosition != null) {
                LOGGER.debug("Use import for {} ( {} ) by position", this.importValueType.name(), Integer.valueOf(this.partitionId));
                importBatch = readNextBatchByPositionAndPartition(latestScheduledPosition.getPosition(), null);
            } else {
                LOGGER.debug("latestPosition is null, importBatch was not initialized");
                importBatch = null;
            }
            Integer num = null;
            if (importBatch == null || importBatch.getHits() == null || importBatch.getHits().isEmpty()) {
                num = Integer.valueOf(readerBackoff);
            } else {
                if (!scheduleImportJob(createImportJob(latestScheduledPosition, importBatch), !z)) {
                    return;
                }
            }
            this.errorStrategy.reset();
            if (z) {
                rescheduleReader(num);
            }
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            if (z) {
                this.errorStrategy.idle();
                rescheduleReader(Integer.valueOf((int) this.errorStrategy.idleTime()));
            }
        } catch (NoSuchIndexException e2) {
            if (z) {
                rescheduleReader(Integer.valueOf(readerBackoff));
            }
        }
    }

    @Override // io.camunda.operate.zeebeimport.RecordsReader
    public ImportBatch readNextBatchBySequence(Long l, Long l2) throws NoSuchIndexException {
        int i;
        long longValue;
        String aliasName = this.importValueType.getAliasName(this.operateProperties.getZeebeElasticsearch().getPrefix());
        int i2 = this.batchSizeThrottle.get();
        if (i2 != this.batchSizeThrottle.getOriginal()) {
            LOGGER.warn("Use new batch size {} (original {})", Integer.valueOf(i2), Integer.valueOf(this.batchSizeThrottle.getOriginal()));
        }
        if (l2 == null || l2.longValue() <= 0) {
            i = i2;
            if (this.countEmptyRuns == this.operateProperties.getImporter().getMaxEmptyRuns()) {
                longValue = this.maxPossibleSequence;
                this.countEmptyRuns = 0;
                LOGGER.debug("Max empty runs reached. Data type {}, partitionId {}, sequence {}, lastSequence {}, maxNumberOfHits {}.", new Object[]{this.importValueType, Integer.valueOf(this.partitionId), l, Long.valueOf(longValue), Integer.valueOf(i)});
            } else {
                longValue = l.longValue() + i2;
            }
        } else {
            i = (int) ((l2.longValue() - l.longValue()) * 2);
            longValue = l2.longValue();
            LOGGER.debug("Import batch reread was called. Data type {}, partitionId {}, sequence {}, lastSequence {}, maxNumberOfHits {}.", new Object[]{this.importValueType, Integer.valueOf(this.partitionId), l, Long.valueOf(longValue), Integer.valueOf(i)});
        }
        SearchRequest requestCache = new SearchRequest(new String[]{aliasName}).source(new SearchSourceBuilder().sort("sequence", SortOrder.ASC).query(QueryBuilders.rangeQuery("sequence").gt(l).lte(Long.valueOf(longValue))).size(i >= 10000 ? 10000 : i)).routing(String.valueOf(this.partitionId)).requestCache(false);
        try {
            int i3 = i;
            HitEntity[] withTimerSearchHits = withTimerSearchHits(() -> {
                return read(requestCache, i3 >= 10000);
            });
            if (withTimerSearchHits.length == 0) {
                this.countEmptyRuns++;
            } else {
                this.countEmptyRuns = 0;
            }
            return createImportBatch(withTimerSearchHits);
        } catch (Exception e) {
            if (!e.getMessage().contains("entity content is too long")) {
                throw new OperateRuntimeException(String.format("Exception occurred for alias [%s], while obtaining next Zeebe records batch: %s", aliasName, e.getMessage()), e);
            }
            LOGGER.info("{}. Will decrease batch size for {}-{}", new Object[]{e.getMessage(), this.importValueType.name(), Integer.valueOf(this.partitionId)});
            this.batchSizeThrottle.throttle();
            return readNextBatchBySequence(l, l2);
        } catch (ElasticsearchStatusException e2) {
            if (e2.getMessage().contains("no such index")) {
                throw new NoSuchIndexException();
            }
            throw new OperateRuntimeException(String.format("Exception occurred for alias [%s], while obtaining next Zeebe records batch: %s", aliasName, e2.getMessage()), e2);
        }
    }

    @Override // io.camunda.operate.zeebeimport.RecordsReader
    public ImportBatch readNextBatchByPositionAndPartition(long j, Long l) throws NoSuchIndexException {
        String aliasName = this.importValueType.getAliasName(this.operateProperties.getZeebeElasticsearch().getPrefix());
        try {
            SearchRequest createSearchQuery = createSearchQuery(aliasName, j, l);
            SearchResponse withTimer = withTimer(() -> {
                return this.zeebeEsClient.search(createSearchQuery, RequestOptions.DEFAULT);
            });
            checkForFailedShards(withTimer);
            return createImportBatch(withTimer);
        } catch (ElasticsearchStatusException e) {
            if (e.getMessage().contains("no such index")) {
                throw new NoSuchIndexException();
            }
            throw new OperateRuntimeException(String.format("Exception occurred for alias [%s], while obtaining next Zeebe records batch: %s", aliasName, e.getMessage()), e);
        } catch (Exception e2) {
            if (!e2.getMessage().contains("entity content is too long")) {
                throw new OperateRuntimeException(String.format("Exception occurred for alias [%s], while obtaining next Zeebe records batch: %s", aliasName, e2.getMessage()), e2);
            }
            this.batchSizeThrottle.throttle();
            return readNextBatchByPositionAndPartition(j, l);
        }
    }

    @Override // io.camunda.operate.zeebeimport.RecordsReader
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override // io.camunda.operate.zeebeimport.RecordsReader
    public ImportValueType getImportValueType() {
        return this.importValueType;
    }

    @Override // io.camunda.operate.zeebeimport.RecordsReader
    public BlockingQueue<Callable<Boolean>> getImportJobs() {
        return this.importJobs;
    }

    private ImportBatch readNextBatchBySequence(Long l) throws NoSuchIndexException {
        return readNextBatchBySequence(l, null);
    }

    private HitEntity[] read(SearchRequest searchRequest, boolean z) throws IOException {
        SearchResponse scroll;
        String str = null;
        try {
            ArrayList arrayList = new ArrayList();
            if (z) {
                searchRequest.scroll(TimeValue.timeValueMillis(60000L));
            }
            SearchResponse search = this.zeebeEsClient.search(searchRequest, ElasticsearchUtil.requestOptions);
            checkForFailedShards(search);
            arrayList.addAll(Arrays.stream(search.getHits().getHits()).map(this::searchHitToOperateHit).toList());
            if (z) {
                str = search.getScrollId();
                do {
                    SearchScrollRequest searchScrollRequest = new SearchScrollRequest(str);
                    searchScrollRequest.scroll(TimeValue.timeValueMillis(60000L));
                    scroll = this.zeebeEsClient.scroll(searchScrollRequest, ElasticsearchUtil.requestOptions);
                    checkForFailedShards(scroll);
                    str = scroll.getScrollId();
                    arrayList.addAll(Arrays.stream(scroll.getHits().getHits()).map(this::searchHitToOperateHit).toList());
                } while (scroll.getHits().getHits().length != 0);
            }
            HitEntity[] hitEntityArr = (HitEntity[]) arrayList.toArray(new HitEntity[0]);
            if (str != null) {
                ElasticsearchUtil.clearScroll(str, this.zeebeEsClient);
            }
            return hitEntityArr;
        } catch (Throwable th) {
            if (str != null) {
                ElasticsearchUtil.clearScroll(str, this.zeebeEsClient);
            }
            throw th;
        }
    }

    private HitEntity searchHitToOperateHit(SearchHit searchHit) {
        return new HitEntity().setIndex(searchHit.getIndex()).setSourceAsString(searchHit.getSourceAsString());
    }

    private void rescheduleReader(Integer num) {
        if (num != null) {
            this.readersExecutor.schedule(this, Date.from(OffsetDateTime.now().plus(num.intValue(), (TemporalUnit) ChronoUnit.MILLIS).toInstant()));
        } else {
            this.readersExecutor.submit(this);
        }
    }

    private ImportJob createImportJob(ImportPositionEntity importPositionEntity, ImportBatch importBatch) {
        return (ImportJob) this.beanFactory.getBean(ImportJob.class, new Object[]{importBatch, importPositionEntity});
    }

    private boolean scheduleImportJob(ImportJob importJob, boolean z) {
        if (!tryToScheduleImportJob(importJob, z)) {
            return false;
        }
        importJobScheduledSucceeded(importJob);
        return true;
    }

    private void importJobScheduledSucceeded(ImportJob importJob) {
        this.metrics.getTimer("operate.import.job.scheduled", new String[]{"type", this.importValueType.name(), "partition", String.valueOf(this.partitionId)}).record(Duration.between(importJob.getCreationTime(), OffsetDateTime.now()));
        ImportBatch importBatch = importJob.getImportBatch();
        importBatch.setScheduledTime(OffsetDateTime.now());
        notifyImportListenersAsScheduled(importBatch);
        importJob.recordLatestScheduledPosition();
    }

    private void notifyImportListenersAsScheduled(ImportBatch importBatch) {
        if (this.importListeners != null) {
            this.importListeners.forEach(importListener -> {
                importListener.scheduled(importBatch);
            });
        }
    }

    private void checkForFailedShards(SearchResponse searchResponse) {
        if (searchResponse.getFailedShards() > 0) {
            throw new OperateRuntimeException("Some ES shards failed. Ignoring search response and retrying, to prevent data loss.");
        }
    }

    private ImportBatch createImportBatch(SearchResponse searchResponse) {
        SearchHit[] hits = searchResponse.getHits().getHits();
        List list = Arrays.stream(hits).map(searchHit -> {
            return new HitEntity().setIndex(searchHit.getIndex()).setSourceAsString(searchHit.getSourceAsString());
        }).toList();
        String str = null;
        if (hits.length > 0) {
            str = hits[hits.length - 1].getIndex();
        }
        return new ImportBatch(this.partitionId, this.importValueType, list, str);
    }

    private ImportBatch createImportBatch(HitEntity[] hitEntityArr) {
        String str = null;
        if (hitEntityArr.length > 0) {
            str = hitEntityArr[hitEntityArr.length - 1].getIndex();
        }
        return new ImportBatch(this.partitionId, this.importValueType, Arrays.asList(hitEntityArr), str);
    }

    private SearchRequest createSearchQuery(String str, long j, Long l) {
        SearchSourceBuilder size;
        RangeQueryBuilder gt = QueryBuilders.rangeQuery("position").gt(Long.valueOf(j));
        if (l != null) {
            gt = gt.lte(l);
        }
        SearchSourceBuilder sort = new SearchSourceBuilder().query(ElasticsearchUtil.joinWithAnd(new QueryBuilder[]{gt, QueryBuilders.termQuery(RecordsReader.PARTITION_ID_FIELD_NAME, this.partitionId)})).sort("position", SortOrder.ASC);
        if (l == null) {
            size = sort.size(this.batchSizeThrottle.get());
        } else {
            LOGGER.debug("Import batch reread was called. Data type {}, partitionId {}, positionFrom {}, positionTo {}.", new Object[]{this.importValueType, Integer.valueOf(this.partitionId), Long.valueOf(j), l});
            int longValue = (int) (l.longValue() - j);
            size = sort.size((longValue <= 0 || longValue > 10000) ? 10000 : longValue);
        }
        return new SearchRequest(new String[]{str}).source(size).routing(String.valueOf(this.partitionId)).requestCache(false);
    }

    private SearchResponse withTimer(Callable<SearchResponse> callable) throws Exception {
        return (SearchResponse) this.metrics.getTimer("operate.import.query", new String[]{"type", this.importValueType.name(), "partition", String.valueOf(this.partitionId)}).recordCallable(callable);
    }

    private HitEntity[] withTimerSearchHits(Callable<HitEntity[]> callable) throws Exception {
        return (HitEntity[]) this.metrics.getTimer("operate.import.query", new String[]{"type", this.importValueType.name(), "partition", String.valueOf(this.partitionId)}).recordCallable(callable);
    }

    private boolean tryToScheduleImportJob(ImportJob importJob, boolean z) {
        return ((Boolean) withReschedulingImportJobLock(() -> {
            boolean z2 = false;
            int i = 3;
            while (true) {
                int i2 = i;
                if (z2 || i2 <= 0) {
                    break;
                }
                z2 = this.importJobs.offer(executeJob(importJob));
                i = i2 - 1;
            }
            this.pendingImportJob = (z || z2) ? null : importJob;
            if (z2 && this.active == null) {
                executeNext();
            }
            return Boolean.valueOf(z2);
        })).booleanValue();
    }

    private Callable<Boolean> executeJob(ImportJob importJob) {
        return () -> {
            try {
                Boolean call = importJob.call();
                if (call.booleanValue()) {
                    executeNext();
                    rescheduleRecordsReaderIfNecessary();
                } else {
                    ThreadUtil.sleepFor(2000L);
                    execute(this.active);
                }
                return call;
            } catch (Exception e) {
                LOGGER.error("Exception occurred when importing data: " + e.getMessage(), e);
                ThreadUtil.sleepFor(2000L);
                execute(this.active);
                return false;
            }
        };
    }

    private void executeNext() {
        this.active = this.importJobs.poll();
        if (this.active != null) {
            this.importExecutor.submit(this.active);
            LOGGER.debug("Submitted next job");
        }
    }

    private void execute(Callable<Boolean> callable) {
        this.importExecutor.submit(callable);
        LOGGER.debug("Submitted the same job");
    }

    private void rescheduleRecordsReaderIfNecessary() {
        withReschedulingImportJobLock(() -> {
            if (hasPendingImportJobToReschedule() && shouldReschedulePendingImportJob()) {
                startRescheduling();
                this.readersExecutor.submit(this::reschedulePendingImportJob);
            }
        });
    }

    private void reschedulePendingImportJob() {
        try {
            scheduleImportJob(this.pendingImportJob, false);
        } finally {
            withReschedulingImportJobLock(() -> {
                this.pendingImportJob = null;
                completeRescheduling();
                this.readersExecutor.submit(this);
            });
        }
    }

    private boolean hasPendingImportJobToReschedule() {
        return this.pendingImportJob != null;
    }

    private boolean shouldReschedulePendingImportJob() {
        return !this.ongoingRescheduling;
    }

    private void startRescheduling() {
        this.ongoingRescheduling = true;
    }

    private void completeRescheduling() {
        this.ongoingRescheduling = false;
    }

    private void withReschedulingImportJobLock(Runnable runnable) {
        withReschedulingImportJobLock(() -> {
            runnable.run();
            return null;
        });
    }

    private <T> T withReschedulingImportJobLock(Callable<T> callable) {
        try {
            try {
                this.schedulingImportJobLock.lock();
                T call = callable.call();
                this.schedulingImportJobLock.unlock();
                return call;
            } catch (Exception e) {
                throw new OperateRuntimeException(e);
            }
        } catch (Throwable th) {
            this.schedulingImportJobLock.unlock();
            throw th;
        }
    }
}
