package io.camunda.operate.archiver;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.operate.Metrics;
import io.camunda.operate.conditions.ElasticsearchCondition;
import io.camunda.operate.exceptions.OperateRuntimeException;
import io.camunda.operate.property.OperateProperties;
import io.camunda.operate.schema.templates.BatchOperationTemplate;
import io.camunda.operate.schema.templates.ListViewTemplate;
import io.camunda.operate.util.Either;
import io.camunda.operate.util.ElasticsearchUtil;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Conditional;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

@Conditional({ElasticsearchCondition.class})
@Component
/* loaded from: input_file:io/camunda/operate/archiver/ElasticsearchArchiverRepository.class */
public class ElasticsearchArchiverRepository implements ArchiverRepository {
    public static final int INTERNAL_SCROLL_KEEP_ALIVE_MS = 30000;
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchArchiverRepository.class);

    @Autowired
    @Qualifier("archiverThreadPoolExecutor")
    protected ThreadPoolTaskScheduler archiverExecutor;

    @Autowired
    private BatchOperationTemplate batchOperationTemplate;

    @Autowired
    private ListViewTemplate processInstanceTemplate;

    @Autowired
    private OperateProperties operateProperties;

    @Autowired
    private Metrics metrics;

    @Autowired
    private RestHighLevelClient esClient;

    @Autowired
    private ObjectMapper objectMapper;

    private ArchiveBatch createArchiveBatch(SearchResponse searchResponse, String str, String str2) {
        List buckets = searchResponse.getAggregations().get(str).getBuckets();
        if (buckets.size() <= 0) {
            return null;
        }
        Histogram.Bucket bucket = (Histogram.Bucket) buckets.get(0);
        return new ArchiveBatch(bucket.getKeyAsString(), (ArrayList) Arrays.stream(bucket.getAggregations().get(str2).getHits().getHits()).collect(ArrayList::new, (arrayList, searchHit) -> {
            arrayList.add(searchHit.getId());
        }, (arrayList2, arrayList3) -> {
            arrayList2.addAll(arrayList3);
        }));
    }

    private CompletableFuture<SearchResponse> sendSearchRequest(SearchRequest searchRequest) {
        return ElasticsearchUtil.searchAsync(searchRequest, this.archiverExecutor, this.esClient);
    }

    private CompletableFuture<ArchiveBatch> searchAsync(SearchRequest searchRequest, Function<Throwable, String> function) {
        CompletableFuture<ArchiveBatch> completableFuture = new CompletableFuture<>();
        Timer.Sample start = Timer.start();
        sendSearchRequest(searchRequest).whenComplete((searchResponse, th) -> {
            start.stop(getArchiverQueryTimer());
            Either<Throwable, ArchiveBatch> handleSearchResponse = handleSearchResponse(searchResponse, th, function);
            Objects.requireNonNull(completableFuture);
            Consumer consumer = (v1) -> {
                r1.complete(v1);
            };
            Objects.requireNonNull(completableFuture);
            handleSearchResponse.ifRightOrLeft(consumer, completableFuture::completeExceptionally);
        });
        return completableFuture;
    }

    @Override // io.camunda.operate.archiver.ArchiverRepository
    public CompletableFuture<ArchiveBatch> getBatchOperationNextBatch() {
        return searchAsync(createFinishedBatchOperationsSearchRequest(createFinishedBatchOperationsAggregation(AbstractArchiverJob.DATES_AGG, AbstractArchiverJob.INSTANCES_AGG)), th -> {
            return String.format("Exception occurred, while obtaining finished batch operations: %s", th.getMessage());
        });
    }

    @Override // io.camunda.operate.archiver.ArchiverRepository
    public CompletableFuture<ArchiveBatch> getProcessInstancesNextBatch(List<Integer> list) {
        return searchAsync(createFinishedInstancesSearchRequest(createFinishedInstancesAggregation(AbstractArchiverJob.DATES_AGG, AbstractArchiverJob.INSTANCES_AGG), list), th -> {
            return String.format("Exception occurred, while obtaining finished batch operations: %s", th.getMessage());
        });
    }

    @Override // io.camunda.operate.archiver.ArchiverRepository
    public void setIndexLifeCycle(String str) {
        try {
            if (this.operateProperties.getArchiver().isIlmEnabled() && this.esClient.indices().exists(new GetIndexRequest(new String[]{str}), RequestOptions.DEFAULT)) {
                this.esClient.indices().putSettings(new UpdateSettingsRequest(new String[]{str}).settings(Settings.builder().put("index.lifecycle.name", "operate_delete_archived_indices").build()), RequestOptions.DEFAULT);
            }
        } catch (Exception e) {
            LOGGER.warn("Could not set ILM policy {} for index {}: {}", new Object[]{"operate_delete_archived_indices", str, e.getMessage()});
        }
    }

    @Override // io.camunda.operate.archiver.ArchiverRepository
    public CompletableFuture<Void> deleteDocuments(String str, String str2, List<Object> list) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Timer.Sample start = Timer.start();
        ElasticsearchUtil.deleteAsyncWithConnectionRelease(this.archiverExecutor, str, str2, list, this.objectMapper, this.esClient).thenAccept(l -> {
            start.stop(getArchiverDeleteQueryTimer());
            completableFuture.complete(null);
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    @Override // io.camunda.operate.archiver.ArchiverRepository
    public CompletableFuture<Void> reindexDocuments(String str, String str2, String str3, List<Object> list) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ReindexRequest sourceQuery = createReindexRequestWithDefaults().setSourceIndices(new String[]{str}).setDestIndex(str2).setSourceQuery(QueryBuilders.termsQuery(str3, list));
        Timer.Sample start = Timer.start();
        ElasticsearchUtil.reindexAsyncWithConnectionRelease(this.archiverExecutor, sourceQuery, str, this.esClient).thenAccept(l -> {
            start.stop(getArchiverReindexQueryTimer());
            completableFuture.complete(null);
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private SearchRequest createFinishedBatchOperationsSearchRequest(AggregationBuilder aggregationBuilder) {
        ConstantScoreQueryBuilder constantScoreQuery = QueryBuilders.constantScoreQuery(QueryBuilders.rangeQuery("endDate").lte(this.operateProperties.getArchiver().getArchivingTimepoint()));
        SearchRequest requestCache = new SearchRequest(new String[]{this.batchOperationTemplate.getFullQualifiedName()}).source(new SearchSourceBuilder().query(constantScoreQuery).aggregation(aggregationBuilder).fetchSource(false).size(0).sort("endDate", SortOrder.ASC)).requestCache(false);
        LOGGER.debug("Finished batch operations for archiving request: \n{}\n and aggregation: \n{}", constantScoreQuery.toString(), aggregationBuilder.toString());
        return requestCache;
    }

    private AggregationBuilder createFinishedBatchOperationsAggregation(String str, String str2) {
        return AggregationBuilders.dateHistogram(str).field("endDate").calendarInterval(new DateHistogramInterval(this.operateProperties.getArchiver().getRolloverInterval())).format(this.operateProperties.getArchiver().getElsRolloverDateFormat()).keyed(true).subAggregation(PipelineAggregatorBuilders.bucketSort("datesSortedAgg", Arrays.asList(new FieldSortBuilder("_key"))).size(1)).subAggregation(AggregationBuilders.topHits(str2).size(this.operateProperties.getArchiver().getRolloverBatchSize()).sort("id", SortOrder.ASC).fetchSource("id", (String) null));
    }

    private Either<Throwable, ArchiveBatch> handleSearchResponse(SearchResponse searchResponse, Throwable th, Function<Throwable, String> function) {
        return th != null ? Either.left(new OperateRuntimeException(function.apply(th), th)) : Either.right(createArchiveBatch(searchResponse, AbstractArchiverJob.DATES_AGG, AbstractArchiverJob.INSTANCES_AGG));
    }

    private Timer getArchiverQueryTimer() {
        return this.metrics.getTimer("operate.archiver.query", new String[0]);
    }

    private SearchRequest createFinishedInstancesSearchRequest(AggregationBuilder aggregationBuilder, List<Integer> list) {
        ConstantScoreQueryBuilder constantScoreQuery = QueryBuilders.constantScoreQuery(ElasticsearchUtil.joinWithAnd(new QueryBuilder[]{QueryBuilders.rangeQuery("endDate").lte(this.operateProperties.getArchiver().getArchivingTimepoint()), QueryBuilders.termQuery("joinRelation", "processInstance"), QueryBuilders.termsQuery("partitionId", list)}));
        SearchRequest requestCache = new SearchRequest(new String[]{this.processInstanceTemplate.getFullQualifiedName()}).source(new SearchSourceBuilder().query(constantScoreQuery).aggregation(aggregationBuilder).fetchSource(false).size(0).sort("endDate", SortOrder.ASC)).requestCache(false);
        LOGGER.debug("Finished process instances for archiving request: \n{}\n and aggregation: \n{}", constantScoreQuery.toString(), aggregationBuilder.toString());
        return requestCache;
    }

    private AggregationBuilder createFinishedInstancesAggregation(String str, String str2) {
        return AggregationBuilders.dateHistogram(str).field("endDate").calendarInterval(new DateHistogramInterval(this.operateProperties.getArchiver().getRolloverInterval())).format(this.operateProperties.getArchiver().getElsRolloverDateFormat()).keyed(true).subAggregation(PipelineAggregatorBuilders.bucketSort("datesSortedAgg", Arrays.asList(new FieldSortBuilder("_key"))).size(1)).subAggregation(AggregationBuilders.topHits(str2).size(this.operateProperties.getArchiver().getRolloverBatchSize()).sort("id", SortOrder.ASC).fetchSource("id", (String) null));
    }

    private ReindexRequest createReindexRequestWithDefaults() {
        return new ReindexRequest().setScroll(TimeValue.timeValueMillis(30000L)).setAbortOnVersionConflict(false).setSlices(0);
    }

    private Timer getArchiverReindexQueryTimer() {
        return this.metrics.getTimer("operate.archiver.reindex.query", new String[0]);
    }

    private Timer getArchiverDeleteQueryTimer() {
        return this.metrics.getTimer("operate.archiver.delete.query", new String[0]);
    }
}
