package org.apache.druid.frame.processor;

import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableNilFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessorExecutorTest;
import org.apache.druid.frame.processor.manager.ProcessorAndCallback;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.frame.processor.manager.ProcessorManagers;
import org.apache.druid.frame.processor.test.ChompingFrameProcessor;
import org.apache.druid.frame.processor.test.FailingFrameProcessor;
import org.apache.druid.frame.processor.test.SleepyFrameProcessor;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.TestIndex;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/frame/processor/RunAllFullyWidgetTest.class */
public class RunAllFullyWidgetTest extends FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite {
    private final int bouncerPoolSize;
    private final int maxOutstandingProcessors;
    private final boolean delayed;
    private final AtomicLong closed;
    private Bouncer bouncer;

    @GuardedBy("this")
    private int concurrentHighWatermark;

    @GuardedBy("this")
    private int concurrentNow;

    /* loaded from: input_file:org/apache/druid/frame/processor/RunAllFullyWidgetTest$ConcurrencyTrackingFrameProcessor.class */
    private class ConcurrencyTrackingFrameProcessor<T> implements FrameProcessor<T> {
        private final AtomicBoolean didRun = new AtomicBoolean(false);
        private final AtomicBoolean didCleanup = new AtomicBoolean(false);
        private final FrameProcessor<T> delegate;

        public ConcurrencyTrackingFrameProcessor(FrameProcessor<T> frameProcessor) {
            this.delegate = frameProcessor;
        }

        public List<ReadableFrameChannel> inputChannels() {
            return this.delegate.inputChannels();
        }

        public List<WritableFrameChannel> outputChannels() {
            return this.delegate.outputChannels();
        }

        public ReturnOrAwait<T> runIncrementally(IntSet intSet) throws InterruptedException, IOException {
            if (this.didRun.compareAndSet(false, true)) {
                synchronized (RunAllFullyWidgetTest.this) {
                    RunAllFullyWidgetTest.access$008(RunAllFullyWidgetTest.this);
                    if (RunAllFullyWidgetTest.this.concurrentHighWatermark < RunAllFullyWidgetTest.this.concurrentNow) {
                        RunAllFullyWidgetTest.this.concurrentHighWatermark = RunAllFullyWidgetTest.this.concurrentNow;
                    }
                }
            }
            return this.delegate.runIncrementally(intSet);
        }

        public void cleanup() throws IOException {
            try {
                this.delegate.cleanup();
                synchronized (RunAllFullyWidgetTest.this) {
                    if (this.didRun.get() && this.didCleanup.compareAndSet(false, true)) {
                        RunAllFullyWidgetTest.access$010(RunAllFullyWidgetTest.this);
                    }
                }
            } catch (Throwable th) {
                synchronized (RunAllFullyWidgetTest.this) {
                    if (this.didRun.get() && this.didCleanup.compareAndSet(false, true)) {
                        RunAllFullyWidgetTest.access$010(RunAllFullyWidgetTest.this);
                    }
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/frame/processor/RunAllFullyWidgetTest$DelayedProcessorManager.class */
    public class DelayedProcessorManager<T, R> implements ProcessorManager<T, R> {
        private final ProcessorManager<T, R> delegate;

        public DelayedProcessorManager(ProcessorManager<T, R> processorManager) {
            this.delegate = processorManager;
        }

        public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() {
            ListenableFuture next = this.delegate.next();
            final SettableFuture create = SettableFuture.create();
            Futures.addCallback(next, new FutureCallback<Optional<ProcessorAndCallback<T>>>() { // from class: org.apache.druid.frame.processor.RunAllFullyWidgetTest.DelayedProcessorManager.1
                public void onSuccess(Optional<ProcessorAndCallback<T>> optional) {
                    create.set(optional);
                }

                public void onFailure(Throwable th) {
                    create.setException(th);
                }
            }, RunAllFullyWidgetTest.this.exec.getExecutorService());
            return create;
        }

        public R result() {
            return (R) this.delegate.result();
        }

        public void close() {
            this.delegate.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/frame/processor/RunAllFullyWidgetTest$EnsureCloseProcessorManager.class */
    public class EnsureCloseProcessorManager<T, R> implements ProcessorManager<T, R> {
        private final ProcessorManager<T, R> delegate;

        public EnsureCloseProcessorManager(ProcessorManager<T, R> processorManager) {
            this.delegate = processorManager;
        }

        public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() {
            return this.delegate.next();
        }

        public R result() {
            return (R) this.delegate.result();
        }

        public void close() {
            RunAllFullyWidgetTest.this.closed.getAndIncrement();
            this.delegate.close();
        }
    }

    /* loaded from: input_file:org/apache/druid/frame/processor/RunAllFullyWidgetTest$ThrowOnCloseProcessorManager.class */
    private static class ThrowOnCloseProcessorManager<T, R> implements ProcessorManager<T, R> {
        private final ProcessorManager<T, R> delegate;

        public ThrowOnCloseProcessorManager(ProcessorManager<T, R> processorManager) {
            this.delegate = processorManager;
        }

        public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() {
            return this.delegate.next();
        }

        public R result() {
            return (R) this.delegate.result();
        }

        public void close() {
            this.delegate.close();
            throw new ISE("error!", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/druid/frame/processor/RunAllFullyWidgetTest$ThrowOnNextProcessorManager.class */
    private static class ThrowOnNextProcessorManager<T, R> implements ProcessorManager<T, R> {
        private final ProcessorManager<T, R> delegate;
        private int i;

        public ThrowOnNextProcessorManager(ProcessorManager<T, R> processorManager, int i) {
            this.delegate = processorManager;
            this.i = i;
        }

        public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() {
            if (this.i == 0) {
                throw new ISE("error!", new Object[0]);
            }
            this.i--;
            return this.delegate.next();
        }

        public R result() {
            return (R) this.delegate.result();
        }

        public void close() {
            this.delegate.close();
        }
    }

    public RunAllFullyWidgetTest(int i, int i2, int i3, boolean z) {
        super(i);
        this.closed = new AtomicLong();
        this.concurrentHighWatermark = 0;
        this.concurrentNow = 0;
        this.bouncerPoolSize = i2;
        this.maxOutstandingProcessors = i3;
        this.delayed = z;
    }

    @Parameterized.Parameters(name = "numThreads = {0}, bouncerPoolSize = {1}, maxOutstandingProcessors = {2}, delayed = {3}")
    public static Collection<Object[]> constructorFeeder() {
        ArrayList arrayList = new ArrayList();
        for (int i : new int[]{1, 3, 12}) {
            for (int i2 : new int[]{1, 3, 12, Integer.MAX_VALUE}) {
                for (int i3 : new int[]{1, 3, 12}) {
                    for (boolean z : new boolean[]{false, true}) {
                        arrayList.add(new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Boolean.valueOf(z)});
                    }
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.druid.frame.processor.FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.bouncer = this.bouncerPoolSize == Integer.MAX_VALUE ? Bouncer.unlimited() : new Bouncer(this.bouncerPoolSize);
        synchronized (this) {
            this.concurrentNow = 0;
            this.concurrentHighWatermark = 0;
        }
    }

    @Override // org.apache.druid.frame.processor.FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite
    @After
    public void tearDown() throws Exception {
        super.tearDown();
        synchronized (this) {
            Assert.assertEquals(0L, this.concurrentNow);
            MatcherAssert.assertThat(Integer.valueOf(this.concurrentHighWatermark), Matchers.lessThanOrEqualTo(Integer.valueOf(this.bouncerPoolSize)));
            MatcherAssert.assertThat(Integer.valueOf(this.concurrentHighWatermark), Matchers.lessThanOrEqualTo(Integer.valueOf(this.maxOutstandingProcessors)));
        }
        Assert.assertEquals("Bouncer current running count", 0L, this.bouncer.getCurrentCount());
        Assert.assertEquals("Bouncer max pool size", this.bouncerPoolSize, this.bouncer.getMaxCount());
        Assert.assertEquals("Encountered single close (from ensureClose)", 1L, this.closed.get());
    }

    @Test
    public void test_runAllFully_emptyChannel() throws Exception {
        Assert.assertEquals("xyzzy", this.exec.runAllFully(possiblyDelay(ensureClose(ProcessorManagers.none().withAccumulation("xyzzy", (str, obj) -> {
            return str + obj;
        }))), this.maxOutstandingProcessors, this.bouncer, (String) null).get());
    }

    @Test
    public void test_runAllFully_fiftyThousandProcessors() throws Exception {
        Frame frame = (Frame) Iterables.getOnlyElement(FrameSequenceBuilder.fromCursorFactory(new QueryableIndexCursorFactory(TestIndex.getMMappedTestIndex())).frameType(FrameType.ROW_BASED).frames().toList());
        IntStream range = IntStream.range(0, 100);
        Objects.requireNonNull(range);
        Assert.assertEquals(100L, ((Long) this.exec.runAllFully(possiblyDelay(ensureClose(ProcessorManagers.of(Iterables.transform(range::iterator, num -> {
            BlockingQueueFrameChannel minimal = BlockingQueueFrameChannel.minimal();
            try {
                minimal.writable().write(frame);
                minimal.writable().close();
                return new ConcurrencyTrackingFrameProcessor(new ChompingFrameProcessor(Collections.singletonList(minimal.readable())));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        })).withAccumulation(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        }))), this.maxOutstandingProcessors, this.bouncer, (String) null).get()).longValue());
    }

    @Test
    public void test_runAllFully_failing() {
        FrameProcessorExecutor frameProcessorExecutor = this.exec;
        IntStream generate = IntStream.generate(() -> {
            return 0;
        });
        Objects.requireNonNull(generate);
        ListenableFuture runAllFully = frameProcessorExecutor.runAllFully(possiblyDelay(ensureClose(ProcessorManagers.of(Iterables.transform(generate::iterator, num -> {
            return new ConcurrencyTrackingFrameProcessor(new FailingFrameProcessor(ReadableNilFrameChannel.INSTANCE, BlockingQueueFrameChannel.minimal().writable(), 0));
        })).withAccumulation(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        }))), this.maxOutstandingProcessors, this.bouncer, (String) null);
        Objects.requireNonNull(runAllFully);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, runAllFully::get);
        MatcherAssert.assertThat(executionException.getCause(), CoreMatchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat(executionException.getCause().getCause(), CoreMatchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat(executionException.getCause().getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!")));
    }

    @Test
    public void test_runAllFully_errorAccumulateFn() {
        FrameProcessorExecutor frameProcessorExecutor = this.exec;
        IntStream range = IntStream.range(0, 100);
        Objects.requireNonNull(range);
        ListenableFuture runAllFully = frameProcessorExecutor.runAllFully(possiblyDelay(ensureClose(ProcessorManagers.of(Iterables.transform(range::iterator, num -> {
            return new ChompingFrameProcessor(Collections.emptyList());
        })).withAccumulation(0L, (l, l2) -> {
            throw new ISE("error!", new Object[0]);
        }))), this.maxOutstandingProcessors, this.bouncer, (String) null);
        Objects.requireNonNull(runAllFully);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, runAllFully::get);
        MatcherAssert.assertThat(executionException.getCause(), CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(executionException.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("error!")));
    }

    @Test
    public void test_runAllFully_errorChannelFirstElement() {
        FrameProcessorExecutor frameProcessorExecutor = this.exec;
        IntStream generate = IntStream.generate(() -> {
            return 0;
        });
        Objects.requireNonNull(generate);
        ListenableFuture runAllFully = frameProcessorExecutor.runAllFully(possiblyDelay(new ThrowOnNextProcessorManager(ensureClose(ProcessorManagers.of(Iterables.transform(generate::iterator, num -> {
            return new ChompingFrameProcessor(Collections.emptyList());
        })).withAccumulation(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })), 0)), this.maxOutstandingProcessors, this.bouncer, (String) null);
        Objects.requireNonNull(runAllFully);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, runAllFully::get);
        MatcherAssert.assertThat(executionException.getCause(), CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(executionException.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("error!")));
    }

    @Test
    public void test_runAllFully_errorChannelSecondElement() {
        FrameProcessorExecutor frameProcessorExecutor = this.exec;
        IntStream generate = IntStream.generate(() -> {
            return 0;
        });
        Objects.requireNonNull(generate);
        ListenableFuture runAllFully = frameProcessorExecutor.runAllFully(possiblyDelay(new ThrowOnNextProcessorManager(ensureClose(ProcessorManagers.of(Iterables.transform(generate::iterator, num -> {
            return new ChompingFrameProcessor(Collections.emptyList());
        })).withAccumulation(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })), 1)), this.maxOutstandingProcessors, this.bouncer, (String) null);
        Objects.requireNonNull(runAllFully);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, runAllFully::get);
        MatcherAssert.assertThat(executionException.getCause(), CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(executionException.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("error!")));
    }

    @Test
    public void test_runAllFully_errorChannelHundredthElement() {
        FrameProcessorExecutor frameProcessorExecutor = this.exec;
        IntStream generate = IntStream.generate(() -> {
            return 0;
        });
        Objects.requireNonNull(generate);
        ListenableFuture runAllFully = frameProcessorExecutor.runAllFully(possiblyDelay(new ThrowOnNextProcessorManager(ensureClose(ProcessorManagers.of(Iterables.transform(generate::iterator, num -> {
            return new ChompingFrameProcessor(Collections.emptyList());
        })).withAccumulation(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })), 100)), this.maxOutstandingProcessors, this.bouncer, (String) null);
        Objects.requireNonNull(runAllFully);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, runAllFully::get);
        MatcherAssert.assertThat(executionException.getCause(), CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(executionException.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("error!")));
    }

    @Test
    public void test_runAllFully_errorChannelClose() {
        FrameProcessorExecutor frameProcessorExecutor = this.exec;
        IntStream range = IntStream.range(0, 101);
        Objects.requireNonNull(range);
        ListenableFuture runAllFully = frameProcessorExecutor.runAllFully(possiblyDelay(new ThrowOnCloseProcessorManager(ensureClose(ProcessorManagers.of(Iterables.transform(range::iterator, num -> {
            return new ChompingFrameProcessor(Collections.emptyList());
        })).withAccumulation(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })))), this.maxOutstandingProcessors, this.bouncer, (String) null);
        Objects.requireNonNull(runAllFully);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, runAllFully::get);
        MatcherAssert.assertThat(executionException.getCause(), CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(executionException.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("error!")));
    }

    @Test
    public void test_runAllFully_errorChannelSecondElementAndClose() {
        FrameProcessorExecutor frameProcessorExecutor = this.exec;
        IntStream range = IntStream.range(0, 101);
        Objects.requireNonNull(range);
        ListenableFuture runAllFully = frameProcessorExecutor.runAllFully(possiblyDelay(new ThrowOnCloseProcessorManager(new ThrowOnNextProcessorManager(ensureClose(ProcessorManagers.of(Iterables.transform(range::iterator, num -> {
            return new ChompingFrameProcessor(Collections.emptyList());
        })).withAccumulation(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })), 1))), this.maxOutstandingProcessors, this.bouncer, (String) null);
        Objects.requireNonNull(runAllFully);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, runAllFully::get);
        MatcherAssert.assertThat(executionException.getCause(), CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(executionException.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("error!")));
    }

    @Test(timeout = 30000)
    public void test_runAllFully_futureCancel() throws InterruptedException {
        int min = Math.min(Math.min(this.bouncerPoolSize, this.maxOutstandingProcessors), this.numThreads);
        List list = (List) IntStream.range(0, 10 * min).mapToObj(i -> {
            return new SleepyFrameProcessor();
        }).collect(Collectors.toList());
        this.exec.registerCancellationId("xyzzy");
        ListenableFuture runAllFully = this.exec.runAllFully(possiblyDelay(ensureClose(ProcessorManagers.of(Sequences.simple(list).map(frameProcessor -> {
            return new ConcurrencyTrackingFrameProcessor(frameProcessor);
        })).withAccumulation(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        }))), this.maxOutstandingProcessors, this.bouncer, "xyzzy");
        for (int i2 = 0; i2 < min; i2++) {
            ((SleepyFrameProcessor) list.get(i2)).awaitRun();
        }
        Assert.assertTrue(runAllFully.cancel(true));
        Assert.assertTrue(runAllFully.isCancelled());
        while (this.exec.cancelableProcessorCount() > 0) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(0L, this.exec.cancelableProcessorCount());
    }

    private <T, R> ProcessorManager<T, R> possiblyDelay(ProcessorManager<T, R> processorManager) {
        return this.delayed ? new DelayedProcessorManager(processorManager) : processorManager;
    }

    private <T, R> ProcessorManager<T, R> ensureClose(ProcessorManager<T, R> processorManager) {
        return new EnsureCloseProcessorManager(processorManager);
    }

    static /* synthetic */ int access$008(RunAllFullyWidgetTest runAllFullyWidgetTest) {
        int i = runAllFullyWidgetTest.concurrentNow;
        runAllFullyWidgetTest.concurrentNow = i + 1;
        return i;
    }

    static /* synthetic */ int access$010(RunAllFullyWidgetTest runAllFullyWidgetTest) {
        int i = runAllFullyWidgetTest.concurrentNow;
        runAllFullyWidgetTest.concurrentNow = i - 1;
        return i;
    }
}
