/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ingest;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.DocWriteRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.index.IndexRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.update.UpdateRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.ClusterChangedEvent;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.ClusterStateApplier;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.Strings;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.metrics.CounterMetric;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.metrics.MeanMetric;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.VersionType;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ingest.IngestDocument;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ingest.IngestMetadata;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ingest.IngestStats;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ingest.Pipeline;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ingest.PipelineStore;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.threadpool.ThreadPool;

public class PipelineExecutionService
implements ClusterStateApplier {
    private final PipelineStore store;
    private final ThreadPool threadPool;
    private final StatsHolder totalStats = new StatsHolder();
    private volatile Map<String, StatsHolder> statsHolderPerPipeline = Collections.emptyMap();

    public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) {
        this.store = store;
        this.threadPool = threadPool;
    }

    public void executeBulkRequest(final Iterable<DocWriteRequest> actionRequests, final BiConsumer<IndexRequest, Exception> itemFailureHandler, final Consumer<Exception> completionHandler) {
        this.threadPool.executor("write").execute(new AbstractRunnable(){

            @Override
            public void onFailure(Exception e) {
                completionHandler.accept(e);
            }

            @Override
            protected void doRun() throws Exception {
                for (DocWriteRequest actionRequest : actionRequests) {
                    IndexRequest indexRequest = null;
                    if (actionRequest instanceof IndexRequest) {
                        indexRequest = (IndexRequest)actionRequest;
                    } else if (actionRequest instanceof UpdateRequest) {
                        UpdateRequest updateRequest = (UpdateRequest)actionRequest;
                        IndexRequest indexRequest2 = indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
                    }
                    if (indexRequest == null || !Strings.hasText(indexRequest.getPipeline())) continue;
                    try {
                        PipelineExecutionService.this.innerExecute(indexRequest, PipelineExecutionService.this.getPipeline(indexRequest.getPipeline()));
                        indexRequest.setPipeline(null);
                    }
                    catch (Exception e) {
                        itemFailureHandler.accept(indexRequest, e);
                    }
                }
                completionHandler.accept(null);
            }
        });
    }

    public IngestStats stats() {
        Map<String, StatsHolder> statsHolderPerPipeline = this.statsHolderPerPipeline;
        HashMap<String, IngestStats.Stats> statsPerPipeline = new HashMap<String, IngestStats.Stats>(statsHolderPerPipeline.size());
        for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) {
            statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
        }
        return new IngestStats(this.totalStats.createStats(), statsPerPipeline);
    }

    @Override
    public void applyClusterState(ClusterChangedEvent event) {
        IngestMetadata ingestMetadata = (IngestMetadata)event.state().getMetaData().custom("ingest");
        if (ingestMetadata != null) {
            this.updatePipelineStats(ingestMetadata);
        }
    }

    void updatePipelineStats(IngestMetadata ingestMetadata) {
        boolean changed = false;
        HashMap<String, StatsHolder> newStatsPerPipeline = new HashMap<String, StatsHolder>(this.statsHolderPerPipeline);
        Iterator iterator = newStatsPerPipeline.keySet().iterator();
        while (iterator.hasNext()) {
            String pipeline = (String)iterator.next();
            if (ingestMetadata.getPipelines().containsKey(pipeline)) continue;
            iterator.remove();
            changed = true;
        }
        for (String pipeline : ingestMetadata.getPipelines().keySet()) {
            if (newStatsPerPipeline.containsKey(pipeline)) continue;
            newStatsPerPipeline.put(pipeline, new StatsHolder());
            changed = true;
        }
        if (changed) {
            this.statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline);
        }
    }

    private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception {
        if (pipeline.getProcessors().isEmpty()) {
            return;
        }
        long startTimeInNanos = System.nanoTime();
        Optional<StatsHolder> pipelineStats = Optional.ofNullable(this.statsHolderPerPipeline.get(pipeline.getId()));
        try {
            this.totalStats.preIngest();
            pipelineStats.ifPresent(StatsHolder::preIngest);
            String index = indexRequest.index();
            String type = indexRequest.type();
            String id = indexRequest.id();
            String routing = indexRequest.routing();
            String parent = indexRequest.parent();
            Long version = indexRequest.version();
            VersionType versionType = indexRequest.versionType();
            Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
            IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, version, versionType, sourceAsMap);
            pipeline.execute(ingestDocument);
            Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
            indexRequest.index((String)metadataMap.get((Object)IngestDocument.MetaData.INDEX));
            indexRequest.type((String)metadataMap.get((Object)IngestDocument.MetaData.TYPE));
            indexRequest.id((String)metadataMap.get((Object)IngestDocument.MetaData.ID));
            indexRequest.routing((String)metadataMap.get((Object)IngestDocument.MetaData.ROUTING));
            indexRequest.parent((String)metadataMap.get((Object)IngestDocument.MetaData.PARENT));
            indexRequest.version(((Number)metadataMap.get((Object)IngestDocument.MetaData.VERSION)).longValue());
            if (metadataMap.get((Object)IngestDocument.MetaData.VERSION_TYPE) != null) {
                indexRequest.versionType(VersionType.fromString((String)metadataMap.get((Object)IngestDocument.MetaData.VERSION_TYPE)));
            }
            indexRequest.source(ingestDocument.getSourceAndMetadata());
        }
        catch (Exception e) {
            this.totalStats.ingestFailed();
            pipelineStats.ifPresent(StatsHolder::ingestFailed);
            throw e;
        }
        finally {
            long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
            this.totalStats.postIngest(ingestTimeInMillis);
            pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis));
        }
    }

    private Pipeline getPipeline(String pipelineId) {
        Pipeline pipeline = this.store.get(pipelineId);
        if (pipeline == null) {
            throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
        }
        return pipeline;
    }

    static class StatsHolder {
        private final MeanMetric ingestMetric = new MeanMetric();
        private final CounterMetric ingestCurrent = new CounterMetric();
        private final CounterMetric ingestFailed = new CounterMetric();

        StatsHolder() {
        }

        void preIngest() {
            this.ingestCurrent.inc();
        }

        void postIngest(long ingestTimeInMillis) {
            this.ingestCurrent.dec();
            this.ingestMetric.inc(ingestTimeInMillis);
        }

        void ingestFailed() {
            this.ingestFailed.inc();
        }

        IngestStats.Stats createStats() {
            return new IngestStats.Stats(this.ingestMetric.count(), this.ingestMetric.sum(), this.ingestCurrent.count(), this.ingestFailed.count());
        }
    }
}

