package org.apache.flink.connector.file.src.impl;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.FSDataInputStream;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.class */
public class StreamFormatAdapterTest extends AdapterTestBase<StreamFormat<Integer>> {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest$CheckpointedIntFormat.class */
    public static final class CheckpointedIntFormat implements StreamFormat<Integer> {
        private CheckpointedIntFormat() {
        }

        public StreamFormat.Reader<Integer> createReader(Configuration configuration, FSDataInputStream fSDataInputStream, long j, long j2) throws IOException {
            Assert.assertEquals("invalid file length", 0L, j % 4);
            long pos = fSDataInputStream.getPos();
            long j3 = pos == 0 ? 0L : (pos + 4) - (pos % 4);
            long j4 = j2 == j ? j : (j2 + 4) - (j2 % 4);
            fSDataInputStream.seek(j3);
            return new TestIntReader(fSDataInputStream, j4, true);
        }

        public StreamFormat.Reader<Integer> restoreReader(Configuration configuration, FSDataInputStream fSDataInputStream, long j, long j2, long j3) throws IOException {
            Assert.assertEquals("invalid file length", 0L, j2 % 4);
            long j4 = j3 == j2 ? j2 : (j3 + 4) - (j3 % 4);
            fSDataInputStream.seek(j);
            return new TestIntReader(fSDataInputStream, j4, true);
        }

        public boolean isSplittable() {
            return true;
        }

        public TypeInformation<Integer> getProducedType() {
            return Types.INT;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest$FailingInstantiationFormat.class */
    public static final class FailingInstantiationFormat implements StreamFormat<Integer> {
        private FailingInstantiationFormat() {
        }

        public StreamFormat.Reader<Integer> createReader(Configuration configuration, FSDataInputStream fSDataInputStream, long j, long j2) throws IOException {
            throw new IOException("test exception");
        }

        public StreamFormat.Reader<Integer> restoreReader(Configuration configuration, FSDataInputStream fSDataInputStream, long j, long j2, long j3) throws IOException {
            throw new IOException("test exception");
        }

        public boolean isSplittable() {
            return false;
        }

        public TypeInformation<Integer> getProducedType() {
            return Types.INT;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest$NonCheckpointedIntFormat.class */
    public static final class NonCheckpointedIntFormat extends SimpleStreamFormat<Integer> {
        private NonCheckpointedIntFormat() {
        }

        public StreamFormat.Reader<Integer> createReader(Configuration configuration, FSDataInputStream fSDataInputStream) throws IOException {
            return new TestIntReader(fSDataInputStream, Long.MAX_VALUE, false);
        }

        public TypeInformation<Integer> getProducedType() {
            return Types.INT;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.connector.file.src.impl.AdapterTestBase
    public StreamFormat<Integer> createCheckpointedFormat() {
        return new CheckpointedIntFormat();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.connector.file.src.impl.AdapterTestBase
    public StreamFormat<Integer> createNonCheckpointedFormat() {
        return new NonCheckpointedIntFormat();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.connector.file.src.impl.AdapterTestBase
    public StreamFormat<Integer> createFormatFailingInInstantiation() {
        return new FailingInstantiationFormat();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.connector.file.src.impl.AdapterTestBase
    public BulkFormat<Integer, FileSourceSplit> wrapWithAdapter(StreamFormat<Integer> streamFormat) {
        return new StreamFormatAdapter(streamFormat);
    }

    @Test
    public void testReadSmallBatchSize() throws IOException {
        simpleReadTest(1);
    }

    @Test
    public void testBatchSizeMatchesOneRecord() throws IOException {
        simpleReadTest(4);
    }

    @Test
    public void testBatchSizeIsRecordMultiple() throws IOException {
        simpleReadTest(20);
    }

    private void simpleReadTest(int i) throws IOException {
        Configuration configuration = new Configuration();
        configuration.set(StreamFormat.FETCH_IO_SIZE, new MemorySize(i));
        BulkFormat.Reader createReader = new StreamFormatAdapter(new CheckpointedIntFormat()).createReader(configuration, new FileSourceSplit("test-id", testPath, 0L, 400L, 0L, 400L));
        ArrayList arrayList = new ArrayList();
        readNumbers(createReader, arrayList, 100);
        verifyIntListResult(arrayList);
    }
}
