package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.class */
public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner<ParallelIndexSubTask> {
    private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class);
    private final TaskToolbox toolbox;
    private final String taskId;
    private final String groupId;
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final Map<String, Object> context;
    private final FiniteFirehoseFactory<?, ?> baseFirehoseFactory;
    private final int maxNumTasks;
    private final IndexingServiceClient indexingServiceClient;
    private volatile boolean stopped;
    private volatile TaskMonitor<ParallelIndexSubTask> taskMonitor;
    private final BlockingQueue<TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask>> taskCompleteEvents = new LinkedBlockingDeque();
    private final ConcurrentHashMap<String, PushedSegmentsReport> segmentsMap = new ConcurrentHashMap<>();
    private int nextSpecId = 0;

    /* renamed from: org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$druid$indexer$TaskState = new int[TaskState.values().length];

        static {
            try {
                $SwitchMap$org$apache$druid$indexer$TaskState[TaskState.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$druid$indexer$TaskState[TaskState.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SinglePhaseParallelIndexTaskRunner(TaskToolbox taskToolbox, String str, String str2, ParallelIndexIngestionSpec parallelIndexIngestionSpec, Map<String, Object> map, IndexingServiceClient indexingServiceClient) {
        this.toolbox = taskToolbox;
        this.taskId = str;
        this.groupId = str2;
        this.ingestionSchema = parallelIndexIngestionSpec;
        this.context = map;
        this.baseFirehoseFactory = parallelIndexIngestionSpec.m33getIOConfig().getFirehoseFactory();
        this.maxNumTasks = parallelIndexIngestionSpec.m32getTuningConfig().getMaxNumSubTasks();
        this.indexingServiceClient = (IndexingServiceClient) Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public TaskState run() throws Exception {
        Iterator<ParallelIndexSubTaskSpec> it = subTaskSpecIterator().iterator();
        long taskStatusCheckPeriodMs = this.ingestionSchema.m32getTuningConfig().getTaskStatusCheckPeriodMs();
        this.taskMonitor = new TaskMonitor<>((IndexingServiceClient) Preconditions.checkNotNull(this.indexingServiceClient, "indexingServiceClient"), this.ingestionSchema.m32getTuningConfig().getMaxRetry(), this.baseFirehoseFactory.getNumSplits());
        TaskState taskState = TaskState.RUNNING;
        this.taskMonitor.start(taskStatusCheckPeriodMs);
        try {
            log.info("Submitting initial tasks", new Object[0]);
            while (isRunning() && it.hasNext() && this.taskMonitor.getNumRunningTasks() < this.maxNumTasks) {
                submitNewTask(this.taskMonitor, it.next());
            }
            log.info("Waiting for subTasks to be completed", new Object[0]);
            while (isRunning()) {
                TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask> poll = this.taskCompleteEvents.poll(taskStatusCheckPeriodMs, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    TaskState lastState = poll.getLastState();
                    switch (AnonymousClass2.$SwitchMap$org$apache$druid$indexer$TaskState[lastState.ordinal()]) {
                        case 1:
                            TaskStatusPlus lastStatus = poll.getLastStatus();
                            if (lastStatus != null) {
                                if (!this.segmentsMap.containsKey(lastStatus.getId())) {
                                    throw new ISE("Missing reports from task[%s]!", new Object[]{lastStatus.getId()});
                                }
                                if (it.hasNext()) {
                                    if (this.taskMonitor.getNumRunningTasks() >= this.maxNumTasks) {
                                        break;
                                    } else {
                                        submitNewTask(this.taskMonitor, it.next());
                                        break;
                                    }
                                } else if (this.taskMonitor.getNumRunningTasks() == 0 && this.taskCompleteEvents.size() == 0) {
                                    this.stopped = true;
                                    if (!this.taskMonitor.isSucceeded()) {
                                        SinglePhaseParallelIndexingProgress progress = this.taskMonitor.getProgress();
                                        throw new ISE("Expected for [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks", new Object[]{Integer.valueOf(progress.getExpectedSucceeded()), Integer.valueOf(progress.getSucceeded()), Integer.valueOf(progress.getFailed())});
                                    }
                                    publish(this.toolbox);
                                    taskState = TaskState.SUCCESS;
                                    break;
                                }
                            } else {
                                throw new ISE("Last status of complete task is missing!", new Object[0]);
                            }
                            break;
                        case 2:
                            taskState = TaskState.FAILED;
                            this.stopped = true;
                            TaskStatusPlus lastStatus2 = poll.getLastStatus();
                            if (lastStatus2 == null) {
                                log.error("Failed to run sub tasks for inputSplits[%s]", new Object[]{getSplitsIfSplittable(((ParallelIndexSubTaskSpec) poll.getSpec()).getIngestionSpec().m33getIOConfig().getFirehoseFactory())});
                                break;
                            } else {
                                log.error("Failed because of the failed sub task[%s]", new Object[]{lastStatus2.getId()});
                                break;
                            }
                        default:
                            throw new ISE("spec[%s] is in an invalid state[%s]", new Object[]{poll.getSpec().getId(), lastState});
                    }
                }
            }
            log.info("Cleaning up resources", new Object[0]);
            this.taskCompleteEvents.clear();
            this.taskMonitor.stop();
            if (taskState != TaskState.SUCCESS) {
                log.info("This task is finished with [%s] state. Killing [%d] remaining subtasks.", new Object[]{taskState, Integer.valueOf(this.taskMonitor.getNumRunningTasks())});
                this.taskMonitor.killAll();
            }
            return taskState;
        } catch (Throwable th) {
            log.info("Cleaning up resources", new Object[0]);
            this.taskCompleteEvents.clear();
            this.taskMonitor.stop();
            if (taskState != TaskState.SUCCESS) {
                log.info("This task is finished with [%s] state. Killing [%d] remaining subtasks.", new Object[]{taskState, Integer.valueOf(this.taskMonitor.getNumRunningTasks())});
                this.taskMonitor.killAll();
            }
            throw th;
        }
    }

    private boolean isRunning() {
        return (this.stopped || Thread.currentThread().isInterrupted()) ? false : true;
    }

    @VisibleForTesting
    TaskToolbox getToolbox() {
        return this.toolbox;
    }

    @VisibleForTesting
    ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public void collectReport(PushedSegmentsReport pushedSegmentsReport) {
        this.segmentsMap.compute(pushedSegmentsReport.getTaskId(), (str, pushedSegmentsReport2) -> {
            if (pushedSegmentsReport2 != null) {
                Preconditions.checkState(pushedSegmentsReport2.getSegments().equals(pushedSegmentsReport.getSegments()), "task[%s] sent two or more reports and previous report[%s] is different from the current one[%s]", new Object[]{str, pushedSegmentsReport2, pushedSegmentsReport});
            }
            return pushedSegmentsReport;
        });
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public SinglePhaseParallelIndexingProgress getProgress() {
        return this.taskMonitor == null ? SinglePhaseParallelIndexingProgress.notRunning() : this.taskMonitor.getProgress();
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public Set<String> getRunningTaskIds() {
        return this.taskMonitor == null ? Collections.emptySet() : this.taskMonitor.getRunningTaskIds();
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public List<SubTaskSpec<ParallelIndexSubTask>> getSubTaskSpecs() {
        if (this.taskMonitor == null) {
            return Collections.emptyList();
        }
        List<SubTaskSpec<ParallelIndexSubTask>> runningSubTaskSpecs = this.taskMonitor.getRunningSubTaskSpecs();
        List<SubTaskSpec<ParallelIndexSubTask>> completeSubTaskSpecs = this.taskMonitor.getCompleteSubTaskSpecs();
        HashMap hashMap = new HashMap(runningSubTaskSpecs.size() + completeSubTaskSpecs.size());
        runningSubTaskSpecs.forEach(subTaskSpec -> {
        });
        completeSubTaskSpecs.forEach(subTaskSpec2 -> {
        });
        return new ArrayList(hashMap.values());
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public List<SubTaskSpec<ParallelIndexSubTask>> getRunningSubTaskSpecs() {
        return this.taskMonitor == null ? Collections.emptyList() : this.taskMonitor.getRunningSubTaskSpecs();
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public List<SubTaskSpec<ParallelIndexSubTask>> getCompleteSubTaskSpecs() {
        return this.taskMonitor == null ? Collections.emptyList() : this.taskMonitor.getCompleteSubTaskSpecs();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    @Nullable
    public SubTaskSpec<ParallelIndexSubTask> getSubTaskSpec(String str) {
        if (this.taskMonitor == null) {
            return null;
        }
        TaskMonitor<T>.MonitorEntry runningTaskMonitorEntry = this.taskMonitor.getRunningTaskMonitorEntry(str);
        TaskHistory<ParallelIndexSubTask> completeSubTaskSpecHistory = this.taskMonitor.getCompleteSubTaskSpecHistory(str);
        return runningTaskMonitorEntry != null ? runningTaskMonitorEntry.getSpec() : completeSubTaskSpecHistory != null ? completeSubTaskSpecHistory.getSpec() : null;
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    @Nullable
    public ParallelIndexTaskRunner.SubTaskSpecStatus getSubTaskState(String str) {
        if (this.taskMonitor == null) {
            return null;
        }
        TaskMonitor<T>.MonitorEntry runningTaskMonitorEntry = this.taskMonitor.getRunningTaskMonitorEntry(str);
        TaskHistory<ParallelIndexSubTask> completeSubTaskSpecHistory = this.taskMonitor.getCompleteSubTaskSpecHistory(str);
        return runningTaskMonitorEntry != null ? new ParallelIndexTaskRunner.SubTaskSpecStatus((ParallelIndexSubTaskSpec) runningTaskMonitorEntry.getSpec(), runningTaskMonitorEntry.getRunningStatus(), runningTaskMonitorEntry.getTaskHistory()) : (completeSubTaskSpecHistory == null || completeSubTaskSpecHistory.isEmpty()) ? null : new ParallelIndexTaskRunner.SubTaskSpecStatus((ParallelIndexSubTaskSpec) completeSubTaskSpecHistory.getSpec(), null, completeSubTaskSpecHistory.getAttemptHistory());
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    @Nullable
    public TaskHistory<ParallelIndexSubTask> getCompleteSubTaskSpecAttemptHistory(String str) {
        if (this.taskMonitor == null) {
            return null;
        }
        return this.taskMonitor.getCompleteSubTaskSpecHistory(str);
    }

    private void publish(TaskToolbox taskToolbox) throws IOException {
        TransactionalSegmentPublisher transactionalSegmentPublisher = (set, obj) -> {
            return (SegmentPublishResult) taskToolbox.getTaskActionClient().submit(new SegmentTransactionalInsertAction(set));
        };
        ActionBasedUsedSegmentChecker actionBasedUsedSegmentChecker = new ActionBasedUsedSegmentChecker(taskToolbox.getTaskActionClient());
        Set set2 = (Set) this.segmentsMap.values().stream().flatMap(pushedSegmentsReport -> {
            return pushedSegmentsReport.getSegments().stream();
        }).collect(Collectors.toSet());
        if (set2.isEmpty() || transactionalSegmentPublisher.publishSegments(set2, (Object) null).isSuccess()) {
            log.info("Published [%d] segments", new Object[]{Integer.valueOf(set2.size())});
            return;
        }
        log.info("Transaction failure while publishing segments, checking if someone else beat us to it.", new Object[0]);
        if (!actionBasedUsedSegmentChecker.findUsedSegments((Set) this.segmentsMap.values().stream().flatMap(pushedSegmentsReport2 -> {
            return pushedSegmentsReport2.getSegments().stream();
        }).map(SegmentIdWithShardSpec::fromDataSegment).collect(Collectors.toSet())).equals(set2)) {
            throw new ISE("Failed to publish segments[%s]", new Object[]{set2});
        }
        log.info("Our segments really do exist, awaiting handoff.", new Object[0]);
    }

    private void submitNewTask(TaskMonitor<ParallelIndexSubTask> taskMonitor, final ParallelIndexSubTaskSpec parallelIndexSubTaskSpec) {
        log.info("Submit a new task for spec[%s] and inputSplit[%s]", new Object[]{parallelIndexSubTaskSpec.getId(), parallelIndexSubTaskSpec.getInputSplit()});
        Futures.addCallback(taskMonitor.submit(parallelIndexSubTaskSpec), new FutureCallback<TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask>>() { // from class: org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner.1
            public void onSuccess(TaskMonitor.SubTaskCompleteEvent<ParallelIndexSubTask> subTaskCompleteEvent) {
                SinglePhaseParallelIndexTaskRunner.this.taskCompleteEvents.offer(subTaskCompleteEvent);
            }

            public void onFailure(Throwable th) {
                SinglePhaseParallelIndexTaskRunner.log.error(th, "Error while running a task for subTaskSpec[%s]", new Object[]{parallelIndexSubTaskSpec});
                SinglePhaseParallelIndexTaskRunner.this.taskCompleteEvents.offer(TaskMonitor.SubTaskCompleteEvent.fail(parallelIndexSubTaskSpec, th));
            }
        });
    }

    @VisibleForTesting
    int getAndIncrementNextSpecId() {
        int i = this.nextSpecId;
        this.nextSpecId = i + 1;
        return i;
    }

    @VisibleForTesting
    Stream<ParallelIndexSubTaskSpec> subTaskSpecIterator() throws IOException {
        return this.baseFirehoseFactory.getSplits().map(this::newTaskSpec);
    }

    @VisibleForTesting
    ParallelIndexSubTaskSpec newTaskSpec(InputSplit inputSplit) {
        return new ParallelIndexSubTaskSpec(this.taskId + "_" + getAndIncrementNextSpecId(), this.groupId, this.taskId, new ParallelIndexIngestionSpec(this.ingestionSchema.getDataSchema(), new ParallelIndexIOConfig(this.baseFirehoseFactory.withSplit(inputSplit), Boolean.valueOf(this.ingestionSchema.m33getIOConfig().isAppendToExisting())), this.ingestionSchema.m32getTuningConfig()), this.context, inputSplit);
    }

    private static List<InputSplit> getSplitsIfSplittable(FirehoseFactory firehoseFactory) throws IOException {
        if (firehoseFactory instanceof FiniteFirehoseFactory) {
            return (List) ((FiniteFirehoseFactory) firehoseFactory).getSplits().collect(Collectors.toList());
        }
        throw new ISE("firehoseFactory[%s] is not splittable", new Object[]{firehoseFactory.getClass().getSimpleName()});
    }
}
