/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexingProgress;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;

public class SinglePhaseParallelIndexTaskRunner
implements ParallelIndexTaskRunner<ParallelIndexSubTask> {
    private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class);
    private final TaskToolbox toolbox;
    private final String taskId;
    private final String groupId;
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final Map<String, Object> context;
    private final FiniteFirehoseFactory<?, ?> baseFirehoseFactory;
    private final int maxNumTasks;
    private final IndexingServiceClient indexingServiceClient;
    private final BlockingQueue<TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask>> taskCompleteEvents = new LinkedBlockingDeque<TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask>>();
    private final ConcurrentHashMap<String, PushedSegmentsReport> segmentsMap = new ConcurrentHashMap();
    private volatile boolean stopped;
    private volatile TaskMonitor<ParallelIndexSubTask> taskMonitor;
    private int nextSpecId = 0;

    SinglePhaseParallelIndexTaskRunner(TaskToolbox toolbox, String taskId, String groupId, ParallelIndexIngestionSpec ingestionSchema, Map<String, Object> context, IndexingServiceClient indexingServiceClient) {
        this.toolbox = toolbox;
        this.taskId = taskId;
        this.groupId = groupId;
        this.ingestionSchema = ingestionSchema;
        this.context = context;
        this.baseFirehoseFactory = (FiniteFirehoseFactory)ingestionSchema.getIOConfig().getFirehoseFactory();
        this.maxNumTasks = ingestionSchema.getTuningConfig().getMaxNumSubTasks();
        this.indexingServiceClient = (IndexingServiceClient)Preconditions.checkNotNull((Object)indexingServiceClient, (Object)"indexingServiceClient");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskState run() throws Exception {
        Iterator subTaskSpecIterator = this.subTaskSpecIterator().iterator();
        long taskStatusCheckingPeriod = this.ingestionSchema.getTuningConfig().getTaskStatusCheckPeriodMs();
        this.taskMonitor = new TaskMonitor((IndexingServiceClient)Preconditions.checkNotNull((Object)this.indexingServiceClient, (Object)"indexingServiceClient"), this.ingestionSchema.getTuningConfig().getMaxRetry(), this.baseFirehoseFactory.getNumSplits());
        TaskState state = TaskState.RUNNING;
        this.taskMonitor.start(taskStatusCheckingPeriod);
        try {
            log.info("Submitting initial tasks", new Object[0]);
            while (this.isRunning() && subTaskSpecIterator.hasNext() && this.taskMonitor.getNumRunningTasks() < this.maxNumTasks) {
                this.submitNewTask(this.taskMonitor, (ParallelIndexSubTaskSpec)subTaskSpecIterator.next());
            }
            log.info("Waiting for subTasks to be completed", new Object[0]);
            block7: while (this.isRunning()) {
                TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask> taskCompleteEvent = this.taskCompleteEvents.poll(taskStatusCheckingPeriod, TimeUnit.MILLISECONDS);
                if (taskCompleteEvent == null) continue;
                TaskState completeState = taskCompleteEvent.getLastState();
                switch (completeState) {
                    case SUCCESS: {
                        TaskStatusPlus completeStatus = taskCompleteEvent.getLastStatus();
                        if (completeStatus == null) {
                            throw new ISE("Last status of complete task is missing!", new Object[0]);
                        }
                        if (!this.segmentsMap.containsKey(completeStatus.getId())) {
                            throw new ISE("Missing reports from task[%s]!", new Object[]{completeStatus.getId()});
                        }
                        if (!subTaskSpecIterator.hasNext()) {
                            if (this.taskMonitor.getNumRunningTasks() != 0 || this.taskCompleteEvents.size() != 0) continue block7;
                            this.stopped = true;
                            if (this.taskMonitor.isSucceeded()) {
                                this.publish(this.toolbox);
                                state = TaskState.SUCCESS;
                                continue block7;
                            }
                            SinglePhaseParallelIndexingProgress monitorStatus = this.taskMonitor.getProgress();
                            throw new ISE("Expected for [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks", new Object[]{monitorStatus.getExpectedSucceeded(), monitorStatus.getSucceeded(), monitorStatus.getFailed()});
                        }
                        if (this.taskMonitor.getNumRunningTasks() >= this.maxNumTasks) continue block7;
                        this.submitNewTask(this.taskMonitor, (ParallelIndexSubTaskSpec)subTaskSpecIterator.next());
                        continue block7;
                    }
                    case FAILED: {
                        state = TaskState.FAILED;
                        this.stopped = true;
                        TaskStatusPlus lastStatus = taskCompleteEvent.getLastStatus();
                        if (lastStatus != null) {
                            log.error("Failed because of the failed sub task[%s]", new Object[]{lastStatus.getId()});
                            continue block7;
                        }
                        ParallelIndexSubTaskSpec spec = (ParallelIndexSubTaskSpec)taskCompleteEvent.getSpec();
                        log.error("Failed to run sub tasks for inputSplits[%s]", new Object[]{SinglePhaseParallelIndexTaskRunner.getSplitsIfSplittable(spec.getIngestionSpec().getIOConfig().getFirehoseFactory())});
                        continue block7;
                    }
                }
                throw new ISE("spec[%s] is in an invalid state[%s]", new Object[]{taskCompleteEvent.getSpec().getId(), completeState});
            }
        }
        catch (Throwable throwable) {
            log.info("Cleaning up resources", new Object[0]);
            this.taskCompleteEvents.clear();
            this.taskMonitor.stop();
            if (state != TaskState.SUCCESS) {
                log.info("This task is finished with [%s] state. Killing [%d] remaining subtasks.", new Object[]{state, this.taskMonitor.getNumRunningTasks()});
                this.taskMonitor.killAll();
            }
            throw throwable;
        }
        log.info("Cleaning up resources", new Object[0]);
        this.taskCompleteEvents.clear();
        this.taskMonitor.stop();
        if (state != TaskState.SUCCESS) {
            log.info("This task is finished with [%s] state. Killing [%d] remaining subtasks.", new Object[]{state, this.taskMonitor.getNumRunningTasks()});
            this.taskMonitor.killAll();
        }
        return state;
    }

    private boolean isRunning() {
        return !this.stopped && !Thread.currentThread().isInterrupted();
    }

    @VisibleForTesting
    TaskToolbox getToolbox() {
        return this.toolbox;
    }

    @VisibleForTesting
    ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Override
    public void collectReport(PushedSegmentsReport report) {
        this.segmentsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
            if (prevReport != null) {
                Preconditions.checkState((boolean)prevReport.getSegments().equals(report.getSegments()), (String)"task[%s] sent two or more reports and previous report[%s] is different from the current one[%s]", (Object[])new Object[]{taskId, prevReport, report});
            }
            return report;
        });
    }

    @Override
    public SinglePhaseParallelIndexingProgress getProgress() {
        return this.taskMonitor == null ? SinglePhaseParallelIndexingProgress.notRunning() : this.taskMonitor.getProgress();
    }

    @Override
    public Set<String> getRunningTaskIds() {
        return this.taskMonitor == null ? Collections.emptySet() : this.taskMonitor.getRunningTaskIds();
    }

    @Override
    public List<SubTaskSpec<ParallelIndexSubTask>> getSubTaskSpecs() {
        if (this.taskMonitor != null) {
            List<SubTaskSpec<ParallelIndexSubTask>> runningSubTaskSpecs = this.taskMonitor.getRunningSubTaskSpecs();
            List<SubTaskSpec<ParallelIndexSubTask>> completeSubTaskSpecs = this.taskMonitor.getCompleteSubTaskSpecs();
            HashMap subTaskSpecMap = new HashMap(runningSubTaskSpecs.size() + completeSubTaskSpecs.size());
            runningSubTaskSpecs.forEach(spec -> subTaskSpecMap.put(spec.getId(), spec));
            completeSubTaskSpecs.forEach(spec -> subTaskSpecMap.put(spec.getId(), spec));
            return new ArrayList<SubTaskSpec<ParallelIndexSubTask>>(subTaskSpecMap.values());
        }
        return Collections.emptyList();
    }

    @Override
    public List<SubTaskSpec<ParallelIndexSubTask>> getRunningSubTaskSpecs() {
        return this.taskMonitor == null ? Collections.emptyList() : this.taskMonitor.getRunningSubTaskSpecs();
    }

    @Override
    public List<SubTaskSpec<ParallelIndexSubTask>> getCompleteSubTaskSpecs() {
        return this.taskMonitor == null ? Collections.emptyList() : this.taskMonitor.getCompleteSubTaskSpecs();
    }

    @Override
    @Nullable
    public SubTaskSpec<ParallelIndexSubTask> getSubTaskSpec(String subTaskSpecId) {
        if (this.taskMonitor != null) {
            TaskMonitor.MonitorEntry monitorEntry = this.taskMonitor.getRunningTaskMonitorEntry(subTaskSpecId);
            TaskHistory<ParallelIndexSubTask> taskHistory = this.taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
            SubTaskSpec<ParallelIndexSubTask> subTaskSpec = monitorEntry != null ? monitorEntry.getSpec() : (taskHistory != null ? taskHistory.getSpec() : null);
            return subTaskSpec;
        }
        return null;
    }

    @Override
    @Nullable
    public ParallelIndexTaskRunner.SubTaskSpecStatus getSubTaskState(String subTaskSpecId) {
        if (this.taskMonitor == null) {
            return null;
        }
        TaskMonitor.MonitorEntry monitorEntry = this.taskMonitor.getRunningTaskMonitorEntry(subTaskSpecId);
        TaskHistory<ParallelIndexSubTask> taskHistory = this.taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
        ParallelIndexTaskRunner.SubTaskSpecStatus subTaskSpecStatus = monitorEntry != null ? new ParallelIndexTaskRunner.SubTaskSpecStatus((ParallelIndexSubTaskSpec)monitorEntry.getSpec(), monitorEntry.getRunningStatus(), monitorEntry.getTaskHistory()) : (taskHistory != null && !taskHistory.isEmpty() ? new ParallelIndexTaskRunner.SubTaskSpecStatus((ParallelIndexSubTaskSpec)taskHistory.getSpec(), null, taskHistory.getAttemptHistory()) : null);
        return subTaskSpecStatus;
    }

    @Override
    @Nullable
    public TaskHistory<ParallelIndexSubTask> getCompleteSubTaskSpecAttemptHistory(String subTaskSpecId) {
        if (this.taskMonitor == null) {
            return null;
        }
        return this.taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
    }

    private void publish(TaskToolbox toolbox) throws IOException {
        boolean published;
        TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
            SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
            return toolbox.getTaskActionClient().submit(action);
        };
        ActionBasedUsedSegmentChecker usedSegmentChecker = new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient());
        Set segmentsToPublish = this.segmentsMap.values().stream().flatMap(report -> report.getSegments().stream()).collect(Collectors.toSet());
        boolean bl = published = segmentsToPublish.isEmpty() || publisher.publishSegments(segmentsToPublish, null).isSuccess();
        if (published) {
            log.info("Published [%d] segments", new Object[]{segmentsToPublish.size()});
        } else {
            log.info("Transaction failure while publishing segments, checking if someone else beat us to it.", new Object[0]);
            Set segmentsIdentifiers = this.segmentsMap.values().stream().flatMap(report -> report.getSegments().stream()).map(SegmentIdWithShardSpec::fromDataSegment).collect(Collectors.toSet());
            if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers).equals(segmentsToPublish)) {
                log.info("Our segments really do exist, awaiting handoff.", new Object[0]);
            } else {
                throw new ISE("Failed to publish segments[%s]", new Object[]{segmentsToPublish});
            }
        }
    }

    private void submitNewTask(TaskMonitor<ParallelIndexSubTask> taskMonitor, final ParallelIndexSubTaskSpec spec) {
        log.info("Submit a new task for spec[%s] and inputSplit[%s]", new Object[]{spec.getId(), spec.getInputSplit()});
        ListenableFuture<TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask>> future = taskMonitor.submit(spec);
        Futures.addCallback(future, (FutureCallback)new FutureCallback<TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask>>(){

            public void onSuccess(TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask> completeEvent) {
                SinglePhaseParallelIndexTaskRunner.this.taskCompleteEvents.offer(completeEvent);
            }

            public void onFailure(Throwable t) {
                log.error(t, "Error while running a task for subTaskSpec[%s]", new Object[]{spec});
                SinglePhaseParallelIndexTaskRunner.this.taskCompleteEvents.offer(TaskMonitor.SubTaskCompleteEvent.fail(spec, t));
            }
        });
    }

    @VisibleForTesting
    int getAndIncrementNextSpecId() {
        return this.nextSpecId++;
    }

    @VisibleForTesting
    Stream<ParallelIndexSubTaskSpec> subTaskSpecIterator() throws IOException {
        return this.baseFirehoseFactory.getSplits().map(this::newTaskSpec);
    }

    @VisibleForTesting
    ParallelIndexSubTaskSpec newTaskSpec(InputSplit split) {
        return new ParallelIndexSubTaskSpec(this.taskId + "_" + this.getAndIncrementNextSpecId(), this.groupId, this.taskId, new ParallelIndexIngestionSpec(this.ingestionSchema.getDataSchema(), new ParallelIndexIOConfig((FirehoseFactory)this.baseFirehoseFactory.withSplit(split), this.ingestionSchema.getIOConfig().isAppendToExisting()), this.ingestionSchema.getTuningConfig()), this.context, split);
    }

    private static List<InputSplit> getSplitsIfSplittable(FirehoseFactory firehoseFactory) throws IOException {
        if (firehoseFactory instanceof FiniteFirehoseFactory) {
            FiniteFirehoseFactory finiteFirehoseFactory = (FiniteFirehoseFactory)firehoseFactory;
            return finiteFirehoseFactory.getSplits().collect(Collectors.toList());
        }
        throw new ISE("firehoseFactory[%s] is not splittable", new Object[]{firehoseFactory.getClass().getSimpleName()});
    }
}

