package org.apache.druid.frame.channel;

import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.druid.collections.bitmap.BitmapOperationTestBase;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/* loaded from: input_file:org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.class */
public class ReadableByteChunksFrameChannelTest {

    /* loaded from: input_file:org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest$NonParameterizedTests.class */
    public static class NonParameterizedTests extends InitializedNullHandlingTest {

        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();

        @Rule
        public ExpectedException expectedException = ExpectedException.none();

        @Test
        public void testZeroBytes() {
            ReadableByteChunksFrameChannel create = ReadableByteChunksFrameChannel.create("test", false);
            create.doneWriting();
            Assert.assertTrue(create.canRead());
            Assert.assertFalse(create.isFinished());
            Assert.assertTrue(create.isErrorOrFinished());
            this.expectedException.expect(IllegalStateException.class);
            this.expectedException.expectMessage("Incomplete or missing frame at end of stream (id = test, position = 0)");
            create.read();
        }

        @Test
        public void testZeroBytesWithSpecialError() {
            ReadableByteChunksFrameChannel create = ReadableByteChunksFrameChannel.create("test", false);
            create.setError(new IllegalArgumentException("test error"));
            create.doneWriting();
            Assert.assertTrue(create.canRead());
            Assert.assertFalse(create.isFinished());
            Assert.assertTrue(create.isErrorOrFinished());
            this.expectedException.expect(IllegalArgumentException.class);
            this.expectedException.expectMessage("test error");
            create.read();
        }

        @Test
        public void testEmptyFrameFile() throws IOException {
            File writeFrameFile = FrameTestUtil.writeFrameFile((Sequence<Frame>) Sequences.empty(), this.temporaryFolder.newFile());
            ReadableByteChunksFrameChannel create = ReadableByteChunksFrameChannel.create("test", false);
            create.addChunk(Files.toByteArray(writeFrameFile));
            create.doneWriting();
            Assert.assertEquals(writeFrameFile.length(), create.getBytesAdded());
            while (create.canRead()) {
                Assert.assertFalse(create.isFinished());
                Assert.assertFalse(create.isErrorOrFinished());
                create.read();
            }
            Assert.assertTrue(create.isFinished());
            create.close();
        }

        @Test
        public void testAddChunkAfterDoneWriting() {
            ReadableByteChunksFrameChannel create = ReadableByteChunksFrameChannel.create("test", false);
            Throwable th = null;
            try {
                create.doneWriting();
                Assert.assertThrows(ChannelClosedForWritesException.class, () -> {
                    create.addChunk(new byte[0]);
                });
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }

        @Test
        public void testTruncatedFrameFile() throws IOException {
            byte[] bArr = new byte[30000];
            FileInputStream fileInputStream = new FileInputStream(FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromCursorFactory(new IncrementalIndexCursorFactory(TestIndex.getIncrementalTestIndex())).allocator(ArenaMemoryAllocator.create(ByteBuffer.allocate(64000))).frameType(FrameType.COLUMNAR).frames(), this.temporaryFolder.newFile()));
            Throwable th = null;
            try {
                ByteStreams.readFully(fileInputStream, bArr);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                ReadableByteChunksFrameChannel create = ReadableByteChunksFrameChannel.create("test", false);
                create.addChunk(bArr);
                create.doneWriting();
                Assert.assertEquals(bArr.length, create.getBytesAdded());
                Assert.assertTrue(create.canRead());
                Assert.assertFalse(create.isFinished());
                Assert.assertFalse(create.isErrorOrFinished());
                create.read();
                Assert.assertTrue(create.canRead());
                Assert.assertFalse(create.isFinished());
                Assert.assertFalse(create.isErrorOrFinished());
                create.read();
                Assert.assertTrue(create.canRead());
                Assert.assertFalse(create.isFinished());
                Assert.assertTrue(create.isErrorOrFinished());
                this.expectedException.expect(IllegalStateException.class);
                this.expectedException.expectMessage(CoreMatchers.startsWith("Incomplete or missing frame at end of stream"));
                create.read();
            } catch (Throwable th3) {
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                throw th3;
            }
        }

        @Test
        public void testSetError() throws IOException {
            File writeFrameFile = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromCursorFactory(new IncrementalIndexCursorFactory(TestIndex.getIncrementalTestIndex())).allocator(ArenaMemoryAllocator.create(ByteBuffer.allocate(64000))).frameType(FrameType.COLUMNAR).frames(), this.temporaryFolder.newFile());
            ReadableByteChunksFrameChannel create = ReadableByteChunksFrameChannel.create("test", false);
            byte[] byteArray = Files.toByteArray(writeFrameFile);
            byte[] bArr = new byte[30000];
            System.arraycopy(byteArray, 0, bArr, 0, bArr.length);
            create.addChunk(bArr);
            Assert.assertEquals(bArr.length, create.getBytesAdded());
            create.setError(new ISE("Test error!", new Object[0]));
            create.doneWriting();
            Assert.assertEquals(bArr.length, create.getBytesAdded());
            this.expectedException.expect(IllegalStateException.class);
            this.expectedException.expectMessage("Test error!");
            create.read();
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest$ParameterizedWithTestIndexTests.class */
    public static class ParameterizedWithTestIndexTests extends InitializedNullHandlingTest {

        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
        private final FrameType frameType;
        private final int maxRowsPerFrame;
        private final int chunkSize;

        /* loaded from: input_file:org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest$ParameterizedWithTestIndexTests$Chunker.class */
        private static class Chunker implements Closeable {
            private final FileInputStream in;
            private final int chunkSize;
            private final byte[] buf;
            private boolean eof = false;

            public Chunker(FileInputStream fileInputStream, int i) {
                this.in = fileInputStream;
                this.chunkSize = i;
                this.buf = new byte[i];
            }

            @Nullable
            public byte[] nextChunk() throws IOException {
                int i;
                if (this.eof) {
                    return null;
                }
                int i2 = 0;
                while (true) {
                    i = i2;
                    if (i >= this.chunkSize) {
                        break;
                    }
                    int read = this.in.read(this.buf, i, this.chunkSize - i);
                    if (read < 0) {
                        this.eof = true;
                        break;
                    }
                    i2 = i + read;
                }
                if (i > 0) {
                    return Arrays.copyOf(this.buf, i);
                }
                return null;
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                this.in.close();
            }
        }

        public ParameterizedWithTestIndexTests(FrameType frameType, int i, int i2) {
            this.frameType = frameType;
            this.maxRowsPerFrame = i;
            this.chunkSize = i2;
        }

        @Parameterized.Parameters(name = "frameType = {0}, maxRowsPerFrame = {1}, chunkSize = {2}")
        public static Iterable<Object[]> constructorFeeder() {
            ArrayList arrayList = new ArrayList();
            for (FrameType frameType : FrameType.values()) {
                for (int i : new int[]{1, 50, Integer.MAX_VALUE}) {
                    for (int i2 : new int[]{1, 10, BitmapOperationTestBase.NUM_BITMAPS, 5000, 50000, 1000000}) {
                        arrayList.add(new Object[]{frameType, Integer.valueOf(i), Integer.valueOf(i2)});
                    }
                }
            }
            return arrayList;
        }

        @Test
        public void testWriteFullyThenRead() throws IOException {
            IncrementalIndexCursorFactory incrementalIndexCursorFactory = new IncrementalIndexCursorFactory(TestIndex.getIncrementalTestIndex());
            File writeFrameFile = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromCursorFactory(incrementalIndexCursorFactory).maxRowsPerFrame(this.maxRowsPerFrame).frameType(this.frameType).frames(), this.temporaryFolder.newFile());
            ReadableByteChunksFrameChannel create = ReadableByteChunksFrameChannel.create("test", false);
            ListenableFuture listenableFuture = null;
            long j = 0;
            Assert.assertEquals(0L, create.getBytesBuffered());
            Chunker chunker = new Chunker(new FileInputStream(writeFrameFile), this.chunkSize);
            Throwable th = null;
            while (true) {
                try {
                    byte[] nextChunk = chunker.nextChunk();
                    if (nextChunk == null) {
                        break;
                    }
                    j += nextChunk.length;
                    ListenableFuture addChunk = create.addChunk(nextChunk);
                    Assert.assertEquals(create.getBytesAdded(), j);
                    Assert.assertEquals(Boolean.valueOf(create.canRead()), Boolean.valueOf(addChunk != null));
                    if (addChunk != null) {
                        if (listenableFuture == null) {
                            listenableFuture = addChunk;
                        } else {
                            Assert.assertSame(listenableFuture, addChunk);
                        }
                    }
                } catch (Throwable th2) {
                    if (chunker != null) {
                        if (0 != 0) {
                            try {
                                chunker.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            chunker.close();
                        }
                    }
                    throw th2;
                }
            }
            Assert.assertNotNull(listenableFuture);
            Assert.assertFalse(listenableFuture.isDone());
            create.doneWriting();
            if (chunker != null) {
                if (0 != 0) {
                    try {
                        chunker.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    chunker.close();
                }
            }
            FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromCursorFactory(incrementalIndexCursorFactory), FrameTestUtil.readRowsFromFrameChannel(create, FrameReader.create(incrementalIndexCursorFactory.getRowSignature())));
        }

        @Test
        public void testWriteReadInterleaved() throws IOException {
            IncrementalIndexCursorFactory incrementalIndexCursorFactory = new IncrementalIndexCursorFactory(TestIndex.getIncrementalTestIndex());
            File writeFrameFile = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromCursorFactory(incrementalIndexCursorFactory).maxRowsPerFrame(this.maxRowsPerFrame).frameType(this.frameType).frames(), this.temporaryFolder.newFile());
            ReadableByteChunksFrameChannel create = ReadableByteChunksFrameChannel.create("test", false);
            BlockingQueueFrameChannel blockingQueueFrameChannel = new BlockingQueueFrameChannel(10000);
            ListenableFuture listenableFuture = null;
            int i = 0;
            long j = 0;
            Chunker chunker = new Chunker(new FileInputStream(writeFrameFile), this.chunkSize);
            Throwable th = null;
            while (true) {
                try {
                    try {
                        byte[] nextChunk = chunker.nextChunk();
                        if (nextChunk == null) {
                            break;
                        }
                        if (i % 3 == 0) {
                            while (create.canRead()) {
                                blockingQueueFrameChannel.writable().write(create.read());
                            }
                            Assert.assertTrue(listenableFuture == null || listenableFuture.isDone());
                        } else if (i % 11 == 0 && create.canRead()) {
                            blockingQueueFrameChannel.writable().write(create.read());
                        }
                        if (listenableFuture != null && listenableFuture.isDone()) {
                            listenableFuture = null;
                        }
                        i++;
                        j += nextChunk.length;
                        ListenableFuture addChunk = create.addChunk(nextChunk);
                        Assert.assertEquals(j, create.getBytesAdded());
                        Assert.assertEquals(Boolean.valueOf(create.canRead()), Boolean.valueOf(addChunk != null));
                        if (addChunk != null) {
                            if (listenableFuture == null) {
                                listenableFuture = addChunk;
                            } else {
                                Assert.assertSame(listenableFuture, addChunk);
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (chunker != null) {
                        if (th != null) {
                            try {
                                chunker.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            chunker.close();
                        }
                    }
                    throw th2;
                }
            }
            create.doneWriting();
            while (create.canRead()) {
                blockingQueueFrameChannel.writable().write(create.read());
            }
            blockingQueueFrameChannel.writable().close();
            if (chunker != null) {
                if (0 != 0) {
                    try {
                        chunker.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    chunker.close();
                }
            }
            FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromCursorFactory(incrementalIndexCursorFactory), FrameTestUtil.readRowsFromFrameChannel(blockingQueueFrameChannel.readable(), FrameReader.create(incrementalIndexCursorFactory.getRowSignature())));
        }
    }
}
