package org.apache.iceberg.flink.actions;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkCatalogTestBase;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.class */
public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
    private static final String TABLE_NAME_UNPARTITIONED = "test_table_unpartitioned";
    private static final String TABLE_NAME_PARTITIONED = "test_table_partitioned";
    private static final String TABLE_NAME_WITH_PK = "test_table_with_pk";
    private final FileFormat format;
    private Table icebergTableUnPartitioned;
    private Table icebergTablePartitioned;
    private Table icebergTableWithPk;

    @Rule
    public TemporaryFolder temp;

    public TestRewriteDataFilesAction(String str, Namespace namespace, FileFormat fileFormat) {
        super(str, namespace);
        this.temp = new TemporaryFolder();
        this.format = fileFormat;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.flink.FlinkTestBase
    public TableEnvironment getTableEnv() {
        super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1);
        return super.getTableEnv();
    }

    @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}")
    public static Iterable<Object[]> parameters() {
        ArrayList newArrayList = Lists.newArrayList();
        for (FileFormat fileFormat : new FileFormat[]{FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}) {
            for (Object[] objArr : FlinkCatalogTestBase.parameters()) {
                newArrayList.add(new Object[]{(String) objArr[0], (Namespace) objArr[1], fileFormat});
            }
        }
        return newArrayList;
    }

    @Override // org.apache.iceberg.flink.FlinkCatalogTestBase
    @Before
    public void before() {
        super.before();
        sql("CREATE DATABASE %s", this.flinkDatabase);
        sql("USE CATALOG %s", this.catalogName);
        sql("USE %s", "db");
        sql("CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s')", TABLE_NAME_UNPARTITIONED, this.format.name());
        this.icebergTableUnPartitioned = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE_NAME_UNPARTITIONED));
        sql("CREATE TABLE %s (id int, data varchar,spec varchar)  PARTITIONED BY (data,spec) with ('write.format.default'='%s')", TABLE_NAME_PARTITIONED, this.format.name());
        this.icebergTablePartitioned = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE_NAME_PARTITIONED));
        sql("CREATE TABLE %s (id int, data varchar, PRIMARY KEY(`id`) NOT ENFORCED) with ('write.format.default'='%s', 'format-version'='2')", TABLE_NAME_WITH_PK, this.format.name());
        this.icebergTableWithPk = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE_NAME_WITH_PK));
    }

    @Override // org.apache.iceberg.flink.FlinkCatalogTestBase
    @After
    public void clean() {
        sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE_NAME_UNPARTITIONED);
        sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE_NAME_PARTITIONED);
        sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE_NAME_WITH_PK);
        sql("DROP DATABASE IF EXISTS %s", this.flinkDatabase);
        super.clean();
    }

    @Test
    public void testRewriteDataFilesEmptyTable() throws Exception {
        Assert.assertNull("Table must be empty", this.icebergTableUnPartitioned.currentSnapshot());
        Actions.forTable(this.icebergTableUnPartitioned).rewriteDataFiles().execute();
        Assert.assertNull("Table must stay empty", this.icebergTableUnPartitioned.currentSnapshot());
    }

    @Test
    public void testRewriteDataFilesUnpartitionedTable() throws Exception {
        sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
        sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
        this.icebergTableUnPartitioned.refresh();
        Assert.assertEquals("Should have 2 data files before rewrite", 2L, Lists.newArrayList(CloseableIterable.transform(this.icebergTableUnPartitioned.newScan().planFiles(), (v0) -> {
            return v0.file();
        })).size());
        RewriteDataFilesActionResult execute = Actions.forTable(this.icebergTableUnPartitioned).rewriteDataFiles().execute();
        Assert.assertEquals("Action should rewrite 2 data files", 2L, execute.deletedDataFiles().size());
        Assert.assertEquals("Action should add 1 data file", 1L, execute.addedDataFiles().size());
        this.icebergTableUnPartitioned.refresh();
        Assert.assertEquals("Should have 1 data files after rewrite", 1L, Lists.newArrayList(CloseableIterable.transform(this.icebergTableUnPartitioned.newScan().planFiles(), (v0) -> {
            return v0.file();
        })).size());
        SimpleDataUtil.assertTableRecords(this.icebergTableUnPartitioned, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(1, "hello"), SimpleDataUtil.createRecord(2, "world")}));
    }

    @Test
    public void testRewriteDataFilesPartitionedTable() throws Exception {
        sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
        sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
        sql("INSERT INTO %s SELECT 3, 'world' ,'b'", TABLE_NAME_PARTITIONED);
        sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED);
        this.icebergTablePartitioned.refresh();
        Assert.assertEquals("Should have 4 data files before rewrite", 4L, Lists.newArrayList(CloseableIterable.transform(this.icebergTablePartitioned.newScan().planFiles(), (v0) -> {
            return v0.file();
        })).size());
        RewriteDataFilesActionResult execute = Actions.forTable(this.icebergTablePartitioned).rewriteDataFiles().execute();
        Assert.assertEquals("Action should rewrite 4 data files", 4L, execute.deletedDataFiles().size());
        Assert.assertEquals("Action should add 2 data file", 2L, execute.addedDataFiles().size());
        this.icebergTablePartitioned.refresh();
        Assert.assertEquals("Should have 2 data files after rewrite", 2L, Lists.newArrayList(CloseableIterable.transform(this.icebergTablePartitioned.newScan().planFiles(), (v0) -> {
            return v0.file();
        })).size());
        GenericRecord create = GenericRecord.create(new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "spec", Types.StringType.get())}));
        SimpleDataUtil.assertTableRecords(this.icebergTablePartitioned, Lists.newArrayList(new Record[]{create.copy("id", 1, "data", "hello", "spec", "a"), create.copy("id", 2, "data", "hello", "spec", "a"), create.copy("id", 3, "data", "world", "spec", "b"), create.copy("id", 4, "data", "world", "spec", "b")}));
    }

    @Test
    public void testRewriteDataFilesWithFilter() throws Exception {
        sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
        sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
        sql("INSERT INTO %s SELECT 3, 'world' ,'a'", TABLE_NAME_PARTITIONED);
        sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED);
        sql("INSERT INTO %s SELECT 5, 'world' ,'b'", TABLE_NAME_PARTITIONED);
        this.icebergTablePartitioned.refresh();
        Assert.assertEquals("Should have 5 data files before rewrite", 5L, Lists.newArrayList(CloseableIterable.transform(this.icebergTablePartitioned.newScan().planFiles(), (v0) -> {
            return v0.file();
        })).size());
        RewriteDataFilesActionResult execute = Actions.forTable(this.icebergTablePartitioned).rewriteDataFiles().filter(Expressions.equal("spec", "a")).filter(Expressions.startsWith("data", "he")).execute();
        Assert.assertEquals("Action should rewrite 2 data files", 2L, execute.deletedDataFiles().size());
        Assert.assertEquals("Action should add 1 data file", 1L, execute.addedDataFiles().size());
        this.icebergTablePartitioned.refresh();
        Assert.assertEquals("Should have 4 data files after rewrite", 4L, Lists.newArrayList(CloseableIterable.transform(this.icebergTablePartitioned.newScan().planFiles(), (v0) -> {
            return v0.file();
        })).size());
        GenericRecord create = GenericRecord.create(new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "spec", Types.StringType.get())}));
        SimpleDataUtil.assertTableRecords(this.icebergTablePartitioned, Lists.newArrayList(new Record[]{create.copy("id", 1, "data", "hello", "spec", "a"), create.copy("id", 2, "data", "hello", "spec", "a"), create.copy("id", 3, "data", "world", "spec", "a"), create.copy("id", 4, "data", "world", "spec", "b"), create.copy("id", 5, "data", "world", "spec", "b")}));
    }

    @Test
    public void testRewriteLargeTableHasResiduals() throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        ArrayList newArrayList3 = Lists.newArrayList();
        for (int i = 0; i < 100; i++) {
            int i2 = i;
            String valueOf = String.valueOf(i % 3);
            if (i % 2 == 0) {
                newArrayList.add("(" + i2 + ",'" + valueOf + "')");
            } else {
                newArrayList2.add("(" + i2 + ",'" + valueOf + "')");
            }
            Record copy = SimpleDataUtil.RECORD.copy();
            copy.setField("id", Integer.valueOf(i2));
            copy.setField("data", valueOf);
            newArrayList3.add(copy);
        }
        sql("INSERT INTO %s values " + StringUtils.join(newArrayList, ","), TABLE_NAME_UNPARTITIONED);
        sql("INSERT INTO %s values " + StringUtils.join(newArrayList2, ","), TABLE_NAME_UNPARTITIONED);
        this.icebergTableUnPartitioned.refresh();
        CloseableIterator it = ((TableScan) ((TableScan) this.icebergTableUnPartitioned.newScan().ignoreResiduals()).filter(Expressions.equal("data", "0"))).planFiles().iterator();
        while (it.hasNext()) {
            Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), ((FileScanTask) it.next()).residual());
        }
        Assert.assertEquals("Should have 2 data files before rewrite", 2L, Lists.newArrayList(CloseableIterable.transform(r0, (v0) -> {
            return v0.file();
        })).size());
        RewriteDataFilesActionResult execute = Actions.forTable(this.icebergTableUnPartitioned).rewriteDataFiles().filter(Expressions.equal("data", "0")).execute();
        Assert.assertEquals("Action should rewrite 2 data files", 2L, execute.deletedDataFiles().size());
        Assert.assertEquals("Action should add 1 data file", 1L, execute.addedDataFiles().size());
        SimpleDataUtil.assertTableRecords(this.icebergTableUnPartitioned, newArrayList3);
    }

    @Test
    public void testRewriteAvoidRepeateCompress() throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(this.icebergTableUnPartitioned.schema());
        File newFile = this.temp.newFile();
        int i = 0;
        FileAppender newAppender = genericAppenderFactory.newAppender(Files.localOutput(newFile), this.format);
        Throwable th = null;
        try {
            while (newAppender.length() < 20000) {
                Record createRecord = SimpleDataUtil.createRecord(Integer.valueOf(i), UUID.randomUUID().toString());
                newAppender.add(createRecord);
                newArrayList.add(createRecord);
                i++;
            }
            this.icebergTableUnPartitioned.newAppend().appendFile(DataFiles.builder(this.icebergTableUnPartitioned.spec()).withPath(newFile.getAbsolutePath()).withFileSizeInBytes(newFile.length()).withFormat(this.format).withRecordCount(i).build()).commit();
            sql("INSERT INTO %s SELECT 1,'a' ", TABLE_NAME_UNPARTITIONED);
            sql("INSERT INTO %s SELECT 2,'b' ", TABLE_NAME_UNPARTITIONED);
            this.icebergTableUnPartitioned.refresh();
            Assert.assertEquals("Should have 3 data files before rewrite", 3L, Lists.newArrayList(CloseableIterable.transform(this.icebergTableUnPartitioned.newScan().planFiles(), (v0) -> {
                return v0.file();
            })).size());
            RewriteDataFilesActionResult execute = Actions.forTable(this.icebergTableUnPartitioned).rewriteDataFiles().targetSizeInBytes(newFile.length() + 10).splitOpenFileCost(1L).execute();
            Assert.assertEquals("Action should rewrite 2 data files", 2L, execute.deletedDataFiles().size());
            Assert.assertEquals("Action should add 1 data file", 1L, execute.addedDataFiles().size());
            this.icebergTableUnPartitioned.refresh();
            ArrayList newArrayList2 = Lists.newArrayList(CloseableIterable.transform(this.icebergTableUnPartitioned.newScan().planFiles(), (v0) -> {
                return v0.file();
            }));
            Assert.assertEquals("Should have 2 data files after rewrite", 2L, newArrayList2.size());
            Assert.assertTrue(((List) newArrayList2.stream().map((v0) -> {
                return v0.path();
            }).collect(Collectors.toList())).contains(newFile.getAbsolutePath()));
            newArrayList.add(SimpleDataUtil.createRecord(1, "a"));
            newArrayList.add(SimpleDataUtil.createRecord(2, "b"));
            SimpleDataUtil.assertTableRecords(this.icebergTableUnPartitioned, newArrayList);
        } finally {
            if (newAppender != null) {
                if (0 != 0) {
                    try {
                        newAppender.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    newAppender.close();
                }
            }
        }
    }

    @Test
    public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
        sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
        sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
        Table loadTable = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE_NAME_WITH_PK));
        Table loadTable2 = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE_NAME_WITH_PK));
        sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
        this.icebergTableWithPk.refresh();
        Assert.assertEquals("The latest sequence number should be greater than that of the stale snapshot", loadTable.currentSnapshot().sequenceNumber() + 1, this.icebergTableWithPk.currentSnapshot().sequenceNumber());
        CloseableIterable planFiles = this.icebergTableWithPk.newScan().planFiles();
        ArrayList newArrayList = Lists.newArrayList(CloseableIterable.transform(planFiles, (v0) -> {
            return v0.file();
        }));
        Set set = (Set) Lists.newArrayList(CloseableIterable.transform(planFiles, (v0) -> {
            return v0.deletes();
        })).stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
        Assert.assertEquals("Should have 3 data files before rewrite", 3L, newArrayList.size());
        Assert.assertEquals("Should have 1 delete file before rewrite", 1L, set.size());
        Assert.assertSame("The 1 delete file should be an equality-delete file", ((DeleteFile) Iterables.getOnlyElement(set)).content(), FileContent.EQUALITY_DELETES);
        shouldHaveDataAndFileSequenceNumbers(TABLE_NAME_WITH_PK, ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L), Pair.of(3L, 3L)));
        Assertions.assertThatThrownBy(() -> {
            Actions.forTable(loadTable).rewriteDataFiles().useStartingSequenceNumber(false).execute();
        }, "Rewrite using new sequence number should fail", new Object[0]).isInstanceOf(ValidationException.class);
        RewriteDataFilesActionResult execute = Actions.forTable(loadTable2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
        Assert.assertEquals("Action should rewrite 2 data files", 2L, execute.deletedDataFiles().size());
        Assert.assertEquals("Action should add 1 data file", 1L, execute.addedDataFiles().size());
        shouldHaveDataAndFileSequenceNumbers(TABLE_NAME_WITH_PK, ImmutableList.of(Pair.of(3L, 3L), Pair.of(3L, 3L), Pair.of(2L, 4L)));
        SimpleDataUtil.assertTableRecords(this.icebergTableWithPk, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(1, "hi"), SimpleDataUtil.createRecord(2, "world")}));
    }

    private void shouldHaveDataAndFileSequenceNumbers(String str, List<Pair<Long, Long>> list) {
        Assertions.assertThat((List) sql("SELECT * FROM %s$entries WHERE status < 2", str).stream().map(row -> {
            return Pair.of((Long) row.getFieldAs("sequence_number"), (Long) row.getFieldAs("file_sequence_number"));
        }).collect(Collectors.toList())).hasSameElementsAs(list);
    }
}
