package org.apache.druid.indexing.common.task.batch.parallel;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.stream.Stream;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionParallelIndexTaskRunner.class */
public class PartialDimensionDistributionParallelIndexTaskRunner extends InputSourceSplitParallelIndexTaskRunner<PartialDimensionDistributionTask, DimensionDistributionReport> {
    private static final Logger log = new Logger(PartialDimensionDistributionParallelIndexTaskRunner.class);
    private static final String PHASE_NAME = "partial dimension distribution";
    private final ExecutorService executor;
    private final Phaser allReportsProcessedPhaser;
    private final Map<Interval, Set<String>> intervalToTaskIds;
    private final File tempDistributionsDir;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartialDimensionDistributionParallelIndexTaskRunner(TaskToolbox taskToolbox, String str, String str2, String str3, ParallelIndexIngestionSpec parallelIndexIngestionSpec, Map<String, Object> map) {
        super(taskToolbox, str, str2, str3, parallelIndexIngestionSpec, map);
        this.executor = Execs.singleThreaded("DimDistributionWriter-%s");
        this.allReportsProcessedPhaser = new Phaser(1);
        this.intervalToTaskIds = new HashMap();
        this.tempDistributionsDir = createDistributionsDir();
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public String getName() {
        return PHASE_NAME;
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexPhaseRunner, org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public void collectReport(DimensionDistributionReport dimensionDistributionReport) {
        super.collectReport((PartialDimensionDistributionParallelIndexTaskRunner) new DimensionDistributionReport(dimensionDistributionReport.getTaskId(), null));
        if (this.executor.isShutdown()) {
            throw new ISE("Executor is already shutdown. Cannot process more reports.", new Object[0]);
        }
        this.allReportsProcessedPhaser.register();
        this.executor.submit(() -> {
            extractDistributionsFromReport(dimensionDistributionReport);
        });
    }

    public Map<Interval, PartitionBoundaries> getIntervalToPartitionBoundaries(DimensionRangePartitionsSpec dimensionRangePartitionsSpec) {
        waitToProcessPendingReports();
        if (getStopReason() != null) {
            throw new ISE("DimensionDistributionPhaseRunner has been stopped. %s", new Object[]{getStopReason()});
        }
        Set keySet = super.getReports().keySet();
        HashMap hashMap = new HashMap();
        this.intervalToTaskIds.forEach((interval, set) -> {
            File intervalDistributionDir = getIntervalDistributionDir(interval);
            StringSketchMerger stringSketchMerger = new StringSketchMerger();
            Stream stream = set.stream();
            Objects.requireNonNull(keySet);
            Stream map = stream.filter((v1) -> {
                return r1.contains(v1);
            }).map(str -> {
                return readDistributionFromFile(intervalDistributionDir, str);
            });
            Objects.requireNonNull(stringSketchMerger);
            map.forEach(stringSketchMerger::merge);
            StringDistribution result = stringSketchMerger.getResult();
            Integer targetRowsPerSegment = dimensionRangePartitionsSpec.getTargetRowsPerSegment();
            hashMap.put(interval, targetRowsPerSegment == null ? result.getEvenPartitionsByMaxSize(dimensionRangePartitionsSpec.getMaxRowsPerSegment().intValue()) : result.getEvenPartitionsByTargetSize(targetRowsPerSegment.intValue()));
        });
        cleanupDistributionsDir();
        return hashMap;
    }

    private void extractDistributionsFromReport(DimensionDistributionReport dimensionDistributionReport) {
        try {
            log.debug("Started writing distributions from task [%s]", new Object[]{dimensionDistributionReport.getTaskId()});
            Map<Interval, StringDistribution> intervalToDistribution = dimensionDistributionReport.getIntervalToDistribution();
            if (intervalToDistribution == null || intervalToDistribution.isEmpty()) {
                log.debug("No dimension distribution received from task [%s]", new Object[]{dimensionDistributionReport.getTaskId()});
            } else {
                intervalToDistribution.forEach((interval, stringDistribution) -> {
                    Set<String> computeIfAbsent = this.intervalToTaskIds.computeIfAbsent(interval, interval -> {
                        return new HashSet();
                    });
                    String taskId = dimensionDistributionReport.getTaskId();
                    if (computeIfAbsent.contains(taskId)) {
                        return;
                    }
                    writeDistributionToFile(interval, taskId, stringDistribution);
                    computeIfAbsent.add(taskId);
                });
                log.debug("Finished writing distributions from task [%s]", new Object[]{dimensionDistributionReport.getTaskId()});
            }
        } finally {
            this.allReportsProcessedPhaser.arriveAndDeregister();
        }
    }

    private void writeDistributionToFile(Interval interval, String str, StringDistribution stringDistribution) {
        try {
            File intervalDistributionDir = getIntervalDistributionDir(interval);
            FileUtils.mkdirp(intervalDistributionDir);
            getToolbox().getJsonMapper().writeValue(getDistributionJsonFile(intervalDistributionDir, str), stringDistribution);
        } catch (IOException e) {
            String format = StringUtils.format("Exception while writing distribution file for interval [%s], task [%s]", new Object[]{interval, str});
            stopGracefully(format);
            throw new ISE(e, format, new Object[0]);
        }
    }

    private StringDistribution readDistributionFromFile(File file, String str) {
        try {
            return (StringDistribution) getToolbox().getJsonMapper().readValue(getDistributionJsonFile(file, str), StringDistribution.class);
        } catch (IOException e) {
            throw new ISE(e, "Error while reading distribution for interval [%s], task [%s]", new Object[]{file.getName(), str});
        }
    }

    private File getIntervalDistributionDir(Interval interval) {
        return new File(this.tempDistributionsDir, toIntervalString(interval));
    }

    private File getDistributionJsonFile(File file, String str) {
        return new File(file, str);
    }

    private void waitToProcessPendingReports() {
        log.info("Waiting to extract distributions from sub-task reports.", new Object[0]);
        try {
            this.allReportsProcessedPhaser.arriveAndAwaitAdvance();
            this.executor.shutdownNow();
        } catch (Exception e) {
            throw new ISE(e, "Exception while waiting to extract distributions.", new Object[0]);
        }
    }

    private File createDistributionsDir() {
        File file = new File(getToolbox().getConfig().getTaskTempDir(getTaskId()), "dimension_distributions");
        try {
            FileUtils.mkdirp(file);
            return file;
        } catch (IOException e) {
            throw new ISE(e, "Could not create temp distribution directory.", new Object[0]);
        }
    }

    private void cleanupDistributionsDir() {
        try {
            FileUtils.deleteDirectory(this.tempDistributionsDir);
        } catch (IOException e) {
            log.warn(e, "Could not delete temp distribution directory.", new Object[0]);
        }
    }

    private String toIntervalString(Interval interval) {
        return new DateTime(interval.getStartMillis(), interval.getChronology()) + "_" + new DateTime(interval.getEndMillis(), interval.getChronology());
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.InputSourceSplitParallelIndexTaskRunner
    SubTaskSpec<PartialDimensionDistributionTask> createSubTaskSpec(final String str, String str2, String str3, Map<String, Object> map, InputSplit inputSplit, final ParallelIndexIngestionSpec parallelIndexIngestionSpec) {
        return new SubTaskSpec<PartialDimensionDistributionTask>(str, str2, str3, map, inputSplit) { // from class: org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionParallelIndexTaskRunner.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec
            public PartialDimensionDistributionTask newSubTask(int i) {
                return new PartialDimensionDistributionTask(null, getGroupId(), null, getSupervisorTaskId(), str, i, parallelIndexIngestionSpec, getContext());
            }
        };
    }
}
