package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.druid.utils.CollectionUtils;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.class */
public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implements ChatHandler {
    public static final String TYPE = "index_parallel";
    private static final Logger LOG = new Logger(ParallelIndexSupervisorTask.class);
    private static final String TASK_PHASE_FAILURE_MSG = "Failed in phase[%s]. See task logs for details.";
    private static final long DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE = 7;
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final String baseSubtaskSpecName;
    private final InputSource baseInputSource;
    private final boolean missingIntervalsInOverwriteMode;
    private final long awaitSegmentAvailabilityTimeoutMillis;
    private AuthorizerMapper authorizerMapper;
    private volatile CurrentSubTaskHolder currentSubTaskHolder;
    private volatile TaskToolbox toolbox;
    private volatile Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats;
    private IngestionState ingestionState;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask$Partition.class */
    public static class Partition {
        final Interval interval;
        final int bucketId;

        /* JADX INFO: Access modifiers changed from: private */
        public static Partition fromStat(PartitionStat partitionStat) {
            return new Partition(partitionStat.getInterval(), partitionStat.getBucketId());
        }

        Partition(Interval interval, int i) {
            this.interval = interval;
            this.bucketId = i;
        }

        public int getBucketId() {
            return this.bucketId;
        }

        public Interval getInterval() {
            return this.interval;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Partition partition = (Partition) obj;
            return getBucketId() == partition.getBucketId() && Objects.equals(getInterval(), partition.getInterval());
        }

        public int hashCode() {
            return Objects.hash(getInterval(), Integer.valueOf(getBucketId()));
        }

        public String toString() {
            return "Partition{interval=" + this.interval + ", bucketId=" + this.bucketId + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask$PartitionReport.class */
    public static class PartitionReport {
        private final PartitionStat partitionStat;
        private final String subTaskId;

        PartitionReport(String str, PartitionStat partitionStat) {
            this.subTaskId = str;
            this.partitionStat = partitionStat;
        }

        String getSubTaskId() {
            return this.subTaskId;
        }

        PartitionStat getPartitionStat() {
            return this.partitionStat;
        }
    }

    @JsonCreator
    public ParallelIndexSupervisorTask(@JsonProperty("id") String str, @JsonProperty("groupId") @Nullable String str2, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") ParallelIndexIngestionSpec parallelIndexIngestionSpec, @JsonProperty("context") Map<String, Object> map) {
        this(str, str2, taskResource, parallelIndexIngestionSpec, null, map);
    }

    public ParallelIndexSupervisorTask(String str, @Nullable String str2, TaskResource taskResource, ParallelIndexIngestionSpec parallelIndexIngestionSpec, @Nullable String str3, Map<String, Object> map) {
        super(getOrMakeId(str, TYPE, parallelIndexIngestionSpec.getDataSchema().getDataSource()), str2, taskResource, parallelIndexIngestionSpec.getDataSchema().getDataSource(), map, parallelIndexIngestionSpec.m44getTuningConfig().getMaxAllowedLockCount(), computeBatchIngestionMode(parallelIndexIngestionSpec.m45getIOConfig()));
        this.ingestionSchema = parallelIndexIngestionSpec;
        this.baseSubtaskSpecName = str3 == null ? getId() : str3;
        if (getIngestionMode() == AbstractTask.IngestionMode.REPLACE && parallelIndexIngestionSpec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
            throw new ISE("GranularitySpec's intervals cannot be empty when using replace.", new Object[0]);
        }
        if (isGuaranteedRollup(getIngestionMode(), parallelIndexIngestionSpec.m44getTuningConfig())) {
            checkPartitionsSpecForForceGuaranteedRollup(parallelIndexIngestionSpec.m44getTuningConfig().getGivenOrDefaultPartitionsSpec());
        }
        this.baseInputSource = parallelIndexIngestionSpec.m45getIOConfig().getNonNullInputSource();
        this.missingIntervalsInOverwriteMode = getIngestionMode() != AbstractTask.IngestionMode.APPEND && parallelIndexIngestionSpec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
        if (this.missingIntervalsInOverwriteMode) {
            addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
        }
        this.awaitSegmentAvailabilityTimeoutMillis = parallelIndexIngestionSpec.m44getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
        this.ingestionState = IngestionState.NOT_STARTED;
    }

    private static void checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec) {
        if (partitionsSpec.isForceGuaranteedRollupCompatible()) {
            return;
        }
        throw new ISE("forceGuaranteedRollup is incompatible with partitionsSpec: " + partitionsSpec.getForceGuaranteedRollupIncompatiblityReason(), new Object[0]);
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public String getType() {
        return TYPE;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    @Nonnull
    @JsonIgnore
    public Set<ResourceAction> getInputSourceResources() {
        if (getIngestionSchema().m45getIOConfig().getFirehoseFactory() != null) {
            throw getInputSecurityOnFirehoseUnsupportedError();
        }
        return getIngestionSchema().m45getIOConfig().getInputSource() != null ? (Set) getIngestionSchema().m45getIOConfig().getInputSource().getTypes().stream().map(str -> {
            return new ResourceAction(new Resource(str, "EXTERNAL"), Action.READ);
        }).collect(Collectors.toSet()) : ImmutableSet.of();
    }

    @JsonProperty("spec")
    public ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @VisibleForTesting
    @Nullable
    ParallelIndexTaskRunner getCurrentRunner() {
        if (!isParallelMode() || this.currentSubTaskHolder == null) {
            return null;
        }
        return (ParallelIndexTaskRunner) this.currentSubTaskHolder.getTask();
    }

    @VisibleForTesting
    @Nullable
    <T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(TaskToolbox taskToolbox, Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> function) {
        ParallelIndexTaskRunner<T, R> apply = function.apply(taskToolbox);
        if (this.currentSubTaskHolder.setTask(apply)) {
            return apply;
        }
        return null;
    }

    private static TaskState runNextPhase(@Nullable ParallelIndexTaskRunner parallelIndexTaskRunner) throws Exception {
        if (parallelIndexTaskRunner != null) {
            return parallelIndexTaskRunner.run();
        }
        LOG.info("Task is asked to stop. Finish as failed", new Object[0]);
        return TaskState.FAILED;
    }

    @VisibleForTesting
    SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox taskToolbox) {
        return new SinglePhaseParallelIndexTaskRunner(taskToolbox, getId(), getGroupId(), this.baseSubtaskSpecName, this.ingestionSchema, getContext());
    }

    @VisibleForTesting
    PartialDimensionCardinalityParallelIndexTaskRunner createPartialDimensionCardinalityRunner(TaskToolbox taskToolbox) {
        return new PartialDimensionCardinalityParallelIndexTaskRunner(taskToolbox, getId(), getGroupId(), this.baseSubtaskSpecName, this.ingestionSchema, getContext());
    }

    @VisibleForTesting
    PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(TaskToolbox taskToolbox, ParallelIndexIngestionSpec parallelIndexIngestionSpec, @Nullable Map<Interval, Integer> map) {
        return new PartialHashSegmentGenerateParallelIndexTaskRunner(taskToolbox, getId(), getGroupId(), this.baseSubtaskSpecName, parallelIndexIngestionSpec, getContext(), map);
    }

    @VisibleForTesting
    PartialDimensionDistributionParallelIndexTaskRunner createPartialDimensionDistributionRunner(TaskToolbox taskToolbox) {
        return new PartialDimensionDistributionParallelIndexTaskRunner(taskToolbox, getId(), getGroupId(), this.baseSubtaskSpecName, this.ingestionSchema, getContext());
    }

    @VisibleForTesting
    PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGenerateRunner(TaskToolbox taskToolbox, Map<Interval, PartitionBoundaries> map, ParallelIndexIngestionSpec parallelIndexIngestionSpec) {
        return new PartialRangeSegmentGenerateParallelIndexTaskRunner(taskToolbox, getId(), getGroupId(), this.baseSubtaskSpecName, parallelIndexIngestionSpec, getContext(), map);
    }

    @VisibleForTesting
    PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(TaskToolbox taskToolbox, List<PartialSegmentMergeIOConfig> list, ParallelIndexIngestionSpec parallelIndexIngestionSpec) {
        return new PartialGenericSegmentMergeParallelIndexTaskRunner(taskToolbox, getId(), getGroupId(), this.baseSubtaskSpecName, parallelIndexIngestionSpec.getDataSchema(), list, parallelIndexIngestionSpec.m44getTuningConfig(), getContext());
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        return determineLockGranularityAndTryLock(taskActionClient, this.ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals());
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> list) throws IOException {
        return findInputSegments(getDataSource(), taskActionClient, list);
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public boolean requireLockExistingSegments() {
        return getIngestionMode() != AbstractTask.IngestionMode.APPEND;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public boolean isPerfectRollup() {
        return isGuaranteedRollup(getIngestionMode(), getIngestionSchema().m44getTuningConfig());
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    @Nullable
    public Granularity getSegmentGranularity() {
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        if (granularitySpec instanceof ArbitraryGranularitySpec) {
            return null;
        }
        return granularitySpec.getSegmentGranularity();
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask
    public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
        if (this.ingestionSchema.m44getTuningConfig().getMaxSavedParseExceptions() != 0) {
            LOG.warn("maxSavedParseExceptions is not supported yet", new Object[0]);
        }
        if (this.ingestionSchema.m44getTuningConfig().getMaxParseExceptions() != Integer.MAX_VALUE) {
            LOG.warn("maxParseExceptions is not supported yet", new Object[0]);
        }
        if (this.ingestionSchema.m44getTuningConfig().isLogParseExceptions()) {
            LOG.warn("logParseExceptions is not supported yet", new Object[0]);
        }
        if (this.missingIntervalsInOverwriteMode) {
            LOG.warn("Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. Forced to use timeChunk lock.", new Object[0]);
        }
        LOG.debug("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider) Preconditions.checkNotNull(taskToolbox.getChatHandlerProvider(), "chatHandlerProvider")).getClass().getName()});
        this.authorizerMapper = taskToolbox.getAuthorizerMapper();
        taskToolbox.getChatHandlerProvider().register(getId(), this, false);
        addToContextIfAbsent(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, false);
        try {
            initializeSubTaskCleaner();
            if (isParallelMode()) {
                emitMetric(taskToolbox.getEmitter(), "ingest/count", 1);
                this.toolbox = taskToolbox;
                return isGuaranteedRollup(getIngestionMode(), this.ingestionSchema.m44getTuningConfig()) ? runMultiPhaseParallel(taskToolbox) : runSinglePhaseParallel(taskToolbox);
            }
            if (!this.baseInputSource.isSplittable()) {
                LOG.warn("firehoseFactory[%s] is not splittable. Running sequentially.", new Object[]{this.baseInputSource.getClass().getSimpleName()});
            } else {
                if (this.ingestionSchema.m44getTuningConfig().getMaxNumConcurrentSubTasks() > 1) {
                    throw new ISE("Unknown reason for sequentail mode. Failing this task.", new Object[0]);
                }
                LOG.warn("maxNumConcurrentSubTasks[%s] is less than or equal to 1. Running sequentially. Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel ingestion mode.", new Object[]{Integer.valueOf(this.ingestionSchema.m44getTuningConfig().getMaxNumConcurrentSubTasks())});
            }
            return runSequential(taskToolbox);
        } finally {
            this.ingestionState = IngestionState.COMPLETED;
            taskToolbox.getChatHandlerProvider().unregister(getId());
        }
    }

    private void initializeSubTaskCleaner() {
        if (isParallelMode()) {
            this.currentSubTaskHolder = new CurrentSubTaskHolder((obj, taskConfig) -> {
                ((ParallelIndexTaskRunner) obj).stopGracefully(null);
            });
        } else {
            this.currentSubTaskHolder = new CurrentSubTaskHolder((obj2, taskConfig2) -> {
                ((IndexTask) obj2).stopGracefully(taskConfig2);
            });
        }
        registerResourceCloserOnAbnormalExit(this.currentSubTaskHolder);
    }

    public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig parallelIndexTuningConfig) {
        if (null == parallelIndexTuningConfig) {
            return false;
        }
        return inputSource.isSplittable() && parallelIndexTuningConfig.getMaxNumConcurrentSubTasks() >= (useRangePartitions(parallelIndexTuningConfig) ? 1 : 2);
    }

    private static boolean useRangePartitions(ParallelIndexTuningConfig parallelIndexTuningConfig) {
        return parallelIndexTuningConfig.getGivenOrDefaultPartitionsSpec() instanceof DimensionRangePartitionsSpec;
    }

    private boolean isParallelMode() {
        return isParallelMode(this.baseInputSource, this.ingestionSchema.m44getTuningConfig());
    }

    private void waitForSegmentAvailability(Map<String, PushedSegmentsReport> map) {
        ArrayList arrayList = new ArrayList();
        map.values().forEach(pushedSegmentsReport -> {
            arrayList.addAll(pushedSegmentsReport.getNewSegments());
        });
        waitForSegmentAvailability(this.toolbox, arrayList, this.awaitSegmentAvailabilityTimeoutMillis);
    }

    private TaskStatus runSinglePhaseParallel(TaskToolbox taskToolbox) throws Exception {
        TaskStatus failure;
        this.ingestionState = IngestionState.BUILD_SEGMENTS;
        ParallelIndexTaskRunner createRunner = createRunner(taskToolbox, this::createSinglePhaseTaskRunner);
        TaskState runNextPhase = runNextPhase(createRunner);
        if (runNextPhase.isSuccess()) {
            publishSegments(taskToolbox, createRunner.getReports());
            if (this.awaitSegmentAvailabilityTimeoutMillis > 0) {
                waitForSegmentAvailability(createRunner.getReports());
            }
            failure = TaskStatus.success(getId());
        } else {
            Preconditions.checkState(runNextPhase.isFailure(), "Unrecognized state after task is complete[%s]", runNextPhase);
            failure = TaskStatus.failure(getId(), createRunner.getStopReason() != null ? createRunner.getStopReason() : StringUtils.format(TASK_PHASE_FAILURE_MSG, new Object[]{createRunner.getName()}));
        }
        taskToolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports(failure, this.segmentAvailabilityConfirmationCompleted));
        return failure;
    }

    private TaskStatus runMultiPhaseParallel(TaskToolbox taskToolbox) throws Exception {
        return useRangePartitions(this.ingestionSchema.m44getTuningConfig()) ? runRangePartitionMultiPhaseParallel(taskToolbox) : runHashPartitionMultiPhaseParallel(taskToolbox);
    }

    private static ParallelIndexIngestionSpec rewriteIngestionSpecWithIntervalsIfMissing(ParallelIndexIngestionSpec parallelIndexIngestionSpec, Collection<Interval> collection) {
        return parallelIndexIngestionSpec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty() ? parallelIndexIngestionSpec.withDataSchema(parallelIndexIngestionSpec.getDataSchema().withGranularitySpec(parallelIndexIngestionSpec.getDataSchema().getGranularitySpec().withIntervals(new ArrayList(collection)))) : parallelIndexIngestionSpec;
    }

    @VisibleForTesting
    TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox taskToolbox) throws Exception {
        Map<Interval, Integer> map;
        TaskStatus failure;
        ParallelIndexIngestionSpec parallelIndexIngestionSpec = this.ingestionSchema;
        if (!(this.ingestionSchema.m44getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
            throw new ISE("forceGuaranteedRollup is set but partitionsSpec [%s] is not a single_dim or hash partition spec.", new Object[]{this.ingestionSchema.m44getTuningConfig().getPartitionsSpec()});
        }
        HashedPartitionsSpec partitionsSpec = this.ingestionSchema.m44getTuningConfig().getPartitionsSpec();
        if (partitionsSpec.getNumShards() == null || parallelIndexIngestionSpec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
            LOG.info("Needs to determine intervals or numShards, beginning %s phase.", new Object[]{PartialDimensionCardinalityTask.TYPE});
            ParallelIndexTaskRunner createRunner = createRunner(taskToolbox, this::createPartialDimensionCardinalityRunner);
            if (runNextPhase(createRunner).isFailure()) {
                return TaskStatus.failure(getId(), StringUtils.format(TASK_PHASE_FAILURE_MSG, new Object[]{createRunner.getName()}));
            }
            if (createRunner.getReports().isEmpty()) {
                LOG.warn("No valid rows for hash partitioning. All rows may have invalid timestamps or have been filtered out.", new Object[0]);
                return TaskStatus.success(getId(), "No valid rows for hash partitioning. All rows may have invalid timestamps or have been filtered out.");
            }
            if (partitionsSpec.getNumShards() == null) {
                int intValue = partitionsSpec.getMaxRowsPerSegment() == null ? 5000000 : partitionsSpec.getMaxRowsPerSegment().intValue();
                LOG.info("effective maxRowsPerSegment is: " + intValue, new Object[0]);
                map = determineNumShardsFromCardinalityReport(createRunner.getReports().values(), intValue);
                LOG.debug("intervalToNumShards: %s", new Object[]{map.toString()});
            } else {
                map = CollectionUtils.mapValues(mergeCardinalityReports(createRunner.getReports().values()), union -> {
                    return partitionsSpec.getNumShards();
                });
            }
            parallelIndexIngestionSpec = rewriteIngestionSpecWithIntervalsIfMissing(parallelIndexIngestionSpec, map.keySet());
        } else {
            map = null;
        }
        ParallelIndexIngestionSpec parallelIndexIngestionSpec2 = parallelIndexIngestionSpec;
        Map<Interval, Integer> map2 = map;
        ParallelIndexTaskRunner<?, ?> createRunner2 = createRunner(taskToolbox, taskToolbox2 -> {
            return createPartialHashSegmentGenerateRunner(taskToolbox, parallelIndexIngestionSpec2, map2);
        });
        if (runNextPhase(createRunner2).isFailure()) {
            return TaskStatus.failure(getId(), StringUtils.format(TASK_PHASE_FAILURE_MSG, new Object[]{createRunner2.getName()}));
        }
        this.indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(createRunner2, true);
        List<PartialSegmentMergeIOConfig> createGenericMergeIOConfigs = createGenericMergeIOConfigs(this.ingestionSchema.m44getTuningConfig().getTotalNumMergeTasks(), getPartitionToLocations(createRunner2.getReports()));
        ParallelIndexIngestionSpec parallelIndexIngestionSpec3 = parallelIndexIngestionSpec;
        ParallelIndexTaskRunner createRunner3 = createRunner(taskToolbox, taskToolbox3 -> {
            return createPartialGenericSegmentMergeRunner(taskToolbox3, createGenericMergeIOConfigs, parallelIndexIngestionSpec3);
        });
        TaskState runNextPhase = runNextPhase(createRunner3);
        if (runNextPhase.isSuccess()) {
            publishSegments(taskToolbox, createRunner3.getReports());
            if (this.awaitSegmentAvailabilityTimeoutMillis > 0) {
                waitForSegmentAvailability(createRunner3.getReports());
            }
            failure = TaskStatus.success(getId());
        } else {
            Preconditions.checkState(runNextPhase.isFailure(), "Unrecognized state after task is complete[%s]", runNextPhase);
            failure = TaskStatus.failure(getId(), StringUtils.format(TASK_PHASE_FAILURE_MSG, new Object[]{createRunner3.getName()}));
        }
        taskToolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports(failure, this.segmentAvailabilityConfirmationCompleted));
        return failure;
    }

    @VisibleForTesting
    TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox taskToolbox) throws Exception {
        String str;
        TaskStatus failure;
        ParallelIndexIngestionSpec parallelIndexIngestionSpec = this.ingestionSchema;
        PartialDimensionDistributionParallelIndexTaskRunner partialDimensionDistributionParallelIndexTaskRunner = (PartialDimensionDistributionParallelIndexTaskRunner) createRunner(taskToolbox, this::createPartialDimensionDistributionRunner);
        if (runNextPhase(partialDimensionDistributionParallelIndexTaskRunner).isFailure()) {
            return TaskStatus.failure(getId(), StringUtils.format(TASK_PHASE_FAILURE_MSG, new Object[]{partialDimensionDistributionParallelIndexTaskRunner.getName()}));
        }
        try {
            Map<Interval, PartitionBoundaries> intervalToPartitionBoundaries = partialDimensionDistributionParallelIndexTaskRunner.getIntervalToPartitionBoundaries((DimensionRangePartitionsSpec) this.ingestionSchema.m44getTuningConfig().getGivenOrDefaultPartitionsSpec());
            if (intervalToPartitionBoundaries.isEmpty()) {
                LOG.warn("No valid rows for range partitioning. All rows may have invalid timestamps or multiple dimension values.", new Object[0]);
                return TaskStatus.success(getId(), "No valid rows for range partitioning. All rows may have invalid timestamps or multiple dimension values.");
            }
            ParallelIndexIngestionSpec rewriteIngestionSpecWithIntervalsIfMissing = rewriteIngestionSpecWithIntervalsIfMissing(parallelIndexIngestionSpec, intervalToPartitionBoundaries.keySet());
            ParallelIndexTaskRunner<?, ?> createRunner = createRunner(taskToolbox, taskToolbox2 -> {
                return createPartialRangeSegmentGenerateRunner(taskToolbox2, intervalToPartitionBoundaries, rewriteIngestionSpecWithIntervalsIfMissing);
            });
            if (runNextPhase(createRunner).isFailure()) {
                return TaskStatus.failure(getId(), StringUtils.format(TASK_PHASE_FAILURE_MSG, new Object[]{createRunner.getName()}));
            }
            this.indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(createRunner, true);
            List<PartialSegmentMergeIOConfig> createGenericMergeIOConfigs = createGenericMergeIOConfigs(this.ingestionSchema.m44getTuningConfig().getTotalNumMergeTasks(), getPartitionToLocations(createRunner.getReports()));
            ParallelIndexTaskRunner createRunner2 = createRunner(taskToolbox, taskToolbox3 -> {
                return createPartialGenericSegmentMergeRunner(taskToolbox3, createGenericMergeIOConfigs, rewriteIngestionSpecWithIntervalsIfMissing);
            });
            TaskState runNextPhase = runNextPhase(createRunner2);
            if (runNextPhase.isSuccess()) {
                publishSegments(taskToolbox, createRunner2.getReports());
                if (this.awaitSegmentAvailabilityTimeoutMillis > 0) {
                    waitForSegmentAvailability(createRunner2.getReports());
                }
                failure = TaskStatus.success(getId());
            } else {
                Preconditions.checkState(runNextPhase.isFailure(), "Unrecognized state after task is complete[%s]", runNextPhase);
                failure = TaskStatus.failure(getId(), StringUtils.format(TASK_PHASE_FAILURE_MSG, new Object[]{createRunner2.getName()}));
            }
            taskToolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports(failure, this.segmentAvailabilityConfirmationCompleted));
            return failure;
        } catch (Exception e) {
            str = "Error creating partition boundaries.";
            str = partialDimensionDistributionParallelIndexTaskRunner.getStopReason() != null ? str + " " + partialDimensionDistributionParallelIndexTaskRunner.getStopReason() : "Error creating partition boundaries.";
            LOG.error(e, str, new Object[0]);
            return TaskStatus.failure(getId(), str);
        }
    }

    private static Map<Interval, Union> mergeCardinalityReports(Collection<DimensionCardinalityReport> collection) {
        HashMap hashMap = new HashMap();
        collection.forEach(dimensionCardinalityReport -> {
            for (Map.Entry<Interval, byte[]> entry : dimensionCardinalityReport.getIntervalToCardinalities().entrySet()) {
                ((Union) hashMap.computeIfAbsent(entry.getKey(), interval -> {
                    return new Union(11);
                })).update(HllSketch.wrap(Memory.wrap(entry.getValue())));
            }
        });
        return hashMap;
    }

    @VisibleForTesting
    public static Map<Interval, Integer> determineNumShardsFromCardinalityReport(Collection<DimensionCardinalityReport> collection, int i) {
        return computeIntervalToNumShards(i, mergeCardinalityReports(collection));
    }

    @VisibleForTesting
    @Nonnull
    static Map<Interval, Integer> computeIntervalToNumShards(int i, Map<Interval, Union> map) {
        return CollectionUtils.mapValues(map, union -> {
            long round;
            double estimate = union.getEstimate();
            if (estimate <= 0.0d) {
                round = 7;
                LOG.warn("Estimated cardinality for union of estimates is zero or less: %.2f, setting num shards to %d", new Object[]{Double.valueOf(estimate), Long.valueOf(DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE)});
            } else {
                round = Math.round(estimate / i);
            }
            LOG.info("estimatedNumShards %d given estimated cardinality %.2f and maxRowsPerSegment %d", new Object[]{Long.valueOf(round), Double.valueOf(estimate), Integer.valueOf(i)});
            if (round == 1) {
                LOG.info("estimatedNumShards is ONE (%d) given estimated cardinality %.2f and maxRowsPerSegment %d", new Object[]{Long.valueOf(round), Double.valueOf(estimate), Integer.valueOf(i)});
            }
            try {
                return Integer.valueOf(Math.max(Math.toIntExact(round), 1));
            } catch (ArithmeticException e) {
                throw new ISE("Estimated numShards [%s] exceeds integer bounds.", new Object[]{Long.valueOf(round)});
            }
        });
    }

    static Map<Partition, List<PartitionLocation>> getPartitionToLocations(Map<String, GeneratedPartitionsReport> map) {
        TreeMap treeMap = new TreeMap(Comparator.comparingLong(partition -> {
            return partition.getInterval().getStartMillis();
        }).thenComparingLong(partition2 -> {
            return partition2.getInterval().getEndMillis();
        }).thenComparingInt((v0) -> {
            return v0.getBucketId();
        }));
        map.forEach((str, generatedPartitionsReport) -> {
            generatedPartitionsReport.getPartitionStats().forEach(partitionStat -> {
                ((List) treeMap.computeIfAbsent(Partition.fromStat(partitionStat), partition3 -> {
                    return new ArrayList();
                })).add(new PartitionReport(str, partitionStat));
            });
        });
        HashMap hashMap = new HashMap();
        Interval interval = null;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (Map.Entry entry : treeMap.entrySet()) {
            Partition partition3 = (Partition) entry.getKey();
            Interval interval2 = partition3.getInterval();
            if (!interval2.equals(interval)) {
                atomicInteger.set(0);
                interval = interval2;
            }
            List list = (List) entry.getValue();
            BuildingShardSpec convert = ((PartitionReport) list.get(0)).getPartitionStat().getSecondaryPartition().convert(atomicInteger.getAndIncrement());
            hashMap.put(partition3, (List) list.stream().map(partitionReport -> {
                return partitionReport.getPartitionStat().toPartitionLocation(partitionReport.getSubTaskId(), convert);
            }).collect(Collectors.toList()));
        }
        return hashMap;
    }

    private static List<PartialSegmentMergeIOConfig> createGenericMergeIOConfigs(int i, Map<Partition, List<PartitionLocation>> map) {
        return createMergeIOConfigs(i, map, PartialSegmentMergeIOConfig::new);
    }

    @VisibleForTesting
    static <M extends PartialSegmentMergeIOConfig, L extends PartitionLocation> List<M> createMergeIOConfigs(int i, Map<Partition, List<L>> map, Function<List<L>, M> function) {
        int min = Math.min(i, map.size());
        LOG.info("Number of merge tasks is set to [%d] based on totalNumMergeTasks[%d] and number of partitions[%d]", new Object[]{Integer.valueOf(min), Integer.valueOf(i), Integer.valueOf(map.size())});
        ArrayList arrayList = new ArrayList(map.keySet());
        Collections.shuffle(arrayList, ThreadLocalRandom.current());
        ArrayList arrayList2 = new ArrayList(min);
        for (int i2 = 0; i2 < min; i2++) {
            Pair<Integer, Integer> partitionBoundaries = getPartitionBoundaries(i2, arrayList.size(), min);
            arrayList2.add(function.apply((List) arrayList.subList(((Integer) partitionBoundaries.lhs).intValue(), ((Integer) partitionBoundaries.rhs).intValue()).stream().flatMap(partition -> {
                return ((List) map.get(partition)).stream();
            }).collect(Collectors.toList())));
        }
        return arrayList2;
    }

    private static Pair<Integer, Integer> getPartitionBoundaries(int i, int i2, int i3) {
        int i4 = i2 / i3;
        int i5 = i2 % i3;
        int i6 = (i * i4) + (i < i5 ? i : i5);
        return Pair.of(Integer.valueOf(i6), Integer.valueOf(i6 + i4 + (i < i5 ? 1 : 0)));
    }

    private void publishSegments(TaskToolbox taskToolbox, Map<String, PushedSegmentsReport> map) throws IOException {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        map.values().forEach(pushedSegmentsReport -> {
            hashSet.addAll(pushedSegmentsReport.getOldSegments());
            hashSet2.addAll(pushedSegmentsReport.getNewSegments());
        });
        Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = compactionStateAnnotateFunction(((Boolean) getContextValue(Tasks.STORE_COMPACTION_STATE_KEY, false)).booleanValue(), taskToolbox, this.ingestionSchema);
        Set<DataSegment> emptySet = Collections.emptySet();
        if (getIngestionMode() == AbstractTask.IngestionMode.REPLACE) {
            TombstoneHelper tombstoneHelper = new TombstoneHelper(taskToolbox.getTaskActionClient());
            List<Interval> computeTombstoneIntervals = tombstoneHelper.computeTombstoneIntervals(hashSet2, this.ingestionSchema.getDataSchema());
            if (!computeTombstoneIntervals.isEmpty()) {
                HashMap hashMap = new HashMap();
                for (Interval interval : computeTombstoneIntervals) {
                    hashMap.put(interval, allocateNewSegmentForTombstone(this.ingestionSchema, interval.getStart()));
                }
                emptySet = tombstoneHelper.computeTombstones(this.ingestionSchema.getDataSchema(), hashMap);
                hashSet2.addAll(emptySet);
                LOG.debugSegments(emptySet, "To publish tombstones");
            }
        }
        TaskLockType lockTypeToUse = getTaskLockHelper().getLockTypeToUse();
        TransactionalSegmentPublisher transactionalSegmentPublisher = (set, set2, obj) -> {
            return (SegmentPublishResult) taskToolbox.getTaskActionClient().submit(buildPublishAction(set, set2, lockTypeToUse));
        };
        if (!(hashSet2.isEmpty() || transactionalSegmentPublisher.publishSegments(hashSet, hashSet2, compactionStateAnnotateFunction, (Object) null).isSuccess())) {
            throw new ISE("Failed to publish segments", new Object[0]);
        }
        LOG.info("Published [%d] segments", new Object[]{Integer.valueOf(hashSet2.size())});
        emitMetric(taskToolbox.getEmitter(), "ingest/tombstones/count", Integer.valueOf(emptySet.size()));
        emitMetric(taskToolbox.getEmitter(), "ingest/segments/count", Integer.valueOf(hashSet2.size()));
    }

    private TaskStatus runSequential(TaskToolbox taskToolbox) throws Exception {
        IndexTask indexTask = new IndexTask(getId(), getGroupId(), getTaskResource(), getDataSource(), this.baseSubtaskSpecName, new IndexTask.IndexIngestionSpec(getIngestionSchema().getDataSchema(), getIngestionSchema().m45getIOConfig(), convertToIndexTuningConfig(getIngestionSchema().m44getTuningConfig())), getContext(), getIngestionSchema().m44getTuningConfig().getMaxAllowedLockCount(), false);
        if (this.currentSubTaskHolder.setTask(indexTask) && indexTask.isReady(taskToolbox.getTaskActionClient())) {
            return indexTask.run(taskToolbox);
        }
        LOG.info("Task was asked to stop. Finish as failed", new Object[0]);
        return TaskStatus.failure(getId(), "Task was asked to stop. Finish as failed");
    }

    private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus, boolean z) {
        Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents = doGetRowStatsAndUnparseableEvents("true", true);
        return TaskReport.buildTaskReports(new IngestionStatsAndErrorsTaskReport(getId(), new IngestionStatsAndErrorsTaskReportData(IngestionState.COMPLETED, (Map) doGetRowStatsAndUnparseableEvents.rhs, (Map) doGetRowStatsAndUnparseableEvents.lhs, taskStatus.getErrorMsg(), z, this.segmentAvailabilityWaitTimeMs)));
    }

    private static IndexTask.IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig parallelIndexTuningConfig) {
        return new IndexTask.IndexTuningConfig(null, null, parallelIndexTuningConfig.getAppendableIndexSpec(), Integer.valueOf(parallelIndexTuningConfig.getMaxRowsInMemory()), Long.valueOf(parallelIndexTuningConfig.getMaxBytesInMemory()), Boolean.valueOf(parallelIndexTuningConfig.isSkipBytesInMemoryOverheadCheck()), null, null, null, null, parallelIndexTuningConfig.getPartitionsSpec(), parallelIndexTuningConfig.getIndexSpec(), parallelIndexTuningConfig.getIndexSpecForIntermediatePersists(), Integer.valueOf(parallelIndexTuningConfig.getMaxPendingPersists()), Boolean.valueOf(parallelIndexTuningConfig.isForceGuaranteedRollup()), Boolean.valueOf(parallelIndexTuningConfig.isReportParseExceptions()), null, Long.valueOf(parallelIndexTuningConfig.getPushTimeout()), parallelIndexTuningConfig.getSegmentWriteOutMediumFactory(), Boolean.valueOf(parallelIndexTuningConfig.isLogParseExceptions()), Integer.valueOf(parallelIndexTuningConfig.getMaxParseExceptions()), Integer.valueOf(parallelIndexTuningConfig.getMaxSavedParseExceptions()), Integer.valueOf(parallelIndexTuningConfig.getMaxColumnsToMerge()), Long.valueOf(parallelIndexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis()));
    }

    @Path("/segment/allocate")
    @Consumes({"application/x-jackson-smile"})
    @POST
    @Produces({"application/x-jackson-smile"})
    public Response allocateSegment(Object obj, @Context HttpServletRequest httpServletRequest) {
        SegmentIdWithShardSpec allocateNewSegment;
        ChatHandlers.authorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        if (this.toolbox == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        ParallelIndexTaskRunner parallelIndexTaskRunner = (ParallelIndexTaskRunner) Preconditions.checkNotNull(getCurrentRunner(), "runner");
        if (!(parallelIndexTaskRunner instanceof SinglePhaseParallelIndexTaskRunner)) {
            throw new ISE("Expected [%s], but [%s] is in use", new Object[]{SinglePhaseParallelIndexTaskRunner.class.getName(), parallelIndexTaskRunner.getClass().getName()});
        }
        try {
            if (((Boolean) Preconditions.checkNotNull((Boolean) getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY), "useLineageBasedSegmentAllocation in taskContext")).booleanValue()) {
                SegmentAllocationRequest segmentAllocationRequest = (SegmentAllocationRequest) this.toolbox.getJsonMapper().convertValue(obj, SegmentAllocationRequest.class);
                allocateNewSegment = ((SinglePhaseParallelIndexTaskRunner) parallelIndexTaskRunner).allocateNewSegment(getDataSource(), segmentAllocationRequest.getTimestamp(), segmentAllocationRequest.getSequenceName(), segmentAllocationRequest.getPrevSegmentId());
            } else {
                allocateNewSegment = ((SinglePhaseParallelIndexTaskRunner) parallelIndexTaskRunner).allocateNewSegment(getDataSource(), (DateTime) this.toolbox.getJsonMapper().convertValue(obj, DateTime.class));
            }
            return Response.ok(this.toolbox.getJsonMapper().writeValueAsBytes(allocateNewSegment)).build();
        } catch (IOException | IllegalStateException e) {
            return Response.serverError().entity(Throwables.getStackTraceAsString(e)).build();
        } catch (IllegalArgumentException e2) {
            return Response.status(Response.Status.BAD_REQUEST).entity(Throwables.getStackTraceAsString(e2)).build();
        } catch (MaxAllowedLocksExceededException e3) {
            getCurrentRunner().stopGracefully(e3.getMessage());
            return Response.status(Response.Status.BAD_REQUEST).entity(e3.getMessage()).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static InputFormat getInputFormat(ParallelIndexIngestionSpec parallelIndexIngestionSpec) {
        return parallelIndexIngestionSpec.m45getIOConfig().getNonNullInputFormat();
    }

    @POST
    @Path("/report")
    @Consumes({"application/x-jackson-smile"})
    public Response report(SubTaskReport subTaskReport, @Context HttpServletRequest httpServletRequest) {
        ChatHandlers.authorizationCheck(httpServletRequest, Action.WRITE, getDataSource(), this.authorizerMapper);
        if (this.currentSubTaskHolder == null || this.currentSubTaskHolder.getTask() == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        ((ParallelIndexTaskRunner) this.currentSubTaskHolder.getTask()).collectReport(subTaskReport);
        return Response.ok().build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/mode")
    public Response getMode(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/phase")
    public Response getPhaseName(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        if (!isParallelMode()) {
            return Response.status(Response.Status.BAD_REQUEST).entity("task is running in the sequential mode").build();
        }
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        return currentRunner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running").build() : Response.ok(currentRunner.getName()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/progress")
    public Response getProgress(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        return currentRunner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(currentRunner.getProgress()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtasks/running")
    public Response getRunningTasks(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        return currentRunner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(currentRunner.getRunningTaskIds()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspecs")
    public Response getSubTaskSpecs(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        return currentRunner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(currentRunner.getSubTaskSpecs()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspecs/running")
    public Response getRunningSubTaskSpecs(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        return currentRunner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(currentRunner.getRunningSubTaskSpecs()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspecs/complete")
    public Response getCompleteSubTaskSpecs(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        return currentRunner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(currentRunner.getCompleteSubTaskSpecs()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspec/{id}")
    public Response getSubTaskSpec(@PathParam("id") String str, @Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        if (currentRunner == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        SubTaskSpec subTaskSpec = currentRunner.getSubTaskSpec(str);
        return subTaskSpec == null ? Response.status(Response.Status.NOT_FOUND).build() : Response.ok(subTaskSpec).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspec/{id}/state")
    public Response getSubTaskState(@PathParam("id") String str, @Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        if (currentRunner == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        ParallelIndexTaskRunner.SubTaskSpecStatus subTaskState = currentRunner.getSubTaskState(str);
        return subTaskState == null ? Response.status(Response.Status.NOT_FOUND).build() : Response.ok(subTaskState).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspec/{id}/history")
    public Response getCompleteSubTaskSpecAttemptHistory(@PathParam("id") String str, @Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = getCurrentRunner();
        if (currentRunner == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        TaskHistory completeSubTaskSpecAttemptHistory = currentRunner.getCompleteSubTaskSpecAttemptHistory(str);
        return completeSubTaskSpecAttemptHistory == null ? Response.status(Response.Status.NOT_FOUND).build() : Response.ok(completeSubTaskSpecAttemptHistory.getAttemptHistory()).build();
    }

    private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object obj) {
        if (obj instanceof RowIngestionMetersTotals) {
            return (RowIngestionMetersTotals) obj;
        }
        if (!(obj instanceof Map)) {
            throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + obj.getClass());
        }
        Map map = (Map) obj;
        return new RowIngestionMetersTotals(((Number) map.get("processed")).longValue(), ((Number) map.get("processedBytes")).longValue(), ((Number) map.get("processedWithError")).longValue(), ((Number) map.get("thrownAway")).longValue(), ((Number) map.get("unparseable")).longValue());
    }

    private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(SinglePhaseParallelIndexTaskRunner singlePhaseParallelIndexTaskRunner, boolean z) {
        SimpleRowIngestionMeters simpleRowIngestionMeters = new SimpleRowIngestionMeters();
        ArrayList arrayList = new ArrayList();
        for (PushedSegmentsReport pushedSegmentsReport : singlePhaseParallelIndexTaskRunner.getReports().values()) {
            Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
            if (taskReport == null || taskReport.isEmpty()) {
                LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId(), new Object[0]);
            } else {
                simpleRowIngestionMeters.addRowIngestionMetersTotals(getBuildSegmentsStatsFromTaskReport(taskReport, z, arrayList));
            }
        }
        simpleRowIngestionMeters.addRowIngestionMetersTotals(getRowStatsAndUnparseableEventsForRunningTasks(singlePhaseParallelIndexTaskRunner.getRunningTaskIds(), arrayList, z));
        return createStatsAndErrorsReport(simpleRowIngestionMeters.getTotals(), arrayList);
    }

    private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(ParallelIndexTaskRunner<?, ?> parallelIndexTaskRunner, boolean z) {
        if (this.indexGenerateRowStats != null) {
            return Pair.of((Map) this.indexGenerateRowStats.lhs, z ? (Map) this.indexGenerateRowStats.rhs : ImmutableMap.of());
        }
        if (!parallelIndexTaskRunner.getName().equals("partial segment generation")) {
            return Pair.of(ImmutableMap.of(), ImmutableMap.of());
        }
        Map<String, ?> reports = parallelIndexTaskRunner.getReports();
        SimpleRowIngestionMeters simpleRowIngestionMeters = new SimpleRowIngestionMeters();
        ArrayList arrayList = new ArrayList();
        Iterator<?> it = reports.values().iterator();
        while (it.hasNext()) {
            GeneratedPartitionsReport generatedPartitionsReport = (GeneratedPartitionsReport) it.next();
            Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
            if (taskReport == null || taskReport.isEmpty()) {
                LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId(), new Object[0]);
            } else {
                simpleRowIngestionMeters.addRowIngestionMetersTotals(getBuildSegmentsStatsFromTaskReport(taskReport, true, arrayList));
            }
        }
        simpleRowIngestionMeters.addRowIngestionMetersTotals(getRowStatsAndUnparseableEventsForRunningTasks(parallelIndexTaskRunner.getRunningTaskIds(), arrayList, z));
        return createStatsAndErrorsReport(simpleRowIngestionMeters.getTotals(), arrayList);
    }

    private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(Set<String> set, List<ParseExceptionReport> list, boolean z) {
        SimpleRowIngestionMeters simpleRowIngestionMeters = new SimpleRowIngestionMeters();
        for (String str : set) {
            try {
                Map<String, Object> taskReport = getTaskReport(this.toolbox.getOverlordClient(), str);
                if (taskReport != null && !taskReport.isEmpty()) {
                    Map map = (Map) ((Map) taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)).get("payload");
                    Map map2 = (Map) ((Map) ((Map) map.get("rowStats")).get("totals")).get("buildSegments");
                    if (z) {
                        list.addAll((List) ((Map) map.get("unparseableEvents")).get("buildSegments"));
                    }
                    simpleRowIngestionMeters.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(map2));
                }
            } catch (Exception e) {
                LOG.warn(e, "Encountered exception when getting live subtask report for task: " + str, new Object[0]);
            }
        }
        return simpleRowIngestionMeters.getTotals();
    }

    private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(RowIngestionMetersTotals rowIngestionMetersTotals, List<ParseExceptionReport> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("buildSegments", rowIngestionMetersTotals);
        hashMap.put("totals", hashMap2);
        return Pair.of(hashMap, ImmutableMap.of("buildSegments", list));
    }

    private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(Map<String, TaskReport> map, boolean z, List<ParseExceptionReport> list) {
        IngestionStatsAndErrorsTaskReportData ingestionStatsAndErrorsTaskReportData = (IngestionStatsAndErrorsTaskReportData) ((IngestionStatsAndErrorsTaskReport) map.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)).getPayload();
        RowIngestionMetersTotals totalsFromBuildSegmentsRowStats = getTotalsFromBuildSegmentsRowStats(ingestionStatsAndErrorsTaskReportData.getRowStats().get("buildSegments"));
        if (z) {
            list.addAll((List) ingestionStatsAndErrorsTaskReportData.getUnparseableEvents().get("buildSegments"));
        }
        return totalsFromBuildSegmentsRowStats;
    }

    private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(String str, boolean z) {
        Object task;
        if (this.currentSubTaskHolder != null && (task = this.currentSubTaskHolder.getTask()) != null) {
            if (isParallelMode()) {
                return isGuaranteedRollup(getIngestionMode(), this.ingestionSchema.m44getTuningConfig()) ? doGetRowStatsAndUnparseableEventsParallelMultiPhase((ParallelIndexTaskRunner) task, z) : doGetRowStatsAndUnparseableEventsParallelSinglePhase((SinglePhaseParallelIndexTaskRunner) task, z);
            }
            IndexTask indexTask = (IndexTask) task;
            return Pair.of(indexTask.doGetRowStats(str), indexTask.doGetUnparseableEvents(str));
        }
        return Pair.of(ImmutableMap.of(), ImmutableMap.of());
    }

    @GET
    @Produces({"application/json"})
    @Path("/rowStats")
    public Response getRowStats(@Context HttpServletRequest httpServletRequest, @QueryParam("full") String str) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return Response.ok(doGetRowStatsAndUnparseableEvents(str, false).lhs).build();
    }

    @VisibleForTesting
    public Map<String, Object> doGetLiveReports(String str) {
        IngestionState ingestionState;
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents = doGetRowStatsAndUnparseableEvents(str, true);
        if (isParallelMode()) {
            ingestionState = this.ingestionState;
        } else {
            IndexTask indexTask = (IndexTask) this.currentSubTaskHolder.getTask();
            ingestionState = indexTask == null ? this.ingestionState : indexTask.getIngestionState();
        }
        hashMap3.put("ingestionState", ingestionState);
        hashMap3.put("unparseableEvents", doGetRowStatsAndUnparseableEvents.rhs);
        hashMap3.put("rowStats", doGetRowStatsAndUnparseableEvents.lhs);
        hashMap2.put("taskId", getId());
        hashMap2.put("payload", hashMap3);
        hashMap2.put(TaskAction.TYPE_FIELD, IngestionStatsAndErrorsTaskReport.REPORT_KEY);
        hashMap.put(IngestionStatsAndErrorsTaskReport.REPORT_KEY, hashMap2);
        return hashMap;
    }

    @GET
    @Produces({"application/json"})
    @Path("/liveReports")
    public Response getLiveReports(@Context HttpServletRequest httpServletRequest, @QueryParam("full") String str) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return Response.ok(doGetLiveReports(str)).build();
    }

    @VisibleForTesting
    @Nullable
    static Map<String, Object> getTaskReport(OverlordClient overlordClient, String str) throws InterruptedException, ExecutionException {
        try {
            return (Map) FutureUtils.get(overlordClient.taskReportAsMap(str), true);
        } catch (ExecutionException e) {
            if ((e.getCause() instanceof HttpResponseException) && e.getCause().getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
                return null;
            }
            throw e;
        }
    }
}
