package org.apache.druid.frame.processor;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.frame.channel.ReadableFileFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameFileChannel;
import org.apache.druid.frame.file.FrameFile;
import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.frame.processor.test.ChompingFrameProcessor;
import org.apache.druid.frame.processor.test.FailingFrameProcessor;
import org.apache.druid.frame.processor.test.InfiniteFrameProcessor;
import org.apache.druid.frame.processor.test.SleepyFrameProcessor;
import org.apache.druid.frame.processor.test.SuperBlasterFrameProcessor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CloseableUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/druid/frame/processor/FrameProcessorExecutorTest.class */
public class FrameProcessorExecutorTest {

    /* loaded from: input_file:org/apache/druid/frame/processor/FrameProcessorExecutorTest$BaseFrameProcessorExecutorTestSuite.class */
    public static abstract class BaseFrameProcessorExecutorTestSuite extends InitializedNullHandlingTest {

        @Rule
        public final TemporaryFolder temporaryFolder = new TemporaryFolder();
        public final int numThreads;
        FrameProcessorExecutor exec;

        public BaseFrameProcessorExecutorTestSuite(int i) {
            this.numThreads = i;
        }

        @Before
        public void setUp() throws Exception {
            this.exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.multiThreaded(this.numThreads, StringUtils.encodeForFormat(getClass().getName()) + "-%s")));
        }

        @After
        public void tearDown() throws Exception {
            this.exec.getExecutorService().shutdownNow();
            if (!this.exec.getExecutorService().awaitTermination(1L, TimeUnit.MINUTES)) {
                throw new ISE("Executor service did not terminate within 1 minute", new Object[0]);
            }
        }

        List<File> writeToNFiles(StorageAdapter storageAdapter, int i) throws IOException {
            ArrayList arrayList = new ArrayList();
            final ArrayList arrayList2 = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    arrayList.add(this.temporaryFolder.newFile());
                    arrayList2.add(FrameFileWriter.open(Channels.newChannel(Files.newOutputStream(((File) arrayList.get(i2)).toPath(), new OpenOption[0])), (ByteBuffer) null, ByteTracker.unboundedTracker()));
                } catch (Throwable th) {
                    CloseableUtils.closeAll(arrayList2);
                    throw th;
                }
            }
            FrameSequenceBuilder.fromAdapter(storageAdapter).frameType(FrameType.ROW_BASED).allocator(ArenaMemoryAllocator.createOnHeap(1000000)).maxRowsPerFrame(3).frames().forEach(new Consumer<Frame>() { // from class: org.apache.druid.frame.processor.FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite.1
                private int j = 0;

                @Override // java.util.function.Consumer
                public void accept(Frame frame) {
                    try {
                        ((FrameFileWriter) arrayList2.get(this.j % arrayList2.size())).writeFrame(frame, -1);
                        this.j++;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            CloseableUtils.closeAll(arrayList2);
            return arrayList;
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/druid/frame/processor/FrameProcessorExecutorTest$MiscTests.class */
    public static class MiscTests extends BaseFrameProcessorExecutorTestSuite {
        public MiscTests(int i) {
            super(i);
        }

        @Parameterized.Parameters(name = "numThreads = {0}")
        public static Collection<Object[]> constructorFeeder() {
            ArrayList arrayList = new ArrayList();
            for (int i : new int[]{1, 3, 12}) {
                arrayList.add(new Object[]{Integer.valueOf(i)});
            }
            return arrayList;
        }

        @Test
        public void test_runFully_errors() throws Exception {
            ReadableFrameChannel openFileChannel = FrameProcessorExecutorTest.openFileChannel((File) Iterables.getOnlyElement(writeToNFiles(new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex()), 1)));
            BlockingQueueFrameChannel minimal = BlockingQueueFrameChannel.minimal();
            ListenableFuture runFully = this.exec.runFully(new FailingFrameProcessor(openFileChannel, minimal.writable(), 0), (String) null);
            Objects.requireNonNull(runFully);
            MatcherAssert.assertThat(((ExecutionException) Assert.assertThrows(ExecutionException.class, runFully::get)).getCause().getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!")));
            ReadableFrameChannel readable = minimal.readable();
            Assert.assertTrue(readable.canRead());
            Objects.requireNonNull(readable);
            MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, readable::read)).getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!")));
            Assert.assertTrue(readable.isFinished());
        }

        @Test
        public void test_registerCancelableFuture() throws InterruptedException {
            SettableFuture create = SettableFuture.create();
            Assert.assertSame(create, this.exec.registerCancelableFuture(create, false, "xyzzy"));
            this.exec.cancel("xyzzy");
            Assert.assertTrue(create.isDone());
            Assert.assertTrue(create.isCancelled());
        }

        @Test
        public void test_cancel_sleepy() throws Exception {
            SleepyFrameProcessor sleepyFrameProcessor = new SleepyFrameProcessor();
            ListenableFuture runFully = this.exec.runFully(sleepyFrameProcessor, "xyzzy");
            sleepyFrameProcessor.awaitRun();
            this.exec.cancel("xyzzy");
            Assert.assertTrue(runFully.isDone());
            Assert.assertTrue(runFully.isCancelled());
            Assert.assertTrue(sleepyFrameProcessor.didGetInterrupt());
            Assert.assertTrue(sleepyFrameProcessor.didCleanup());
        }

        @Test(timeout = 30000)
        public void test_futureCancel_sleepy() throws Exception {
            SleepyFrameProcessor sleepyFrameProcessor = new SleepyFrameProcessor();
            ListenableFuture runFully = this.exec.runFully(sleepyFrameProcessor, "xyzzy");
            sleepyFrameProcessor.awaitRun();
            Assert.assertTrue(runFully.cancel(true));
            Assert.assertTrue(runFully.isDone());
            Assert.assertTrue(runFully.isCancelled());
            sleepyFrameProcessor.awaitCleanup();
            Assert.assertTrue(sleepyFrameProcessor.didGetInterrupt());
            Assert.assertTrue(sleepyFrameProcessor.didCleanup());
        }

        @Test
        public void test_cancel_concurrency() throws Exception {
            Frame frame = (Frame) Iterables.getOnlyElement(FrameSequenceBuilder.fromAdapter(new QueryableIndexStorageAdapter(TestIndex.getMMappedTestIndex())).frameType(FrameType.ROW_BASED).frames().toList());
            HashSet<String> hashSet = new HashSet();
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            IdentityHashMap identityHashMap = new IdentityHashMap();
            HashMap hashMap3 = new HashMap();
            boolean z = false;
            for (int i = 0; i < 1000; i++) {
                String uuid = UUID.randomUUID().toString();
                ArrayList arrayList = new ArrayList(8);
                ArrayList arrayList2 = new ArrayList(8);
                for (int i2 = 0; i2 < 8; i2++) {
                    BlockingQueueFrameChannel minimal = BlockingQueueFrameChannel.minimal();
                    arrayList.add(new InfiniteFrameProcessor(frame, minimal.writable()));
                    arrayList2.add(minimal.readable());
                }
                ChompingFrameProcessor chompingFrameProcessor = new ChompingFrameProcessor(arrayList2);
                hashSet.add(uuid);
                hashMap.put(uuid, arrayList);
                hashMap2.put(uuid, chompingFrameProcessor);
                hashMap3.put(uuid, Boolean.valueOf(z));
                z = !z;
            }
            for (String str : hashMap.keySet()) {
                for (InfiniteFrameProcessor infiniteFrameProcessor : (List) hashMap.get(str)) {
                    identityHashMap.put(infiniteFrameProcessor, this.exec.runFully(infiniteFrameProcessor, str));
                }
                ChompingFrameProcessor chompingFrameProcessor2 = (ChompingFrameProcessor) hashMap2.get(str);
                identityHashMap.put(chompingFrameProcessor2, this.exec.runFully(chompingFrameProcessor2, str));
            }
            for (Map.Entry entry : hashMap2.entrySet()) {
                String str2 = (String) entry.getKey();
                ((ChompingFrameProcessor) entry.getValue()).awaitRead();
                if (((Boolean) hashMap3.get(str2)).booleanValue()) {
                    ((List) hashMap.get(str2)).forEach((v0) -> {
                        v0.stop();
                    });
                } else {
                    this.exec.cancel(str2);
                }
            }
            for (String str3 : hashSet) {
                boolean booleanValue = ((Boolean) hashMap3.get(str3)).booleanValue();
                List<InfiniteFrameProcessor> list = (List) hashMap.get(str3);
                ChompingFrameProcessor chompingFrameProcessor3 = (ChompingFrameProcessor) hashMap2.get(str3);
                if (booleanValue) {
                    long j = 0;
                    for (InfiniteFrameProcessor infiniteFrameProcessor2 : list) {
                        Long l = (Long) ((ListenableFuture) identityHashMap.get(infiniteFrameProcessor2)).get();
                        Assert.assertNotNull(l);
                        Assert.assertEquals(infiniteFrameProcessor2.getNumFrames(), l.longValue());
                        j += l.longValue();
                    }
                    Long l2 = (Long) ((ListenableFuture) identityHashMap.get(chompingFrameProcessor3)).get();
                    Assert.assertNotNull(l2);
                    Assert.assertEquals(j, l2.longValue());
                } else {
                    Iterator it = ImmutableList.copyOf(Iterables.concat(list, Collections.singleton(chompingFrameProcessor3))).iterator();
                    while (it.hasNext()) {
                        ListenableFuture listenableFuture = (ListenableFuture) identityHashMap.get((FrameProcessor) it.next());
                        Assert.assertTrue(listenableFuture.isDone());
                        Assert.assertTrue(listenableFuture.isCancelled());
                        Objects.requireNonNull(listenableFuture);
                        MatcherAssert.assertThat((Exception) Assert.assertThrows(Exception.class, listenableFuture::get), CoreMatchers.instanceOf(CancellationException.class));
                    }
                }
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    Assert.assertTrue(((InfiniteFrameProcessor) it2.next()).didCleanup());
                }
                Assert.assertTrue(chompingFrameProcessor3.didCleanup());
            }
        }

        @Test
        public void test_cancel_nonexistentCancellationId() throws InterruptedException {
            this.exec.cancel("nonexistent");
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/druid/frame/processor/FrameProcessorExecutorTest$SuperBlasterTests.class */
    public static class SuperBlasterTests extends BaseFrameProcessorExecutorTestSuite {
        private final SuperBlasterFrameProcessor.AwaitStyle awaitStyle;

        public SuperBlasterTests(int i, SuperBlasterFrameProcessor.AwaitStyle awaitStyle) {
            super(i);
            this.awaitStyle = awaitStyle;
        }

        @Parameterized.Parameters(name = "numThreads = {0}, awaitStyle = {1}")
        public static Collection<Object[]> constructorFeeder() {
            ArrayList arrayList = new ArrayList();
            for (int i : new int[]{1, 3, 12}) {
                for (SuperBlasterFrameProcessor.AwaitStyle awaitStyle : SuperBlasterFrameProcessor.AwaitStyle.values()) {
                    arrayList.add(new Object[]{Integer.valueOf(i), awaitStyle});
                }
            }
            return arrayList;
        }

        @Test
        public void test_runFully() throws Exception {
            List<File> writeToNFiles = writeToNFiles(new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex()), 3);
            File newFile = this.temporaryFolder.newFile();
            BlockingQueueFrameChannel minimal = BlockingQueueFrameChannel.minimal();
            BlockingQueueFrameChannel minimal2 = BlockingQueueFrameChannel.minimal();
            SuperBlasterFrameProcessor superBlasterFrameProcessor = new SuperBlasterFrameProcessor((List) writeToNFiles.stream().map(file -> {
                return FrameProcessorExecutorTest.openFileChannel(file);
            }).collect(Collectors.toList()), ImmutableList.of(minimal.writable(), minimal2.writable()), this.awaitStyle);
            FrameChannelMixer frameChannelMixer = new FrameChannelMixer(ImmutableList.of(minimal.readable(), minimal2.readable()), new WritableFrameFileChannel(FrameFileWriter.open(Channels.newChannel(Files.newOutputStream(newFile.toPath(), new OpenOption[0])), (ByteBuffer) null, ByteTracker.unboundedTracker())));
            ListenableFuture runFully = this.exec.runFully(superBlasterFrameProcessor, (String) null);
            ListenableFuture runFully2 = this.exec.runFully(frameChannelMixer, (String) null);
            Assert.assertEquals(r0.getNumRows(), ((Long) runFully.get()).longValue());
            Assert.assertEquals(r0.getNumRows() * 2, ((Long) runFully2.get()).longValue());
            Assert.assertEquals(r0.getNumRows() * 2, FrameTestUtil.readRowsFromFrameChannel(new ReadableFileFrameChannel(FrameFile.open(newFile, (ByteTracker) null, new FrameFile.Flag[0])), FrameReader.create(r0.getRowSignature())).toList().size());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ReadableFrameChannel openFileChannel(File file) {
        try {
            return new ReadableFileFrameChannel(FrameFile.open(file, (ByteTracker) null, new FrameFile.Flag[0]));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
