/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestRuntimeFiltering
extends SparkTestBaseWithCatalog {
    private final PlanningMode planningMode;

    @Parameterized.Parameters(name="planningMode = {0}")
    public static Object[] parameters() {
        return new Object[]{PlanningMode.LOCAL, PlanningMode.DISTRIBUTED};
    }

    public TestRuntimeFiltering(PlanningMode planningMode) {
        this.planningMode = planningMode;
    }

    @After
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", this.tableName);
        this.sql("DROP TABLE IF EXISTS dim", new Object[0]);
    }

    @Test
    public void testIdentityPartitionedTable() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (date)", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df = spark.range(1L, 100L).withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"});
        df.coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        this.sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 10L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).select("id", new String[]{"date"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        String query = String.format("SELECT f.* FROM %s f JOIN dim d ON f.date = d.date AND d.id = 1 ORDER BY id", this.tableName);
        this.assertQueryContainsRuntimeFilter(query);
        this.deleteNotMatchingFiles((Expression)Expressions.equal((String)"date", (Object)1), 3);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE date = DATE '1970-01-02' ORDER BY id", this.tableName), this.sql(query, new Object[0]));
    }

    @Test
    public void testBucketedTable() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (bucket(8, id))", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df = spark.range(1L, 100L).withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"});
        df.coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        this.sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 2L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).select("id", new String[]{"date"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        String query = String.format("SELECT f.* FROM %s f JOIN dim d ON f.id = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        this.assertQueryContainsRuntimeFilter(query);
        this.deleteNotMatchingFiles((Expression)Expressions.equal((String)"id", (Object)1), 7);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE id = 1 ORDER BY date", this.tableName), this.sql(query, new Object[0]));
    }

    @Test
    public void testRenamedSourceColumnTable() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (bucket(8, id))", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df = spark.range(1L, 100L).withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"});
        df.coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        this.sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 2L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).select("id", new String[]{"date"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        this.sql("ALTER TABLE %s RENAME COLUMN id TO row_id", this.tableName);
        String query = String.format("SELECT f.* FROM %s f JOIN dim d ON f.row_id = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        this.assertQueryContainsRuntimeFilter(query);
        this.deleteNotMatchingFiles((Expression)Expressions.equal((String)"row_id", (Object)1), 7);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE row_id = 1 ORDER BY date", this.tableName), this.sql(query, new Object[0]));
    }

    @Test
    public void testMultipleRuntimeFilters() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (data, bucket(8, id))", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df = spark.range(1L, 100L).withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"});
        df.coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        this.sql("CREATE TABLE dim (id BIGINT, date DATE, data STRING) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 2L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).withColumn("data", functions.expr((String)"'1970-01-02'")).select("id", new String[]{"date", "data"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        String query = String.format("SELECT f.* FROM %s f JOIN dim d ON f.id = d.id AND f.data = d.data AND d.date = DATE '1970-01-02'", this.tableName);
        this.assertQueryContainsRuntimeFilters(query, 2, "Query should have 2 runtime filters");
        this.deleteNotMatchingFiles((Expression)Expressions.equal((String)"id", (Object)1), 31);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE id = 1 AND data = '1970-01-02'", this.tableName), this.sql(query, new Object[0]));
    }

    @Test
    public void testCaseSensitivityOfRuntimeFilters() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (data, bucket(8, id))", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df = spark.range(1L, 100L).withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"});
        df.coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        this.sql("CREATE TABLE dim (id BIGINT, date DATE, data STRING) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 2L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).withColumn("data", functions.expr((String)"'1970-01-02'")).select("id", new String[]{"date", "data"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        String caseInsensitiveQuery = String.format("select f.* from %s F join dim d ON f.Id = d.iD and f.DaTa = d.dAtA and d.dAtE = date '1970-01-02'", this.tableName);
        this.assertQueryContainsRuntimeFilters(caseInsensitiveQuery, 2, "Query should have 2 runtime filters");
        this.deleteNotMatchingFiles((Expression)Expressions.equal((String)"id", (Object)1), 31);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE id = 1 AND data = '1970-01-02'", this.tableName), this.sql(caseInsensitiveQuery, new Object[0]));
    }

    @Test
    public void testBucketedTableWithMultipleSpecs() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df1 = spark.range(1L, 100L).withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(id % 2 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"});
        df1.coalesce(1).writeTo(this.tableName).append();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        table.updateSpec().addField((Term)Expressions.bucket((String)"id", (int)8)).commit();
        this.sql("REFRESH TABLE %s", this.tableName);
        Dataset df2 = spark.range(1L, 100L).withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"});
        df2.coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        this.sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 2L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).select("id", new String[]{"date"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        String query = String.format("SELECT f.* FROM %s f JOIN dim d ON f.id = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        this.assertQueryContainsRuntimeFilter(query);
        this.deleteNotMatchingFiles((Expression)Expressions.equal((String)"id", (Object)1), 7);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE id = 1 ORDER BY date", this.tableName), this.sql(query, new Object[0]));
    }

    @Test
    public void testSourceColumnWithDots() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (`i.d` BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (bucket(8, `i.d`))", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df = spark.range(1L, 100L).withColumnRenamed("id", "i.d").withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(`i.d` % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("`i.d`", new String[]{"data", "date", "ts"});
        df.coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        this.sql("SELECT * FROM %s WHERE `i.d` = 1", this.tableName);
        this.sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 2L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).select("id", new String[]{"date"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        String query = String.format("SELECT f.* FROM %s f JOIN dim d ON f.`i.d` = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        this.assertQueryContainsRuntimeFilter(query);
        this.deleteNotMatchingFiles((Expression)Expressions.equal((String)"i.d", (Object)1), 7);
        this.sql(query, new Object[0]);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE `i.d` = 1 ORDER BY date", this.tableName), this.sql(query, new Object[0]));
    }

    @Test
    public void testSourceColumnWithBackticks() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (`i``d` BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg PARTITIONED BY (bucket(8, `i``d`))", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df = spark.range(1L, 100L).withColumnRenamed("id", "i`d").withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(`i``d` % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("`i``d`", new String[]{"data", "date", "ts"});
        df.coalesce(1).writeTo(this.tableName).option("fanout-enabled", "true").append();
        this.sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 2L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).select("id", new String[]{"date"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        String query = String.format("SELECT f.* FROM %s f JOIN dim d ON f.`i``d` = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        this.assertQueryContainsRuntimeFilter(query);
        this.deleteNotMatchingFiles((Expression)Expressions.equal((String)"i`d", (Object)1), 7);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE `i``d` = 1 ORDER BY date", this.tableName), this.sql(query, new Object[0]));
    }

    @Test
    public void testUnpartitionedTable() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg", this.tableName);
        this.configurePlanningMode(this.planningMode);
        Dataset df = spark.range(1L, 100L).withColumn("date", functions.date_add((Column)functions.expr((String)"DATE '1970-01-01'"), (Column)functions.expr((String)"CAST(id % 4 AS INT)"))).withColumn("ts", functions.expr((String)"TO_TIMESTAMP(date)")).withColumn("data", functions.expr((String)"CAST(date AS STRING)")).select("id", new String[]{"data", "date", "ts"});
        df.coalesce(1).writeTo(this.tableName).append();
        this.sql("CREATE TABLE dim (id BIGINT, date DATE) USING parquet", new Object[0]);
        Dataset dimDF = spark.range(1L, 2L).withColumn("date", functions.expr((String)"DATE '1970-01-02'")).select("id", new String[]{"date"});
        dimDF.coalesce(1).write().mode("append").insertInto("dim");
        String query = String.format("SELECT f.* FROM %s f JOIN dim d ON f.id = d.id AND d.date = DATE '1970-01-02' ORDER BY date", this.tableName);
        this.assertQueryContainsNoRuntimeFilter(query);
        this.assertEquals("Should have expected rows", this.sql("SELECT * FROM %s WHERE id = 1 ORDER BY date", this.tableName), this.sql(query, new Object[0]));
    }

    private void assertQueryContainsRuntimeFilter(String query) {
        this.assertQueryContainsRuntimeFilters(query, 1, "Query should have 1 runtime filter");
    }

    private void assertQueryContainsNoRuntimeFilter(String query) {
        this.assertQueryContainsRuntimeFilters(query, 0, "Query should have no runtime filters");
    }

    private void assertQueryContainsRuntimeFilters(String query, int expectedFilterCount, String errorMessage) {
        List output = spark.sql("EXPLAIN EXTENDED " + query).collectAsList();
        String plan = ((Row)output.get(0)).getString(0);
        int actualFilterCount = StringUtils.countMatches((CharSequence)plan, (CharSequence)"dynamicpruningexpression");
        Assert.assertEquals((String)errorMessage, (long)expectedFilterCount, (long)actualFilterCount);
    }

    private void deleteNotMatchingFiles(Expression filter, int expectedDeletedFileCount) {
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        FileIO io = table.io();
        HashSet matchingFileLocations = Sets.newHashSet();
        try (CloseableIterable files = ((TableScan)table.newScan().filter(filter)).planFiles();){
            for (FileScanTask file : files) {
                String path = ((DataFile)file.file()).path().toString();
                matchingFileLocations.add(path);
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        HashSet deletedFileLocations = Sets.newHashSet();
        try {
            Object object = null;
            try (CloseableIterable files = table.newScan().planFiles();){
                for (FileScanTask file : files) {
                    String path = ((DataFile)file.file()).path().toString();
                    if (matchingFileLocations.contains(path)) continue;
                    io.deleteFile(path);
                    deletedFileLocations.add(path);
                }
            }
            catch (Throwable throwable) {
                object = throwable;
                throw throwable;
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        Assert.assertEquals((String)"Deleted unexpected number of files", (long)expectedDeletedFileCount, (long)deletedFileLocations.size());
    }
}

