package org.apache.flink.table.store.connector.source;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.class */
public class FileStoreSourceSplitReaderTest {
    private static ExecutorService service;

    @TempDir
    Path tempDir;

    @BeforeAll
    public static void before() {
        service = Executors.newSingleThreadExecutor();
    }

    @AfterAll
    public static void after() {
        service.shutdownNow();
        service = null;
    }

    @Test
    public void testPrimaryKey() throws Exception {
        innerTestOnce(false, 0);
    }

    @Test
    public void testValueCount() throws Exception {
        innerTestOnce(true, 0);
    }

    @Test
    public void testPrimaryKeySkip() throws Exception {
        innerTestOnce(false, 4);
    }

    @Test
    public void testValueCountSkip() throws Exception {
        innerTestOnce(true, 7);
    }

    private void innerTestOnce(boolean z, int i) throws Exception {
        TestDataReadWrite testDataReadWrite = new TestDataReadWrite(this.tempDir.toString(), service);
        FileStoreSourceSplitReader fileStoreSourceSplitReader = new FileStoreSourceSplitReader(testDataReadWrite.createRead(), z);
        List<Tuple2<Long, Long>> kvs = kvs();
        assignSplit(fileStoreSourceSplitReader, new FileStoreSourceSplit("id1", CompactManagerTest.row(1), 0, testDataReadWrite.writeFiles(CompactManagerTest.row(1), 0, kvs), i));
        RecordsWithSplitIds<RecordAndPosition<RowData>> fetch = fileStoreSourceSplitReader.fetch();
        List asList = z ? Arrays.asList(new Tuple2(RowKind.INSERT, 1L), new Tuple2(RowKind.INSERT, 2L), new Tuple2(RowKind.INSERT, 2L), new Tuple2(RowKind.INSERT, 3L), new Tuple2(RowKind.INSERT, 3L), new Tuple2(RowKind.DELETE, 4L), new Tuple2(RowKind.INSERT, 5L), new Tuple2(RowKind.DELETE, 6L), new Tuple2(RowKind.DELETE, 6L)) : (List) kvs.stream().map(tuple2 -> {
            return new Tuple2(RowKind.INSERT, tuple2.f1);
        }).collect(Collectors.toList());
        Assertions.assertThat(readRecords(fetch, "id1", i)).isEqualTo(asList.subList(i, asList.size()));
        assertRecords(fileStoreSourceSplitReader.fetch(), "id1", "id1", 0L, null);
        fileStoreSourceSplitReader.close();
    }

    @Test
    public void testPrimaryKeyWithDelete() throws Exception {
        TestDataReadWrite testDataReadWrite = new TestDataReadWrite(this.tempDir.toString(), service);
        FileStoreSourceSplitReader fileStoreSourceSplitReader = new FileStoreSourceSplitReader(testDataReadWrite.createRead().withDropDelete(false), false);
        List<Tuple2<Long, Long>> kvs = kvs();
        RecordWriter createMergeTreeWriter = testDataReadWrite.createMergeTreeWriter(CompactManagerTest.row(1), 0);
        for (Tuple2<Long, Long> tuple2 : kvs) {
            createMergeTreeWriter.write(ValueKind.ADD, GenericRowData.of(new Object[]{tuple2.f0}), GenericRowData.of(new Object[]{tuple2.f1}));
        }
        createMergeTreeWriter.write(ValueKind.DELETE, GenericRowData.of(new Object[]{222L}), GenericRowData.of(new Object[]{333L}));
        List newFiles = createMergeTreeWriter.prepareCommit().newFiles();
        createMergeTreeWriter.close();
        assignSplit(fileStoreSourceSplitReader, new FileStoreSourceSplit("id1", CompactManagerTest.row(1), 0, newFiles));
        RecordsWithSplitIds<RecordAndPosition<RowData>> fetch = fileStoreSourceSplitReader.fetch();
        List list = (List) kvs.stream().map(tuple22 -> {
            return new Tuple2(RowKind.INSERT, tuple22.f1);
        }).collect(Collectors.toList());
        list.add(new Tuple2(RowKind.DELETE, 333L));
        Assertions.assertThat(readRecords(fetch, "id1", 0L)).isEqualTo(list);
        assertRecords(fileStoreSourceSplitReader.fetch(), "id1", "id1", 0L, null);
        fileStoreSourceSplitReader.close();
    }

    @Test
    public void testMultipleBatchInSplit() throws Exception {
        TestDataReadWrite testDataReadWrite = new TestDataReadWrite(this.tempDir.toString(), service);
        FileStoreSourceSplitReader fileStoreSourceSplitReader = new FileStoreSourceSplitReader(testDataReadWrite.createRead(), false);
        List<Tuple2<Long, Long>> kvs = kvs();
        List<SstFileMeta> writeFiles = testDataReadWrite.writeFiles(CompactManagerTest.row(1), 0, kvs);
        List<Tuple2<Long, Long>> kvs2 = kvs(6L);
        writeFiles.addAll(testDataReadWrite.writeFiles(CompactManagerTest.row(1), 0, kvs2));
        assignSplit(fileStoreSourceSplitReader, new FileStoreSourceSplit("id1", CompactManagerTest.row(1), 0, writeFiles));
        assertRecords(fileStoreSourceSplitReader.fetch(), null, "id1", 0L, (List) kvs.stream().map(tuple2 -> {
            return (Long) tuple2.f1;
        }).collect(Collectors.toList()));
        assertRecords(fileStoreSourceSplitReader.fetch(), null, "id1", 6L, (List) kvs2.stream().map(tuple22 -> {
            return (Long) tuple22.f1;
        }).collect(Collectors.toList()));
        assertRecords(fileStoreSourceSplitReader.fetch(), "id1", "id1", 0L, null);
        fileStoreSourceSplitReader.close();
    }

    @Test
    public void testRestore() throws Exception {
        TestDataReadWrite testDataReadWrite = new TestDataReadWrite(this.tempDir.toString(), service);
        FileStoreSourceSplitReader fileStoreSourceSplitReader = new FileStoreSourceSplitReader(testDataReadWrite.createRead(), false);
        List<Tuple2<Long, Long>> kvs = kvs();
        assignSplit(fileStoreSourceSplitReader, new FileStoreSourceSplit("id1", CompactManagerTest.row(1), 0, testDataReadWrite.writeFiles(CompactManagerTest.row(1), 0, kvs), 3L));
        assertRecords(fileStoreSourceSplitReader.fetch(), null, "id1", 3L, (List) kvs.subList(3, kvs.size()).stream().map(tuple2 -> {
            return (Long) tuple2.f1;
        }).collect(Collectors.toList()));
        assertRecords(fileStoreSourceSplitReader.fetch(), "id1", "id1", 0L, null);
        fileStoreSourceSplitReader.close();
    }

    @Test
    public void testRestoreMultipleBatchInSplit() throws Exception {
        TestDataReadWrite testDataReadWrite = new TestDataReadWrite(this.tempDir.toString(), service);
        FileStoreSourceSplitReader fileStoreSourceSplitReader = new FileStoreSourceSplitReader(testDataReadWrite.createRead(), false);
        List<Tuple2<Long, Long>> kvs = kvs();
        List<SstFileMeta> writeFiles = testDataReadWrite.writeFiles(CompactManagerTest.row(1), 0, kvs);
        List<Tuple2<Long, Long>> kvs2 = kvs(6L);
        writeFiles.addAll(testDataReadWrite.writeFiles(CompactManagerTest.row(1), 0, kvs2));
        assignSplit(fileStoreSourceSplitReader, new FileStoreSourceSplit("id1", CompactManagerTest.row(1), 0, writeFiles, 7L));
        assertRecords(fileStoreSourceSplitReader.fetch(), null, "id1", 7L, (List) Stream.concat(kvs.stream(), kvs2.stream()).skip(7L).map(tuple2 -> {
            return (Long) tuple2.f1;
        }).collect(Collectors.toList()));
        assertRecords(fileStoreSourceSplitReader.fetch(), "id1", "id1", 0L, null);
        fileStoreSourceSplitReader.close();
    }

    @Test
    public void testMultipleSplits() throws Exception {
        TestDataReadWrite testDataReadWrite = new TestDataReadWrite(this.tempDir.toString(), service);
        FileStoreSourceSplitReader fileStoreSourceSplitReader = new FileStoreSourceSplitReader(testDataReadWrite.createRead(), false);
        List<Tuple2<Long, Long>> kvs = kvs();
        assignSplit(fileStoreSourceSplitReader, new FileStoreSourceSplit("id1", CompactManagerTest.row(1), 0, testDataReadWrite.writeFiles(CompactManagerTest.row(1), 0, kvs)));
        List<Tuple2<Long, Long>> kvs2 = kvs();
        assignSplit(fileStoreSourceSplitReader, new FileStoreSourceSplit("id2", CompactManagerTest.row(2), 1, testDataReadWrite.writeFiles(CompactManagerTest.row(2), 1, kvs2)));
        assertRecords(fileStoreSourceSplitReader.fetch(), null, "id1", 0L, (List) kvs.stream().map(tuple2 -> {
            return (Long) tuple2.f1;
        }).collect(Collectors.toList()));
        assertRecords(fileStoreSourceSplitReader.fetch(), "id1", "id1", 0L, null);
        assertRecords(fileStoreSourceSplitReader.fetch(), null, "id2", 0L, (List) kvs2.stream().map(tuple22 -> {
            return (Long) tuple22.f1;
        }).collect(Collectors.toList()));
        assertRecords(fileStoreSourceSplitReader.fetch(), "id2", "id2", 0L, null);
        fileStoreSourceSplitReader.close();
    }

    @Test
    public void testNoSplit() throws Exception {
        FileStoreSourceSplitReader fileStoreSourceSplitReader = new FileStoreSourceSplitReader(new TestDataReadWrite(this.tempDir.toString(), service).createRead(), false);
        fileStoreSourceSplitReader.getClass();
        Assertions.assertThatThrownBy(fileStoreSourceSplitReader::fetch).hasMessageContaining("no split remaining");
        fileStoreSourceSplitReader.close();
    }

    private void assertRecords(RecordsWithSplitIds<RecordAndPosition<RowData>> recordsWithSplitIds, String str, String str2, long j, List<Long> list) {
        if (str != null) {
            Assertions.assertThat(recordsWithSplitIds.finishedSplits()).isEqualTo(Collections.singleton(str));
        } else {
            Assertions.assertThat((List) readRecords(recordsWithSplitIds, str2, j).stream().map(tuple2 -> {
                return (Long) tuple2.f1;
            }).collect(Collectors.toList())).isEqualTo(list);
        }
    }

    /* JADX WARN: Type inference failed for: r0v17, types: [org.assertj.core.api.AbstractLongAssert] */
    private List<Tuple2<RowKind, Long>> readRecords(RecordsWithSplitIds<RecordAndPosition<RowData>> recordsWithSplitIds, String str, long j) {
        Assertions.assertThat(recordsWithSplitIds.finishedSplits()).isEmpty();
        Assertions.assertThat(recordsWithSplitIds.nextSplit()).isEqualTo(str);
        ArrayList arrayList = new ArrayList();
        while (true) {
            RecordAndPosition recordAndPosition = (RecordAndPosition) recordsWithSplitIds.nextRecordFromSplit();
            if (recordAndPosition == null) {
                recordsWithSplitIds.recycle();
                return arrayList;
            }
            arrayList.add(new Tuple2(((RowData) recordAndPosition.getRecord()).getRowKind(), Long.valueOf(((RowData) recordAndPosition.getRecord()).getLong(0))));
            ?? assertThat = Assertions.assertThat(recordAndPosition.getRecordSkipCount());
            long j2 = j + 1;
            j = assertThat;
            assertThat.isEqualTo(j2);
        }
    }

    private List<Tuple2<Long, Long>> kvs() {
        return kvs(0L);
    }

    private List<Tuple2<Long, Long>> kvs(long j) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple2(Long.valueOf(j + 1), 1L));
        arrayList.add(new Tuple2(Long.valueOf(j + 2), 2L));
        arrayList.add(new Tuple2(Long.valueOf(j + 3), 2L));
        arrayList.add(new Tuple2(Long.valueOf(j + 4), -1L));
        arrayList.add(new Tuple2(Long.valueOf(j + 5), 1L));
        arrayList.add(new Tuple2(Long.valueOf(j + 6), -2L));
        return arrayList;
    }

    private void assignSplit(FileStoreSourceSplitReader fileStoreSourceSplitReader, FileStoreSourceSplit fileStoreSourceSplit) {
        fileStoreSourceSplitReader.handleSplitsChanges(new SplitsAddition(Collections.singletonList(fileStoreSourceSplit)));
    }
}
