package org.apache.hudi.testutils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieHFileUtils;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/testutils/HoodieClientTestUtils.class */
public class HoodieClientTestUtils {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestUtils.class);

    public static SparkConf getSparkConfForTest(String str) {
        SparkConf sparkConf = new SparkConf().setAppName(str).setMaster("local[4]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar").set("spark.sql.shuffle.partitions", "4").set("spark.default.parallelism", "4");
        if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) {
            sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
        }
        if (canLoadClass("org.apache.spark.sql.hudi.catalog.HoodieCatalog") && HoodieSparkUtils.gteqSpark3_2()) {
            sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog");
        }
        String property = System.getProperty("SPARK_EVLOG_DIR");
        if (property != null) {
            sparkConf.set("spark.eventLog.enabled", "true");
            sparkConf.set("spark.eventLog.dir", property);
            sparkConf.set("spark.ui.enabled", "true");
        } else {
            sparkConf.set("spark.ui.enabled", "false");
        }
        return SparkRDDReadClient.addHoodieSupport(sparkConf);
    }

    private static HashMap<String, String> getLatestFileIDsToFullPath(String str, HoodieTimeline hoodieTimeline, List<HoodieInstant> list) throws IOException {
        HashMap<String, String> hashMap = new HashMap<>();
        Iterator<HoodieInstant> it = list.iterator();
        while (it.hasNext()) {
            hashMap.putAll(((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTimeline.getInstantDetails(it.next()).get(), HoodieCommitMetadata.class)).getFileIdAndFullPaths(new Path(str)));
        }
        return hashMap;
    }

    public static Dataset<Row> readCommit(String str, SQLContext sQLContext, HoodieTimeline hoodieTimeline, String str2) {
        return readCommit(str, sQLContext, hoodieTimeline, str2, true);
    }

    public static Dataset<Row> readCommit(String str, SQLContext sQLContext, HoodieTimeline hoodieTimeline, String str2, boolean z) {
        HoodieInstant hoodieInstant = new HoodieInstant(false, "commit", str2);
        if (!hoodieTimeline.containsInstant(hoodieInstant)) {
            throw new HoodieException("No commit exists at " + str2);
        }
        try {
            HashMap<String, String> latestFileIDsToFullPath = getLatestFileIDsToFullPath(str, hoodieTimeline, Arrays.asList(hoodieInstant));
            LOG.info("Path :" + latestFileIDsToFullPath.values());
            Dataset<Row> dataset = null;
            if (((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).equals(HoodieFileFormat.PARQUET)) {
                dataset = sQLContext.read().parquet((String[]) latestFileIDsToFullPath.values().toArray(new String[latestFileIDsToFullPath.size()]));
            } else if (((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).equals(HoodieFileFormat.ORC)) {
                dataset = sQLContext.read().orc((String[]) latestFileIDsToFullPath.values().toArray(new String[latestFileIDsToFullPath.size()]));
            }
            return dataset != null ? z ? dataset.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, str2)) : dataset : sQLContext.emptyDataFrame();
        } catch (Exception e) {
            throw new HoodieException("Error reading commit " + str2, e);
        }
    }

    public static long countRecordsOptionallySince(JavaSparkContext javaSparkContext, String str, SQLContext sQLContext, HoodieTimeline hoodieTimeline, Option<String> option) {
        try {
            HashMap<String, String> latestFileIDsToFullPath = getLatestFileIDsToFullPath(str, hoodieTimeline, option.isPresent() ? hoodieTimeline.findInstantsAfter((String) option.get(), Integer.MAX_VALUE).getInstants() : hoodieTimeline.getInstants());
            String[] strArr = (String[]) latestFileIDsToFullPath.values().toArray(new String[latestFileIDsToFullPath.size()]);
            if (strArr[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
                Dataset parquet = sQLContext.read().parquet(strArr);
                return option.isPresent() ? parquet.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, option.get())).count() : parquet.count();
            }
            if (strArr[0].endsWith(HoodieFileFormat.HFILE.getFileExtension())) {
                Stream<GenericRecord> readHFile = readHFile(javaSparkContext, strArr);
                return option.isPresent() ? readHFile.filter(genericRecord -> {
                    return HoodieTimeline.compareTimestamps((String) option.get(), HoodieActiveTimeline.LESSER_THAN, genericRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
                }).count() : readHFile.count();
            }
            if (!strArr[0].endsWith(HoodieFileFormat.ORC.getFileExtension())) {
                throw new HoodieException("Unsupported base file format for file :" + strArr[0]);
            }
            Dataset orc = sQLContext.read().orc(strArr);
            return option.isPresent() ? orc.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, option.get())).count() : orc.count();
        } catch (IOException e) {
            throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + ((String) option.get()), e);
        }
    }

    public static List<HoodieBaseFile> getLatestBaseFiles(String str, FileSystem fileSystem, String... strArr) {
        ArrayList arrayList = new ArrayList();
        try {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            for (String str2 : strArr) {
                arrayList.addAll((Collection) new HoodieTableFileSystemView(build, build.getCommitsTimeline().filterCompletedInstants(), fileSystem.globStatus(new Path(str2))).getLatestBaseFiles().collect(Collectors.toList()));
            }
            return arrayList;
        } catch (Exception e) {
            throw new HoodieException("Error reading hoodie table as a dataframe", e);
        }
    }

    public static Dataset<Row> read(JavaSparkContext javaSparkContext, String str, SQLContext sQLContext, FileSystem fileSystem, String... strArr) {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<HoodieBaseFile> it = getLatestBaseFiles(str, fileSystem, strArr).iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getPath());
            }
            if (arrayList.isEmpty()) {
                return sQLContext.emptyDataFrame();
            }
            String[] strArr2 = (String[]) arrayList.toArray(new String[arrayList.size()]);
            return strArr2[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension()) ? sQLContext.read().parquet(strArr2) : strArr2[0].endsWith(HoodieFileFormat.ORC.getFileExtension()) ? sQLContext.read().orc(strArr2) : sQLContext.emptyDataFrame();
        } catch (Exception e) {
            throw new HoodieException("Error reading hoodie table as a dataframe", e);
        }
    }

    public static Stream<GenericRecord> readHFile(JavaSparkContext javaSparkContext, String[] strArr) {
        int i;
        LinkedList linkedList = new LinkedList();
        FileSystem fs = FSUtils.getFs(strArr[0], javaSparkContext.hadoopConfiguration());
        CacheConfig cacheConfig = new CacheConfig(fs.getConf());
        Schema schema = null;
        for (String str : strArr) {
            try {
                HFile.Reader createHFileReader = HoodieHFileUtils.createHFileReader(fs, new Path(str), cacheConfig, fs.getConf());
                if (schema == null) {
                    schema = new Schema.Parser().parse(new String(createHFileReader.getHFileInfo().get(StringUtils.getUTF8Bytes("schema"))));
                }
                HFileScanner scanner = createHFileReader.getScanner(false, false);
                i = scanner.seekTo() ? 0 : i + 1;
                do {
                    Cell cell = scanner.getCell();
                    linkedList.add(HoodieAvroUtils.bytesToAvro(Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()), schema));
                } while (scanner.next());
            } catch (IOException e) {
                throw new HoodieException("Error reading hfile " + str + " as a dataframe", e);
            }
        }
        return linkedList.stream();
    }

    public static TimelineService initTimelineService(HoodieEngineContext hoodieEngineContext, String str, int i) {
        try {
            HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(str).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(i)).build()).build();
            TimelineService timelineService = new TimelineService(hoodieEngineContext, new Configuration(), TimelineService.Config.builder().enableMarkerRequests(true).serverPort(build.getViewStorageConfig().getRemoteViewServerPort().intValue()).build(), FileSystem.get(new Configuration()), FileSystemViewManager.createViewManager(hoodieEngineContext, build.getMetadataConfig(), build.getViewStorageConfig(), build.getCommonConfig()));
            timelineService.startService();
            LOG.info("Timeline service server port: " + i);
            return timelineService;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static Option<HoodieCommitMetadata> getCommitMetadataForLatestInstant(HoodieTableMetaClient hoodieTableMetaClient) {
        HoodieTimeline filterCompletedInstants = hoodieTableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        return filterCompletedInstants.lastInstant().isPresent() ? getCommitMetadataForInstant(hoodieTableMetaClient, (HoodieInstant) filterCompletedInstants.lastInstant().get()) : Option.empty();
    }

    private static Option<HoodieCommitMetadata> getCommitMetadataForInstant(HoodieTableMetaClient hoodieTableMetaClient, HoodieInstant hoodieInstant) {
        try {
            return Option.of(HoodieCommitMetadata.fromBytes((byte[]) hoodieTableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class));
        } catch (Exception e) {
            throw new HoodieException("Failed to read schema from commit metadata", e);
        }
    }

    private static boolean canLoadClass(String str) {
        try {
            return ReflectionUtils.getClass(str) != null;
        } catch (Exception e) {
            return false;
        }
    }
}
