package org.apache.iceberg.flink.sink;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/sink/TestDeltaTaskWriter.class */
public class TestDeltaTaskWriter extends TableTestBase {
    private static final int FORMAT_V2 = 2;
    private final FileFormat format;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "FileFormat = {0}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{"avro"}, new Object[]{"orc"}, new Object[]{"parquet"}};
    }

    public TestDeltaTaskWriter(String str) {
        super(FORMAT_V2);
        this.format = FileFormat.valueOf(str.toUpperCase(Locale.ENGLISH));
    }

    @Before
    public void setupTable() throws IOException {
        this.tableDir = this.temp.newFolder();
        Assert.assertTrue(this.tableDir.delete());
        this.metadataDir = new File(this.tableDir, "metadata");
    }

    private void initTable(boolean z) {
        if (z) {
            this.table = create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
        } else {
            this.table = create(SCHEMA, PartitionSpec.unpartitioned());
        }
        this.table.updateProperties().set("write.parquet.row-group-size-bytes", String.valueOf(8192)).defaultFormat(this.format).commit();
    }

    private int idFieldId() {
        return this.table.schema().findField("id").fieldId();
    }

    private int dataFieldId() {
        return this.table.schema().findField("data").fieldId();
    }

    private void testCdcEvents(boolean z) throws IOException {
        TaskWriterFactory<RowData> createTaskWriterFactory = createTaskWriterFactory(Lists.newArrayList(new Integer[]{Integer.valueOf(idFieldId())}));
        createTaskWriterFactory.initialize(1, 1);
        TaskWriter create = createTaskWriterFactory.create();
        create.write(SimpleDataUtil.createInsert(1, "aaa"));
        create.write(SimpleDataUtil.createInsert(Integer.valueOf(FORMAT_V2), "bbb"));
        create.write(SimpleDataUtil.createInsert(3, "ccc"));
        create.write(SimpleDataUtil.createUpdateBefore(Integer.valueOf(FORMAT_V2), "bbb"));
        create.write(SimpleDataUtil.createUpdateAfter(Integer.valueOf(FORMAT_V2), "ddd"));
        create.write(SimpleDataUtil.createUpdateBefore(1, "aaa"));
        create.write(SimpleDataUtil.createUpdateAfter(1, "eee"));
        create.write(SimpleDataUtil.createInsert(4, "fff"));
        create.write(SimpleDataUtil.createInsert(5, "ggg"));
        create.write(SimpleDataUtil.createDelete(3, "ccc"));
        WriteResult complete = create.complete();
        Assert.assertEquals(z ? 7L : 1L, complete.dataFiles().length);
        Assert.assertEquals(z ? 3L : 1L, complete.deleteFiles().length);
        commitTransaction(complete);
        Assert.assertEquals("Should have expected records.", expectedRowSet(SimpleDataUtil.createRecord(1, "eee"), SimpleDataUtil.createRecord(Integer.valueOf(FORMAT_V2), "ddd"), SimpleDataUtil.createRecord(4, "fff"), SimpleDataUtil.createRecord(5, "ggg")), actualRowSet("*"));
        TaskWriter create2 = createTaskWriterFactory.create();
        create2.write(SimpleDataUtil.createUpdateBefore(Integer.valueOf(FORMAT_V2), "ddd"));
        create2.write(SimpleDataUtil.createUpdateAfter(6, "hhh"));
        create2.write(SimpleDataUtil.createUpdateBefore(5, "ggg"));
        create2.write(SimpleDataUtil.createUpdateAfter(5, "iii"));
        create2.write(SimpleDataUtil.createDelete(4, "fff"));
        WriteResult complete2 = create2.complete();
        Assert.assertEquals(z ? 2L : 1L, complete2.dataFiles().length);
        Assert.assertEquals(z ? 3L : 1L, complete2.deleteFiles().length);
        commitTransaction(complete2);
        Assert.assertEquals("Should have expected records", expectedRowSet(SimpleDataUtil.createRecord(1, "eee"), SimpleDataUtil.createRecord(5, "iii"), SimpleDataUtil.createRecord(6, "hhh")), actualRowSet("*"));
    }

    @Test
    public void testUnpartitioned() throws IOException {
        initTable(false);
        testCdcEvents(false);
    }

    @Test
    public void testPartitioned() throws IOException {
        initTable(true);
        testCdcEvents(true);
    }

    private void testWritePureEqDeletes(boolean z) throws IOException {
        initTable(z);
        TaskWriterFactory<RowData> createTaskWriterFactory = createTaskWriterFactory(Lists.newArrayList(new Integer[]{Integer.valueOf(idFieldId())}));
        createTaskWriterFactory.initialize(1, 1);
        TaskWriter create = createTaskWriterFactory.create();
        create.write(SimpleDataUtil.createDelete(1, "aaa"));
        create.write(SimpleDataUtil.createDelete(Integer.valueOf(FORMAT_V2), "bbb"));
        create.write(SimpleDataUtil.createDelete(3, "ccc"));
        WriteResult complete = create.complete();
        Assert.assertEquals(0L, complete.dataFiles().length);
        Assert.assertEquals(z ? 3L : 1L, complete.deleteFiles().length);
        commitTransaction(complete);
        Assert.assertEquals("Should have no record", expectedRowSet(new Record[0]), actualRowSet("*"));
    }

    @Test
    public void testUnpartitionedPureEqDeletes() throws IOException {
        testWritePureEqDeletes(false);
    }

    @Test
    public void testPartitionedPureEqDeletes() throws IOException {
        testWritePureEqDeletes(true);
    }

    private void testAbort(boolean z) throws IOException {
        initTable(z);
        TaskWriterFactory<RowData> createTaskWriterFactory = createTaskWriterFactory(Lists.newArrayList(new Integer[]{Integer.valueOf(idFieldId())}));
        createTaskWriterFactory.initialize(1, 1);
        TaskWriter create = createTaskWriterFactory.create();
        for (int i = 0; i < 8000; i += FORMAT_V2) {
            create.write(SimpleDataUtil.createUpdateBefore(Integer.valueOf(i + 1), "aaa"));
            create.write(SimpleDataUtil.createUpdateAfter(Integer.valueOf(i + 1), "aaa"));
            create.write(SimpleDataUtil.createUpdateBefore(Integer.valueOf(i + FORMAT_V2), "bbb"));
            create.write(SimpleDataUtil.createUpdateAfter(Integer.valueOf(i + FORMAT_V2), "bbb"));
        }
        List list = (List) Files.walk(Paths.get(this.tableDir.getPath(), "data"), new FileVisitOption[0]).filter(path -> {
            return path.toFile().isFile();
        }).filter(path2 -> {
            return !path2.toString().endsWith(".crc");
        }).collect(Collectors.toList());
        Assert.assertEquals("Should have expected file count, but files are: " + list, z ? 4L : 2L, list.size());
        create.abort();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(Files.exists((Path) it.next(), new LinkOption[0]));
        }
    }

    @Test
    public void testUnpartitionedAbort() throws IOException {
        testAbort(false);
    }

    @Test
    public void testPartitionedAbort() throws IOException {
        testAbort(true);
    }

    @Test
    public void testPartitionedTableWithDataAsKey() throws IOException {
        initTable(true);
        TaskWriterFactory<RowData> createTaskWriterFactory = createTaskWriterFactory(Lists.newArrayList(new Integer[]{Integer.valueOf(dataFieldId())}));
        createTaskWriterFactory.initialize(1, 1);
        TaskWriter create = createTaskWriterFactory.create();
        create.write(SimpleDataUtil.createInsert(1, "aaa"));
        create.write(SimpleDataUtil.createInsert(Integer.valueOf(FORMAT_V2), "aaa"));
        create.write(SimpleDataUtil.createInsert(3, "bbb"));
        create.write(SimpleDataUtil.createInsert(4, "ccc"));
        WriteResult complete = create.complete();
        Assert.assertEquals(3L, complete.dataFiles().length);
        Assert.assertEquals(1L, complete.deleteFiles().length);
        commitTransaction(complete);
        Assert.assertEquals("Should have expected records", expectedRowSet(SimpleDataUtil.createRecord(Integer.valueOf(FORMAT_V2), "aaa"), SimpleDataUtil.createRecord(3, "bbb"), SimpleDataUtil.createRecord(4, "ccc")), actualRowSet("*"));
        TaskWriter create2 = createTaskWriterFactory.create();
        create2.write(SimpleDataUtil.createInsert(5, "aaa"));
        create2.write(SimpleDataUtil.createInsert(6, "bbb"));
        create2.write(SimpleDataUtil.createDelete(7, "ccc"));
        WriteResult complete2 = create2.complete();
        Assert.assertEquals(2L, complete2.dataFiles().length);
        Assert.assertEquals(1L, complete2.deleteFiles().length);
        commitTransaction(complete2);
        Assert.assertEquals("Should have expected records", expectedRowSet(SimpleDataUtil.createRecord(Integer.valueOf(FORMAT_V2), "aaa"), SimpleDataUtil.createRecord(5, "aaa"), SimpleDataUtil.createRecord(3, "bbb"), SimpleDataUtil.createRecord(6, "bbb")), actualRowSet("*"));
    }

    @Test
    public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
        initTable(true);
        TaskWriterFactory<RowData> createTaskWriterFactory = createTaskWriterFactory(Lists.newArrayList(new Integer[]{Integer.valueOf(dataFieldId()), Integer.valueOf(idFieldId())}));
        createTaskWriterFactory.initialize(1, 1);
        TaskWriter create = createTaskWriterFactory.create();
        create.write(SimpleDataUtil.createInsert(1, "aaa"));
        create.write(SimpleDataUtil.createInsert(Integer.valueOf(FORMAT_V2), "aaa"));
        create.write(SimpleDataUtil.createDelete(Integer.valueOf(FORMAT_V2), "aaa"));
        WriteResult complete = create.complete();
        Assert.assertEquals(1L, complete.dataFiles().length);
        Assert.assertEquals(1L, complete.deleteFiles().length);
        Assert.assertEquals(Sets.newHashSet(new FileContent[]{FileContent.POSITION_DELETES}), Sets.newHashSet(new FileContent[]{complete.deleteFiles()[0].content()}));
        commitTransaction(complete);
        Assert.assertEquals("Should have expected records", expectedRowSet(SimpleDataUtil.createRecord(1, "aaa")), actualRowSet("*"));
    }

    private void commitTransaction(WriteResult writeResult) {
        RowDelta newRowDelta = this.table.newRowDelta();
        Stream stream = Arrays.stream(writeResult.dataFiles());
        newRowDelta.getClass();
        stream.forEach(newRowDelta::addRows);
        Stream stream2 = Arrays.stream(writeResult.deleteFiles());
        newRowDelta.getClass();
        stream2.forEach(newRowDelta::addDeletes);
        newRowDelta.validateDeletedFiles().validateDataFilesExist(Lists.newArrayList(writeResult.referencedDataFiles())).commit();
    }

    private StructLikeSet expectedRowSet(Record... recordArr) {
        return SimpleDataUtil.expectedRowSet(this.table, recordArr);
    }

    private StructLikeSet actualRowSet(String... strArr) throws IOException {
        return SimpleDataUtil.actualRowSet(this.table, strArr);
    }

    private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> list) {
        return new RowDataTaskWriterFactory(SerializableTable.copyOf(this.table), FlinkSchemaUtil.convert(this.table.schema()), 134217728L, this.format, list, false);
    }
}
