package org.apache.flink.connector.file.src.impl;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/connector/file/src/impl/AdapterTestBase.class */
public abstract class AdapterTestBase<FormatT> {

    @ClassRule
    public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
    protected static final int NUM_NUMBERS = 100;
    protected static final long FILE_LEN = 400;
    protected static Path testPath;

    /* loaded from: input_file:org/apache/flink/connector/file/src/impl/AdapterTestBase$CloseTestingInputStream.class */
    private static class CloseTestingInputStream extends FSDataInputStream {
        boolean closed;

        private CloseTestingInputStream() {
        }

        public void seek(long j) throws IOException {
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public int read() throws IOException {
            throw new UnsupportedOperationException();
        }

        public void close() throws IOException {
            this.closed = true;
        }
    }

    @BeforeClass
    public static void writeTestFile() throws IOException {
        File file = new File(TMP_DIR.getRoot(), "testFile");
        testPath = Path.fromLocalFile(file);
        DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(file));
        Throwable th = null;
        try {
            for (int i = 0; i < NUM_NUMBERS; i++) {
                dataOutputStream.writeInt(i);
            }
            if (dataOutputStream != null) {
                if (0 == 0) {
                    dataOutputStream.close();
                    return;
                }
                try {
                    dataOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (dataOutputStream != null) {
                if (0 != 0) {
                    try {
                        dataOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    dataOutputStream.close();
                }
            }
            throw th3;
        }
    }

    protected abstract FormatT createCheckpointedFormat();

    protected abstract FormatT createNonCheckpointedFormat();

    protected abstract FormatT createFormatFailingInInstantiation();

    protected abstract BulkFormat<Integer, FileSourceSplit> wrapWithAdapter(FormatT formatt);

    @Test
    public void testRecoverCheckpointedFormatOneSplit() throws IOException {
        testReading(createCheckpointedFormat(), 1, 5, 44);
    }

    @Test
    public void testRecoverCheckpointedFormatMultipleSplits() throws IOException {
        testReading(createCheckpointedFormat(), 3, 11, 33, 56);
    }

    @Test
    public void testRecoverNonCheckpointedFormatOneSplit() throws IOException {
        testReading(createNonCheckpointedFormat(), 1, 5, 44);
    }

    private void testReading(FormatT formatt, int i, int... iArr) throws IOException {
        int[] copyOf = Arrays.copyOf(iArr, iArr.length + 1);
        copyOf[copyOf.length - 1] = NUM_NUMBERS;
        Configuration configuration = new Configuration();
        configuration.set(StreamFormat.FETCH_IO_SIZE, new MemorySize(10L));
        BulkFormat<Integer, FileSourceSplit> wrapWithAdapter = wrapWithAdapter(formatt);
        Queue<FileSourceSplit> buildSplits = buildSplits(i);
        ArrayList arrayList = new ArrayList();
        FileSourceSplit fileSourceSplit = null;
        BulkFormat.Reader reader = null;
        for (int i2 : copyOf) {
            FileSourceSplit readNumbers = readNumbers(reader, fileSourceSplit, wrapWithAdapter, buildSplits, configuration, arrayList, i2 - arrayList.size());
            fileSourceSplit = readNumbers;
            reader = readNumbers == null ? null : wrapWithAdapter.restoreReader(configuration, readNumbers);
        }
        verifyIntListResult(arrayList);
    }

    @Test
    public void testClosesStreamIfReaderCreationFails() throws Exception {
        Path path = new Path("testFs:///testpath-1");
        CloseTestingInputStream closeTestingInputStream = new CloseTestingInputStream();
        TestingFileSystem createForFileStatus = TestingFileSystem.createForFileStatus("testFs", TestingFileSystem.TestFileStatus.forFileWithStream(path, 1024L, closeTestingInputStream));
        createForFileStatus.register();
        try {
            wrapWithAdapter(createFormatFailingInInstantiation()).createReader(new Configuration(), new FileSourceSplit("id", path, 0L, 1024L, 0L, 1024L));
        } catch (IOException e) {
        }
        Assert.assertTrue(closeTestingInputStream.closed);
        createForFileStatus.unregister();
    }

    @Test
    public void testClosesStreamIfReaderRestoreFails() throws Exception {
        Path path = new Path("testFs:///testpath-1");
        CloseTestingInputStream closeTestingInputStream = new CloseTestingInputStream();
        TestingFileSystem createForFileStatus = TestingFileSystem.createForFileStatus("testFs", TestingFileSystem.TestFileStatus.forFileWithStream(path, 1024L, closeTestingInputStream));
        createForFileStatus.register();
        try {
            wrapWithAdapter(createFormatFailingInInstantiation()).restoreReader(new Configuration(), new FileSourceSplit("id", path, 0L, 1024L, 0L, 1024L, new String[0], new CheckpointedPosition(0L, 5L)));
        } catch (IOException e) {
        }
        Assert.assertTrue(closeTestingInputStream.closed);
        createForFileStatus.unregister();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void verifyIntListResult(List<Integer> list) {
        Assert.assertEquals("wrong result size", 100L, list.size());
        int i = 0;
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            if (it.next().intValue() != i2) {
                Assert.fail("Wrong result: " + list);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void readNumbers(BulkFormat.Reader<Integer> reader, List<Integer> list, int i) throws IOException {
        readNumbers(reader, null, null, null, null, list, i);
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected static FileSourceSplit readNumbers(BulkFormat.Reader<Integer> reader, FileSourceSplit fileSourceSplit, BulkFormat<Integer, FileSourceSplit> bulkFormat, Queue<FileSourceSplit> queue, Configuration configuration, List<Integer> list, int i) throws IOException {
        BulkFormat.RecordIterator readBatch;
        RecordAndPosition next;
        long j = Long.MIN_VALUE;
        long j2 = Long.MIN_VALUE;
        while (i > 0) {
            if (reader == null) {
                fileSourceSplit = queue.poll();
                Assert.assertNotNull(fileSourceSplit);
                reader = bulkFormat.createReader(configuration, fileSourceSplit);
            }
            while (i > 0 && (readBatch = reader.readBatch()) != null) {
                while (i > 0 && (next = readBatch.next()) != null) {
                    i--;
                    list.add(next.getRecord());
                    j = next.getOffset();
                    j2 = next.getRecordSkipCount();
                }
            }
            reader.close();
            reader = null;
        }
        if (fileSourceSplit != null) {
            return fileSourceSplit.updateWithCheckpointedPosition(new CheckpointedPosition(j, j2));
        }
        return null;
    }

    static Queue<FileSourceSplit> buildSplits(int i) {
        ArrayDeque arrayDeque = new ArrayDeque();
        long j = FILE_LEN / i;
        for (int i2 = 0; i2 < i - 1; i2++) {
            arrayDeque.add(new FileSourceSplit("ID-" + i2, testPath, i2 * j, j, 0L, FILE_LEN));
        }
        long j2 = (i - 1) * j;
        arrayDeque.add(new FileSourceSplit("ID-" + (i - 1), testPath, j2, FILE_LEN - j2, 0L, FILE_LEN));
        return arrayDeque;
    }
}
