package org.apache.iceberg.spark.extensions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.class */
public class TestRewriteDataFilesProcedure extends ExtensionsTestBase {
    private static final String QUOTED_SPECIAL_CHARS_TABLE_NAME = "`table:with.special:chars`";

    @BeforeAll
    public static void setupSpark() {
        spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
    }

    @AfterEach
    public void removeTable() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
        sql("DROP TABLE IF EXISTS %s", new Object[]{tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)});
    }

    @TestTemplate
    public void testZOrderSortExpression() {
        List parseSortOrder = ExtendedParser.parseSortOrder(spark, "c1, zorder(c2, c3)");
        Assertions.assertThat(parseSortOrder).as("Should parse 2 order fields", new Object[0]).hasSize(2);
        ((AbstractStringAssert) Assertions.assertThat(((ExtendedParser.RawOrderField) parseSortOrder.get(0)).term().name()).as("First field should be a ref", new Object[0])).isEqualTo("c1");
        Assertions.assertThat(((ExtendedParser.RawOrderField) parseSortOrder.get(1)).term()).as("Second field should be zorder", new Object[0]).isInstanceOf(Zorder.class);
    }

    @TestTemplate
    public void testRewriteDataFilesInEmptyTable() {
        createTable();
        assertEquals("Procedure output must match", ImmutableList.of(row(new Object[]{0, 0, 0L, 0})), sql("CALL %s.system.rewrite_data_files('%s')", new Object[]{this.catalogName, this.tableIdent}));
    }

    @TestTemplate
    public void testRewriteDataFilesOnPartitionTable() {
        createPartitionTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 10 data files and add 2 data files (one per partition) ", row(new Object[]{10, 2}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesOnNonPartitionTable() {
        createTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 10 data files and add 1 data files", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithOptions() {
        createTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        assertEquals("Action should rewrite 0 data files and add 0 data files", ImmutableList.of(row(new Object[]{0, 0, 0L, 0})), sql("CALL %s.system.rewrite_data_files(table => '%s', options => map('min-input-files','12'))", new Object[]{this.catalogName, this.tableIdent}));
        assertEquals("Data should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithSortStrategy() {
        createTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', sort_order => 'c1 DESC NULLS LAST')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 10 data files and add 1 data files", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() {
        createTable();
        insertData(10);
        assertEquals("Action should rewrite 10 data files and add 1 data files", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql("CALL %s.system.rewrite_data_files( table => '%s',  strategy => 'sort',  sort_order => 'c1',  options => map('shuffle-partitions-per-file', '2'))", new Object[]{this.catalogName, this.tableIdent}).get(0), 2));
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null})), sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testRewriteDataFilesWithZOrder() {
        createTable();
        insertData(10);
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', sort_order => 'zorder(c1,c2)')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 10 data files and add 1 data files", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null})), sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
        sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", new Object[]{this.tableName});
        for (int i = 0; i < 5; i++) {
            sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", new Object[]{this.tableName});
        }
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', sort_order => 'zorder(c2,c3)')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 10 data files and add 1 data files", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(snapshotSummary()).containsEntry("removed-files-size", String.valueOf(((Object[]) sql.get(0))[2]));
        Assertions.assertThat(sql("SELECT * FROM %s", new Object[]{this.tableName})).containsExactly(new Object[]{row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null})});
    }

    @TestTemplate
    public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
        createTable();
        insertData(10);
        assertEquals("Action should rewrite 10 data files and add 1 data files", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql("CALL %s.system.rewrite_data_files( table => '%s', strategy => 'sort',  sort_order => 'zorder(c1, c2)',  options => map('shuffle-partitions-per-file', '2'))", new Object[]{this.catalogName, this.tableIdent}).get(0), 2));
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{2, "bar", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null}), row(new Object[]{1, "foo", null})), sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testRewriteDataFilesWithFilter() {
        createTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 = 1 and c2 is not null')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 5 data files (containing c1 = 1) and add 1 data files", row(new Object[]{5, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithDeterministicTrueFilter() {
        createTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 10 data files and add 1 data files", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithDeterministicFalseFilter() {
        createTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        assertEquals("Action should rewrite 0 data files and add 0 data files", row(new Object[]{0, 0}), Arrays.copyOf((Object[]) sql("CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')", new Object[]{this.catalogName, this.tableIdent}).get(0), 2));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithFilterOnPartitionTable() {
        createPartitionTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c2 = \"bar\"')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 5 data files from single matching partition(containing c2 = bar) and add 1 data files", row(new Object[]{5, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithFilterOnOnBucketExpression() {
        Assumptions.assumeThat(this.catalogName).isNotEqualTo(SparkCatalogConfig.SPARK.catalogName());
        createBucketPartitionTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.bucket(2, c2) = 0')", new Object[]{this.catalogName, this.tableIdent, this.catalogName});
        assertEquals("Action should rewrite 5 data files from single matching partition(containing bucket(c2) = 0) and add 1 data files", row(new Object[]{5, 1}), row(new Object[]{((Object[]) sql.get(0))[0], ((Object[]) sql.get(0))[1]}));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithInFilterOnPartitionTable() {
        createPartitionTable();
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c2 in (\"bar\")')", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 5 data files from single matching partition(containing c2 = bar) and add 1 data files", row(new Object[]{5, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteDataFilesWithAllPossibleFilters() {
        createPartitionTable();
        insertData(10);
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 = 3')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 > 3')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 >= 3')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 < 0')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 <= 0')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 in (3,4,5)')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 is null')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c3 is not null')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 = 3 and c2 = \"bar\"')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 = 3 or c1 = 5')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c1 not in (1,2)')", new Object[]{this.catalogName, this.tableIdent});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c2 like \"%s\"')", new Object[]{this.catalogName, this.tableIdent, "car%"});
    }

    @TestTemplate
    public void testRewriteDataFilesWithPossibleV2Filters() {
        Assumptions.assumeThat(this.catalogName).isNotEqualTo(SparkCatalogConfig.SPARK.catalogName());
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "id");
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.bucket(2, data) >= 0')", new Object[]{this.catalogName, this.tableIdent, this.catalogName});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.truncate(4, id) >= 1')", new Object[]{this.catalogName, this.tableIdent, this.catalogName});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.years(ts) >= 1')", new Object[]{this.catalogName, this.tableIdent, this.catalogName});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.months(ts) >= 1')", new Object[]{this.catalogName, this.tableIdent, this.catalogName});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.days(ts) >= date(\"2023-01-01\")')", new Object[]{this.catalogName, this.tableIdent, this.catalogName});
        sql("CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.hours(ts) >= 1')", new Object[]{this.catalogName, this.tableIdent, this.catalogName});
    }

    @TestTemplate
    public void testRewriteDataFilesWithInvalidInputs() {
        createTable();
        insertData(2);
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', options => map('min-input-files','2'), strategy => 'temp')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("unsupported strategy: temp. Only binpack or sort is supported");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'binpack', sort_order => 'c1 ASC NULLS FIRST')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Must use only one rewriter type (bin-pack, sort, zorder)");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessageStartingWith("Cannot sort data without a valid sort order");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', sort_order => 'c1 ASC none')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Unable to parse sortOrder: c1 ASC none");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', sort_order => 'c1 none NULLS FIRST')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Unable to parse sortOrder: c1 none NULLS FIRST");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', sort_order => 'col1 DESC NULLS FIRST')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(ValidationException.class).hasMessageStartingWith("Cannot find field 'col1' in struct:");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'col1 = 3')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot parse predicates in where option: col1 = 3");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', sort_order => 'zorder(col1)')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot find column 'col1' in table schema (case sensitive = false): struct<1: c1: optional int, 2: c2: optional string, 3: c3: optional string>");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', sort_order => 'c1,zorder(c2,c3)')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot mix identity sort columns and a Zorder sort expression: c1,zorder(c2,c3)");
    }

    @TestTemplate
    public void testInvalidCasesForRewriteDataFiles() {
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files('n', table => 't')", new Object[]{this.catalogName});
        }).isInstanceOf(AnalysisException.class).hasMessage("Named and positional arguments cannot be mixed");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.custom.rewrite_data_files('n', 't')", new Object[]{this.catalogName});
        }).isInstanceOf(NoSuchProcedureException.class).hasMessage("Procedure custom.rewrite_data_files not found");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files()", new Object[]{this.catalogName});
        }).isInstanceOf(AnalysisException.class).hasMessage("Missing required parameters: [table]");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => 't', table => 't')", new Object[]{this.catalogName});
        }).isInstanceOf(AnalysisException.class).hasMessageEndingWith("Duplicate procedure argument: table");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files('')", new Object[]{this.catalogName});
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot handle an empty identifier for parameter 'table'");
    }

    @TestTemplate
    public void testBinPackTableWithSpecialChars() {
        Assumptions.assumeThat(this.catalogName).isEqualTo(SparkCatalogConfig.HADOOP.catalogName());
        TableIdentifier of = TableIdentifier.of(new String[]{"default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", "")});
        sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", new Object[]{tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)});
        insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
        List<Object[]> currentData = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'c2 is not null')", new Object[]{this.catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)});
        assertEquals("Action should rewrite 10 data files and add 1 data file", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isEqualTo(Long.valueOf(snapshotSummary(of).get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)));
        Assertions.assertThat(SparkTableCache.get().size()).as("Table cache must be empty", new Object[0]).isEqualTo(0);
    }

    @TestTemplate
    public void testSortTableWithSpecialChars() {
        Assumptions.assumeThat(this.catalogName).isEqualTo(SparkCatalogConfig.HADOOP.catalogName());
        TableIdentifier of = TableIdentifier.of(new String[]{"default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", "")});
        sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", new Object[]{tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)});
        insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
        List<Object[]> currentData = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
        List sql = sql("CALL %s.system.rewrite_data_files(  table => '%s',  strategy => 'sort',  sort_order => 'c1',  where => 'c2 is not null')", new Object[]{this.catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)});
        assertEquals("Action should rewrite 10 data files and add 1 data file", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary(of).get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)));
        Assertions.assertThat(SparkTableCache.get().size()).as("Table cache must be empty", new Object[0]).isEqualTo(0);
    }

    @TestTemplate
    public void testZOrderTableWithSpecialChars() {
        Assumptions.assumeThat(this.catalogName).isEqualTo(SparkCatalogConfig.HADOOP.catalogName());
        TableIdentifier of = TableIdentifier.of(new String[]{"default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", "")});
        sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", new Object[]{tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)});
        insertData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME), 10);
        List<Object[]> currentData = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
        List sql = sql("CALL %s.system.rewrite_data_files(  table => '%s',  strategy => 'sort',  sort_order => 'zorder(c1, c2)',  where => 'c2 is not null')", new Object[]{this.catalogName, tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)});
        assertEquals("Action should rewrite 10 data files and add 1 data file", row(new Object[]{10, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary(of).get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)));
        Assertions.assertThat(SparkTableCache.get().size()).as("Table cache must be empty", new Object[0]).isEqualTo(0);
    }

    @TestTemplate
    public void testDefaultSortOrder() {
        createTable();
        sql("ALTER TABLE %s WRITE ORDERED BY c2", new Object[]{this.tableName});
        insertData(10);
        List<Object[]> currentData = currentData();
        List sql = sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', options => map('min-input-files','2'))", new Object[]{this.catalogName, this.tableIdent});
        assertEquals("Action should rewrite 2 data files and add 1 data files", row(new Object[]{2, 1}), Arrays.copyOf((Object[]) sql.get(0), 2));
        Assertions.assertThat((Object[]) sql.get(0)).hasSize(4);
        Assertions.assertThat(((Object[]) sql.get(0))[2]).isInstanceOf(Long.class).isEqualTo(Long.valueOf(snapshotSummary().get("removed-files-size")));
        assertEquals("Data after compaction should not change", currentData, currentData());
    }

    @TestTemplate
    public void testRewriteWithUntranslatedOrUnconvertedFilter() {
        createTable();
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'substr(encode(c2, \"utf-8\"), 2) = \"fo\"')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Cannot translate Spark expression");
        Assertions.assertThatThrownBy(() -> {
            sql("CALL %s.system.rewrite_data_files(table => '%s', where => 'substr(c2, 2) = \"fo\"')", new Object[]{this.catalogName, this.tableIdent});
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Cannot convert Spark filter");
    }

    @TestTemplate
    public void testRewriteDataFilesSummary() {
        createTable();
        insertData(10);
        sql("CALL %s.system.rewrite_data_files(table => '%s')", new Object[]{this.catalogName, this.tableIdent});
        Assertions.assertThat(snapshotSummary()).containsKey("app-id").containsEntry("engine-name", "spark").hasEntrySatisfying("engine-version", str -> {
            Assertions.assertThat(str).startsWith("3.5");
        });
    }

    private void createTable() {
        sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", new Object[]{this.tableName});
    }

    private void createPartitionTable() {
        sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg PARTITIONED BY (c2) TBLPROPERTIES ('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", "none"});
    }

    private void createBucketPartitionTable() {
        sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg PARTITIONED BY (bucket(2, c2)) TBLPROPERTIES ('%s' '%s')", new Object[]{this.tableName, "write.distribution-mode", "none"});
    }

    private void insertData(int i) {
        insertData(this.tableName, i);
    }

    private void insertData(String str, int i) {
        ThreeColumnRecord threeColumnRecord = new ThreeColumnRecord(1, "foo", (String) null);
        ThreeColumnRecord threeColumnRecord2 = new ThreeColumnRecord(2, "bar", (String) null);
        ArrayList newArrayList = Lists.newArrayList();
        IntStream.range(0, i / 2).forEach(i2 -> {
            newArrayList.add(threeColumnRecord);
            newArrayList.add(threeColumnRecord2);
        });
        try {
            spark.createDataFrame(newArrayList, ThreeColumnRecord.class).repartition(i).writeTo(str).append();
        } catch (NoSuchTableException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private Map<String, String> snapshotSummary() {
        return snapshotSummary(this.tableIdent);
    }

    private Map<String, String> snapshotSummary(TableIdentifier tableIdentifier) {
        return this.validationCatalog.loadTable(tableIdentifier).currentSnapshot().summary();
    }

    private List<Object[]> currentData() {
        return currentData(this.tableName);
    }

    private List<Object[]> currentData(String str) {
        return rowsToJava(spark.sql("SELECT * FROM " + str + " order by c1, c2, c3").collectAsList());
    }
}
