package org.apache.druid.java.util.common.guava;

import com.google.common.collect.Ordering;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.utils.JvmUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.class */
public class ParallelMergeCombiningSequenceTest {
    private static final int TEST_POOL_SIZE = 4;
    private static final Logger LOG = new Logger(ParallelMergeCombiningSequenceTest.class);
    public static final Ordering<IntPair> INT_PAIR_ORDERING = Ordering.natural().onResultOf(intPair -> {
        return (Comparable) intPair.lhs;
    });
    public static final BinaryOperator<IntPair> INT_PAIR_MERGE_FN = (intPair, intPair2) -> {
        return intPair == null ? intPair2 : intPair2 == null ? intPair : new IntPair((Integer) intPair.lhs, Integer.valueOf(((Integer) intPair.rhs).intValue() + ((Integer) intPair2.rhs).intValue()));
    };
    private ForkJoinPool pool;

    /* loaded from: input_file:org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest$IntPair.class */
    public static class IntPair extends Pair<Integer, Integer> {
        private IntPair(Integer num, Integer num2) {
            super(num, num2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest$TestingReporter.class */
    public static class TestingReporter implements Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> {
        ParallelMergeCombiningSequence.CancellationFuture future;
        Yielder<IntPair> yielder;
        volatile ParallelMergeCombiningSequence.MergeCombineMetrics metrics;
        volatile boolean done = false;

        TestingReporter() {
        }

        @Override // java.util.function.Consumer
        public void accept(ParallelMergeCombiningSequence.MergeCombineMetrics mergeCombineMetrics) {
            this.metrics = mergeCombineMetrics;
            this.done = true;
        }
    }

    @Before
    public void setup() {
        this.pool = new ForkJoinPool(4, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (thread, th) -> {
            LOG.error(th, "Unhandled exception in thread [%s]", new Object[]{thread});
        }, true);
    }

    @After
    public void teardown() {
        this.pool.shutdown();
    }

    @Test
    public void testOrderedResultBatchFromSequence() throws IOException {
        Sequence<IntPair> nonBlockingSequence = nonBlockingSequence(5000);
        ParallelMergeCombiningSequence.YielderBatchedResultsCursor yielderBatchedResultsCursor = new ParallelMergeCombiningSequence.YielderBatchedResultsCursor(new ParallelMergeCombiningSequence.SequenceBatcher(nonBlockingSequence, 128), INT_PAIR_ORDERING);
        yielderBatchedResultsCursor.initialize();
        Yielder each = Yielders.each(nonBlockingSequence);
        IntPair intPair = null;
        while (!each.isDone() && !yielderBatchedResultsCursor.isDone()) {
            Assert.assertEquals(each.get(), yielderBatchedResultsCursor.get());
            Assert.assertNotEquals(yielderBatchedResultsCursor.get(), intPair);
            intPair = (IntPair) yielderBatchedResultsCursor.get();
            each = each.next((IntPair) each.get());
            yielderBatchedResultsCursor.advance();
        }
        yielderBatchedResultsCursor.close();
        each.close();
    }

    @Test
    public void testOrderedResultBatchFromSequenceBacktoYielderOnSequence() throws IOException {
        Yielder yielder;
        Sequence<IntPair> nonBlockingSequence = nonBlockingSequence(5000);
        ParallelMergeCombiningSequence.YielderBatchedResultsCursor yielderBatchedResultsCursor = new ParallelMergeCombiningSequence.YielderBatchedResultsCursor(new ParallelMergeCombiningSequence.SequenceBatcher(nonBlockingSequence, 128), INT_PAIR_ORDERING);
        yielderBatchedResultsCursor.initialize();
        Yielder each = Yielders.each(nonBlockingSequence);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue((int) Math.ceil(41.0625d));
        IntPair intPair = null;
        ParallelMergeCombiningSequence.ResultBatch resultBatch = new ParallelMergeCombiningSequence.ResultBatch(128);
        int i = 0;
        while (!each.isDone() && !yielderBatchedResultsCursor.isDone()) {
            Assert.assertEquals(each.get(), yielderBatchedResultsCursor.get());
            Assert.assertNotEquals(yielderBatchedResultsCursor.get(), intPair);
            intPair = (IntPair) yielderBatchedResultsCursor.get();
            resultBatch.add(intPair);
            i++;
            if (i >= 128) {
                arrayBlockingQueue.offer(resultBatch);
                resultBatch = new ParallelMergeCombiningSequence.ResultBatch(128);
                i = 0;
            }
            each = each.next((IntPair) each.get());
            yielderBatchedResultsCursor.advance();
        }
        if (!resultBatch.isDrained()) {
            arrayBlockingQueue.offer(resultBatch);
        }
        arrayBlockingQueue.offer(ParallelMergeCombiningSequence.ResultBatch.terminal());
        each.close();
        yielderBatchedResultsCursor.close();
        Yielder each2 = Yielders.each(nonBlockingSequence);
        Yielder each3 = Yielders.each(ParallelMergeCombiningSequence.makeOutputSequenceForQueue(arrayBlockingQueue, true, System.nanoTime() + TimeUnit.NANOSECONDS.convert(10000L, TimeUnit.MILLISECONDS), new ParallelMergeCombiningSequence.CancellationGizmo()));
        while (true) {
            yielder = each3;
            if (each2.isDone() || yielder.isDone()) {
                break;
            }
            Assert.assertEquals(each2.get(), yielder.get());
            Assert.assertNotEquals(yielder.get(), intPair);
            intPair = (IntPair) yielder.get();
            each2 = each2.next((IntPair) each2.get());
            each3 = yielder.next((IntPair) yielder.get());
        }
        each2.close();
        yielder.close();
    }

    @Test
    public void testOrderedResultBatchFromSequenceToBlockingQueueCursor() throws IOException {
        Sequence<IntPair> nonBlockingSequence = nonBlockingSequence(5000);
        ParallelMergeCombiningSequence.YielderBatchedResultsCursor yielderBatchedResultsCursor = new ParallelMergeCombiningSequence.YielderBatchedResultsCursor(new ParallelMergeCombiningSequence.SequenceBatcher(nonBlockingSequence, 128), INT_PAIR_ORDERING);
        yielderBatchedResultsCursor.initialize();
        Yielder each = Yielders.each(nonBlockingSequence);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue((int) Math.ceil(41.0625d));
        IntPair intPair = null;
        ParallelMergeCombiningSequence.ResultBatch resultBatch = new ParallelMergeCombiningSequence.ResultBatch(128);
        int i = 0;
        while (!each.isDone() && !yielderBatchedResultsCursor.isDone()) {
            Assert.assertEquals(each.get(), yielderBatchedResultsCursor.get());
            Assert.assertNotEquals(yielderBatchedResultsCursor.get(), intPair);
            intPair = (IntPair) yielderBatchedResultsCursor.get();
            resultBatch.add(intPair);
            i++;
            if (i >= 128) {
                arrayBlockingQueue.offer(resultBatch);
                resultBatch = new ParallelMergeCombiningSequence.ResultBatch(128);
                i = 0;
            }
            each = each.next((IntPair) each.get());
            yielderBatchedResultsCursor.advance();
        }
        if (!resultBatch.isDrained()) {
            arrayBlockingQueue.offer(resultBatch);
        }
        arrayBlockingQueue.offer(ParallelMergeCombiningSequence.ResultBatch.terminal());
        each.close();
        yielderBatchedResultsCursor.close();
        Yielder each2 = Yielders.each(nonBlockingSequence);
        ParallelMergeCombiningSequence.BlockingQueueuBatchedResultsCursor blockingQueueuBatchedResultsCursor = new ParallelMergeCombiningSequence.BlockingQueueuBatchedResultsCursor(arrayBlockingQueue, new ParallelMergeCombiningSequence.CancellationGizmo(), INT_PAIR_ORDERING, false, -1L);
        blockingQueueuBatchedResultsCursor.initialize();
        IntPair intPair2 = null;
        while (!each2.isDone() && !blockingQueueuBatchedResultsCursor.isDone()) {
            Assert.assertEquals(each2.get(), blockingQueueuBatchedResultsCursor.get());
            Assert.assertNotEquals(blockingQueueuBatchedResultsCursor.get(), intPair2);
            intPair2 = (IntPair) blockingQueueuBatchedResultsCursor.get();
            each2 = each2.next((IntPair) each2.get());
            blockingQueueuBatchedResultsCursor.advance();
        }
        each2.close();
        blockingQueueuBatchedResultsCursor.close();
    }

    @Test
    public void testNone() throws IOException {
        assertResult(new ArrayList());
    }

    @Test
    public void testEmpties() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Sequences.empty());
        arrayList.add(Sequences.empty());
        assertResult(arrayList);
        arrayList.add(Sequences.empty());
        arrayList.add(Sequences.empty());
        arrayList.add(Sequences.empty());
        assertResult(arrayList);
    }

    @Test
    public void testEmptiesAndNonEmpty() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Sequences.empty());
        arrayList.add(nonBlockingSequence(5));
        assertResult(arrayList);
        arrayList.clear();
        arrayList.add(Sequences.empty());
        arrayList.add(Sequences.empty());
        arrayList.add(Sequences.empty());
        arrayList.add(Sequences.empty());
        arrayList.add(Sequences.empty());
        arrayList.add(nonBlockingSequence(5));
        assertResult(arrayList);
    }

    @Test
    public void testMergeCombineMetricsAccumulatorNPEOnBadExecutorPool() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(6));
        ForkJoinPool forkJoinPool = new ForkJoinPool(1, forkJoinPool2 -> {
            return null;
        }, (thread, th) -> {
            LOG.error(th, "Unhandled exception in thread [%s]", new Object[]{thread});
        }, true);
        Assert.assertEquals("Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query.", Assert.assertThrows(QueryTimeoutException.class, () -> {
            assertResultWithCustomPool(arrayList, 10, 20, mergeCombineMetrics -> {
            }, forkJoinPool);
        }).getMessage());
        forkJoinPool.shutdown();
    }

    @Test
    public void testAllInSingleBatch() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(6));
        assertResult(arrayList, 10, 20, mergeCombineMetrics -> {
            Assert.assertEquals(1L, mergeCombineMetrics.getParallelism());
            Assert.assertEquals(2L, mergeCombineMetrics.getInputSequences());
            Assert.assertEquals(11L, mergeCombineMetrics.getInputRows());
            Assert.assertEquals(6.0f, (float) mergeCombineMetrics.getOutputRows(), 5.0f);
            Assert.assertEquals(4L, mergeCombineMetrics.getTaskCount());
        });
        arrayList.clear();
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(6));
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(8));
        arrayList.add(nonBlockingSequence(4));
        arrayList.add(nonBlockingSequence(6));
        assertResult(arrayList, 10, 20, mergeCombineMetrics2 -> {
            Assert.assertEquals(2L, mergeCombineMetrics2.getParallelism());
            Assert.assertEquals(6L, mergeCombineMetrics2.getInputSequences());
            Assert.assertEquals(34L, mergeCombineMetrics2.getInputRows());
            Assert.assertEquals(16.0f, (float) mergeCombineMetrics2.getOutputRows(), 15.0f);
            Assert.assertEquals(10.0f, (float) mergeCombineMetrics2.getTaskCount(), 2.0f);
        });
    }

    @Test
    public void testAllInSingleYield() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(6));
        assertResult(arrayList, 4, 20, mergeCombineMetrics -> {
            Assert.assertEquals(1L, mergeCombineMetrics.getParallelism());
            Assert.assertEquals(2L, mergeCombineMetrics.getInputSequences());
            Assert.assertEquals(11L, mergeCombineMetrics.getInputRows());
            Assert.assertEquals(6.0f, (float) mergeCombineMetrics.getOutputRows(), 5.0f);
            Assert.assertEquals(4L, mergeCombineMetrics.getTaskCount());
        });
        arrayList.clear();
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(6));
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(8));
        arrayList.add(nonBlockingSequence(4));
        arrayList.add(nonBlockingSequence(6));
        assertResult(arrayList, 4, 20, mergeCombineMetrics2 -> {
            Assert.assertEquals(2L, mergeCombineMetrics2.getParallelism());
            Assert.assertEquals(6L, mergeCombineMetrics2.getInputSequences());
            Assert.assertEquals(34L, mergeCombineMetrics2.getInputRows());
            Assert.assertEquals(16.0f, (float) mergeCombineMetrics2.getOutputRows(), 15.0f);
            Assert.assertEquals(10.0f, (float) mergeCombineMetrics2.getTaskCount(), 2.0f);
        });
    }

    @Test
    public void testMultiBatchMultiYield() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(15));
        arrayList.add(nonBlockingSequence(26));
        assertResult(arrayList, 5, 10, mergeCombineMetrics -> {
            Assert.assertEquals(1L, mergeCombineMetrics.getParallelism());
            Assert.assertEquals(2L, mergeCombineMetrics.getInputSequences());
            Assert.assertEquals(41L, mergeCombineMetrics.getInputRows());
            Assert.assertEquals(21.0f, (float) mergeCombineMetrics.getOutputRows(), 20.0f);
            Assert.assertEquals(4.0f, (float) mergeCombineMetrics.getTaskCount(), 2.0f);
        });
        arrayList.add(nonBlockingSequence(15));
        arrayList.add(nonBlockingSequence(33));
        arrayList.add(nonBlockingSequence(17));
        arrayList.add(nonBlockingSequence(14));
        assertResult(arrayList, 5, 10, mergeCombineMetrics2 -> {
            Assert.assertEquals(2L, mergeCombineMetrics2.getParallelism());
            Assert.assertEquals(6L, mergeCombineMetrics2.getInputSequences());
            Assert.assertEquals(120L, mergeCombineMetrics2.getInputRows());
            Assert.assertEquals(60.0f, (float) mergeCombineMetrics2.getOutputRows(), 59.0f);
            Assert.assertEquals(10.0f, (float) mergeCombineMetrics2.getTaskCount(), 5.0f);
        });
    }

    @Test
    public void testMixedSingleAndMultiYield() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(60));
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(8));
        assertResult(arrayList, 5, 10);
        arrayList.add(nonBlockingSequence(1));
        arrayList.add(nonBlockingSequence(8));
        arrayList.add(nonBlockingSequence(32));
        assertResult(arrayList, 5, 10);
    }

    @Test
    public void testLongerSequencesJustForFun() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(10000));
        arrayList.add(nonBlockingSequence(9001));
        assertResult(arrayList, 128, 1024, mergeCombineMetrics -> {
            Assert.assertEquals(1L, mergeCombineMetrics.getParallelism());
            Assert.assertEquals(2L, mergeCombineMetrics.getInputSequences());
            Assert.assertEquals(19001L, mergeCombineMetrics.getInputRows());
        });
        arrayList.add(nonBlockingSequence(7777));
        arrayList.add(nonBlockingSequence(8500));
        arrayList.add(nonBlockingSequence(5000));
        arrayList.add(nonBlockingSequence(8888));
        assertResult(arrayList, 128, 1024, mergeCombineMetrics2 -> {
            Assert.assertEquals(2L, mergeCombineMetrics2.getParallelism());
            Assert.assertEquals(6L, mergeCombineMetrics2.getInputSequences());
            Assert.assertEquals(49166L, mergeCombineMetrics2.getInputRows());
        });
    }

    @Test
    public void testExceptionOnInputSequenceRead() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(explodingSequence(15));
        arrayList.add(nonBlockingSequence(25));
        Assert.assertEquals("exploded", Assert.assertThrows(RuntimeException.class, () -> {
            assertException(arrayList);
        }).getMessage());
        Assert.assertTrue(this.pool.awaitQuiescence(1L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
    }

    @Test
    public void testExceptionOnInputSequenceRead2() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(5));
        arrayList.add(nonBlockingSequence(25));
        arrayList.add(explodingSequence(11));
        arrayList.add(nonBlockingSequence(12));
        Assert.assertEquals("exploded", Assert.assertThrows(RuntimeException.class, () -> {
            assertException(arrayList);
        }).getMessage());
        Assert.assertTrue(this.pool.awaitQuiescence(1L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
    }

    @Test
    public void testExceptionFirstResultFromSequence() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(explodingSequence(0));
        arrayList.add(nonBlockingSequence(2));
        arrayList.add(nonBlockingSequence(2));
        arrayList.add(nonBlockingSequence(2));
        Assert.assertEquals("exploded", Assert.assertThrows(RuntimeException.class, () -> {
            assertException(arrayList);
        }).getMessage());
        Assert.assertTrue(this.pool.awaitQuiescence(1L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
    }

    @Test
    public void testExceptionFirstResultFromMultipleSequence() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(explodingSequence(0));
        arrayList.add(explodingSequence(0));
        arrayList.add(explodingSequence(0));
        arrayList.add(nonBlockingSequence(2));
        arrayList.add(nonBlockingSequence(2));
        arrayList.add(nonBlockingSequence(2));
        Assert.assertEquals("exploded", Assert.assertThrows(RuntimeException.class, () -> {
            assertException(arrayList);
        }).getMessage());
        Assert.assertTrue(this.pool.awaitQuiescence(1L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
    }

    @Test
    public void testTimeoutExceptionDueToStalledInput() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(2048));
        arrayList.add(nonBlockingSequence(2048));
        arrayList.add(nonBlockingSequence(2048));
        arrayList.add(blockingSequence(2048, 400, 500, 1, 500, true));
        Assert.assertEquals("Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query.", Assert.assertThrows(QueryTimeoutException.class, () -> {
            assertException(arrayList, 4096, 16384, 1000L, 0);
        }).getMessage());
        if (JvmUtils.majorVersion() >= 20 || JvmUtils.majorVersion() < 9) {
            Assert.assertTrue(this.pool.awaitQuiescence(3L, TimeUnit.SECONDS));
            Assert.assertTrue(this.pool.isQuiescent());
        }
    }

    @Test
    public void testTimeoutExceptionDueToSlowReader() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(50000, true));
        arrayList.add(nonBlockingSequence(50000, true));
        arrayList.add(nonBlockingSequence(50000, true));
        arrayList.add(nonBlockingSequence(50000, true));
        Assert.assertEquals("Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query.", Assert.assertThrows(QueryTimeoutException.class, () -> {
            assertException(arrayList, 8, 64, 1000L, 1500);
        }).getMessage());
        Assert.assertTrue(this.pool.awaitQuiescence(1L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
    }

    @Test
    public void testTimeoutExceptionDueToStoppedReader() throws InterruptedException {
        ArrayList<TestingReporter> arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(nonBlockingSequence(150000, true));
            arrayList2.add(nonBlockingSequence(150000, true));
            arrayList2.add(nonBlockingSequence(150000, true));
            arrayList2.add(nonBlockingSequence(150000, true));
            TestingReporter testingReporter = new TestingReporter();
            ParallelMergeCombiningSequence parallelMergeCombiningSequence = new ParallelMergeCombiningSequence(this.pool, arrayList2, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, 5000L, 0, 4, 512, 128, 100, testingReporter);
            Yielder<IntPair> each = Yielders.each(parallelMergeCombiningSequence);
            testingReporter.future = parallelMergeCombiningSequence.getCancellationFuture();
            testingReporter.yielder = each;
            testingReporter.yielder = each.next((Object) null);
            Assert.assertFalse(each.isDone());
            arrayList.add(testingReporter);
        }
        Thread.sleep(5000L);
        Assert.assertTrue(this.pool.awaitQuiescence(10L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
        Assert.assertFalse(this.pool.hasQueuedSubmissions());
        for (TestingReporter testingReporter2 : arrayList) {
            Assert.assertThrows(QueryTimeoutException.class, () -> {
                testingReporter2.yielder.next((Object) null);
            });
            Assert.assertTrue(testingReporter2.future.isCancelled());
            Assert.assertTrue(testingReporter2.future.getCancellationGizmo().isCanceled());
        }
        Assert.assertTrue(this.pool.awaitQuiescence(10L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
    }

    @Test
    public void testManyBigSequencesAllAtOnce() throws IOException {
        Yielder<IntPair> yielder;
        ArrayList<TestingReporter> arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(nonBlockingSequence(50000, true));
            arrayList2.add(nonBlockingSequence(50000, true));
            arrayList2.add(nonBlockingSequence(50000, true));
            arrayList2.add(nonBlockingSequence(50000, true));
            TestingReporter testingReporter = new TestingReporter();
            ParallelMergeCombiningSequence parallelMergeCombiningSequence = new ParallelMergeCombiningSequence(this.pool, arrayList2, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, 30000L, 0, 4, 512, 128, 100, testingReporter);
            Yielder<IntPair> each = Yielders.each(parallelMergeCombiningSequence);
            testingReporter.future = parallelMergeCombiningSequence.getCancellationFuture();
            testingReporter.yielder = each;
            each.next((Object) null);
            Assert.assertFalse(each.isDone());
            arrayList.add(testingReporter);
        }
        for (TestingReporter testingReporter2 : arrayList) {
            Yielder<IntPair> yielder2 = testingReporter2.yielder;
            while (true) {
                yielder = yielder2;
                if (!yielder.isDone()) {
                    yielder2 = yielder.next((IntPair) yielder.get());
                }
            }
            Assert.assertTrue(yielder.isDone());
            yielder.close();
            Assert.assertTrue(testingReporter2.future.isDone());
        }
        Assert.assertTrue(this.pool.awaitQuiescence(10L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
        Assert.assertEquals(0L, this.pool.getRunningThreadCount());
        Assert.assertFalse(this.pool.hasQueuedSubmissions());
        Assert.assertEquals(0L, this.pool.getActiveThreadCount());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((TestingReporter) it.next()).done);
        }
    }

    @Test
    public void testGracefulCloseOfYielderCancelsPool() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(nonBlockingSequence(10000));
        arrayList.add(nonBlockingSequence(9001));
        arrayList.add(nonBlockingSequence(7777));
        arrayList.add(nonBlockingSequence(8500));
        arrayList.add(nonBlockingSequence(5000));
        arrayList.add(nonBlockingSequence(8888));
        assertResultWithEarlyClose(arrayList, 128, 1024, 256, mergeCombineMetrics -> {
            Assert.assertEquals(2L, mergeCombineMetrics.getParallelism());
            Assert.assertEquals(6L, mergeCombineMetrics.getInputSequences());
            Assert.assertTrue(49166 > mergeCombineMetrics.getInputRows());
            Assert.assertTrue(0 < mergeCombineMetrics.getInputRows());
        });
    }

    private void assertResult(List<Sequence<IntPair>> list) throws IOException {
        assertResult(list, 4096, 16384, null);
    }

    private void assertResult(List<Sequence<IntPair>> list, int i, int i2) throws IOException {
        assertResult(list, i, i2, null);
    }

    private void assertResultWithCustomPool(List<Sequence<IntPair>> list, int i, int i2, Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> consumer, ForkJoinPool forkJoinPool) throws InterruptedException, IOException {
        CombiningSequence create = CombiningSequence.create(new MergeSequence(INT_PAIR_ORDERING, Sequences.simple(list)), INT_PAIR_ORDERING, INT_PAIR_MERGE_FN);
        ParallelMergeCombiningSequence parallelMergeCombiningSequence = new ParallelMergeCombiningSequence(forkJoinPool, list, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, 5000L, 0, 4, i2, i, 100, consumer);
        Yielder each = Yielders.each(create);
        Yielder each2 = Yielders.each(parallelMergeCombiningSequence);
        IntPair intPair = null;
        while (!each.isDone() && !each2.isDone()) {
            Assert.assertEquals(each.get(), each2.get());
            Assert.assertNotEquals(each2.get(), intPair);
            intPair = (IntPair) each2.get();
            each = each.next((IntPair) each.get());
            each2 = each2.next((IntPair) each2.get());
        }
        Assert.assertTrue(each.isDone());
        Assert.assertTrue(each2.isDone());
        while (this.pool.getRunningThreadCount() > 0) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(0L, this.pool.getRunningThreadCount());
        each.close();
        each2.close();
        Assert.assertFalse(parallelMergeCombiningSequence.getCancellationFuture().isCancelled());
        Assert.assertTrue(parallelMergeCombiningSequence.getCancellationFuture().isDone());
        Assert.assertFalse(parallelMergeCombiningSequence.getCancellationFuture().getCancellationGizmo().isCanceled());
    }

    private void assertResult(List<Sequence<IntPair>> list, int i, int i2, Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> consumer) throws IOException {
        CombiningSequence create = CombiningSequence.create(new MergeSequence(INT_PAIR_ORDERING, Sequences.simple(list)), INT_PAIR_ORDERING, INT_PAIR_MERGE_FN);
        ParallelMergeCombiningSequence parallelMergeCombiningSequence = new ParallelMergeCombiningSequence(this.pool, list, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, 5000L, 0, 4, i2, i, 100, consumer);
        Yielder each = Yielders.each(create);
        Yielder each2 = Yielders.each(parallelMergeCombiningSequence);
        IntPair intPair = null;
        while (!each.isDone() && !each2.isDone()) {
            Assert.assertEquals(each.get(), each2.get());
            Assert.assertNotEquals(each2.get(), intPair);
            intPair = (IntPair) each2.get();
            each = each.next((IntPair) each.get());
            each2 = each2.next((IntPair) each2.get());
        }
        Assert.assertTrue(each.isDone());
        Assert.assertTrue(each2.isDone());
        Assert.assertTrue(this.pool.awaitQuiescence(5L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
        each.close();
        each2.close();
        Assert.assertFalse(parallelMergeCombiningSequence.getCancellationFuture().isCancelled());
        Assert.assertFalse(parallelMergeCombiningSequence.getCancellationFuture().getCancellationGizmo().isCanceled());
        Assert.assertTrue(parallelMergeCombiningSequence.getCancellationFuture().isDone());
    }

    private void assertResultWithEarlyClose(List<Sequence<IntPair>> list, int i, int i2, int i3, Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> consumer) throws IOException {
        CombiningSequence create = CombiningSequence.create(new MergeSequence(INT_PAIR_ORDERING, Sequences.simple(list)), INT_PAIR_ORDERING, INT_PAIR_MERGE_FN);
        ParallelMergeCombiningSequence parallelMergeCombiningSequence = new ParallelMergeCombiningSequence(this.pool, list, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, 5000L, 0, 4, i2, i, 100, consumer);
        Yielder each = Yielders.each(create);
        Yielder each2 = Yielders.each(parallelMergeCombiningSequence);
        IntPair intPair = null;
        int i4 = 0;
        while (true) {
            if (each.isDone() || each2.isDone()) {
                break;
            }
            if (i4 >= i3) {
                each2.close();
                each.close();
                break;
            }
            i4++;
            Assert.assertEquals(each.get(), each2.get());
            Assert.assertNotEquals(each2.get(), intPair);
            intPair = (IntPair) each2.get();
            each = each.next((IntPair) each.get());
            each2 = each2.next((IntPair) each2.get());
        }
        Assert.assertEquals(each.get(), each2.get());
        Yielder yielder = each2;
        Assert.assertEquals("Sequence canceled", Assert.assertThrows(RuntimeException.class, () -> {
            yielder.next((IntPair) yielder.get());
        }).getMessage());
        Assert.assertTrue(parallelMergeCombiningSequence.getCancellationFuture().getCancellationGizmo().isCanceled());
        Assert.assertEquals("Sequence canceled", parallelMergeCombiningSequence.getCancellationFuture().getCancellationGizmo().getRuntimeException().getMessage());
        Assert.assertTrue(parallelMergeCombiningSequence.getCancellationFuture().isCancelled());
        Assert.assertTrue(this.pool.awaitQuiescence(10L, TimeUnit.SECONDS));
        Assert.assertTrue(this.pool.isQuiescent());
        Assert.assertFalse(each.isDone());
        Assert.assertFalse(each2.isDone());
    }

    private void assertException(List<Sequence<IntPair>> list) throws Throwable {
        assertException(list, 4096, 16384, 5000L, 0);
    }

    private void assertException(List<Sequence<IntPair>> list, int i, int i2, long j, int i3) throws Throwable {
        Throwable assertThrows = Assert.assertThrows(Exception.class, () -> {
            Yielder each = Yielders.each(new ParallelMergeCombiningSequence(this.pool, list, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, j, 0, 4, i2, i, 100, (Consumer) null));
            IntPair intPair = null;
            while (!each.isDone()) {
                Assert.assertNotEquals(each.get(), intPair);
                intPair = (IntPair) each.get();
                if (i3 > 0 && ThreadLocalRandom.current().nextBoolean()) {
                    Thread.sleep(i3);
                }
                each = each.next((IntPair) each.get());
            }
            each.close();
        });
        list.forEach(sequence -> {
            if (sequence instanceof ExplodingSequence) {
                Assert.assertEquals(1L, ((ExplodingSequence) sequence).getCloseCount());
            }
        });
        LOG.warn(assertThrows, "exception:", new Object[0]);
        throw assertThrows;
    }

    public static Sequence<IntPair> nonBlockingSequence(final int i, final boolean z) {
        final List<IntPair> generateOrderedPairs = z ? null : generateOrderedPairs(i);
        return new BaseSequence(new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>() { // from class: org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest.1
            public Iterator<IntPair> make() {
                return new Iterator<IntPair>() { // from class: org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest.1.1
                    int mergeKey = 0;
                    int rowCounter = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return this.rowCounter < i;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public IntPair next() {
                        if (z) {
                            this.rowCounter++;
                            this.mergeKey += ParallelMergeCombiningSequenceTest.incrementMergeKeyAmount();
                            return ParallelMergeCombiningSequenceTest.makeIntPair(this.mergeKey);
                        }
                        List list = generateOrderedPairs;
                        int i2 = this.rowCounter;
                        this.rowCounter = i2 + 1;
                        return (IntPair) list.get(i2);
                    }
                };
            }

            public void cleanup(Iterator<IntPair> it) {
            }
        });
    }

    public static Sequence<IntPair> blockingSequence(final int i, int i2, int i3, final int i4, final int i5, final boolean z) {
        final List<IntPair> generateOrderedPairs = z ? null : generateOrderedPairs(i);
        final long nanoTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(ThreadLocalRandom.current().nextLong(i2, i3), TimeUnit.MILLISECONDS);
        return new BaseSequence(new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>() { // from class: org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest.2
            public Iterator<IntPair> make() {
                return new Iterator<IntPair>() { // from class: org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest.2.1
                    int mergeKey = 0;
                    int rowCounter = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return this.rowCounter < i;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public IntPair next() {
                        try {
                            long nanoTime2 = System.nanoTime();
                            if (this.rowCounter == 0 && nanoTime2 < nanoTime) {
                                Thread.sleep(Math.max(TimeUnit.MILLISECONDS.convert(nanoTime - nanoTime2, TimeUnit.NANOSECONDS), 1L));
                            } else if (i5 > 0 && this.rowCounter % i4 == 0 && ThreadLocalRandom.current().nextBoolean()) {
                                Thread.sleep(Math.max(ThreadLocalRandom.current().nextInt(i5), 1));
                            }
                            if (z) {
                                this.rowCounter++;
                                this.mergeKey += ParallelMergeCombiningSequenceTest.incrementMergeKeyAmount();
                                return ParallelMergeCombiningSequenceTest.makeIntPair(this.mergeKey);
                            }
                            List list = generateOrderedPairs;
                            int i6 = this.rowCounter;
                            this.rowCounter = i6 + 1;
                            return (IntPair) list.get(i6);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                };
            }

            public void cleanup(Iterator<IntPair> it) {
            }
        });
    }

    private static Sequence<IntPair> nonBlockingSequence(int i) {
        return nonBlockingSequence(i, false);
    }

    private static Sequence<IntPair> explodingSequence(final int i) {
        final int i2 = i + 1;
        final AtomicInteger atomicInteger = new AtomicInteger(1);
        return new ExplodingSequence(new BaseSequence(new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>() { // from class: org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest.4
            public Iterator<IntPair> make() {
                atomicInteger.decrementAndGet();
                return new Iterator<IntPair>() { // from class: org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest.4.1
                    int mergeKey = 0;
                    int rowCounter = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return this.rowCounter < i2;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public IntPair next() {
                        if (this.rowCounter == i) {
                            throw new RuntimeException("exploded");
                        }
                        this.mergeKey += ParallelMergeCombiningSequenceTest.incrementMergeKeyAmount();
                        this.rowCounter++;
                        return ParallelMergeCombiningSequenceTest.makeIntPair(this.mergeKey);
                    }
                };
            }

            public void cleanup(Iterator<IntPair> it) {
                atomicInteger.incrementAndGet();
            }
        }), false, false) { // from class: org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest.3
            public long getCloseCount() {
                return atomicInteger.get();
            }
        };
    }

    private static List<IntPair> generateOrderedPairs(int i) {
        int i2 = 0;
        ArrayList arrayList = new ArrayList(i);
        for (int i3 = 0; i3 < i; i3++) {
            i2 += incrementMergeKeyAmount();
            arrayList.add(makeIntPair(i2));
        }
        return arrayList;
    }

    private static int incrementMergeKeyAmount() {
        return ThreadLocalRandom.current().nextInt(1, 3);
    }

    private static IntPair makeIntPair(int i) {
        return new IntPair(Integer.valueOf(i), Integer.valueOf(ThreadLocalRandom.current().nextInt(1, 100)));
    }
}
