package org.apache.flink.connector.file.src;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.class */
class FileSourceHeavyThroughputTest {
    private TestingFileSystem testFs;

    /* loaded from: input_file:org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest$ArrayReader.class */
    private static final class ArrayReader implements StreamFormat.Reader<byte[]> {
        private static final int ARRAY_SIZE = 1048576;
        private final FSDataInputStream in;

        ArrayReader(FSDataInputStream fSDataInputStream) {
            this.in = fSDataInputStream;
        }

        @Nullable
        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public byte[] m11read() throws IOException {
            byte[] bArr = new byte[ARRAY_SIZE];
            int read = this.in.read(bArr);
            if (read == bArr.length) {
                return bArr;
            }
            if (read == -1) {
                return null;
            }
            return Arrays.copyOf(bArr, read);
        }

        public void close() throws IOException {
            this.in.close();
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest$ArrayReaderFormat.class */
    private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
        private static final long serialVersionUID = 1;

        private ArrayReaderFormat() {
        }

        public StreamFormat.Reader<byte[]> createReader(Configuration configuration, FSDataInputStream fSDataInputStream) throws IOException {
            return new ArrayReader(fSDataInputStream);
        }

        public TypeInformation<byte[]> getProducedType() {
            return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest$GeneratingInputStream.class */
    private static final class GeneratingInputStream extends FSDataInputStream {
        private final long length;
        private long pos;

        GeneratingInputStream(long j) {
            this.length = j;
        }

        public void seek(long j) throws IOException {
            Preconditions.checkArgument(j >= 0 && j <= this.length);
            this.pos = j;
        }

        public long getPos() throws IOException {
            return this.pos;
        }

        public int read() throws IOException {
            if (this.pos >= this.length) {
                return -1;
            }
            this.pos++;
            return 0;
        }

        public int read(byte[] bArr, int i, int i2) throws IOException {
            if (this.pos >= this.length) {
                return -1;
            }
            int min = (int) Math.min(i2, this.length - this.pos);
            this.pos += min;
            return min;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest$NoOpReaderContext.class */
    private static final class NoOpReaderContext implements SourceReaderContext {
        private NoOpReaderContext() {
        }

        public SourceReaderMetricGroup metricGroup() {
            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
        }

        public Configuration getConfiguration() {
            return new Configuration();
        }

        public String getLocalHostName() {
            return "localhost";
        }

        public int getIndexOfSubtask() {
            return 0;
        }

        public void sendSplitRequest() {
        }

        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
        }

        public UserCodeClassLoader getUserCodeClassLoader() {
            return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest$NoOpReaderOutput.class */
    private static final class NoOpReaderOutput<E> implements ReaderOutput<E> {
        private NoOpReaderOutput() {
        }

        public void collect(E e) {
        }

        public void collect(E e, long j) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markIdle() {
        }

        public void markActive() {
        }

        public SourceOutput<E> createOutputForSplit(String str) {
            return this;
        }

        public void releaseOutputForSplit(String str) {
        }
    }

    FileSourceHeavyThroughputTest() {
    }

    @AfterEach
    void unregisterTestFs() throws Exception {
        if (this.testFs != null) {
            this.testFs.unregister();
        }
    }

    @Test
    void testHeavyThroughput() throws Exception {
        Path path = new Path("testfs:///testpath");
        FileSourceSplit fileSourceSplit = new FileSourceSplit("testsplitId", path, 0L, 21474836480L, 0L, 21474836480L);
        this.testFs = TestingFileSystem.createForFileStatus(path.toUri().getScheme(), TestingFileSystem.TestFileStatus.forFileWithStream(path, 21474836480L, new GeneratingInputStream(21474836480L)));
        this.testFs.register();
        SourceReader createReader = FileSource.forRecordStreamFormat(new ArrayReaderFormat(), new Path[]{path}).build().createReader(new NoOpReaderContext());
        createReader.addSplits(Collections.singletonList(fileSourceSplit));
        createReader.notifyNoMoreSplits();
        NoOpReaderOutput noOpReaderOutput = new NoOpReaderOutput();
        while (true) {
            InputStatus pollNext = createReader.pollNext(noOpReaderOutput);
            if (pollNext == InputStatus.END_OF_INPUT) {
                return;
            }
            if (pollNext == InputStatus.NOTHING_AVAILABLE) {
                createReader.isAvailable().get();
            }
        }
    }
}
