package org.apache.iceberg.spark.source;

import java.util.HashMap;
import java.util.Map;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
import org.apache.iceberg.spark.TestBaseWithCatalog;
import org.apache.iceberg.spark.functions.BucketFunction;
import org.apache.iceberg.spark.functions.DaysFunction;
import org.apache.iceberg.spark.functions.HoursFunction;
import org.apache.iceberg.spark.functions.MonthsFunction;
import org.apache.iceberg.spark.functions.TruncateFunction;
import org.apache.iceberg.spark.functions.YearsFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.LiteralValue;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.UserDefinedScalarFunc;
import org.apache.spark.sql.connector.expressions.filter.And;
import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/iceberg/spark/source/TestSparkScan.class */
public class TestSparkScan extends TestBaseWithCatalog {
    private static final String DUMMY_BLOB_TYPE = "sum-data-size-bytes-v1";

    @Parameter(index = 3)
    private String format;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, format = {3}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{SparkCatalogConfig.HADOOP.catalogName(), SparkCatalogConfig.HADOOP.implementation(), SparkCatalogConfig.HADOOP.properties(), "parquet"}, new Object[]{SparkCatalogConfig.HADOOP.catalogName(), SparkCatalogConfig.HADOOP.implementation(), SparkCatalogConfig.HADOOP.properties(), "avro"}, new Object[]{SparkCatalogConfig.HADOOP.catalogName(), SparkCatalogConfig.HADOOP.implementation(), SparkCatalogConfig.HADOOP.properties(), "orc"}};
    }

    @BeforeEach
    public void useCatalog() {
        sql("USE %s", this.catalogName);
    }

    @AfterEach
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", this.tableName);
    }

    @TestTemplate
    public void testEstimatedRowCount() throws NoSuchTableException {
        sql("CREATE TABLE %s (id BIGINT, date DATE) USING iceberg TBLPROPERTIES('%s' = '%s')", this.tableName, "write.format.default", this.format);
        spark.range(10000L).withColumn("date", functions.date_add(functions.expr("DATE '1970-01-01'"), functions.expr("CAST(id AS INT)"))).select("id", new String[]{"date"}).coalesce(1).writeTo(this.tableName).append();
        Assertions.assertThat(new SparkScanBuilder(spark, this.validationCatalog.loadTable(this.tableIdent), CaseInsensitiveStringMap.empty()).build().estimateStatistics().numRows().getAsLong()).isEqualTo(10000L);
    }

    @TestTemplate
    public void testTableWithoutColStats() throws NoSuchTableException {
        sql("CREATE TABLE %s (id int, data string) USING iceberg", this.tableName);
        spark.createDataset(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "a"), new SimpleRecord(4, "b")}), Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        SparkScan sparkScan = (SparkScan) new SparkScanBuilder(spark, this.validationCatalog.loadTable(this.tableIdent), CaseInsensitiveStringMap.empty()).build();
        ImmutableMap of = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true", "spark.sql.iceberg.report-column-stats", "false");
        ImmutableMap of2 = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
        checkColStatisticsNotReported(sparkScan, 4L);
        withSQLConf(of, () -> {
            checkColStatisticsNotReported(sparkScan, 4L);
        });
        withSQLConf(of2, () -> {
            checkColStatisticsReported(sparkScan, 4L, Maps.newHashMap());
        });
    }

    @TestTemplate
    public void testTableWithoutApacheDatasketchColStat() throws NoSuchTableException {
        sql("CREATE TABLE %s (id int, data string) USING iceberg", this.tableName);
        spark.createDataset(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "a"), new SimpleRecord(4, "b")}), Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        long snapshotId = loadTable.currentSnapshot().snapshotId();
        SparkScan sparkScan = (SparkScan) new SparkScanBuilder(spark, loadTable, CaseInsensitiveStringMap.empty()).build();
        ImmutableMap of = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true", "spark.sql.iceberg.report-column-stats", "false");
        ImmutableMap of2 = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
        loadTable.updateStatistics().setStatistics(snapshotId, new GenericStatisticsFile(snapshotId, "/test/statistics/file.puffin", 100L, 42L, ImmutableList.of(new GenericBlobMetadata(DUMMY_BLOB_TYPE, snapshotId, 1L, ImmutableList.of(1), ImmutableMap.of("data_size", "4"))))).commit();
        checkColStatisticsNotReported(sparkScan, 4L);
        withSQLConf(of, () -> {
            checkColStatisticsNotReported(sparkScan, 4L);
        });
        withSQLConf(of2, () -> {
            checkColStatisticsReported(sparkScan, 4L, Maps.newHashMap());
        });
    }

    @TestTemplate
    public void testTableWithOneColStats() throws NoSuchTableException {
        sql("CREATE TABLE %s (id int, data string) USING iceberg", this.tableName);
        spark.createDataset(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "a"), new SimpleRecord(4, "b")}), Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        long snapshotId = loadTable.currentSnapshot().snapshotId();
        SparkScan sparkScan = (SparkScan) new SparkScanBuilder(spark, loadTable, CaseInsensitiveStringMap.empty()).build();
        ImmutableMap of = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true", "spark.sql.iceberg.report-column-stats", "false");
        ImmutableMap of2 = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
        loadTable.updateStatistics().setStatistics(snapshotId, new GenericStatisticsFile(snapshotId, "/test/statistics/file.puffin", 100L, 42L, ImmutableList.of(new GenericBlobMetadata("apache-datasketches-theta-v1", snapshotId, 1L, ImmutableList.of(1), ImmutableMap.of("ndv", "4"))))).commit();
        checkColStatisticsNotReported(sparkScan, 4L);
        withSQLConf(of, () -> {
            checkColStatisticsNotReported(sparkScan, 4L);
        });
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("id", 4L);
        withSQLConf(of2, () -> {
            checkColStatisticsReported(sparkScan, 4L, newHashMap);
        });
    }

    @TestTemplate
    public void testTableWithOneApacheDatasketchColStatAndOneDifferentColStat() throws NoSuchTableException {
        sql("CREATE TABLE %s (id int, data string) USING iceberg", this.tableName);
        spark.createDataset(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "a"), new SimpleRecord(4, "b")}), Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        long snapshotId = loadTable.currentSnapshot().snapshotId();
        SparkScan sparkScan = (SparkScan) new SparkScanBuilder(spark, loadTable, CaseInsensitiveStringMap.empty()).build();
        ImmutableMap of = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true", "spark.sql.iceberg.report-column-stats", "false");
        ImmutableMap of2 = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
        loadTable.updateStatistics().setStatistics(snapshotId, new GenericStatisticsFile(snapshotId, "/test/statistics/file.puffin", 100L, 42L, ImmutableList.of(new GenericBlobMetadata("apache-datasketches-theta-v1", snapshotId, 1L, ImmutableList.of(1), ImmutableMap.of("ndv", "4")), new GenericBlobMetadata(DUMMY_BLOB_TYPE, snapshotId, 1L, ImmutableList.of(1), ImmutableMap.of("data_size", "2"))))).commit();
        checkColStatisticsNotReported(sparkScan, 4L);
        withSQLConf(of, () -> {
            checkColStatisticsNotReported(sparkScan, 4L);
        });
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("id", 4L);
        withSQLConf(of2, () -> {
            checkColStatisticsReported(sparkScan, 4L, newHashMap);
        });
    }

    @TestTemplate
    public void testTableWithTwoColStats() throws NoSuchTableException {
        sql("CREATE TABLE %s (id int, data string) USING iceberg", this.tableName);
        spark.createDataset(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "a"), new SimpleRecord(4, "b")}), Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        long snapshotId = loadTable.currentSnapshot().snapshotId();
        SparkScan sparkScan = (SparkScan) new SparkScanBuilder(spark, loadTable, CaseInsensitiveStringMap.empty()).build();
        ImmutableMap of = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true", "spark.sql.iceberg.report-column-stats", "false");
        ImmutableMap of2 = ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
        loadTable.updateStatistics().setStatistics(snapshotId, new GenericStatisticsFile(snapshotId, "/test/statistics/file.puffin", 100L, 42L, ImmutableList.of(new GenericBlobMetadata("apache-datasketches-theta-v1", snapshotId, 1L, ImmutableList.of(1), ImmutableMap.of("ndv", "4")), new GenericBlobMetadata("apache-datasketches-theta-v1", snapshotId, 1L, ImmutableList.of(2), ImmutableMap.of("ndv", "2"))))).commit();
        checkColStatisticsNotReported(sparkScan, 4L);
        withSQLConf(of, () -> {
            checkColStatisticsNotReported(sparkScan, 4L);
        });
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("id", 4L);
        newHashMap.put("data", 2L);
        withSQLConf(of2, () -> {
            checkColStatisticsReported(sparkScan, 4L, newHashMap);
        });
    }

    @TestTemplate
    public void testUnpartitionedYears() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("=", expressions(toUDF(new YearsFunction.TimestampToYearsFunction(), expressions(fieldRef("ts"))), intLit(SystemFunctionPushDownHelper.timestampStrToYearOrdinal("2017-11-22T00:00:00.000000+00:00"))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedYears() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "years(ts)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("=", expressions(toUDF(new YearsFunction.TimestampToYearsFunction(), expressions(fieldRef("ts"))), intLit(SystemFunctionPushDownHelper.timestampStrToYearOrdinal("2017-11-22T00:00:00.000000+00:00"))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(5);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(5);
    }

    @TestTemplate
    public void testUnpartitionedMonths() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate(">", expressions(toUDF(new MonthsFunction.TimestampToMonthsFunction(), expressions(fieldRef("ts"))), intLit(SystemFunctionPushDownHelper.timestampStrToMonthOrdinal("2017-11-22T00:00:00.000000+00:00"))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedMonths() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "months(ts)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate(">", expressions(toUDF(new MonthsFunction.TimestampToMonthsFunction(), expressions(fieldRef("ts"))), intLit(SystemFunctionPushDownHelper.timestampStrToMonthOrdinal("2017-11-22T00:00:00.000000+00:00"))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(5);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(5);
    }

    @TestTemplate
    public void testUnpartitionedDays() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("<", expressions(toUDF(new DaysFunction.TimestampToDaysFunction(), expressions(fieldRef("ts"))), dateLit(SystemFunctionPushDownHelper.timestampStrToDayOrdinal("2018-11-20T00:00:00.000000+00:00"))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedDays() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "days(ts)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("<", expressions(toUDF(new DaysFunction.TimestampToDaysFunction(), expressions(fieldRef("ts"))), dateLit(SystemFunctionPushDownHelper.timestampStrToDayOrdinal("2018-11-20T00:00:00.000000+00:00"))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(5);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(5);
    }

    @TestTemplate
    public void testUnpartitionedHours() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate(">=", expressions(toUDF(new HoursFunction.TimestampToHoursFunction(), expressions(fieldRef("ts"))), intLit(SystemFunctionPushDownHelper.timestampStrToHourOrdinal("2017-11-22T06:02:09.243857+00:00"))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedHours() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "hours(ts)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate(">=", expressions(toUDF(new HoursFunction.TimestampToHoursFunction(), expressions(fieldRef("ts"))), intLit(SystemFunctionPushDownHelper.timestampStrToHourOrdinal("2017-11-22T06:02:09.243857+00:00"))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(8);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(2);
    }

    @TestTemplate
    public void testUnpartitionedBucketLong() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate(">=", expressions(toUDF(new BucketFunction.BucketLong(DataTypes.LongType), expressions(intLit(5), fieldRef("id"))), intLit(2)));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedBucketLong() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "bucket(5, id)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate(">=", expressions(toUDF(new BucketFunction.BucketLong(DataTypes.LongType), expressions(intLit(5), fieldRef("id"))), intLit(2)));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(6);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(4);
    }

    @TestTemplate
    public void testUnpartitionedBucketString() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("<=", expressions(toUDF(new BucketFunction.BucketString(), expressions(intLit(5), fieldRef("data"))), intLit(2)));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedBucketString() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "bucket(5, data)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("<=", expressions(toUDF(new BucketFunction.BucketString(), expressions(intLit(5), fieldRef("data"))), intLit(2)));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(6);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(4);
    }

    @TestTemplate
    public void testUnpartitionedTruncateString() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("<>", expressions(toUDF(new TruncateFunction.TruncateString(), expressions(intLit(4), fieldRef("data"))), stringLit("data")));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedTruncateString() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "truncate(4, data)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("<>", expressions(toUDF(new TruncateFunction.TruncateString(), expressions(intLit(4), fieldRef("data"))), stringLit("data")));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(5);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(5);
    }

    @TestTemplate
    public void testUnpartitionedIsNull() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("IS_NULL", expressions(toUDF(new TruncateFunction.TruncateString(), expressions(intLit(4), fieldRef("data")))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedIsNull() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "truncate(4, data)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("IS_NULL", expressions(toUDF(new TruncateFunction.TruncateString(), expressions(intLit(4), fieldRef("data")))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(0);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testUnpartitionedIsNotNull() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("IS_NOT_NULL", expressions(toUDF(new TruncateFunction.TruncateString(), expressions(intLit(4), fieldRef("data")))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedIsNotNull() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "truncate(4, data)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Predicate predicate = new Predicate("IS_NOT_NULL", expressions(toUDF(new TruncateFunction.TruncateString(), expressions(intLit(4), fieldRef("data")))));
        pushFilters(scanBuilder, predicate);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(predicate));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(0);
    }

    @TestTemplate
    public void testUnpartitionedAnd() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        And and = new And(new Predicate("=", expressions(toUDF(new YearsFunction.TimestampToYearsFunction(), expressions(fieldRef("ts"))), intLit(47))), new Predicate(">=", expressions(toUDF(new BucketFunction.BucketLong(DataTypes.LongType), expressions(intLit(5), fieldRef("id"))), intLit(2))));
        pushFilters(scanBuilder, and);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(and));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedAnd() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "years(ts), bucket(5, id)");
        SparkScanBuilder scanBuilder = scanBuilder();
        And and = new And(new Predicate("=", expressions(toUDF(new YearsFunction.TimestampToYearsFunction(), expressions(fieldRef("ts"))), intLit(47))), new Predicate(">=", expressions(toUDF(new BucketFunction.BucketLong(DataTypes.LongType), expressions(intLit(5), fieldRef("id"))), intLit(2))));
        pushFilters(scanBuilder, and);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(1);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(and));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(9);
    }

    @TestTemplate
    public void testUnpartitionedOr() throws Exception {
        SystemFunctionPushDownHelper.createUnpartitionedTable(spark, this.tableName);
        SparkScanBuilder scanBuilder = scanBuilder();
        Or or = new Or(new Predicate("=", expressions(toUDF(new YearsFunction.TimestampToYearsFunction(), expressions(fieldRef("ts"))), intLit(47))), new Predicate(">=", expressions(toUDF(new BucketFunction.BucketLong(DataTypes.LongType), expressions(intLit(5), fieldRef("id"))), intLit(2))));
        pushFilters(scanBuilder, or);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(10);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(or));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(10);
    }

    @TestTemplate
    public void testPartitionedOr() throws Exception {
        SystemFunctionPushDownHelper.createPartitionedTable(spark, this.tableName, "years(ts), bucket(5, id)");
        SparkScanBuilder scanBuilder = scanBuilder();
        Or or = new Or(new Predicate("=", expressions(toUDF(new YearsFunction.TimestampToYearsFunction(), expressions(fieldRef("ts"))), intLit(48))), new Predicate(">=", expressions(toUDF(new BucketFunction.BucketLong(DataTypes.LongType), expressions(intLit(5), fieldRef("id"))), intLit(2))));
        pushFilters(scanBuilder, or);
        Assertions.assertThat(scanBuilder.build().toBatch().planInputPartitions().length).isEqualTo(6);
        SparkScanBuilder scanBuilder2 = scanBuilder();
        pushFilters(scanBuilder2, new Not(or));
        Assertions.assertThat(scanBuilder2.build().toBatch().planInputPartitions().length).isEqualTo(4);
    }

    private SparkScanBuilder scanBuilder() throws Exception {
        return new SparkScanBuilder(spark, Spark3Util.loadIcebergTable(spark, this.tableName), new CaseInsensitiveStringMap(ImmutableMap.of("path", this.tableName)));
    }

    private void pushFilters(ScanBuilder scanBuilder, Predicate... predicateArr) {
        Assertions.assertThat(scanBuilder).isInstanceOf(SupportsPushDownV2Filters.class);
        ((SupportsPushDownV2Filters) scanBuilder).pushPredicates(predicateArr);
    }

    private Expression[] expressions(Expression... expressionArr) {
        return expressionArr;
    }

    private void checkColStatisticsNotReported(SparkScan sparkScan, long j) {
        Statistics estimateStatistics = sparkScan.estimateStatistics();
        Assertions.assertThat(estimateStatistics.numRows().getAsLong()).isEqualTo(j);
        Assertions.assertThat(estimateStatistics.columnStats().isEmpty());
    }

    private void checkColStatisticsReported(SparkScan sparkScan, long j, Map<String, Long> map) {
        Statistics estimateStatistics = sparkScan.estimateStatistics();
        Assertions.assertThat(estimateStatistics.numRows().getAsLong()).isEqualTo(j);
        Map columnStats = estimateStatistics.columnStats();
        if (map.isEmpty()) {
            Assertions.assertThat(columnStats.values().stream().allMatch(columnStatistics -> {
                return columnStatistics.distinctCount().isEmpty();
            })).isTrue();
            return;
        }
        for (Map.Entry<String, Long> entry : map.entrySet()) {
            Assertions.assertThat(((ColumnStatistics) columnStats.get(FieldReference.column(entry.getKey()))).distinctCount().getAsLong()).isEqualTo(entry.getValue());
        }
    }

    private static LiteralValue<Integer> intLit(int i) {
        return LiteralValue.apply(Integer.valueOf(i), DataTypes.IntegerType);
    }

    private static LiteralValue<Integer> dateLit(int i) {
        return LiteralValue.apply(Integer.valueOf(i), DataTypes.DateType);
    }

    private static LiteralValue<String> stringLit(String str) {
        return LiteralValue.apply(str, DataTypes.StringType);
    }

    private static NamedReference fieldRef(String str) {
        return FieldReference.apply(str);
    }

    private static UserDefinedScalarFunc toUDF(BoundFunction boundFunction, Expression[] expressionArr) {
        return new UserDefinedScalarFunc(boundFunction.name(), boundFunction.canonicalName(), expressionArr);
    }
}
