/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.io.File;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestHoodieTableSource {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieTableSource.class);
    private Configuration conf;
    @TempDir
    File tempFile;

    void beforeEach() throws Exception {
        String path = this.tempFile.getAbsolutePath();
        this.conf = TestConfigurations.getDefaultConf(path);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
    }

    @Test
    void testGetReadPaths() throws Exception {
        this.beforeEach();
        HoodieTableSource tableSource = this.getEmptyStreamingSource();
        List fileList = tableSource.getReadFiles();
        Assertions.assertNotNull((Object)fileList);
        MatcherAssert.assertThat((Object)fileList.size(), (Matcher)Is.is((Object)4));
        FieldReferenceExpression partRef = new FieldReferenceExpression("partition", DataTypes.STRING(), 4, 4);
        ValueLiteralExpression partLiteral = new ValueLiteralExpression((Object)"par1", (DataType)DataTypes.STRING().notNull());
        CallExpression partFilter = new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.EQUALS, Arrays.asList(partRef, partLiteral), DataTypes.BOOLEAN());
        HoodieTableSource tableSource2 = this.getEmptyStreamingSource();
        tableSource2.applyFilters(Arrays.asList(partFilter));
        List fileList2 = tableSource2.getReadFiles();
        Assertions.assertNotNull((Object)fileList2);
        MatcherAssert.assertThat((Object)fileList2.size(), (Matcher)Is.is((Object)1));
    }

    @Test
    void testGetInputFormat() throws Exception {
        this.beforeEach();
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        HoodieTableSource tableSource = new HoodieTableSource(SerializableSchema.create((ResolvedSchema)TestConfigurations.TABLE_SCHEMA), new StoragePath(this.tempFile.getPath()), Arrays.asList(this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), "default-par", this.conf);
        InputFormat inputFormat = tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)Is.is((Matcher)CoreMatchers.instanceOf(FileInputFormat.class)));
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        inputFormat = tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)Is.is((Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class)));
        this.conf.setString(FlinkOptions.QUERY_TYPE.key(), "incremental");
        Assertions.assertDoesNotThrow(() -> ((HoodieTableSource)tableSource).getInputFormat(), (String)"Query type: 'incremental' should be supported");
    }

    @Test
    void testGetTableAvroSchema() {
        HoodieTableSource tableSource = this.getEmptyStreamingSource();
        Assertions.assertNull((Object)tableSource.getMetaClient(), (String)"Streaming source with empty table path is allowed");
        String schemaFields = tableSource.getTableAvroSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.joining(","));
        String expected = "_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,uuid,name,age,ts,partition";
        MatcherAssert.assertThat((Object)schemaFields, (Matcher)Is.is((Object)"_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,uuid,name,age,ts,partition"));
    }

    @Test
    void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() {
        HoodieTableSource tableSource = this.getEmptyStreamingSource();
        FieldReferenceExpression ref = new FieldReferenceExpression("uuid", DataTypes.STRING(), 0, 0);
        ValueLiteralExpression literal = new ValueLiteralExpression((Object)"1", (DataType)DataTypes.STRING().notNull());
        CallExpression filterExpr = new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.IN, Arrays.asList(ref, literal), DataTypes.BOOLEAN());
        List<CallExpression> expectedFilters = Collections.singletonList(filterExpr);
        tableSource.applyFilters(expectedFilters);
        HoodieTableSource copiedSource = (HoodieTableSource)tableSource.copy();
        ColumnStatsProbe columnStatsProbe = copiedSource.getColumnStatsProbe();
        Assertions.assertNotNull((Object)columnStatsProbe);
    }

    @ParameterizedTest
    @MethodSource(value={"filtersAndResults"})
    void testDataSkippingWithPartitionStatsPruning(List<ResolvedExpression> filters, List<String> expectedPartitions) throws Exception {
        String path = this.tempFile.getAbsolutePath();
        this.conf = TestConfigurations.getDefaultConf(path);
        this.conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), true);
        this.conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), true);
        this.conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, (Object)true);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        HoodieTableSource hoodieTableSource = this.createHoodieTableSource(this.conf);
        hoodieTableSource.applyFilters(filters);
        Assertions.assertEquals(expectedPartitions, (Object)hoodieTableSource.getReadPartitions());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testBucketPruning(boolean hiveStylePartitioning) throws Exception {
        String tablePath1 = new Path(this.tempFile.getAbsolutePath(), "tbl1").toString();
        Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1);
        conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
        conf1.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf1);
        HoodieTableSource tableSource1 = this.createHoodieTableSource(conf1);
        tableSource1.applyFilters(Collections.singletonList(this.createLitEquivalenceExpr("uuid", 0, (DataType)DataTypes.STRING().notNull(), "id1")));
        MatcherAssert.assertThat((Object)tableSource1.getDataBucket(), (Matcher)Is.is((Object)1));
        List fileList = tableSource1.getReadFiles();
        MatcherAssert.assertThat((String)"Files should be pruned by bucket id 1", (Object)fileList.size(), (Matcher)CoreMatchers.is((Object)2));
        Configuration conf2 = conf1.clone();
        String tablePath2 = new Path(this.tempFile.getAbsolutePath(), "tbl2").toString();
        conf2.setString(FlinkOptions.PATH, tablePath2);
        conf2.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid,name");
        conf2.setString(FlinkOptions.KEYGEN_TYPE, "COMPLEX");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf2);
        HoodieTableSource tableSource2 = this.createHoodieTableSource(conf2);
        tableSource2.applyFilters(Arrays.asList(this.createLitEquivalenceExpr("uuid", 0, (DataType)DataTypes.STRING().notNull(), "id1"), this.createLitEquivalenceExpr("name", 1, (DataType)DataTypes.STRING().notNull(), "Danny")));
        MatcherAssert.assertThat((Object)tableSource2.getDataBucket(), (Matcher)Is.is((Object)3));
        List fileList2 = tableSource2.getReadFiles();
        MatcherAssert.assertThat((String)"Files should be pruned by bucket id 3", (Object)fileList2.size(), (Matcher)CoreMatchers.is((Object)3));
        tableSource2.reset();
        tableSource2.applyFilters(Arrays.asList(this.createLitEquivalenceExpr("name", 1, (DataType)DataTypes.STRING().notNull(), "Danny"), this.createLitEquivalenceExpr("uuid", 0, (DataType)DataTypes.STRING().notNull(), "id1")));
        MatcherAssert.assertThat((Object)tableSource2.getDataBucket(), (Matcher)Is.is((Object)3));
        MatcherAssert.assertThat((String)"Files should be pruned by bucket id 3", (Object)tableSource2.getReadFiles().size(), (Matcher)CoreMatchers.is((Object)3));
        Configuration conf3 = conf1.clone();
        String tablePath3 = new Path(this.tempFile.getAbsolutePath(), "tbl3").toString();
        conf3.setString(FlinkOptions.PATH, tablePath3);
        conf3.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid,name");
        conf3.setString(FlinkOptions.KEYGEN_TYPE, "COMPLEX");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf3);
        HoodieTableSource tableSource3 = this.createHoodieTableSource(conf3);
        tableSource3.applyFilters(Collections.singletonList(this.createLitEquivalenceExpr("uuid", 0, (DataType)DataTypes.STRING().notNull(), "id1")));
        MatcherAssert.assertThat((Object)tableSource3.getDataBucket(), (Matcher)Is.is((Object)-1));
        List fileList3 = tableSource3.getReadFiles();
        MatcherAssert.assertThat((String)"Partial pk filtering does not prune any files", (Object)fileList3.size(), (Matcher)CoreMatchers.is((Object)7));
        Configuration conf4 = conf1.clone();
        String tablePath4 = new Path(this.tempFile.getAbsolutePath(), "tbl4").toString();
        conf4.setString(FlinkOptions.PATH, tablePath4);
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf4);
        HoodieTableSource tableSource4 = this.createHoodieTableSource(conf4);
        tableSource4.applyFilters(Arrays.asList(this.createLitEquivalenceExpr("uuid", 0, (DataType)DataTypes.STRING().notNull(), "id1"), this.createLitEquivalenceExpr("name", 1, (DataType)DataTypes.STRING().notNull(), "Danny")));
        MatcherAssert.assertThat((Object)tableSource4.getDataBucket(), (Matcher)Is.is((Object)1));
        List fileList4 = tableSource4.getReadFiles();
        MatcherAssert.assertThat((String)"Files should be pruned by bucket id 1", (Object)fileList4.size(), (Matcher)CoreMatchers.is((Object)2));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testBucketPruningSpecialKeyDataType(boolean logicalTimestamp) throws Exception {
        String tablePath1 = new Path(this.tempFile.getAbsolutePath(), "tbl1").toString();
        Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1, TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
        String f1 = "f_timestamp";
        conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
        conf1.setString(FlinkOptions.RECORD_KEY_FIELD, "f_timestamp");
        conf1.setString(FlinkOptions.PRECOMBINE_FIELD, "f_timestamp");
        conf1.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
        conf1.setBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), logicalTimestamp);
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE, conf1);
        HoodieTableSource tableSource1 = this.createHoodieTableSource(conf1);
        tableSource1.applyFilters(Collections.singletonList(this.createLitEquivalenceExpr("f_timestamp", 0, (DataType)DataTypes.TIMESTAMP((int)3).notNull(), LocalDateTime.ofInstant(Instant.ofEpochMilli(1L), ZoneId.of("UTC")))));
        MatcherAssert.assertThat((Object)tableSource1.getDataBucket(), (Matcher)Is.is((Object)(logicalTimestamp ? 1 : 0)));
        List fileList = tableSource1.getReadFiles();
        MatcherAssert.assertThat((String)"Files should be pruned", (Object)fileList.size(), (Matcher)CoreMatchers.is((Object)1));
        Configuration conf2 = conf1.clone();
        String f2 = "f_date";
        String tablePath2 = new Path(this.tempFile.getAbsolutePath(), "tbl2").toString();
        conf2.setString(FlinkOptions.PATH, tablePath2);
        conf2.setString(FlinkOptions.RECORD_KEY_FIELD, "f_date");
        conf2.setString(FlinkOptions.PRECOMBINE_FIELD, "f_date");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE, conf2);
        HoodieTableSource tableSource2 = this.createHoodieTableSource(conf2);
        tableSource2.applyFilters(Collections.singletonList(this.createLitEquivalenceExpr("f_date", 1, (DataType)DataTypes.DATE().notNull(), LocalDate.ofEpochDay(1L))));
        MatcherAssert.assertThat((Object)tableSource2.getDataBucket(), (Matcher)Is.is((Object)1));
        List fileList2 = tableSource2.getReadFiles();
        MatcherAssert.assertThat((String)"Files should be pruned", (Object)fileList2.size(), (Matcher)CoreMatchers.is((Object)1));
        Configuration conf3 = conf1.clone();
        String f3 = "f_decimal";
        String tablePath3 = new Path(this.tempFile.getAbsolutePath(), "tbl3").toString();
        conf3.setString(FlinkOptions.PATH, tablePath3);
        conf3.setString(FlinkOptions.RECORD_KEY_FIELD, "f_decimal");
        conf3.setString(FlinkOptions.PRECOMBINE_FIELD, "f_decimal");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE, conf3);
        HoodieTableSource tableSource3 = this.createHoodieTableSource(conf3);
        tableSource3.applyFilters(Collections.singletonList(this.createLitEquivalenceExpr("f_decimal", 1, (DataType)DataTypes.DECIMAL((int)3, (int)2).notNull(), new BigDecimal("1.11"))));
        MatcherAssert.assertThat((Object)tableSource3.getDataBucket(), (Matcher)Is.is((Object)0));
        List fileList3 = tableSource3.getReadFiles();
        MatcherAssert.assertThat((String)"Files should be pruned", (Object)fileList3.size(), (Matcher)CoreMatchers.is((Object)1));
    }

    @Test
    void testHoodieSourceCachedMetaClient() {
        HoodieTableSource tableSource = this.getEmptyStreamingSource();
        HoodieTableMetaClient metaClient = tableSource.getMetaClient();
        HoodieTableSource tableSourceCopy = (HoodieTableSource)tableSource.copy();
        MatcherAssert.assertThat((Object)metaClient, (Matcher)Is.is((Object)tableSourceCopy.getMetaClient()));
    }

    @Test
    void testFilterPushDownWithParquetPredicates() {
        HoodieTableSource tableSource = this.getEmptyStreamingSource();
        ArrayList<Object> expressions = new ArrayList<Object>();
        expressions.add(new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0));
        expressions.add(new ValueLiteralExpression((Object)10));
        CallExpression equalsExpression = new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.EQUALS, expressions, DataTypes.BOOLEAN());
        CallExpression greaterThanExpression = new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.GREATER_THAN, expressions, DataTypes.BOOLEAN());
        CallExpression orExpression = new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.OR, Arrays.asList(equalsExpression, greaterThanExpression), DataTypes.BOOLEAN());
        List<ResolvedExpression> expectedFilters = Arrays.asList(equalsExpression, greaterThanExpression, orExpression);
        tableSource.applyFilters(expectedFilters);
        String actualPredicates = tableSource.getPredicates().toString();
        Assertions.assertEquals((Object)ExpressionPredicates.fromExpression(expectedFilters).toString(), (Object)actualPredicates);
    }

    private static Stream<Arguments> filtersAndResults() {
        CallExpression filter1 = new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.GREATER_THAN, Arrays.asList(new FieldReferenceExpression("uuid", DataTypes.STRING(), 0, 0), new ValueLiteralExpression((Object)"id5", (DataType)DataTypes.STRING().notNull())), DataTypes.BOOLEAN());
        CallExpression filter2 = new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.LESS_THAN, Arrays.asList(new FieldReferenceExpression("partition", DataTypes.STRING(), 4, 4), new ValueLiteralExpression((Object)"par4", (DataType)DataTypes.STRING().notNull())), DataTypes.BOOLEAN());
        Object[][] data = new Object[][]{{Arrays.asList(filter1), Arrays.asList("par3", "par4")}, {Arrays.asList(filter2), Arrays.asList("par1", "par2", "par3")}, {Arrays.asList(filter1, filter2), Arrays.asList("par3")}};
        return Stream.of(data).map(Arguments::of);
    }

    private HoodieTableSource getEmptyStreamingSource() {
        String path = this.tempFile.getAbsolutePath();
        this.conf = TestConfigurations.getDefaultConf(path);
        this.conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
        this.conf.setBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
        return this.createHoodieTableSource(this.conf);
    }

    private HoodieTableSource createHoodieTableSource(Configuration conf) {
        return new HoodieTableSource(SerializableSchema.create((ResolvedSchema)TestConfigurations.TABLE_SCHEMA), new StoragePath(conf.getString(FlinkOptions.PATH)), Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), "default-par", conf);
    }

    private ResolvedExpression createLitEquivalenceExpr(String fieldName, int fieldIdx, DataType dataType, Object val) {
        FieldReferenceExpression ref = new FieldReferenceExpression(fieldName, dataType, fieldIdx, fieldIdx);
        ValueLiteralExpression literal = new ValueLiteralExpression(val, dataType);
        return new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.EQUALS, Arrays.asList(ref, literal), DataTypes.BOOLEAN());
    }
}

