package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.ChangelogOperation;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.IncrementalChangelogScan;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/spark/source/TestChangelogReader.class */
public class TestChangelogReader extends SparkTestBase {
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())});
    private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
    private Table table;
    private DataFile dataFile1;
    private DataFile dataFile2;
    private final List<Record> records1 = Lists.newArrayList();
    private final List<Record> records2 = Lists.newArrayList();

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @Before
    public void before() throws IOException {
        this.table = catalog.createTable(TableIdentifier.of(new String[]{"default", "test"}), SCHEMA, SPEC);
        GenericRecord create = GenericRecord.create(this.table.schema());
        this.records1.add(create.copy("id", 29, "data", "a"));
        this.records1.add(create.copy("id", 43, "data", "b"));
        this.records1.add(create.copy("id", 61, "data", "c"));
        this.records1.add(create.copy("id", 89, "data", "d"));
        this.records2.add(create.copy("id", 100, "data", "e"));
        this.records2.add(create.copy("id", 121, "data", "f"));
        this.records2.add(create.copy("id", 122, "data", "g"));
        this.dataFile1 = writeDataFile(this.records1);
        this.dataFile2 = writeDataFile(this.records2);
    }

    @After
    public void after() {
        catalog.dropTable(TableIdentifier.of(new String[]{"default", "test"}));
    }

    @Test
    public void testInsert() throws IOException {
        this.table.newAppend().appendFile(this.dataFile1).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(this.dataFile2).commit();
        long snapshotId2 = this.table.currentSnapshot().snapshotId();
        CloseableIterable planTasks = newScan().planTasks();
        ArrayList newArrayList = Lists.newArrayList();
        CloseableIterator it = planTasks.iterator();
        while (it.hasNext()) {
            ChangelogRowReader changelogRowReader = new ChangelogRowReader(this.table, (ScanTaskGroup) it.next(), this.table.schema(), this.table.schema(), false);
            while (changelogRowReader.next()) {
                newArrayList.add(((InternalRow) changelogRowReader.get()).copy());
            }
            changelogRowReader.close();
        }
        newArrayList.sort((internalRow, internalRow2) -> {
            return internalRow.getInt(0) - internalRow2.getInt(0);
        });
        ArrayList newArrayList2 = Lists.newArrayList();
        addExpectedRows(newArrayList2, ChangelogOperation.INSERT, snapshotId, 0, this.records1);
        addExpectedRows(newArrayList2, ChangelogOperation.INSERT, snapshotId2, 1, this.records2);
        assertEquals("Should have expected rows", newArrayList2, internalRowsToJava(newArrayList));
    }

    @Test
    public void testDelete() throws IOException {
        this.table.newAppend().appendFile(this.dataFile1).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newDelete().deleteFile(this.dataFile1).commit();
        long snapshotId2 = this.table.currentSnapshot().snapshotId();
        CloseableIterable planTasks = ((IncrementalChangelogScan) newScan().fromSnapshotExclusive(snapshotId)).planTasks();
        ArrayList newArrayList = Lists.newArrayList();
        CloseableIterator it = planTasks.iterator();
        while (it.hasNext()) {
            ChangelogRowReader changelogRowReader = new ChangelogRowReader(this.table, (ScanTaskGroup) it.next(), this.table.schema(), this.table.schema(), false);
            while (changelogRowReader.next()) {
                newArrayList.add(((InternalRow) changelogRowReader.get()).copy());
            }
            changelogRowReader.close();
        }
        newArrayList.sort((internalRow, internalRow2) -> {
            return internalRow.getInt(0) - internalRow2.getInt(0);
        });
        ArrayList newArrayList2 = Lists.newArrayList();
        addExpectedRows(newArrayList2, ChangelogOperation.DELETE, snapshotId2, 0, this.records1);
        assertEquals("Should have expected rows", newArrayList2, internalRowsToJava(newArrayList));
    }

    @Test
    public void testDataFileRewrite() throws IOException {
        this.table.newAppend().appendFile(this.dataFile1).commit();
        this.table.newAppend().appendFile(this.dataFile2).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newRewrite().rewriteFiles(ImmutableSet.of(this.dataFile1), ImmutableSet.of(this.dataFile2)).commit();
        CloseableIterable planTasks = ((IncrementalChangelogScan) newScan().fromSnapshotExclusive(snapshotId)).planTasks();
        ArrayList newArrayList = Lists.newArrayList();
        CloseableIterator it = planTasks.iterator();
        while (it.hasNext()) {
            ChangelogRowReader changelogRowReader = new ChangelogRowReader(this.table, (ScanTaskGroup) it.next(), this.table.schema(), this.table.schema(), false);
            while (changelogRowReader.next()) {
                newArrayList.add(((InternalRow) changelogRowReader.get()).copy());
            }
            changelogRowReader.close();
        }
        Assert.assertEquals("Should have no rows", 0L, newArrayList.size());
    }

    @Test
    public void testMixDeleteAndInsert() throws IOException {
        this.table.newAppend().appendFile(this.dataFile1).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newDelete().deleteFile(this.dataFile1).commit();
        long snapshotId2 = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(this.dataFile2).commit();
        long snapshotId3 = this.table.currentSnapshot().snapshotId();
        CloseableIterable planTasks = newScan().planTasks();
        ArrayList newArrayList = Lists.newArrayList();
        CloseableIterator it = planTasks.iterator();
        while (it.hasNext()) {
            ChangelogRowReader changelogRowReader = new ChangelogRowReader(this.table, (ScanTaskGroup) it.next(), this.table.schema(), this.table.schema(), false);
            while (changelogRowReader.next()) {
                newArrayList.add(((InternalRow) changelogRowReader.get()).copy());
            }
            changelogRowReader.close();
        }
        newArrayList.sort((internalRow, internalRow2) -> {
            return internalRow.getInt(3) != internalRow2.getInt(3) ? internalRow.getInt(3) - internalRow2.getInt(3) : internalRow.getInt(0) - internalRow2.getInt(0);
        });
        ArrayList newArrayList2 = Lists.newArrayList();
        addExpectedRows(newArrayList2, ChangelogOperation.INSERT, snapshotId, 0, this.records1);
        addExpectedRows(newArrayList2, ChangelogOperation.DELETE, snapshotId2, 1, this.records1);
        addExpectedRows(newArrayList2, ChangelogOperation.INSERT, snapshotId3, 2, this.records2);
        assertEquals("Should have expected rows", newArrayList2, internalRowsToJava(newArrayList));
    }

    private IncrementalChangelogScan newScan() {
        return this.table.newIncrementalChangelogScan();
    }

    private List<Object[]> addExpectedRows(List<Object[]> list, ChangelogOperation changelogOperation, long j, int i, List<Record> list2) {
        list2.forEach(record -> {
            list.add(row(record.get(0), record.get(1), changelogOperation.name(), Integer.valueOf(i), Long.valueOf(j)));
        });
        return list;
    }

    protected List<Object[]> internalRowsToJava(List<InternalRow> list) {
        return (List) list.stream().map(this::toJava).collect(Collectors.toList());
    }

    private Object[] toJava(InternalRow internalRow) {
        Object[] objArr = new Object[internalRow.numFields()];
        objArr[0] = Integer.valueOf(internalRow.getInt(0));
        objArr[1] = internalRow.getString(1);
        objArr[2] = internalRow.getString(2);
        objArr[3] = Integer.valueOf(internalRow.getInt(3));
        objArr[4] = Long.valueOf(internalRow.getLong(4));
        return objArr;
    }

    private DataFile writeDataFile(List<Record> list) throws IOException {
        return FileHelpers.writeDataFile(this.table, Files.localOutput(this.temp.newFile()), TestHelpers.Row.of(new Object[]{0}), list);
    }
}
