package org.apache.hudi.functional;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/functional/TestDataSkippingWithMORColstats.class */
public class TestDataSkippingWithMORColstats extends HoodieSparkClientTestBase {
    private static String matchCond = "trip_type = 'UBERX'";
    private static String nonMatchCond = "trip_type = 'BLACK'";
    private static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name"};
    private Boolean shouldOverwrite;
    Map<String, String> options;

    @TempDir
    public Path basePath;

    @BeforeEach
    public void setUp() throws Exception {
        initSparkContexts();
        this.dataGen = new HoodieTestDataGenerator();
        this.shouldOverwrite = true;
        this.options = getOptions();
        Properties properties = new Properties();
        properties.putAll(this.options);
        try {
            this.metaClient = HoodieTableMetaClient.initTableAndGetMetaClient(this.storageConf.newInstance(), this.basePath.toString(), properties);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @AfterEach
    public void tearDown() throws IOException {
        cleanupSparkContexts();
        cleanupTestDataGenerator();
        this.metaClient = null;
    }

    @Test
    public void testBaseFileOnly() {
        this.options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        Dataset<Row> makeInsertDf = makeInsertDf("000", 100);
        Dataset<Row> where = makeInsertDf.where(matchCond);
        Dataset<Row> where2 = makeInsertDf.where(nonMatchCond);
        doWrite(where);
        doWrite(where2);
        List<Path> filesToCorrupt = getFilesToCorrupt();
        Assertions.assertEquals(1, filesToCorrupt.size());
        filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile);
        Assertions.assertEquals(0L, readMatchingRecords().except(where).count());
        Assertions.assertThrows(SparkException.class, () -> {
            readMatchingRecords(false).count();
        });
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatches() {
        testBaseFileAndLogFileUpdateMatchesHelper(false, false, false, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesDoCompaction() {
        testBaseFileAndLogFileUpdateMatchesHelper(false, true, false, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesScheduleCompaction() {
        testBaseFileAndLogFileUpdateMatchesHelper(true, false, false, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesDeleteBlock() {
        testBaseFileAndLogFileUpdateMatchesHelper(false, false, true, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesDeleteBlockCompact() {
        this.options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        testBaseFileAndLogFileUpdateMatchesHelper(false, true, true, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesAndRollBack() {
        this.options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        testBaseFileAndLogFileUpdateMatchesHelper(false, false, false, true);
    }

    private void testBaseFileAndLogFileUpdateMatchesHelper(Boolean bool, Boolean bool2, Boolean bool3, Boolean bool4) {
        Dataset<Row> makeInsertDf = makeInsertDf("000", 100);
        Dataset<Row> where = makeInsertDf.where(matchCond);
        Dataset<Row> where2 = makeInsertDf.where(nonMatchCond);
        doWrite(where);
        doWrite(where2);
        if (bool.booleanValue()) {
            doWrite(makeInsertDf);
            scheduleCompaction();
        }
        List<Path> filesToCorrupt = getFilesToCorrupt();
        Assertions.assertEquals(1, filesToCorrupt.size());
        Dataset<Row> limit = where2.limit(1);
        Dataset<Row> makeRecordMatch = makeRecordMatch(limit);
        doWrite(makeRecordMatch);
        if (bool4.booleanValue()) {
            deleteLatestDeltacommit();
            enableInlineCompaction(bool2);
            doWrite(limit);
            Assertions.assertEquals(0L, readMatchingRecords().except(where).count());
        } else if (bool3.booleanValue()) {
            enableInlineCompaction(bool2);
            doDelete(makeRecordMatch);
            Assertions.assertEquals(0L, readMatchingRecords().except(where).count());
        } else {
            Assertions.assertEquals(0L, readMatchingRecords().except(where.union(makeRecordMatch)).count());
        }
        if (!bool2.booleanValue()) {
            filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile);
            Assertions.assertEquals(1, filesToCorrupt.size());
            if (bool4.booleanValue()) {
                Assertions.assertDoesNotThrow(() -> {
                    return Long.valueOf(readMatchingRecords().count());
                }, "The non match file group got rollback correctly");
                return;
            } else {
                Assertions.assertThrows(SparkException.class, () -> {
                    readMatchingRecords().count();
                }, "The corrupt parquets are not excluded");
                return;
            }
        }
        List<Path> filesToCorrupt2 = getFilesToCorrupt();
        filesToCorrupt2.forEach(TestDataSkippingWithMORColstats::corruptFile);
        if (bool3.booleanValue() || bool4.booleanValue()) {
            Assertions.assertEquals(1, filesToCorrupt2.size());
            Assertions.assertEquals(0L, readMatchingRecords().except(where).count());
        } else {
            enableInlineCompaction(true);
            doWrite(makeRecordMatch);
            Assertions.assertEquals(0, filesToCorrupt2.size());
        }
    }

    @Test
    public void testBaseFileAndLogFileUpdateUnmatches() {
        testBaseFileAndLogFileUpdateUnmatchesHelper(false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateUnmatchesScheduleCompaction() {
        testBaseFileAndLogFileUpdateUnmatchesHelper(true);
    }

    private void testBaseFileAndLogFileUpdateUnmatchesHelper(Boolean bool) {
        Dataset<Row> makeInsertDf = makeInsertDf("000", 100);
        Dataset<Row> where = makeInsertDf.where(matchCond);
        doWrite(where);
        Dataset<Row> where2 = makeInsertDf.where(nonMatchCond);
        Dataset<Row> limit = where2.limit(1);
        Dataset<Row> union = removeRecord(where2, limit).union(makeRecordMatch(limit));
        doWrite(union);
        if (bool.booleanValue()) {
            doWrite(where.union(union));
            scheduleCompaction();
        }
        doWrite(limit);
        Assertions.assertEquals(0L, readMatchingRecords().except(where).count());
        List<Path> filesToCorrupt = getFilesToCorrupt();
        Assertions.assertEquals(1, filesToCorrupt.size());
        filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile);
        Assertions.assertThrows(SparkException.class, () -> {
            readMatchingRecords().count();
        });
    }

    private Map<String, String> getOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieMetadataConfig.ENABLE.key(), "true");
        hashMap.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
        hashMap.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key(), "trip_type");
        hashMap.put(DataSourceReadOptions.ENABLE_DATA_SKIPPING().key(), "true");
        hashMap.put(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
        hashMap.put(HoodieWriteConfig.TBL_NAME.key(), "testTable");
        hashMap.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp");
        hashMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
        hashMap.put("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator");
        hashMap.put(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0");
        hashMap.put(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key(), "false");
        hashMap.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
        return hashMap;
    }

    private void scheduleCompaction() {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(HoodieWriteConfig.newBuilder().withPath(this.basePath.toString()).withRollbackUsingMarkers(false).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("trip_type").build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()).forTable("testTable").withKeyGenerator("org.apache.hudi.keygen.NonpartitionedKeyGenerator").build());
        Throwable th = null;
        try {
            hoodieWriteClient.scheduleCompactionAtInstant(hoodieWriteClient.createNewInstantTime(), Option.empty());
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private Dataset<Row> removeRecord(Dataset<Row> dataset, Dataset<Row> dataset2) {
        return dataset.where("_row_key != '" + ((Row) dataset2.first()).getString(1) + "'");
    }

    private List<Path> getFilesToCorrupt() {
        HashSet hashSet = new HashSet();
        this.sparkSession.read().format("hudi").load(this.basePath.toString()).where(matchCond).select("_hoodie_file_name", new String[0]).distinct().collectAsList().forEach(row -> {
            String string = row.getString(0);
            if (string.contains(".parquet")) {
                hashSet.add(FSUtils.getFileId(string));
            } else {
                hashSet.add(string);
            }
        });
        try {
            Stream<Path> list = Files.list(this.basePath);
            Throwable th = null;
            try {
                try {
                    HashMap hashMap = new HashMap();
                    ((List) list.filter(path -> {
                        return !Files.isDirectory(path, new LinkOption[0]);
                    }).filter(path2 -> {
                        return path2.toString().contains(".parquet");
                    }).filter(path3 -> {
                        return !path3.toString().contains(".crc");
                    }).filter(path4 -> {
                        return !hashSet.contains(FSUtils.getFileId(path4.getFileName().toString()));
                    }).collect(Collectors.toList())).forEach(path5 -> {
                        String fileId = FSUtils.getFileId(path5.getFileName().toString());
                        if (!hashMap.containsKey(fileId) || FSUtils.getCommitTime(path5.getFileName().toString()).compareTo(FSUtils.getCommitTime(((Path) hashMap.get(fileId)).getFileName().toString())) > 0) {
                            hashMap.put(fileId, path5);
                        }
                    });
                    ArrayList arrayList = new ArrayList(hashMap.values());
                    if (list != null) {
                        if (0 != 0) {
                            try {
                                list.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            list.close();
                        }
                    }
                    return arrayList;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void doWrite(Dataset<Row> dataset) {
        if (!this.shouldOverwrite.booleanValue()) {
            dataset.write().format("hudi").options(this.options).mode(SaveMode.Append).save(this.basePath.toString());
        } else {
            this.shouldOverwrite = false;
            dataset.write().format("hudi").options(this.options).mode(SaveMode.Overwrite).save(this.basePath.toString());
        }
    }

    private void doDelete(Dataset<Row> dataset) {
        dataset.write().format("hudi").options(this.options).option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath.toString());
    }

    private Dataset<Row> makeRecordMatch(Dataset<Row> dataset) {
        return updateTripType(dataset, "UBERX");
    }

    private Dataset<Row> updateTripType(Dataset<Row> dataset, String str) {
        dataset.createOrReplaceTempView("rowToMod");
        return this.sparkSession.sqlContext().createDataFrame(this.sparkSession.sql("select _hoodie_is_deleted, _row_key, begin_lat, begin_lon, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare, height, nation, partition, partition_path, rider, seconds_since_epoch, timestamp, tip_history, '" + str + "' as trip_type, weight from rowToMod").rdd(), dataset.schema());
    }

    private Dataset<Row> readMatchingRecords() {
        return readMatchingRecords(true);
    }

    public Dataset<Row> readMatchingRecords(Boolean bool) {
        return bool.booleanValue() ? this.sparkSession.read().format("hudi").options(this.options).load(this.basePath.toString()).where(matchCond).drop(dropColumns) : this.sparkSession.read().format("hudi").option(DataSourceReadOptions.ENABLE_DATA_SKIPPING().key(), "false").load(this.basePath.toString()).where(matchCond).drop(dropColumns);
    }

    protected static void corruptFile(Path path) {
        File file = path.toFile();
        file.delete();
        try {
            file.createNewFile();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected Dataset<Row> makeInsertDf(String str, Integer num) {
        return this.sparkSession.read().json(this.jsc.parallelize((List) this.dataGen.generateInserts(str, num).stream().map(hoodieRecord -> {
            return (String) RawTripTestPayload.recordToString(hoodieRecord).get();
        }).collect(Collectors.toList()))).drop("city_to_state");
    }

    public void deleteLatestDeltacommit() {
        new File(this.metaClient.getBasePath() + "/.hoodie/" + ((HoodieInstant) this.metaClient.getActiveTimeline().lastInstant().get()).getFileName()).delete();
    }

    public void enableInlineCompaction(Boolean bool) {
        if (bool.booleanValue()) {
            this.options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
            this.options.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1");
        }
    }
}
