package org.apache.druid.frame.processor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import it.unimi.dsi.fastutil.longs.LongRBTreeSet;
import it.unimi.dsi.fastutil.longs.LongSortedSet;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.RoundingMode;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CloseableUtils;

/* loaded from: input_file:org/apache/druid/frame/processor/SuperSorter.class */
public class SuperSorter {
    private static final Logger log = new Logger(SuperSorter.class);
    public static final long UNLIMITED = -1;
    public static final int UNKNOWN_LEVEL = -1;
    public static final long UNKNOWN_TOTAL = -1;
    private final List<ReadableFrameChannel> inputChannels;
    private final FrameReader frameReader;
    private final List<KeyColumn> sortKey;
    private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture;
    private final FrameProcessorExecutor exec;
    private final FrameProcessorDecorator processorDecorator;
    private final OutputChannelFactory outputChannelFactory;
    private final OutputChannelFactory intermediateOutputChannelFactory;
    private final int maxChannelsPerMerger;
    private final int maxActiveProcessors;
    private final String cancellationId;
    private final boolean removeNullBytes;

    @GuardedBy("runWorkersLock")
    SuperSorterProgressTracker superSorterProgressTracker;

    @GuardedBy("runWorkersLock")
    private long rowLimit;
    private final Object runWorkersLock = new Object();

    @GuardedBy("runWorkersLock")
    private boolean batcherIsRunning = false;

    @GuardedBy("runWorkersLock")
    private IntSet inputChannelsToRead = new IntOpenHashSet();

    @GuardedBy("runWorkersLock")
    private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new Int2ObjectArrayMap();

    @GuardedBy("runWorkersLock")
    private List<OutputChannel> outputChannels = null;

    @GuardedBy("runWorkersLock")
    private final Map<String, PartitionedOutputChannel> levelAndRankToReadableChannelMap = new HashMap();

    @GuardedBy("runWorkersLock")
    private int activeProcessors = 0;

    @GuardedBy("runWorkersLock")
    private long totalInputFrames = -1;

    @GuardedBy("runWorkersLock")
    private int totalMergingLevels = -1;

    @GuardedBy("runWorkersLock")
    private final Queue<Frame> inputBuffer = new ArrayDeque();

    @GuardedBy("runWorkersLock")
    private long inputFramesReadSoFar = 0;

    @GuardedBy("runWorkersLock")
    private long levelZeroMergersRunSoFar = 0;

    @GuardedBy("runWorkersLock")
    private int ultimateMergersRunSoFar = 0;

    @GuardedBy("runWorkersLock")
    private SettableFuture<OutputChannels> allDone = null;

    @GuardedBy("runWorkersLock")
    private Runnable noWorkRunnable = null;

    public SuperSorter(List<ReadableFrameChannel> list, FrameReader frameReader, List<KeyColumn> list2, ListenableFuture<ClusterByPartitions> listenableFuture, FrameProcessorExecutor frameProcessorExecutor, FrameProcessorDecorator frameProcessorDecorator, OutputChannelFactory outputChannelFactory, OutputChannelFactory outputChannelFactory2, int i, int i2, long j, @Nullable String str, SuperSorterProgressTracker superSorterProgressTracker, boolean z) {
        this.inputChannels = list;
        this.frameReader = frameReader;
        this.sortKey = list2;
        this.outputPartitionsFuture = listenableFuture;
        this.exec = frameProcessorExecutor;
        this.processorDecorator = frameProcessorDecorator;
        this.outputChannelFactory = outputChannelFactory;
        this.intermediateOutputChannelFactory = outputChannelFactory2;
        this.maxChannelsPerMerger = i2;
        this.maxActiveProcessors = i;
        this.rowLimit = j;
        this.cancellationId = str;
        this.superSorterProgressTracker = superSorterProgressTracker;
        this.removeNullBytes = z;
        for (int i3 = 0; i3 < list.size(); i3++) {
            this.inputChannelsToRead.add(i3);
        }
        if (i < 1) {
            throw new IAE("maxActiveProcessors[%d] < 1", Integer.valueOf(i));
        }
        if (i2 < 2) {
            throw new IAE("maxChannelsPerMerger[%d] < 2", Integer.valueOf(i2));
        }
        if (j != -1 && j <= 0) {
            throw new IAE("rowLimit[%d] must be positive", Long.valueOf(j));
        }
    }

    public ListenableFuture<OutputChannels> run() {
        ListenableFuture<OutputChannels> futureWithBaggage;
        synchronized (this.runWorkersLock) {
            if (this.allDone != null) {
                throw new ISE("Cannot run() more than once.", new Object[0]);
            }
            this.allDone = SettableFuture.create();
            runWorkersIfPossible();
            this.outputPartitionsFuture.addListener(() -> {
                synchronized (this.runWorkersLock) {
                    if (this.outputPartitionsFuture.isDone()) {
                        this.superSorterProgressTracker.setTotalMergersForUltimateLevel(getOutputPartitions().size());
                    }
                    runWorkersIfPossible();
                    setAllDoneIfPossible();
                }
            }, this.exec.asExecutor(this.cancellationId));
            futureWithBaggage = FutureUtils.futureWithBaggage(this.allDone, () -> {
                synchronized (this.runWorkersLock) {
                    if (this.activeProcessors == 0) {
                        cleanUp();
                    }
                }
            });
        }
        return futureWithBaggage;
    }

    @VisibleForTesting
    void setNoWorkRunnable(Runnable runnable) {
        synchronized (this.runWorkersLock) {
            this.noWorkRunnable = runnable;
        }
    }

    @GuardedBy("runWorkersLock")
    private void workerFinished() {
        this.activeProcessors--;
        if (log.isDebugEnabled()) {
            log.debug(stateString(), new Object[0]);
        }
        runWorkersIfPossible();
        setAllDoneIfPossible();
        if (isAllDone() && this.activeProcessors == 0) {
            cleanUp();
        }
    }

    @GuardedBy("runWorkersLock")
    private void runWorkersIfPossible() {
        if (isAllDone()) {
            return;
        }
        setTotalMergingLevelsIfPossible();
        while (this.activeProcessors < this.maxActiveProcessors && (runNextDirectMerger() || runNextUltimateMerger() || runNextMiddleMerger() || runNextLevelZeroMerger() || runNextBatcher())) {
            try {
                this.activeProcessors++;
                if (log.isDebugEnabled()) {
                    log.debug(stateString(), new Object[0]);
                }
            } catch (Throwable th) {
                this.allDone.setException(th);
                return;
            }
        }
        if (this.activeProcessors == 0 && this.noWorkRunnable != null) {
            log.debug("No active workers and no work left to start.", new Object[0]);
            this.noWorkRunnable.run();
        }
    }

    @GuardedBy("runWorkersLock")
    private void setAllDoneIfPossible() {
        if (isAllDone()) {
            return;
        }
        try {
            if (this.totalInputFrames == 0 && this.outputPartitionsFuture.isDone()) {
                ClusterByPartitions outputPartitions = getOutputPartitions();
                ArrayList arrayList = new ArrayList(outputPartitions.size());
                for (int i = 0; i < outputPartitions.size(); i++) {
                    arrayList.add(this.outputChannelFactory.openNilChannel(i));
                }
                this.allDone.set(OutputChannels.wrap(arrayList));
            } else if (this.rowLimit == 0 && this.activeProcessors == 0) {
                for (int i2 = 0; i2 < this.outputChannels.size(); i2++) {
                    if (this.outputChannels.get(i2) == null) {
                        this.outputChannels.set(i2, this.outputChannelFactory.openNilChannel(i2));
                        this.superSorterProgressTracker.addMergedBatchesForLevel(this.totalMergingLevels - 1, 1L);
                    }
                }
                this.allDone.set(OutputChannels.wrap(this.outputChannels));
            } else if (this.totalMergingLevels != -1 && this.outputsReadyByLevel.containsKey(this.totalMergingLevels - 1) && ((LongSortedSet) this.outputsReadyByLevel.get(this.totalMergingLevels - 1)).size() == getTotalMergersInLevel(this.totalMergingLevels - 1)) {
                this.allDone.set(OutputChannels.wrap(this.outputChannels));
            }
        } catch (Throwable th) {
            this.allDone.setException(th);
        }
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextBatcher() {
        if (this.batcherIsRunning || this.inputChannelsToRead.isEmpty()) {
            return false;
        }
        this.batcherIsRunning = true;
        runWorker(new FrameChannelBatcher(this.inputChannels, this.maxChannelsPerMerger), pair -> {
            List list = (List) pair.lhs;
            IntSet intSet = (IntSet) pair.rhs;
            synchronized (this.runWorkersLock) {
                this.inputBuffer.addAll(list);
                this.inputFramesReadSoFar += list.size();
                this.inputChannelsToRead = intSet;
                if (this.inputChannelsToRead.isEmpty()) {
                    this.inputChannels.forEach((v0) -> {
                        v0.close();
                    });
                    setTotalInputFrames(this.inputFramesReadSoFar);
                    setTotalMergingLevelsIfPossible();
                    runWorkersIfPossible();
                } else if (this.inputBuffer.size() >= this.maxChannelsPerMerger) {
                    runWorkersIfPossible();
                }
                this.batcherIsRunning = false;
            }
        });
        return true;
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextDirectMerger() {
        if (this.totalMergingLevels != 1 || !allInputRead() || this.ultimateMergersRunSoFar >= getTotalMergersInLevel(0)) {
            return false;
        }
        if (isLimited() && (this.rowLimit == 0 || this.activeProcessors > 0)) {
            return false;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Frame> it = this.inputBuffer.iterator();
        while (it.hasNext()) {
            arrayList.add(singleReadableFrameChannel(new FrameWithPartition(it.next(), -1)));
        }
        int i = this.ultimateMergersRunSoFar;
        this.ultimateMergersRunSoFar = i + 1;
        runMerger(0, i, arrayList, Collections.emptyList());
        return true;
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextLevelZeroMerger() {
        Frame poll;
        if (this.totalMergingLevels == 1 || this.inputBuffer.isEmpty()) {
            return false;
        }
        if (this.totalMergingLevels == -1 && this.inputBuffer.size() < getMaxInputBufferFramesForDirectMerging()) {
            return false;
        }
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < this.maxChannelsPerMerger && (poll = this.inputBuffer.poll()) != null) {
            arrayList.add(singleReadableFrameChannel(new FrameWithPartition(poll, -1)));
        }
        long j = this.levelZeroMergersRunSoFar;
        this.levelZeroMergersRunSoFar = j + 1;
        runMerger(0, j, arrayList, ImmutableList.of());
        return true;
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextMiddleMerger() {
        int i;
        for (int size = this.outputsReadyByLevel.size() - 1; size >= 0; size--) {
            int i2 = size + 1;
            long totalMergersInLevel = getTotalMergersInLevel(size);
            LongSortedSet longSortedSet = (LongSortedSet) this.outputsReadyByLevel.get(size);
            if ((this.totalMergingLevels == -1 || i2 < this.totalMergingLevels - 1) && (this.totalMergingLevels != -1 || LongMath.divide(longSortedSet.size(), this.maxChannelsPerMerger, RoundingMode.CEILING) > this.maxChannelsPerMerger)) {
                if (this.totalMergingLevels == -1 || i2 != this.totalMergingLevels - 2) {
                    i = this.maxChannelsPerMerger;
                } else if (this.outputPartitionsFuture.isDone()) {
                    i = Ints.checkedCast(LongMath.divide(totalMergersInLevel, getTotalMergersInLevel(i2), RoundingMode.CEILING));
                } else {
                    continue;
                }
                LongBidirectionalIterator it = longSortedSet.iterator();
                long j = -1;
                long j2 = -1;
                while (it.hasNext()) {
                    long nextLong = it.nextLong();
                    if (nextLong % i == 0) {
                        j = nextLong;
                        j2 = -1;
                    }
                    if (j >= 0) {
                        long j3 = nextLong - j;
                        if (j3 == j2 + 1 && (j3 == i - 1 || (totalMergersInLevel != -1 && nextLong == totalMergersInLevel - 1))) {
                            ArrayList arrayList = new ArrayList();
                            ArrayList arrayList2 = new ArrayList();
                            long j4 = j;
                            while (true) {
                                long j5 = j4;
                                if (j5 >= j + i) {
                                    runMerger(i2, j / i, arrayList, arrayList2);
                                    return true;
                                }
                                if (longSortedSet.remove(j5)) {
                                    PartitionedReadableFrameChannel partitionedReadableFrameChannel = this.levelAndRankToReadableChannelMap.remove(mergerOutputFileName(size, j5)).getReadableChannelSupplier().get();
                                    arrayList.add(partitionedReadableFrameChannel.getReadableFrameChannel(0));
                                    arrayList2.add(partitionedReadableFrameChannel);
                                }
                                j4 = j5 + 1;
                            }
                        } else if (nextLong == j + j2 + 1) {
                            j2++;
                        } else {
                            j = -1;
                            j2 = -1;
                        }
                    }
                }
            }
        }
        return false;
    }

    @GuardedBy("runWorkersLock")
    private boolean runNextUltimateMerger() {
        if (this.totalMergingLevels == -1 || this.totalMergingLevels < 2 || !this.outputPartitionsFuture.isDone() || this.ultimateMergersRunSoFar >= getOutputPartitions().size()) {
            return false;
        }
        if (isLimited() && (this.rowLimit == 0 || this.activeProcessors > 0)) {
            return false;
        }
        int i = this.totalMergingLevels - 2;
        int i2 = i + 1;
        LongSortedSet longSortedSet = (LongSortedSet) this.outputsReadyByLevel.get(i);
        if (longSortedSet == null) {
            return false;
        }
        int size = longSortedSet.size();
        if (size != getTotalMergersInLevel(i)) {
            return false;
        }
        ArrayList arrayList = new ArrayList(size);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= size) {
                int i3 = this.ultimateMergersRunSoFar;
                this.ultimateMergersRunSoFar = i3 + 1;
                runMerger(i2, i3, arrayList, ImmutableList.of());
                return true;
            }
            arrayList.add(this.levelAndRankToReadableChannelMap.get(mergerOutputFileName(i, j2)).getReadableChannelSupplier().get().getReadableFrameChannel(this.ultimateMergersRunSoFar));
            j = j2 + 1;
        }
    }

    @GuardedBy("runWorkersLock")
    private void runMerger(int i, long j, List<ReadableFrameChannel> list, List<PartitionedReadableFrameChannel> list2) {
        WritableFrameChannel writableChannel;
        SingleMemoryAllocatorFactory singleMemoryAllocatorFactory;
        ClusterByPartitions outputPartitions;
        try {
            String mergerOutputFileName = mergerOutputFileName(i, j);
            if (this.totalMergingLevels != -1 && i == this.totalMergingLevels - 1) {
                int size = getOutputPartitions().size();
                int checkedCast = Ints.checkedCast(j);
                if (this.outputChannels == null) {
                    this.outputChannels = Arrays.asList(new OutputChannel[size]);
                }
                OutputChannel openChannel = this.outputChannelFactory.openChannel(checkedCast);
                writableChannel = openChannel.getWritableChannel();
                singleMemoryAllocatorFactory = new SingleMemoryAllocatorFactory(openChannel.getFrameMemoryAllocator());
                this.outputChannels.set(checkedCast, openChannel.readOnly());
                outputPartitions = this.totalMergingLevels == 1 ? new ClusterByPartitions(Collections.singletonList(getOutputPartitions().get(checkedCast))) : null;
            } else {
                PartitionedOutputChannel openPartitionedChannel = this.intermediateOutputChannelFactory.openPartitionedChannel(mergerOutputFileName, true);
                writableChannel = openPartitionedChannel.getWritableChannel();
                singleMemoryAllocatorFactory = new SingleMemoryAllocatorFactory(openPartitionedChannel.getFrameMemoryAllocator());
                outputPartitions = i == this.totalMergingLevels - 2 ? getOutputPartitions() : null;
                this.levelAndRankToReadableChannelMap.put(mergerOutputFileName, openPartitionedChannel.readOnly());
            }
            runWorker(new FrameChannelMerger(list, this.frameReader, writableChannel, FrameWriters.makeRowBasedFrameWriterFactory(singleMemoryAllocatorFactory, this.frameReader.signature(), Collections.emptyList(), this.removeNullBytes), this.sortKey, outputPartitions, this.rowLimit), l -> {
                synchronized (this.runWorkersLock) {
                    ((LongSortedSet) this.outputsReadyByLevel.computeIfAbsent(i, i2 -> {
                        return new LongRBTreeSet();
                    })).add(j);
                    this.superSorterProgressTracker.addMergedBatchesForLevel(i, 1L);
                    if (isLimited() && this.totalMergingLevels != -1 && i == this.totalMergingLevels - 1) {
                        this.rowLimit -= l.longValue();
                        if (this.rowLimit < 0) {
                            throw DruidException.defensive("rowLimit[%d] below zero after outputRows[%d]", Long.valueOf(this.rowLimit), l);
                        }
                    }
                    Iterator it = list2.iterator();
                    while (it.hasNext()) {
                        try {
                            ((PartitionedReadableFrameChannel) it.next()).close();
                        } catch (IOException e) {
                            throw new UncheckedIOException(StringUtils.format("Could not close channel for level [%d] and rank [%d]", Integer.valueOf(i), Long.valueOf(j)), e);
                        }
                    }
                }
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private <T> void runWorker(FrameProcessor<T> frameProcessor, final Consumer<T> consumer) {
        Futures.addCallback(this.exec.runFully(this.processorDecorator.decorate(frameProcessor), this.cancellationId), new FutureCallback<T>() { // from class: org.apache.druid.frame.processor.SuperSorter.1
            public void onSuccess(T t) {
                try {
                    consumer.accept(t);
                    synchronized (SuperSorter.this.runWorkersLock) {
                        SuperSorter.this.workerFinished();
                    }
                } catch (Throwable th) {
                    synchronized (SuperSorter.this.runWorkersLock) {
                        SuperSorter.this.allDone.setException(th);
                    }
                }
            }

            public void onFailure(Throwable th) {
                synchronized (SuperSorter.this.runWorkersLock) {
                    SuperSorter.this.allDone.setException(th);
                }
            }
        }, this.exec.asExecutor(this.cancellationId));
    }

    @GuardedBy("runWorkersLock")
    private void setTotalInputFrames(long j) {
        if (this.totalInputFrames != -1) {
            throw DruidException.defensive("Cannot set totalInputFrames twice (first[%s], second[%s])", Long.valueOf(this.totalInputFrames), Long.valueOf(j));
        }
        this.totalInputFrames = j;
        if (j == 0) {
            this.superSorterProgressTracker.markTriviallyComplete();
        }
    }

    @GuardedBy("runWorkersLock")
    private void setTotalMergingLevelsIfPossible() {
        if (this.totalMergingLevels == -1 && this.totalInputFrames != -1 && this.outputPartitionsFuture.isDone()) {
            if (this.levelZeroMergersRunSoFar == 0) {
                this.totalMergingLevels = 1;
                this.superSorterProgressTracker.setTotalMergingLevels(1);
                this.superSorterProgressTracker.setTotalMergersForLevel(0, getOutputPartitions().size());
                return;
            }
            int i = 0;
            long j = this.totalInputFrames;
            while (true) {
                long j2 = j;
                if (j2 <= this.maxChannelsPerMerger) {
                    break;
                }
                long divide = LongMath.divide(j2, this.maxChannelsPerMerger, RoundingMode.CEILING);
                this.superSorterProgressTracker.setTotalMergersForLevel(i, divide);
                i++;
                j = divide;
            }
            if (i > 1) {
                this.totalMergingLevels = i + 1;
            } else if (getOutputPartitions().size() > 1) {
                this.totalMergingLevels = 3;
            } else {
                this.totalMergingLevels = 2;
            }
            for (int i2 = i; i2 < this.totalMergingLevels; i2++) {
                this.superSorterProgressTracker.setTotalMergersForLevel(i2, 1L);
            }
            this.superSorterProgressTracker.setTotalMergingLevels(this.totalMergingLevels);
        }
    }

    private ClusterByPartitions getOutputPartitions() {
        if (this.outputPartitionsFuture.isDone()) {
            return (ClusterByPartitions) FutureUtils.getUnchecked(this.outputPartitionsFuture, true);
        }
        throw new ISE("Output partitions are not ready yet", new Object[0]);
    }

    @GuardedBy("runWorkersLock")
    private long getTotalMergersInLevel(int i) {
        if (this.totalInputFrames == -1 || this.totalMergingLevels == -1) {
            return -1L;
        }
        if (i >= this.totalMergingLevels) {
            throw new ISE("Invalid level %d", Integer.valueOf(i));
        }
        if (i == this.totalMergingLevels - 1) {
            if (!this.outputPartitionsFuture.isDone()) {
                return -1L;
            }
            if (this.totalInputFrames == 0) {
                return 0L;
            }
            return getOutputPartitions().size();
        }
        if (i > 0 && i == this.totalMergingLevels - 2) {
            if (!this.outputPartitionsFuture.isDone()) {
                return -1L;
            }
            long totalMergersInLevel = getTotalMergersInLevel(i - 1);
            return LongMath.divide(totalMergersInLevel, LongMath.divide(totalMergersInLevel, Math.max(LongMath.divide(totalMergersInLevel, this.maxChannelsPerMerger, RoundingMode.CEILING), Math.min(this.maxActiveProcessors, getOutputPartitions().size())), RoundingMode.CEILING), RoundingMode.CEILING);
        }
        long j = this.totalInputFrames;
        for (int i2 = 0; i2 <= i; i2++) {
            j = LongMath.divide(j, this.maxChannelsPerMerger, RoundingMode.CEILING);
        }
        return j;
    }

    @GuardedBy("runWorkersLock")
    private boolean allInputRead() {
        return this.totalInputFrames != -1;
    }

    @GuardedBy("runWorkersLock")
    private boolean isAllDone() {
        return this.allDone.isDone() || this.allDone.isCancelled();
    }

    @GuardedBy("runWorkersLock")
    private void cleanUp() {
        if (!isAllDone() || this.activeProcessors != 0) {
            throw new ISE("Improper cleanup", new Object[0]);
        }
        if (log.isDebugEnabled()) {
            log.debug(stateString(), new Object[0]);
        }
        this.outputsReadyByLevel.clear();
        this.inputBuffer.clear();
        for (Map.Entry<String, PartitionedOutputChannel> entry : this.levelAndRankToReadableChannelMap.entrySet()) {
            try {
                entry.getValue().getReadableChannelSupplier().get().close();
            } catch (IOException e) {
                throw new UncheckedIOException("Unable to close channel for name : " + entry.getKey(), e);
            }
        }
        this.levelAndRankToReadableChannelMap.clear();
        if (!this.inputChannelsToRead.isEmpty()) {
            for (ReadableFrameChannel readableFrameChannel : this.inputChannels) {
                Objects.requireNonNull(readableFrameChannel);
                CloseableUtils.closeAndSuppressExceptions(readableFrameChannel::close, th -> {
                    log.warn(th, "Could not close input channel", new Object[0]);
                });
            }
            this.inputChannels.forEach((v0) -> {
                v0.close();
            });
        }
        this.inputChannelsToRead.clear();
    }

    private String mergerOutputFileName(int i, long j) {
        return StringUtils.format("merged.%d.%d", Integer.valueOf(i), Long.valueOf(j));
    }

    private int getMaxInputBufferFramesForDirectMerging() {
        return this.maxChannelsPerMerger * this.maxActiveProcessors;
    }

    @GuardedBy("runWorkersLock")
    private boolean isLimited() {
        return this.rowLimit != -1;
    }

    public String stateString() {
        String str;
        synchronized (this.runWorkersLock) {
            long j = this.inputFramesReadSoFar;
            long j2 = this.totalInputFrames;
            int size = this.inputBuffer.size();
            int i = this.totalMergingLevels;
            int size2 = this.outputPartitionsFuture.isDone() ? ((ClusterByPartitions) FutureUtils.getUncheckedImmediately(this.outputPartitionsFuture)).size() : -1;
            int i2 = this.activeProcessors;
            int i3 = this.maxActiveProcessors;
            String valueOf = String.valueOf(this.inputChannelsToRead);
            String.valueOf(this.outputsReadyByLevel);
            if (isAllDone()) {
            }
            str = "frames-in=" + j + "/" + j + " frames-buffered=" + j2 + " lvls=" + j + " parts=" + size + " p=" + i + "/" + size2 + " ch-pending=" + i2 + " to-merge=" + i3 + " done=" + valueOf;
        }
        return str;
    }

    private static ReadableFrameChannel singleReadableFrameChannel(FrameWithPartition frameWithPartition) {
        try {
            BlockingQueueFrameChannel minimal = BlockingQueueFrameChannel.minimal();
            minimal.writable().write(frameWithPartition);
            minimal.writable().close();
            return minimal.readable();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
