package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/spark/source/TestRuntimeFiltering.class */
public class TestRuntimeFiltering extends SparkTestBaseWithCatalog {
    @After
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", this.tableName);
        sql("DROP TABLE IF EXISTS dim", new Object[0]);
    }

    @Test
    public void testIdentityPartitionedTable() throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (date)", this.tableName);
        spark.range(1L, 100L).withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        spark.range(1L, 10L).withColumn("date", functions.expr("DATE '1970-01-02'")).select("id", new String[]{"date"}).coalesce(1).write().mode("append").insertInto("dim");
        String format = String.format("SELECT f.* FROM %s f JOIN dim d ON f.date = d.date AND d.id = 1 ORDER BY id", this.tableName);
        assertQueryContainsRuntimeFilter(format);
        deleteNotMatchingFiles(Expressions.equal("date", 1), 3);
        assertEquals("Should have expected rows", sql("SELECT * FROM %s WHERE date = DATE '1970-01-02' ORDER BY id", this.tableName), sql(format, new Object[0]));
    }

    @Test
    public void testBucketedTable() throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (bucket(8, id))", this.tableName);
        spark.range(1L, 100L).withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        spark.range(1L, 2L).withColumn("date", functions.expr("DATE '1970-01-02'")).select("id", new String[]{"date"}).coalesce(1).write().mode("append").insertInto("dim");
        String format = String.format("SELECT f.* FROM %s f JOIN dim d ON f.id = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        assertQueryContainsRuntimeFilter(format);
        deleteNotMatchingFiles(Expressions.equal("id", 1), 7);
        assertEquals("Should have expected rows", sql("SELECT * FROM %s WHERE id = 1 ORDER BY date", this.tableName), sql(format, new Object[0]));
    }

    @Test
    public void testRenamedSourceColumnTable() throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (bucket(8, id))", this.tableName);
        spark.range(1L, 100L).withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        spark.range(1L, 2L).withColumn("date", functions.expr("DATE '1970-01-02'")).select("id", new String[]{"date"}).coalesce(1).write().mode("append").insertInto("dim");
        sql("ALTER TABLE %s RENAME COLUMN id TO row_id", this.tableName);
        String format = String.format("SELECT f.* FROM %s f JOIN dim d ON f.row_id = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        assertQueryContainsRuntimeFilter(format);
        deleteNotMatchingFiles(Expressions.equal("row_id", 1), 7);
        assertEquals("Should have expected rows", sql("SELECT * FROM %s WHERE row_id = 1 ORDER BY date", this.tableName), sql(format, new Object[0]));
    }

    @Test
    public void testMultipleRuntimeFilters() throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (data, bucket(8, id))", this.tableName);
        spark.range(1L, 100L).withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        sql("CREATE TABLE dim (id BIGINT, date DATE, data STRING) USING parquet", new Object[0]);
        spark.range(1L, 2L).withColumn("date", functions.expr("DATE '1970-01-02'")).withColumn("data", functions.expr("'1970-01-02'")).select("id", new String[]{"date", "data"}).coalesce(1).write().mode("append").insertInto("dim");
        String format = String.format("SELECT f.* FROM %s f JOIN dim d ON f.id = d.id AND f.data = d.data AND d.date = DATE '1970-01-02'", this.tableName);
        assertQueryContainsRuntimeFilters(format, 2, "Query should have 2 runtime filters");
        deleteNotMatchingFiles(Expressions.equal("id", 1), 31);
        assertEquals("Should have expected rows", sql("SELECT * FROM %s WHERE id = 1 AND data = '1970-01-02'", this.tableName), sql(format, new Object[0]));
    }

    @Test
    public void testBucketedTableWithMultipleSpecs() throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg", this.tableName);
        spark.range(1L, 100L).withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(id % 2 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).append();
        this.validationCatalog.loadTable(this.tableIdent).updateSpec().addField(Expressions.bucket("id", 8)).commit();
        sql("REFRESH TABLE %s", this.tableName);
        spark.range(1L, 100L).withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        spark.range(1L, 2L).withColumn("date", functions.expr("DATE '1970-01-02'")).select("id", new String[]{"date"}).coalesce(1).write().mode("append").insertInto("dim");
        String format = String.format("SELECT f.* FROM %s f JOIN dim d ON f.id = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        assertQueryContainsRuntimeFilter(format);
        deleteNotMatchingFiles(Expressions.equal("id", 1), 7);
        assertEquals("Should have expected rows", sql("SELECT * FROM %s WHERE id = 1 ORDER BY date", this.tableName), sql(format, new Object[0]));
    }

    @Test
    public void testSourceColumnWithDots() throws NoSuchTableException {
        sql("CREATE TABLE %s (`i.d` BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (bucket(8, `i.d`))", this.tableName);
        spark.range(1L, 100L).withColumnRenamed("id", "i.d").withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(`i.d` % 4 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("`i.d`", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        sql("SELECT * FROM %s WHERE `i.d` = 1", this.tableName);
        sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        spark.range(1L, 2L).withColumn("date", functions.expr("DATE '1970-01-02'")).select("id", new String[]{"date"}).coalesce(1).write().mode("append").insertInto("dim");
        String format = String.format("SELECT f.* FROM %s f JOIN dim d ON f.`i.d` = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        assertQueryContainsRuntimeFilter(format);
        deleteNotMatchingFiles(Expressions.equal("i.d", 1), 7);
        sql(format, new Object[0]);
        assertEquals("Should have expected rows", sql("SELECT * FROM %s WHERE `i.d` = 1 ORDER BY date", this.tableName), sql(format, new Object[0]));
    }

    @Test
    public void testSourceColumnWithBackticks() throws NoSuchTableException {
        sql("CREATE TABLE %s (`i``d` BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (bucket(8, `i``d`))", this.tableName);
        spark.range(1L, 100L).withColumnRenamed("id", "i`d").withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(`i``d` % 4 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("`i``d`", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        spark.range(1L, 2L).withColumn("date", functions.expr("DATE '1970-01-02'")).select("id", new String[]{"date"}).coalesce(1).write().mode("append").insertInto("dim");
        String format = String.format("SELECT f.* FROM %s f JOIN dim d ON f.`i``d` = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        assertQueryContainsRuntimeFilter(format);
        deleteNotMatchingFiles(Expressions.equal("i`d", 1), 7);
        assertEquals("Should have expected rows", sql("SELECT * FROM %s WHERE `i``d` = 1 ORDER BY date", this.tableName), sql(format, new Object[0]));
    }

    @Test
    public void testUnpartitionedTable() throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg", this.tableName);
        spark.range(1L, 100L).withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr("TO_TIMESTAMP(date)")).withColumn("data", functions.expr("CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"}).coalesce(1).writeTo(this.tableName).append();
        sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        spark.range(1L, 2L).withColumn("date", functions.expr("DATE '1970-01-02'")).select("id", new String[]{"date"}).coalesce(1).write().mode("append").insertInto("dim");
        String format = String.format("SELECT f.* FROM %s f JOIN dim d ON f.id = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        assertQueryContainsNoRuntimeFilter(format);
        assertEquals("Should have expected rows", sql("SELECT * FROM %s WHERE id = 1 ORDER BY date", this.tableName), sql(format, new Object[0]));
    }

    private void assertQueryContainsRuntimeFilter(String str) {
        assertQueryContainsRuntimeFilters(str, 1, "Query should have 1 runtime filter");
    }

    private void assertQueryContainsNoRuntimeFilter(String str) {
        assertQueryContainsRuntimeFilters(str, 0, "Query should have no runtime filters");
    }

    private void assertQueryContainsRuntimeFilters(String str, int i, String str2) {
        Assert.assertEquals(str2, i, StringUtils.countMatches(((Row) spark.sql("EXPLAIN EXTENDED " + str).collectAsList().get(0)).getString(0), "dynamicpruningexpression"));
    }

    private void deleteNotMatchingFiles(Expression expression, int i) {
        HashSet newHashSet;
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        FileIO io = loadTable.io();
        HashSet newHashSet2 = Sets.newHashSet();
        try {
            CloseableIterable planFiles = loadTable.newScan().filter(expression).planFiles();
            Throwable th = null;
            try {
                try {
                    CloseableIterator it = planFiles.iterator();
                    while (it.hasNext()) {
                        newHashSet2.add(((FileScanTask) it.next()).file().path().toString());
                    }
                    if (planFiles != null) {
                        $closeResource(null, planFiles);
                    }
                    newHashSet = Sets.newHashSet();
                } finally {
                }
                try {
                    planFiles = loadTable.newScan().planFiles();
                    Throwable th2 = null;
                    try {
                        try {
                            CloseableIterator it2 = planFiles.iterator();
                            while (it2.hasNext()) {
                                String charSequence = ((FileScanTask) it2.next()).file().path().toString();
                                if (!newHashSet2.contains(charSequence)) {
                                    io.deleteFile(charSequence);
                                    newHashSet.add(charSequence);
                                }
                            }
                            if (planFiles != null) {
                                $closeResource(null, planFiles);
                            }
                            Assert.assertEquals("Deleted unexpected number of files", i, newHashSet.size());
                        } finally {
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            } finally {
            }
        } catch (IOException e2) {
            throw new UncheckedIOException(e2);
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
