package org.apache.iceberg.spark.sql;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.class */
public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog {
    private static final String OTHER_TABLE_NAME = "other_table";
    private static final Map<String, String> TABLE_PROPERTIES = ImmutableMap.of("read.split.target-size", "16777216", "read.split.open-file-cost", "16777216");
    private static final Map<String, String> ENABLED_SPJ_SQL_CONF = ImmutableMap.of(SQLConf.V2_BUCKETING_ENABLED().key(), "true", SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), "false", SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1", "spark.sql.iceberg.planning.preserve-data-grouping", "true");
    private static final Map<String, String> DISABLED_SPJ_SQL_CONF = ImmutableMap.of(SQLConf.V2_BUCKETING_ENABLED().key(), "false", SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), "false", SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1", "spark.sql.iceberg.planning.preserve-data-grouping", "true");

    @BeforeClass
    public static void setupSparkConf() {
        spark.conf().set("spark.sql.shuffle.partitions", "4");
    }

    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", this.tableName);
        sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithBucketingOnByteColumn() throws NoSuchTableException {
        checkJoin("byte_col", "TINYINT", "bucket(4, byte_col)");
    }

    @Test
    public void testJoinsWithBucketingOnShortColumn() throws NoSuchTableException {
        checkJoin("short_col", "SMALLINT", "bucket(4, short_col)");
    }

    @Test
    public void testJoinsWithBucketingOnIntColumn() throws NoSuchTableException {
        checkJoin("int_col", "INT", "bucket(16, int_col)");
    }

    @Test
    public void testJoinsWithBucketingOnLongColumn() throws NoSuchTableException {
        checkJoin("long_col", "BIGINT", "bucket(16, long_col)");
    }

    @Test
    public void testJoinsWithBucketingOnTimestampColumn() throws NoSuchTableException {
        checkJoin("timestamp_col", "TIMESTAMP", "bucket(16, timestamp_col)");
    }

    @Test
    public void testJoinsWithBucketingOnDateColumn() throws NoSuchTableException {
        checkJoin("date_col", "DATE", "bucket(8, date_col)");
    }

    @Test
    public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException {
        checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
    }

    @Test
    public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException {
        checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
    }

    @Test
    public void testJoinsWithYearsOnTimestampColumn() throws NoSuchTableException {
        checkJoin("timestamp_col", "TIMESTAMP", "years(timestamp_col)");
    }

    @Test
    public void testJoinsWithYearsOnDateColumn() throws NoSuchTableException {
        checkJoin("date_col", "DATE", "years(date_col)");
    }

    @Test
    public void testJoinsWithMonthsOnTimestampColumn() throws NoSuchTableException {
        checkJoin("timestamp_col", "TIMESTAMP", "months(timestamp_col)");
    }

    @Test
    public void testJoinsWithMonthsOnDateColumn() throws NoSuchTableException {
        checkJoin("date_col", "DATE", "months(date_col)");
    }

    @Test
    public void testJoinsWithDaysOnTimestampColumn() throws NoSuchTableException {
        checkJoin("timestamp_col", "TIMESTAMP", "days(timestamp_col)");
    }

    @Test
    public void testJoinsWithDaysOnDateColumn() throws NoSuchTableException {
        checkJoin("date_col", "DATE", "days(date_col)");
    }

    @Test
    public void testJoinsWithHoursOnTimestampColumn() throws NoSuchTableException {
        checkJoin("timestamp_col", "TIMESTAMP", "hours(timestamp_col)");
    }

    @Test
    public void testJoinsWithMultipleTransformTypes() throws NoSuchTableException {
        sql("CREATE TABLE %s (  id BIGINT, int_col INT, date_col1 DATE, date_col2 DATE, date_col3 DATE,  timestamp_col TIMESTAMP, string_col STRING, dep STRING)USING iceberg PARTITIONED BY (  years(date_col1), months(date_col2), days(date_col3), hours(timestamp_col),   bucket(8, int_col), dep)TBLPROPERTIES (%s)", this.tableName, tablePropsAsString(TABLE_PROPERTIES));
        sql("CREATE TABLE %s (  id BIGINT, int_col INT, date_col1 DATE, date_col2 DATE, date_col3 DATE,  timestamp_col TIMESTAMP, string_col STRING, dep STRING)USING iceberg PARTITIONED BY (  years(date_col1), months(date_col2), days(date_col3), hours(timestamp_col),   bucket(8, int_col), dep)TBLPROPERTIES (%s)", tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
        Dataset<Row> randomDataDF = randomDataDF(this.validationCatalog.loadTable(this.tableIdent).schema(), 16);
        append(this.tableName, randomDataDF);
        append(tableName(OTHER_TABLE_NAME), randomDataDF);
        append(tableName(OTHER_TABLE_NAME), randomDataDF);
        assertPartitioningAwarePlan(1, 3, "SELECT t1.id FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.dep = t2.dep ORDER BY t1.id", this.tableName, tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.int_col, t1.date_col1 FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.date_col1 = t2.date_col1 ORDER BY t1.id, t1.int_col, t1.date_col1", this.tableName, tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.timestamp_col, t1.string_col FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.timestamp_col = t2.timestamp_col AND t1.string_col = t2.string_col ORDER BY t1.id, t1.timestamp_col, t1.string_col", this.tableName, tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.date_col1, t1.date_col2, t1.date_col3 FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.date_col1 = t2.date_col1 AND t1.date_col2 = t2.date_col2 AND t1.date_col3 = t2.date_col3 ORDER BY t1.id, t1.date_col1, t1.date_col2, t1.date_col3", this.tableName, tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.int_col, t1.timestamp_col, t1.dep FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.timestamp_col = t2.timestamp_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.timestamp_col, t1.dep", this.tableName, tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithCompatibleSpecEvolution() {
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg TBLPROPERTIES (%s)", this.tableName, tablePropsAsString(TABLE_PROPERTIES));
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        loadTable.updateSpec().addField("dep").commit();
        sql("REFRESH TABLE %s", this.tableName);
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        loadTable.updateSpec().addField(Expressions.bucket("int_col", 8)).commit();
        sql("REFRESH TABLE %s", this.tableName);
        sql("INSERT INTO %s VALUES (2L, 200, 'hr')", this.tableName);
        sql("CREATE TABLE %s (other_id BIGINT, other_int_col INT, other_dep STRING)USING iceberg PARTITIONED BY (other_dep)TBLPROPERTIES (%s)", tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName(OTHER_TABLE_NAME));
        sql("INSERT INTO %s VALUES (2L, 200, 'hr')", tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(1, 3, "SELECT * FROM %s INNER JOIN %s ON id = other_id AND int_col = other_int_col AND dep = other_dep ORDER BY id, int_col, dep", this.tableName, tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithIncompatibleSpecs() {
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName, tablePropsAsString(TABLE_PROPERTIES));
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        sql("INSERT INTO %s VALUES (2L, 200, 'software')", this.tableName);
        sql("INSERT INTO %s VALUES (3L, 300, 'software')", this.tableName);
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (bucket(8, int_col))TBLPROPERTIES (%s)", tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName(OTHER_TABLE_NAME));
        sql("INSERT INTO %s VALUES (2L, 200, 'software')", tableName(OTHER_TABLE_NAME));
        sql("INSERT INTO %s VALUES (3L, 300, 'software')", tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(3, 3, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithUnpartitionedTables() {
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg TBLPROPERTIES (  'read.split.target-size' = 16777216,  'read.split.open-file-cost' = 16777216)", this.tableName);
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        sql("INSERT INTO %s VALUES (2L, 200, 'software')", this.tableName);
        sql("INSERT INTO %s VALUES (3L, 300, 'software')", this.tableName);
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg TBLPROPERTIES (  'read.split.target-size' = 16777216,  'read.split.open-file-cost' = 16777216)", tableName(OTHER_TABLE_NAME));
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName(OTHER_TABLE_NAME));
        sql("INSERT INTO %s VALUES (2L, 200, 'software')", tableName(OTHER_TABLE_NAME));
        sql("INSERT INTO %s VALUES (3L, 300, 'software')", tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(3, 3, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithEmptyTable() {
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName, tablePropsAsString(TABLE_PROPERTIES));
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName(OTHER_TABLE_NAME));
        sql("INSERT INTO %s VALUES (2L, 200, 'software')", tableName(OTHER_TABLE_NAME));
        sql("INSERT INTO %s VALUES (3L, 300, 'software')", tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(3, 3, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithOneSplitTables() {
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName, tablePropsAsString(TABLE_PROPERTIES));
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
        sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(0, 0, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testAggregates() throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep, bucket(8, int_col))TBLPROPERTIES (%s)", this.tableName, tablePropsAsString(TABLE_PROPERTIES));
        append(this.tableName, randomDataDF(this.validationCatalog.loadTable(this.tableIdent).schema(), 100));
        assertPartitioningAwarePlan(1, 3, "SELECT COUNT (DISTINCT id) AS count FROM %s GROUP BY dep, int_col ORDER BY count", this.tableName, tableName(OTHER_TABLE_NAME));
        assertPartitioningAwarePlan(1, 3, "SELECT COUNT (DISTINCT id) AS count FROM %s GROUP BY dep ORDER BY count", this.tableName, tableName(OTHER_TABLE_NAME));
    }

    private void checkJoin(String str, String str2, String str3) throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, salary INT, %s %s)USING iceberg PARTITIONED BY (%s)TBLPROPERTIES (%s)", this.tableName, str, str2, str3, tablePropsAsString(TABLE_PROPERTIES));
        sql("CREATE TABLE %s (id BIGINT, salary INT, %s %s)USING iceberg PARTITIONED BY (%s)TBLPROPERTIES (%s)", tableName(OTHER_TABLE_NAME), str, str2, str3, tablePropsAsString(TABLE_PROPERTIES));
        Dataset<Row> randomDataDF = randomDataDF(this.validationCatalog.loadTable(this.tableIdent).schema(), 200);
        append(this.tableName, randomDataDF);
        append(tableName(OTHER_TABLE_NAME), randomDataDF);
        assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.salary, t1.%s FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.%s = t2.%s ORDER BY t1.id, t1.%s", str, this.tableName, tableName(OTHER_TABLE_NAME), str, str, str);
    }

    private void assertPartitioningAwarePlan(int i, int i2, String str, Object... objArr) {
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
            Assert.assertEquals("Number of shuffles with enabled SPJ must match", i, StringUtils.countMatches(executeAndKeepPlan(str, objArr).toString(), "Exchange"));
            atomicReference.set(sql(str, objArr));
        });
        withSQLConf(DISABLED_SPJ_SQL_CONF, () -> {
            Assert.assertEquals("Number of shuffles with disabled SPJ must match", i2, StringUtils.countMatches(executeAndKeepPlan(str, objArr).toString(), "Exchange"));
            atomicReference2.set(sql(str, objArr));
        });
        assertEquals("SPJ should not change query output", (List<Object[]>) atomicReference2.get(), (List<Object[]>) atomicReference.get());
    }

    private Dataset<Row> randomDataDF(Schema schema, int i) {
        JavaRDD parallelize = sparkContext.parallelize(Lists.newArrayList(RandomData.generateSpark(schema, i, 0L)));
        return spark.internalCreateDataFrame(JavaRDD.toRDD(parallelize), SparkSchemaUtil.convert(schema), false);
    }

    private void append(String str, Dataset<Row> dataset) throws NoSuchTableException {
        dataset.coalesce(1).writeTo(str).option("fanout-enabled", "true").append();
    }
}
