package org.apache.druid.frame.channel;

import com.google.common.io.ByteStreams;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.concurrent.ExecutorService;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/druid/frame/channel/ReadableInputStreamFrameChannelTest.class */
public class ReadableInputStreamFrameChannelTest extends InitializedNullHandlingTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    final IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex());
    ExecutorService executorService = Execs.singleThreaded("input-stream-fetcher-test");

    @Test
    public void testSimpleFrameFile() {
        ReadableInputStreamFrameChannel open = ReadableInputStreamFrameChannel.open(getInputStream(), "readSimpleFrameFile", this.executorService, false);
        FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromAdapter(this.adapter, null, false), FrameTestUtil.readRowsFromFrameChannel(open, FrameReader.create(this.adapter.getRowSignature())));
        Assert.assertTrue(open.isFinished());
        open.close();
    }

    @Test
    public void testEmptyFrameFile() throws IOException {
        ReadableInputStreamFrameChannel open = ReadableInputStreamFrameChannel.open(Files.newInputStream(FrameTestUtil.writeFrameFile(Sequences.empty(), this.temporaryFolder.newFile()).toPath(), new OpenOption[0]), "readEmptyFrameFile", this.executorService, false);
        Assert.assertEquals(FrameTestUtil.readRowsFromFrameChannel(open, FrameReader.create(this.adapter.getRowSignature())).toList().size(), 0L);
        Assert.assertTrue(open.isFinished());
        open.close();
    }

    @Test
    public void testZeroBytesFrameFile() throws IOException {
        File newFile = this.temporaryFolder.newFile();
        FileOutputStream fileOutputStream = new FileOutputStream(newFile);
        fileOutputStream.write(new byte[0]);
        fileOutputStream.close();
        ReadableInputStreamFrameChannel open = ReadableInputStreamFrameChannel.open(Files.newInputStream(newFile.toPath(), new OpenOption[0]), "testZeroBytesFrameFile", this.executorService, false);
        MatcherAssert.assertThat((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            FrameTestUtil.readRowsFromFrameChannel(open, FrameReader.create(this.adapter.getRowSignature())).toList();
        }), ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Incomplete or missing frame at end of stream")));
    }

    @Test
    public void testTruncatedFrameFile() throws IOException {
        File writeFrameFile = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromAdapter(new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex())).allocator(ArenaMemoryAllocator.create(ByteBuffer.allocate(64000))).frameType(FrameType.ROW_BASED).frames(), this.temporaryFolder.newFile());
        byte[] bArr = new byte[30000];
        FileInputStream fileInputStream = new FileInputStream(writeFrameFile);
        Throwable th = null;
        try {
            ByteStreams.readFully(fileInputStream, bArr);
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            ReadableInputStreamFrameChannel open = ReadableInputStreamFrameChannel.open(new ByteArrayInputStream(bArr), "readTruncatedFrameFile", this.executorService, false);
            this.expectedException.expect(ISE.class);
            this.expectedException.expectMessage("Incomplete or missing frame at end of stream");
            Assert.assertEquals(FrameTestUtil.readRowsFromFrameChannel(open, FrameReader.create(r0.getRowSignature())).toList().size(), 0L);
            Assert.assertTrue(open.isFinished());
            open.close();
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testIncorrectFrameFile() throws IOException {
        File newFile = this.temporaryFolder.newFile();
        FileOutputStream fileOutputStream = new FileOutputStream(newFile);
        fileOutputStream.write(10);
        fileOutputStream.close();
        ReadableInputStreamFrameChannel open = ReadableInputStreamFrameChannel.open(Files.newInputStream(newFile.toPath(), new OpenOption[0]), "readIncorrectFrameFile", this.executorService, false);
        this.expectedException.expect(ISE.class);
        this.expectedException.expectMessage("Incomplete or missing frame at end of stream");
        Assert.assertEquals(FrameTestUtil.readRowsFromFrameChannel(open, FrameReader.create(this.adapter.getRowSignature())).toList().size(), 0L);
        Assert.assertTrue(open.isFinished());
        open.close();
    }

    @Test
    public void closeInputStreamWhileReading() throws IOException {
        InputStream inputStream = getInputStream();
        ReadableInputStreamFrameChannel open = ReadableInputStreamFrameChannel.open(inputStream, "closeInputStreamWhileReading", this.executorService, false);
        inputStream.close();
        this.expectedException.expect(ISE.class);
        this.expectedException.expectMessage("Found error while reading input stream");
        FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromAdapter(this.adapter, null, false), FrameTestUtil.readRowsFromFrameChannel(open, FrameReader.create(this.adapter.getRowSignature())));
        Assert.assertTrue(open.isFinished());
        open.close();
    }

    @Test
    public void closeInputStreamWhileReadingCheckError() throws IOException, InterruptedException {
        InputStream inputStream = getInputStream();
        ReadableInputStreamFrameChannel open = ReadableInputStreamFrameChannel.open(inputStream, "closeInputStreamWhileReadingCheckError", this.executorService, false);
        inputStream.close();
        this.expectedException.expect(ISE.class);
        this.expectedException.expectMessage("Found error while reading input stream");
        while (!open.canRead()) {
            Thread.sleep(10L);
        }
        open.read();
        Assert.assertTrue(open.isFinished());
        open.close();
    }

    private InputStream getInputStream() {
        try {
            return Files.newInputStream(FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromAdapter(this.adapter).maxRowsPerFrame(10).frameType(FrameType.ROW_BASED).frames(), this.temporaryFolder.newFile()).toPath(), new OpenOption[0]);
        } catch (IOException e) {
            throw new ISE(e, "Unable to create file input stream", new Object[0]);
        }
    }
}
