package org.apache.flink.formats.avro;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AbstractAvroBulkFormat;
import org.apache.flink.formats.avro.AvroBulkFormatTestUtils;
import org.apache.flink.formats.avro.RowDataToAvroConverters;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/formats/avro/AvroBulkFormatTest.class */
class AvroBulkFormatTest {
    private static final List<RowData> TEST_DATA = Arrays.asList(GenericRowData.of(new Object[]{StringData.fromString("AvroBulk"), StringData.fromString("FormatTest")}), GenericRowData.of(new Object[]{StringData.fromString("Apache"), StringData.fromString("Flink")}), GenericRowData.of(new Object[]{StringData.fromString("永和九年，岁在癸丑，暮春之初，会于会稽山阴之兰亭，修禊事也。群贤毕至，少长咸集。此地有崇山峻岭，茂林修竹，又有清流激湍，映带左右。引以为流觞曲水，列坐其次。虽无丝竹管弦之盛，一觞一咏，亦足以畅叙幽情。"), StringData.fromString("")}), GenericRowData.of(new Object[]{StringData.fromString("File"), StringData.fromString("Format")}), GenericRowData.of(new Object[]{null, StringData.fromString("This is a string with English, 中文 and even ��������������")}), GenericRowData.of(new Object[]{StringData.fromString("block with"), StringData.fromString("only one record")}));
    private static final List<Integer> BLOCK_STARTS = Arrays.asList(186, 547, 659);
    private File tmpFile;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/formats/avro/AvroBulkFormatTest$BatchInfo.class */
    public static class BatchInfo {
        private final int start;
        private final int end;

        private BatchInfo(int i, int i2) {
            this.start = i;
            this.end = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/formats/avro/AvroBulkFormatTest$SplitInfo.class */
    public static class SplitInfo {
        private final long start;
        private final long end;
        private final List<BatchInfo> batches;

        private SplitInfo(long j, long j2, List<BatchInfo> list) {
            this.start = j;
            this.end = j2;
            this.batches = list;
        }
    }

    AvroBulkFormatTest() {
    }

    @BeforeEach
    public void before() throws IOException {
        this.tmpFile = Files.createTempFile("avro-bulk-format-test", ".avro", new FileAttribute[0]).toFile();
        this.tmpFile.createNewFile();
        FileOutputStream fileOutputStream = new FileOutputStream(this.tmpFile);
        Schema convertToSchema = AvroSchemaConverter.convertToSchema(AvroBulkFormatTestUtils.ROW_TYPE);
        RowDataToAvroConverters.RowDataToAvroConverter createConverter = RowDataToAvroConverters.createConverter(AvroBulkFormatTestUtils.ROW_TYPE);
        DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter(convertToSchema));
        dataFileWriter.create(convertToSchema, fileOutputStream);
        dataFileWriter.setSyncInterval(64);
        Iterator<RowData> it = TEST_DATA.iterator();
        while (it.hasNext()) {
            dataFileWriter.append((GenericRecord) createConverter.convert(convertToSchema, it.next()));
        }
        dataFileWriter.close();
    }

    @AfterEach
    public void after() throws IOException {
        FileUtils.deleteFileOrDirectory(this.tmpFile);
    }

    @Test
    void testReadWholeFileWithOneSplit() throws IOException {
        assertSplit(new AvroBulkFormatTestUtils.TestingAvroBulkFormat(), Collections.singletonList(new SplitInfo(0L, this.tmpFile.length(), Arrays.asList(new BatchInfo(0, 3), new BatchInfo(3, 5), new BatchInfo(5, 6)))));
    }

    @Test
    void testReadWholeFileWithMultipleSplits() throws IOException {
        AvroBulkFormatTestUtils.TestingAvroBulkFormat testingAvroBulkFormat = new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
        long length = this.tmpFile.length() / 3;
        assertSplit(testingAvroBulkFormat, Arrays.asList(new SplitInfo(0L, length, Collections.singletonList(new BatchInfo(0, 3))), new SplitInfo(length, length * 2, Collections.emptyList()), new SplitInfo(length * 2, this.tmpFile.length(), Arrays.asList(new BatchInfo(3, 5), new BatchInfo(5, 6)))));
    }

    @Test
    void testSplitsAtCriticalLocations() throws IOException {
        assertSplit(new AvroBulkFormatTestUtils.TestingAvroBulkFormat(), Arrays.asList(new SplitInfo(BLOCK_STARTS.get(0).intValue() - 16, BLOCK_STARTS.get(1).intValue() - 16, Collections.singletonList(new BatchInfo(0, 3))), new SplitInfo(BLOCK_STARTS.get(1).intValue() - 16, (BLOCK_STARTS.get(2).intValue() - 16) + 1, Arrays.asList(new BatchInfo(3, 5), new BatchInfo(5, 6)))));
    }

    @Test
    void testRestoreReader() throws IOException {
        AvroBulkFormatTestUtils.TestingAvroBulkFormat testingAvroBulkFormat = new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
        long length = this.tmpFile.length() / 3;
        String uuid = UUID.randomUUID().toString();
        AbstractAvroBulkFormat.AvroReader createReader = testingAvroBulkFormat.createReader(new Configuration(), new FileSourceSplit(uuid, new Path(this.tmpFile.toString()), length * 2, this.tmpFile.length()));
        long assertBatch = assertBatch(createReader, new BatchInfo(3, 5));
        assertBatch(createReader, new BatchInfo(5, 6));
        Assertions.assertThat(createReader.readBatch()).isNull();
        createReader.close();
        AbstractAvroBulkFormat.AvroReader restoreReader = testingAvroBulkFormat.restoreReader(new Configuration(), new FileSourceSplit(uuid, new Path(this.tmpFile.toString()), length * 2, this.tmpFile.length(), StringUtils.EMPTY_STRING_ARRAY, new CheckpointedPosition(assertBatch, 1L)));
        long assertBatch2 = assertBatch(restoreReader, new BatchInfo(3, 5), 1);
        assertBatch(restoreReader, new BatchInfo(5, 6));
        Assertions.assertThat(restoreReader.readBatch()).isNull();
        restoreReader.close();
        Assertions.assertThat(assertBatch2).isEqualTo(assertBatch);
    }

    private void assertSplit(AvroBulkFormatTestUtils.TestingAvroBulkFormat testingAvroBulkFormat, List<SplitInfo> list) throws IOException {
        for (SplitInfo splitInfo : list) {
            AbstractAvroBulkFormat.AvroReader createReader = testingAvroBulkFormat.createReader(new Configuration(), new FileSourceSplit(UUID.randomUUID().toString(), new Path(this.tmpFile.toString()), splitInfo.start, splitInfo.end - splitInfo.start));
            ArrayList arrayList = new ArrayList();
            Iterator it = splitInfo.batches.iterator();
            while (it.hasNext()) {
                arrayList.add(Long.valueOf(assertBatch(createReader, (BatchInfo) it.next())));
            }
            Assertions.assertThat(createReader.readBatch()).isNull();
            for (int i = 1; i < arrayList.size(); i++) {
                Assertions.assertThat(((Long) arrayList.get(i - 1)).longValue() < ((Long) arrayList.get(i)).longValue()).isTrue();
            }
            createReader.close();
        }
    }

    private long assertBatch(BulkFormat.Reader<RowData> reader, BatchInfo batchInfo) throws IOException {
        return assertBatch(reader, batchInfo, 0);
    }

    private long assertBatch(BulkFormat.Reader<RowData> reader, BatchInfo batchInfo, int i) throws IOException {
        long j = -1;
        int i2 = i;
        BulkFormat.RecordIterator readBatch = reader.readBatch();
        RecordAndPosition next = readBatch.next();
        while (true) {
            RecordAndPosition recordAndPosition = next;
            if (recordAndPosition == null) {
                Assertions.assertThat(i2).isEqualTo(batchInfo.end - batchInfo.start);
                readBatch.releaseBatch();
                return j;
            }
            if (j == -1) {
                j = recordAndPosition.getOffset();
            }
            Assertions.assertThat(recordAndPosition.getRecord()).isEqualTo(TEST_DATA.get(batchInfo.start + i2));
            Assertions.assertThat(recordAndPosition.getOffset()).isEqualTo(j);
            i2++;
            Assertions.assertThat(recordAndPosition.getRecordSkipCount()).isEqualTo(i2);
            next = readBatch.next();
        }
    }
}
