package org.apache.hudi.table.functional;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.class */
public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientFunctionalTestHarness {
    private JobConf roSnapshotJobConf;
    private JobConf roJobConf;
    private JobConf rtJobConf;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.table.functional.TestHoodieSparkMergeOnReadTableIncrementalRead$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$HoodieFileFormat = new int[HoodieFileFormat.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieFileFormat[HoodieFileFormat.PARQUET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieFileFormat[HoodieFileFormat.HFILE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @BeforeEach
    void setUp() {
        this.roSnapshotJobConf = new JobConf(hadoopConf());
        this.roJobConf = new JobConf(hadoopConf());
        this.rtJobConf = new JobConf(hadoopConf());
    }

    @Test
    public void testIncrementalReadsWithCompaction() throws Exception {
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(new String[]{"2020/02/20"});
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString());
        HoodieTableMetaClient hoodieMetaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieWriteConfig build = getConfigBuilder(true).build();
        SparkRDDWriteClient hoodieWriteClient = m42getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                Assertions.assertTrue(insertRecordsToMORTable(hoodieMetaClient, hoodieTestDataGenerator.generateInserts("001", 200), hoodieWriteClient, build, "001").findAny().isPresent(), "should list the base files we wrote in the delta commit");
                validateFiles("2020/02/20", 1, getROSnapshotFiles("2020/02/20"), false, this.roSnapshotJobConf, 200, "001");
                FileStatus[] rOIncrementalFiles = getROIncrementalFiles("2020/02/20", true);
                validateFiles("2020/02/20", 1, rOIncrementalFiles, false, this.roJobConf, 200, "001");
                Path path = rOIncrementalFiles[0].getPath();
                FileStatus[] rTIncrementalFiles = getRTIncrementalFiles("2020/02/20");
                validateFiles("2020/02/20", 1, rTIncrementalFiles, true, this.rtJobConf, 200, "001");
                Assertions.assertEquals(path, rTIncrementalFiles[0].getPath());
                hoodieWriteClient.startCommitWithTime("004");
                updateRecordsInMORTable(hoodieMetaClient, hoodieTestDataGenerator.generateUpdates("004", 100), hoodieWriteClient, build, "004", false);
                FileStatus[] rOIncrementalFiles2 = getROIncrementalFiles("2020/02/20", false);
                validateFiles("2020/02/20", 1, rOIncrementalFiles2, false, this.roJobConf, 200, "001");
                Assertions.assertEquals(path, rOIncrementalFiles2[0].getPath());
                validateFiles("2020/02/20", 1, getRTIncrementalFiles("2020/02/20"), true, this.rtJobConf, 200, "001", "004");
                hoodieWriteClient.scheduleCompactionAtInstant("005", Option.empty());
                validateFiles("2020/02/20", 1, getROIncrementalFiles("2020/02/20", true), false, this.roJobConf, 200, "001");
                validateFiles("2020/02/20", 1, getRTIncrementalFiles("2020/02/20"), true, this.rtJobConf, 200, "001", "004");
                List<HoodieRecord> generateInserts = hoodieTestDataGenerator.generateInserts("006", 200);
                hoodieWriteClient.startCommitWithTime("006");
                Assertions.assertTrue(insertRecordsToMORTable(hoodieMetaClient, generateInserts, hoodieWriteClient, build, "006").findAny().isPresent(), "should list the base files we wrote in the delta commit");
                validateFiles("2020/02/20", 2, getROSnapshotFiles("2020/02/20"), false, this.roSnapshotJobConf, 400, "001", "006");
                FileStatus[] rOIncrementalFiles3 = getROIncrementalFiles("2020/02/20", true);
                Assertions.assertEquals(path, rOIncrementalFiles3[0].getPath());
                validateFiles("2020/02/20", 1, rOIncrementalFiles3, false, this.roJobConf, 200, "001");
                validateFiles("2020/02/20", 2, getROIncrementalFiles("2020/02/20", false), false, this.roJobConf, 400, "001", "006");
                validateFiles("2020/02/20", 2, getRTIncrementalFiles("2020/02/20"), true, this.rtJobConf, 400, "001", "004", "006");
                hoodieWriteClient.compact("005");
                validateFiles("2020/02/20", 2, getROSnapshotFiles("2020/02/20"), false, this.roSnapshotJobConf, 400, "001", "005", "006");
                FileStatus[] rOIncrementalFiles4 = getROIncrementalFiles("2020/02/20", "002", -1, true);
                Assertions.assertTrue(rOIncrementalFiles4.length == 2);
                validateFiles("2020/02/20", 2, rOIncrementalFiles4, false, this.roJobConf, 400, "001", "005", "006");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private FileStatus[] getROSnapshotFiles(String str) throws Exception {
        FileInputFormat.setInputPaths(this.roSnapshotJobConf, Paths.get(basePath(), str).toString());
        return listStatus((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), this.roSnapshotJobConf, false);
    }

    private FileStatus[] getROIncrementalFiles(String str, boolean z) throws Exception {
        return getROIncrementalFiles(str, "000", -1, z);
    }

    private FileStatus[] getROIncrementalFiles(String str, String str2, int i, boolean z) throws Exception {
        setupIncremental(this.roJobConf, str2, i, z);
        FileInputFormat.setInputPaths(this.roJobConf, Paths.get(basePath(), str).toString());
        return listStatus((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), this.roJobConf, false);
    }

    private FileStatus[] getRTIncrementalFiles(String str) throws Exception {
        return getRTIncrementalFiles(str, "000", -1);
    }

    private FileStatus[] getRTIncrementalFiles(String str, String str2, int i) throws Exception {
        setupIncremental(this.rtJobConf, str2, i, false);
        FileInputFormat.setInputPaths(this.rtJobConf, Paths.get(basePath(), str).toString());
        return listStatus((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), this.rtJobConf, true);
    }

    private void setupIncremental(JobConf jobConf, String str, int i, boolean z) {
        jobConf.set(String.format("hoodie.%s.consume.mode", "raw_trips"), "INCREMENTAL");
        jobConf.set(String.format("hoodie.%s.consume.start.timestamp", "raw_trips"), str);
        jobConf.setInt(String.format("hoodie.%s.consume.max.commits", "raw_trips"), i);
        jobConf.setBoolean(String.format("hoodie.%s.ro.stop.at.compaction", "raw_trips"), z);
    }

    private void validateFiles(String str, int i, FileStatus[] fileStatusArr, boolean z, JobConf jobConf, int i2, String... strArr) {
        Assertions.assertEquals(i, fileStatusArr.length);
        Set set = (Set) Arrays.stream(strArr).collect(Collectors.toSet());
        List<GenericRecord> recordsUsingInputFormat = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), Collections.singletonList(Paths.get(basePath(), str).toString()), basePath(), jobConf, z);
        Assertions.assertEquals(i2, recordsUsingInputFormat.size());
        Assertions.assertEquals(set, (Set) recordsUsingInputFormat.stream().map(genericRecord -> {
            return genericRecord.get("_hoodie_commit_time").toString();
        }).collect(Collectors.toSet()));
    }

    private FileStatus[] listStatus(HoodieFileFormat hoodieFileFormat, JobConf jobConf, boolean z) throws IOException {
        HoodieParquetRealtimeInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(hoodieFileFormat, z, jobConf);
        switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$model$HoodieFileFormat[hoodieFileFormat.ordinal()]) {
            case 1:
                return z ? inputFormat.listStatus(jobConf) : ((HoodieParquetInputFormat) inputFormat).listStatus(jobConf);
            case 2:
                return z ? ((HoodieHFileRealtimeInputFormat) inputFormat).listStatus(jobConf) : ((HoodieHFileInputFormat) inputFormat).listStatus(jobConf);
            default:
                throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + hoodieFileFormat);
        }
    }
}
