package org.apache.iceberg.spark.extensions;

import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runners.Parameterized;

/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestStoragePartitionedJoinsInRowLevelOperations.class */
public class TestStoragePartitionedJoinsInRowLevelOperations extends SparkExtensionsTestBase {
    private static final String OTHER_TABLE_NAME = "other_table";
    private static final Map<String, String> COMMON_TABLE_PROPERTIES = ImmutableMap.of("format-version", "2", "read.split.target-size", "16777216", "read.split.open-file-cost", "16777216");
    private static final Map<String, String> ENABLED_SPJ_SQL_CONF = ImmutableMap.of(SQLConf.V2_BUCKETING_ENABLED().key(), "true", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED().key(), "true", SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), "false", SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1", "spark.sql.iceberg.planning.preserve-data-grouping", "true");

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), SparkCatalogConfig.HIVE.properties()}};
    }

    public TestStoragePartitionedJoinsInRowLevelOperations(String str, String str2, Map<String, String> map) {
        super(str, str2, map);
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
        sql("DROP TABLE IF EXISTS %s", new Object[]{tableName(OTHER_TABLE_NAME)});
    }

    @Test
    public void testCopyOnWriteDeleteWithoutShuffles() {
        checkDelete(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadDeleteWithoutShuffles() {
        checkDelete(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkDelete(RowLevelOperationMode rowLevelOperationMode) {
        sql("CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)", new Object[]{this.tableName, tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        append(this.tableName, new String[]{"{ \"id\": 1, \"salary\": 100, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 2, \"salary\": 200, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 3, \"salary\": 300, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 4, \"salary\": 400, \"dep\": \"hardware\" }"});
        sql("CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)", new Object[]{tableName(OTHER_TABLE_NAME), tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        append(tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 1, \"salary\": 110, \"dep\": \"hr\" }"});
        append(tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 5, \"salary\": 500, \"dep\": \"hr\" }"});
        sql("ALTER TABLE %s SET TBLPROPERTIES(%s)", new Object[]{this.tableName, tablePropsAsString(ImmutableMap.of("write.delete.mode", rowLevelOperationMode.modeName(), "write.delete.distribution-mode", "none"))});
        withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
            String sparkPlan = executeAndKeepPlan("DELETE FROM %s t WHERE EXISTS (SELECT 1 FROM %s s WHERE t.id = s.id AND t.dep = s.dep)", new Object[]{this.tableName, tableName(OTHER_TABLE_NAME)}).toString();
            if (rowLevelOperationMode != RowLevelOperationMode.COPY_ON_WRITE) {
                Assertions.assertThat(sparkPlan).doesNotContain(new CharSequence[]{"Exchange"});
            } else {
                Assert.assertEquals("Should be 1 shuffle with SPJ", 1L, StringUtils.countMatches(sparkPlan, "Exchange"));
                Assertions.assertThat(sparkPlan).contains(new CharSequence[]{"Exchange hashpartitioning(_file"});
            }
        });
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{2, 200, "hr"}), row(new Object[]{3, 300, "hr"}), row(new Object[]{4, 400, "hardware"})), sql("SELECT * FROM %s ORDER BY id, salary", new Object[]{this.tableName}));
    }

    @Test
    public void testCopyOnWriteUpdateWithoutShuffles() {
        checkUpdate(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadUpdateWithoutShuffles() {
        checkUpdate(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkUpdate(RowLevelOperationMode rowLevelOperationMode) {
        sql("CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)", new Object[]{this.tableName, tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        append(this.tableName, new String[]{"{ \"id\": 1, \"salary\": 100, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 2, \"salary\": 200, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 3, \"salary\": 300, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 4, \"salary\": 400, \"dep\": \"hardware\" }"});
        sql("CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)", new Object[]{tableName(OTHER_TABLE_NAME), tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        append(tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 1, \"salary\": 110, \"dep\": \"hr\" }"});
        append(tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 5, \"salary\": 500, \"dep\": \"hr\" }"});
        sql("ALTER TABLE %s SET TBLPROPERTIES(%s)", new Object[]{this.tableName, tablePropsAsString(ImmutableMap.of("write.update.mode", rowLevelOperationMode.modeName(), "write.update.distribution-mode", "none"))});
        withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
            String sparkPlan = executeAndKeepPlan("UPDATE %s t SET salary = -1 WHERE EXISTS (SELECT 1 FROM %s s WHERE t.id = s.id AND t.dep = s.dep)", new Object[]{this.tableName, tableName(OTHER_TABLE_NAME)}).toString();
            if (rowLevelOperationMode != RowLevelOperationMode.COPY_ON_WRITE) {
                Assertions.assertThat(sparkPlan).doesNotContain(new CharSequence[]{"Exchange"});
            } else {
                Assert.assertEquals("Should be 1 shuffle with SPJ", 1L, StringUtils.countMatches(sparkPlan, "Exchange"));
                Assertions.assertThat(sparkPlan).contains(new CharSequence[]{"Exchange hashpartitioning(_file"});
            }
        });
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, -1, "hr"}), row(new Object[]{2, 200, "hr"}), row(new Object[]{3, 300, "hr"}), row(new Object[]{4, 400, "hardware"})), sql("SELECT * FROM %s ORDER BY id, salary", new Object[]{this.tableName}));
    }

    @Test
    public void testCopyOnWriteMergeWithoutShuffles() {
        checkMerge(RowLevelOperationMode.COPY_ON_WRITE, false);
    }

    @Test
    public void testCopyOnWriteMergeWithoutShufflesWithPredicate() {
        checkMerge(RowLevelOperationMode.COPY_ON_WRITE, true);
    }

    @Test
    public void testMergeOnReadMergeWithoutShuffles() {
        checkMerge(RowLevelOperationMode.MERGE_ON_READ, false);
    }

    @Test
    public void testMergeOnReadMergeWithoutShufflesWithPredicate() {
        checkMerge(RowLevelOperationMode.MERGE_ON_READ, true);
    }

    private void checkMerge(RowLevelOperationMode rowLevelOperationMode, boolean z) {
        sql("CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)", new Object[]{this.tableName, tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        append(this.tableName, new String[]{"{ \"id\": 1, \"salary\": 100, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 2, \"salary\": 200, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 3, \"salary\": 300, \"dep\": \"hr\" }"});
        append(this.tableName, new String[]{"{ \"id\": 4, \"salary\": 400, \"dep\": \"hardware\" }"});
        append(this.tableName, new String[]{"{ \"id\": 6, \"salary\": 600, \"dep\": \"software\" }"});
        sql("CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)", new Object[]{tableName(OTHER_TABLE_NAME), tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        append(tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 1, \"salary\": 110, \"dep\": \"hr\" }"});
        append(tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 5, \"salary\": 500, \"dep\": \"hr\" }"});
        append(tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 6, \"salary\": 300, \"dep\": \"software\" }"});
        append(tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 10, \"salary\": 1000, \"dep\": \"ops\" }"});
        sql("ALTER TABLE %s SET TBLPROPERTIES(%s)", new Object[]{this.tableName, tablePropsAsString(ImmutableMap.of("write.merge.mode", rowLevelOperationMode.modeName(), "write.merge.distribution-mode", "none"))});
        withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
            String sparkPlan = executeAndKeepPlan("MERGE INTO %s AS t USING %s AS s ON t.id = s.id AND t.dep = s.dep %s WHEN MATCHED THEN   UPDATE SET t.salary = s.salary WHEN NOT MATCHED THEN   INSERT *", new Object[]{this.tableName, tableName(OTHER_TABLE_NAME), z ? "AND t.dep IN ('hr', 'ops', 'software')" : ""}).toString();
            if (rowLevelOperationMode != RowLevelOperationMode.COPY_ON_WRITE) {
                Assertions.assertThat(sparkPlan).doesNotContain(new CharSequence[]{"Exchange"});
            } else {
                Assert.assertEquals("Should be 1 shuffle with SPJ", 1L, StringUtils.countMatches(sparkPlan, "Exchange"));
                Assertions.assertThat(sparkPlan).contains(new CharSequence[]{"Exchange hashpartitioning(_file"});
            }
        });
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, 110, "hr"}), row(new Object[]{2, 200, "hr"}), row(new Object[]{3, 300, "hr"}), row(new Object[]{4, 400, "hardware"}), row(new Object[]{5, 500, "hr"}), row(new Object[]{6, 300, "software"}), row(new Object[]{10, 1000, "ops"})), sql("SELECT * FROM %s ORDER BY id, salary", new Object[]{this.tableName}));
    }
}
