package org.apache.druid.frame.processor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.ibm.icu.text.DateFormat;
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import it.unimi.dsi.fastutil.longs.LongRBTreeSet;
import it.unimi.dsi.fastutil.longs.LongSortedSet;
import java.io.File;
import java.io.IOException;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFileFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameFileChannel;
import org.apache.druid.frame.file.FrameFile;
import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CloseableUtils;

/* loaded from: input_file:org/apache/druid/frame/processor/SuperSorter.class */
public class SuperSorter {
    private static final Logger log = new Logger(SuperSorter.class);
    public static final int UNKNOWN_LEVEL = -1;
    public static final long UNKNOWN_TOTAL = -1;
    private final List<ReadableFrameChannel> inputChannels;
    private final FrameReader frameReader;
    private final ClusterBy clusterBy;
    private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture;
    private final FrameProcessorExecutor exec;
    private final File directory;
    private final OutputChannelFactory outputChannelFactory;
    private final Supplier<MemoryAllocator> innerFrameAllocatorMaker;
    private final int maxChannelsPerProcessor;
    private final int maxActiveProcessors;
    private final long rowLimit;
    private final String cancellationId;

    @GuardedBy("runWorkersLock")
    SuperSorterProgressTracker superSorterProgressTracker;
    private final Object runWorkersLock = new Object();

    @GuardedBy("runWorkersLock")
    private boolean batcherIsRunning = false;

    @GuardedBy("runWorkersLock")
    private IntSet inputChannelsToRead = new IntOpenHashSet();

    @GuardedBy("runWorkersLock")
    private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new Int2ObjectArrayMap();

    @GuardedBy("runWorkersLock")
    private List<OutputChannel> outputChannels = null;

    @GuardedBy("runWorkersLock")
    private int activeProcessors = 0;

    @GuardedBy("runWorkersLock")
    private long totalInputFrames = -1;

    @GuardedBy("runWorkersLock")
    private int totalMergingLevels = -1;

    @GuardedBy("runWorkersLock")
    private final Queue<Frame> inputBuffer = new ArrayDeque();

    @GuardedBy("runWorkersLock")
    private long inputFramesReadSoFar = 0;

    @GuardedBy("runWorkersLock")
    private long levelZeroMergersRunSoFar = 0;

    @GuardedBy("runWorkersLock")
    private int ultimateMergersRunSoFar = 0;

    @GuardedBy("runWorkersLock")
    private final Map<File, FrameFile> penultimateFrameFileCache = new HashMap();

    @GuardedBy("runWorkersLock")
    private SettableFuture<OutputChannels> allDone = null;

    @GuardedBy("runWorkersLock")
    private Runnable noWorkRunnable = null;

    public SuperSorter(List<ReadableFrameChannel> list, FrameReader frameReader, ClusterBy clusterBy, ListenableFuture<ClusterByPartitions> listenableFuture, FrameProcessorExecutor frameProcessorExecutor, File file, OutputChannelFactory outputChannelFactory, Supplier<MemoryAllocator> supplier, int i, int i2, long j, @Nullable String str, SuperSorterProgressTracker superSorterProgressTracker) {
        this.inputChannels = list;
        this.frameReader = frameReader;
        this.clusterBy = clusterBy;
        this.outputPartitionsFuture = listenableFuture;
        this.exec = frameProcessorExecutor;
        this.directory = file;
        this.outputChannelFactory = outputChannelFactory;
        this.innerFrameAllocatorMaker = supplier;
        this.maxChannelsPerProcessor = i2;
        this.maxActiveProcessors = i;
        this.rowLimit = j;
        this.cancellationId = str;
        this.superSorterProgressTracker = superSorterProgressTracker;
        for (int i3 = 0; i3 < list.size(); i3++) {
            this.inputChannelsToRead.add(i3);
        }
        if (i < 1) {
            throw new IAE("maxActiveProcessors[%d] < 1", Integer.valueOf(i));
        }
        if (i2 < 2) {
            throw new IAE("maxChannelsPerProcessor[%d] < 2", Integer.valueOf(i2));
        }
    }

    public ListenableFuture<OutputChannels> run() {
        ListenableFuture<OutputChannels> futureWithBaggage;
        synchronized (this.runWorkersLock) {
            if (this.allDone != null) {
                throw new ISE("Cannot run() more than once.", new Object[0]);
            }
            this.allDone = SettableFuture.create();
            runWorkersIfPossible();
            this.outputPartitionsFuture.addListener(() -> {
                synchronized (this.runWorkersLock) {
                    if (this.outputPartitionsFuture.isDone()) {
                        this.superSorterProgressTracker.setTotalMergersForUltimateLevel(getOutputPartitions().size());
                    }
                    runWorkersIfPossible();
                    setAllDoneIfPossible();
                }
            }, this.exec.getExecutorService());
            futureWithBaggage = FutureUtils.futureWithBaggage(this.allDone, () -> {
                synchronized (this.runWorkersLock) {
                    if (this.activeProcessors == 0) {
                        cleanUp();
                    }
                }
            });
        }
        return futureWithBaggage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void setNoWorkRunnable(Runnable runnable) {
        synchronized (this.runWorkersLock) {
            this.noWorkRunnable = runnable;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @GuardedBy("runWorkersLock")
    public void workerFinished() {
        this.activeProcessors--;
        if (log.isDebugEnabled()) {
            log.debug(stateString(), new Object[0]);
        }
        runWorkersIfPossible();
        setAllDoneIfPossible();
        if (isAllDone() && this.activeProcessors == 0) {
            cleanUp();
        }
    }

    @GuardedBy("runWorkersLock")
    private void runWorkersIfPossible() {
        if (isAllDone()) {
            return;
        }
        while (this.activeProcessors < this.maxActiveProcessors && (runNextUltimateMerger() || runNextMiddleMerger() || runNextLevelZeroMerger() || runNextBatcher())) {
            try {
                this.activeProcessors++;
                if (log.isDebugEnabled()) {
                    log.debug(stateString(), new Object[0]);
                }
            } catch (Throwable th) {
                this.allDone.setException(th);
                return;
            }
        }
        if (this.activeProcessors == 0 && this.noWorkRunnable != null) {
            log.debug("No active workers and no work left to start.", new Object[0]);
            this.noWorkRunnable.run();
        }
    }

    @GuardedBy("runWorkersLock")
    private void setAllDoneIfPossible() {
        if (this.totalInputFrames == 0 && this.outputPartitionsFuture.isDone()) {
            ClusterByPartitions outputPartitions = getOutputPartitions();
            ArrayList arrayList = new ArrayList(outputPartitions.size());
            for (int i = 0; i < outputPartitions.size(); i++) {
                arrayList.add(this.outputChannelFactory.openNilChannel(i));
            }
            this.allDone.set(OutputChannels.wrap(arrayList));
            return;
        }
        if (this.totalMergingLevels != -1 && this.outputsReadyByLevel.containsKey(this.totalMergingLevels - 1) && this.outputsReadyByLevel.get(this.totalMergingLevels - 1).size() == getOutputPartitions().size()) {
            try {
                this.allDone.set(OutputChannels.wrap(this.outputChannels));
            } catch (Throwable th) {
                this.allDone.setException(th);
            }
        }
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextBatcher() {
        if (this.batcherIsRunning || this.inputChannelsToRead.isEmpty()) {
            return false;
        }
        this.batcherIsRunning = true;
        runWorker(new FrameChannelBatcher(this.inputChannels, this.maxChannelsPerProcessor), pair -> {
            List list = (List) pair.lhs;
            IntSet intSet = (IntSet) pair.rhs;
            synchronized (this.runWorkersLock) {
                this.inputBuffer.addAll(list);
                this.inputFramesReadSoFar += list.size();
                this.inputChannelsToRead = intSet;
                if (this.inputChannelsToRead.isEmpty()) {
                    this.inputChannels.forEach((v0) -> {
                        v0.close();
                    });
                    setTotalInputFrames(this.inputFramesReadSoFar);
                    runWorkersIfPossible();
                } else if (this.inputBuffer.size() >= this.maxChannelsPerProcessor) {
                    runWorkersIfPossible();
                }
                this.batcherIsRunning = false;
            }
        });
        return true;
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextLevelZeroMerger() {
        Frame poll;
        if (this.inputBuffer.isEmpty()) {
            return false;
        }
        if (this.inputBuffer.size() < this.maxChannelsPerProcessor && !allInputRead()) {
            return false;
        }
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < this.maxChannelsPerProcessor && (poll = this.inputBuffer.poll()) != null) {
            arrayList.add(singleReadableFrameChannel(new FrameWithPartition(poll, -1)));
        }
        long j = this.levelZeroMergersRunSoFar;
        this.levelZeroMergersRunSoFar = j + 1;
        runMerger(0, j, arrayList, null);
        return true;
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextMiddleMerger() {
        ClusterByPartitions clusterByPartitions;
        for (int size = this.outputsReadyByLevel.size() - 1; size >= 0; size--) {
            int i = size + 1;
            long totalMergersInLevel = getTotalMergersInLevel(size);
            LongSortedSet longSortedSet = this.outputsReadyByLevel.get(size);
            if ((this.totalMergingLevels == -1 || i < this.totalMergingLevels - 1) && (this.totalMergingLevels != -1 || LongMath.divide(longSortedSet.size(), this.maxChannelsPerProcessor, RoundingMode.CEILING) > this.maxChannelsPerProcessor)) {
                if (this.totalMergingLevels == -1 || i != this.totalMergingLevels - 2) {
                    clusterByPartitions = null;
                } else if (this.outputPartitionsFuture.isDone()) {
                    clusterByPartitions = getOutputPartitions();
                } else {
                    continue;
                }
                LongBidirectionalIterator it2 = longSortedSet.iterator();
                long j = -1;
                long j2 = -1;
                while (it2.hasNext()) {
                    long nextLong = it2.nextLong();
                    if (nextLong % this.maxChannelsPerProcessor == 0) {
                        j = nextLong;
                        j2 = -1;
                    }
                    if (j >= 0) {
                        long j3 = nextLong - j;
                        if (j3 == j2 + 1 && (j3 == this.maxChannelsPerProcessor - 1 || (totalMergersInLevel != -1 && nextLong == totalMergersInLevel - 1))) {
                            ArrayList arrayList = new ArrayList();
                            long j4 = j;
                            while (true) {
                                long j5 = j4;
                                if (j5 >= j + this.maxChannelsPerProcessor) {
                                    runMerger(i, j / this.maxChannelsPerProcessor, arrayList, clusterByPartitions);
                                    return true;
                                }
                                if (longSortedSet.remove(j5)) {
                                    try {
                                        arrayList.add(new ReadableFileFrameChannel(FrameFile.open(mergerOutputFile(size, j5), FrameFile.Flag.DELETE_ON_CLOSE)));
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                }
                                j4 = j5 + 1;
                            }
                        } else if (nextLong == j + j2 + 1) {
                            j2++;
                        } else {
                            j = -1;
                            j2 = -1;
                        }
                    }
                }
            }
        }
        return false;
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextUltimateMerger() {
        if (this.totalMergingLevels == -1 || !this.outputPartitionsFuture.isDone() || this.ultimateMergersRunSoFar >= getOutputPartitions().size()) {
            return false;
        }
        int i = this.totalMergingLevels - 2;
        int i2 = i + 1;
        LongSortedSet longSortedSet = this.outputsReadyByLevel.get(i);
        if (longSortedSet == null) {
            return false;
        }
        int size = longSortedSet.size();
        if (size != getTotalMergersInLevel(i)) {
            return false;
        }
        ArrayList arrayList = new ArrayList(size);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= size) {
                break;
            }
            FrameFile newReference = this.penultimateFrameFileCache.computeIfAbsent(mergerOutputFile(i, j2), file -> {
                try {
                    return FrameFile.open(file, FrameFile.Flag.DELETE_ON_CLOSE);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).newReference();
            arrayList.add(new ReadableFileFrameChannel(newReference, newReference.getPartitionStartFrame(this.ultimateMergersRunSoFar), newReference.getPartitionStartFrame(this.ultimateMergersRunSoFar + 1)));
            j = j2 + 1;
        }
        if (this.outputChannels == null) {
            this.outputChannels = Arrays.asList(new OutputChannel[getOutputPartitions().size()]);
        }
        runMerger(i2, this.ultimateMergersRunSoFar, arrayList, null);
        this.ultimateMergersRunSoFar++;
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v20, types: [org.apache.druid.frame.channel.WritableFrameChannel] */
    @GuardedBy("runWorkersLock")
    private void runMerger(int i, long j, List<ReadableFrameChannel> list, @Nullable ClusterByPartitions clusterByPartitions) {
        MemoryAllocator memoryAllocator;
        WritableFrameFileChannel writableFrameFileChannel;
        try {
            if (this.totalMergingLevels == -1 || i != this.totalMergingLevels - 1) {
                memoryAllocator = this.innerFrameAllocatorMaker.get();
                writableFrameFileChannel = new WritableFrameFileChannel(FrameFileWriter.open(Files.newByteChannel(mergerOutputFile(i, j).toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE), ByteBuffer.allocate(Frame.compressionBufferSize(memoryAllocator.capacity()))));
            } else {
                int checkedCast = Ints.checkedCast(j);
                OutputChannel openChannel = this.outputChannelFactory.openChannel(checkedCast);
                this.outputChannels.set(checkedCast, openChannel.readOnly());
                writableFrameFileChannel = openChannel.getWritableChannel();
                memoryAllocator = openChannel.getFrameMemoryAllocator();
            }
            runWorker(new FrameChannelMerger(list, this.frameReader, writableFrameFileChannel, FrameWriters.makeFrameWriterFactory(FrameType.ROW_BASED, memoryAllocator, this.frameReader.signature(), Collections.emptyList()), this.clusterBy, clusterByPartitions, this.rowLimit), l -> {
                synchronized (this.runWorkersLock) {
                    this.outputsReadyByLevel.computeIfAbsent(i, i2 -> {
                        return new LongRBTreeSet();
                    }).add(j);
                    this.superSorterProgressTracker.addMergedBatchesForLevel(i, 1L);
                }
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private <T> void runWorker(FrameProcessor<T> frameProcessor, final Consumer<T> consumer) {
        Futures.addCallback(this.exec.runFully(frameProcessor, this.cancellationId), new FutureCallback<T>() { // from class: org.apache.druid.frame.processor.SuperSorter.1
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(T t) {
                try {
                    consumer.accept(t);
                    synchronized (SuperSorter.this.runWorkersLock) {
                        SuperSorter.this.workerFinished();
                    }
                } catch (Throwable th) {
                    synchronized (SuperSorter.this.runWorkersLock) {
                        SuperSorter.this.allDone.setException(th);
                    }
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                synchronized (SuperSorter.this.runWorkersLock) {
                    SuperSorter.this.allDone.setException(th);
                }
            }
        }, this.exec.getExecutorService());
    }

    @GuardedBy("runWorkersLock")
    private void setTotalInputFrames(long j) {
        this.totalInputFrames = j;
        if (j == 0) {
            this.superSorterProgressTracker.markTriviallyComplete();
        }
        long j2 = j;
        int i = 0;
        while (j2 > this.maxChannelsPerProcessor) {
            j2 = LongMath.divide(j2, this.maxChannelsPerProcessor, RoundingMode.CEILING);
            this.superSorterProgressTracker.setTotalMergersForLevel(i, j2);
            i++;
        }
        this.totalMergingLevels = Math.max(i + 1, 3);
        IntStream.range(i, this.totalMergingLevels).forEach(i2 -> {
            synchronized (this.runWorkersLock) {
                this.superSorterProgressTracker.setTotalMergersForLevel(i2, 1L);
            }
        });
        this.superSorterProgressTracker.setTotalMergingLevels(this.totalMergingLevels);
    }

    private ClusterByPartitions getOutputPartitions() {
        if (!this.outputPartitionsFuture.isDone()) {
            throw new ISE("Output partitions are not ready yet", new Object[0]);
        }
        try {
            return this.outputPartitionsFuture.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2);
        }
    }

    @GuardedBy("runWorkersLock")
    private long getTotalMergersInLevel(int i) {
        if (this.totalInputFrames == -1 || this.totalMergingLevels == -1) {
            return -1L;
        }
        if (i >= this.totalMergingLevels) {
            throw new ISE("Invalid level %d", Integer.valueOf(i));
        }
        if (i == this.totalMergingLevels - 1) {
            if (this.outputPartitionsFuture.isDone()) {
                return getOutputPartitions().size();
            }
            return -1L;
        }
        long j = this.totalInputFrames;
        for (int i2 = 0; i2 <= i; i2++) {
            j = LongMath.divide(j, this.maxChannelsPerProcessor, RoundingMode.CEILING);
        }
        return j;
    }

    @GuardedBy("runWorkersLock")
    private boolean allInputRead() {
        return this.totalInputFrames != -1;
    }

    @GuardedBy("runWorkersLock")
    private boolean isAllDone() {
        return this.allDone.isDone() || this.allDone.isCancelled();
    }

    @GuardedBy("runWorkersLock")
    private void cleanUp() {
        if (!isAllDone() || this.activeProcessors != 0) {
            throw new ISE("Improper cleanup", new Object[0]);
        }
        if (log.isDebugEnabled()) {
            log.debug(stateString(), new Object[0]);
        }
        this.outputsReadyByLevel.clear();
        this.inputBuffer.clear();
        for (FrameFile frameFile : this.penultimateFrameFileCache.values()) {
            CloseableUtils.closeAndSuppressExceptions(frameFile, th -> {
                log.warn(th, "Could not close intermediate file [%s]", frameFile.file());
            });
        }
        this.penultimateFrameFileCache.clear();
        if (!this.inputChannelsToRead.isEmpty()) {
            for (ReadableFrameChannel readableFrameChannel : this.inputChannels) {
                readableFrameChannel.getClass();
                CloseableUtils.closeAndSuppressExceptions(readableFrameChannel::close, th2 -> {
                    log.warn(th2, "Could not close input channel", new Object[0]);
                });
            }
            this.inputChannels.forEach((v0) -> {
                v0.close();
            });
        }
        this.inputChannelsToRead.clear();
    }

    private File mergerOutputFile(int i, long j) {
        return new File(this.directory, StringUtils.format("merged.%d.%d", Integer.valueOf(i), Long.valueOf(j)));
    }

    public String stateString() {
        String str;
        synchronized (this.runWorkersLock) {
            str = "frames-in=" + this.inputFramesReadSoFar + "/" + this.totalInputFrames + " frames-buffered=" + this.inputBuffer.size() + " lvls=" + this.totalMergingLevels + " parts=" + (this.outputPartitionsFuture.isDone() ? ((ClusterByPartitions) FutureUtils.getUncheckedImmediately(this.outputPartitionsFuture)).size() : -1) + " p=" + this.activeProcessors + "/" + this.maxActiveProcessors + " ch-pending=" + this.inputChannelsToRead + " to-merge=" + this.outputsReadyByLevel + " done=" + (isAllDone() ? DateFormat.YEAR : "n");
        }
        return str;
    }

    private static ReadableFrameChannel singleReadableFrameChannel(FrameWithPartition frameWithPartition) {
        try {
            BlockingQueueFrameChannel minimal = BlockingQueueFrameChannel.minimal();
            minimal.writable().write(frameWithPartition);
            minimal.writable().close();
            return minimal.readable();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
