package org.apache.hudi.utilities.perf;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.com.codahale.metrics.Histogram;
import org.apache.hudi.com.codahale.metrics.Snapshot;
import org.apache.hudi.com.codahale.metrics.UniformReservoir;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:org/apache/hudi/utilities/perf/TimelineServerPerf.class */
public class TimelineServerPerf implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LogManager.getLogger(TimelineServerPerf.class);
    private final Config cfg;
    private transient TimelineService timelineServer;
    private final boolean useExternalTimelineServer;
    private String hostAddr;

    /* loaded from: input_file:org/apache/hudi/utilities/perf/TimelineServerPerf$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--base-path", "-b"}, description = "Base Path", required = true)
        public String basePath = "";

        @Parameter(names = {"--report-dir", "-rd"}, description = "Dir where reports are added", required = true)
        public String reportDir = "";

        @Parameter(names = {"--max-partitions", "-m"}, description = "Mx partitions to be loaded")
        public Integer maxPartitions = 1000;

        @Parameter(names = {"--num-executors", "-e"}, description = "num executors")
        public Integer numExecutors = 10;

        @Parameter(names = {"--num-cores", "-c"}, description = "num cores")
        public Integer numCoresPerExecutor = 10;

        @Parameter(names = {"--num-iterations", "-i"}, description = "Number of iterations for each partitions")
        public Integer numIterations = 10;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
        public String sparkMaster = "local[2]";

        @Parameter(names = {"--server-port", "-p"}, description = " Server Port")
        public Integer serverPort = 26754;

        @Parameter(names = {"--server-host", "-sh"}, description = " Server Host (Set it for externally managed timeline service")
        public String serverHost = null;

        @Parameter(names = {"--view-storage", "-st"}, description = "View Storage Type. Defaut - SPILLABLE_DISK")
        public FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;

        @Parameter(names = {"--max-view-mem-per-table", "-mv"}, description = "Maximum view memory per table in MB to be used for storing file-groups. Overflow file-groups will be spilled to disk. Used for SPILLABLE_DISK storage type")
        public Integer maxViewMemPerTableInMB = 2048;

        @Parameter(names = {"--mem-overhead-fraction-pending-compaction", "-cf"}, description = "Memory Fraction of --max-view-mem-per-table to be allocated for managing pending compaction storage. Overflow entries will be spilled to disk. Used for SPILLABLE_DISK storage type")
        public Double memFractionForCompactionPerTable = Double.valueOf(0.001d);

        @Parameter(names = {"--base-store-path", "-sp"}, description = "Directory where spilled view entries will be stored. Used for SPILLABLE_DISK storage type")
        public String baseStorePathForFileGroups = FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR;

        @Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root directory for RocksDB")
        public String rocksDBPath = FileSystemViewStorageConfig.DEFAULT_ROCKSDB_BASE_PATH;

        @Parameter(names = {"--wait-for-manual-queries", "-ww"})
        public Boolean waitForManualQueries = false;

        @Parameter(names = {"--help", "-h"})
        public Boolean help = false;

        public TimelineService.Config getTimelinServerConfig() {
            TimelineService.Config config = new TimelineService.Config();
            config.viewStorageType = this.viewStorageType;
            config.baseStorePathForFileGroups = this.baseStorePathForFileGroups;
            config.maxViewMemPerTableInMB = this.maxViewMemPerTableInMB;
            config.memFractionForCompactionPerTable = this.memFractionForCompactionPerTable;
            config.rocksDBPath = this.rocksDBPath;
            config.serverPort = this.serverPort;
            return config;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/utilities/perf/TimelineServerPerf$Dumper.class */
    public static class Dumper implements Serializable {
        private final Path dumpPath;
        private final FileSystem fileSystem;
        private FSDataOutputStream outputStream;

        public Dumper(FileSystem fileSystem, Path path) {
            this.dumpPath = path;
            this.fileSystem = fileSystem;
        }

        public void init() throws IOException {
            this.outputStream = this.fileSystem.create(this.dumpPath, true);
            addHeader();
        }

        private void addHeader() throws IOException {
            this.outputStream.write("Partition,Thread,Min,Max,Mean,Median,75th,95th\n".getBytes());
            this.outputStream.flush();
        }

        public void dump(List<PerfStats> list) {
            list.forEach(perfStats -> {
                String format = String.format("%s,%d,%d,%d,%f,%f,%f,%f\n", perfStats.partition, Integer.valueOf(perfStats.id), Long.valueOf(perfStats.minTime), Long.valueOf(perfStats.maxTime), Double.valueOf(perfStats.meanTime), Double.valueOf(perfStats.medianTime), Double.valueOf(perfStats.p75), Double.valueOf(perfStats.p95));
                System.out.println(format);
                try {
                    this.outputStream.write(format.getBytes());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }

        public void close() throws IOException {
            this.outputStream.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/utilities/perf/TimelineServerPerf$PerfStats.class */
    public static class PerfStats implements Serializable {
        private final String partition;
        private final int id;
        private final long minTime;
        private final long maxTime;
        private final double meanTime;
        private final double medianTime;
        private final double p95;
        private final double p75;

        public PerfStats(String str, int i, Snapshot snapshot) {
            this(str, i, snapshot.getMin(), snapshot.getMax(), snapshot.getMean(), snapshot.getMedian(), snapshot.get95thPercentile(), snapshot.get75thPercentile());
        }

        public PerfStats(String str, int i, long j, long j2, double d, double d2, double d3, double d4) {
            this.partition = str;
            this.id = i;
            this.minTime = j;
            this.maxTime = j2;
            this.meanTime = d;
            this.medianTime = d2;
            this.p95 = d3;
            this.p75 = d4;
        }
    }

    public TimelineServerPerf(Config config) throws IOException {
        this.cfg = config;
        this.useExternalTimelineServer = config.serverHost != null;
        this.timelineServer = new TimelineService(config.getTimelinServerConfig());
    }

    private void setHostAddrFromSparkConf(SparkConf sparkConf) {
        String str = sparkConf.get("spark.driver.host", (String) null);
        if (str == null) {
            LOG.warn("Unable to find driver bind address from spark config");
        } else {
            LOG.info("Overriding hostIp to (" + str + ") found in spark-conf. It was " + this.hostAddr);
            this.hostAddr = str;
        }
    }

    public void run() throws IOException {
        List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.timelineServer.getFs(), this.cfg.basePath, true);
        Collections.shuffle(allPartitionPaths);
        List<String> list = (List) allPartitionPaths.stream().filter(str -> {
            return !str.contains("error");
        }).limit(this.cfg.maxPartitions.intValue()).collect(Collectors.toList());
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("hudi-view-perf-" + this.cfg.basePath, this.cfg.sparkMaster);
        if (this.useExternalTimelineServer) {
            this.hostAddr = this.cfg.serverHost;
        } else {
            this.timelineServer.startService();
            setHostAddrFromSparkConf(buildSparkContext.getConf());
        }
        HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(this.timelineServer.getConf(), this.cfg.basePath, true);
        RemoteHoodieTableFileSystemView remoteHoodieTableFileSystemView = new RemoteHoodieTableFileSystemView(this.hostAddr, this.cfg.serverPort.intValue(), hoodieTableMetaClient);
        String str2 = this.cfg.reportDir;
        hoodieTableMetaClient.getFs().mkdirs(new Path(str2));
        String uuid = UUID.randomUUID().toString();
        System.out.println("First Iteration to load all partitions");
        Dumper dumper = new Dumper(hoodieTableMetaClient.getFs(), new Path(str2, String.format("1_%s.csv", uuid)));
        dumper.init();
        dumper.dump(runLookups(buildSparkContext, list, remoteHoodieTableFileSystemView, 1, 0));
        dumper.close();
        System.out.println("\n\n\n First Iteration is done");
        Dumper dumper2 = new Dumper(hoodieTableMetaClient.getFs(), new Path(str2, String.format("2_%s.csv", uuid)));
        dumper2.init();
        dumper2.dump(runLookups(buildSparkContext, list, remoteHoodieTableFileSystemView, this.cfg.numIterations.intValue(), this.cfg.numCoresPerExecutor.intValue()));
        dumper2.close();
        System.out.println("\n\n\nDumping all File Slices");
        list.forEach(str3 -> {
            remoteHoodieTableFileSystemView.getAllFileSlices(str3).forEach(fileSlice -> {
                System.out.println("\tMyFileSlice=" + fileSlice);
            });
        });
        if (this.useExternalTimelineServer || !this.cfg.waitForManualQueries.booleanValue()) {
            return;
        }
        System.out.println("Timeline Server Host Address=" + this.hostAddr + ", port=" + this.timelineServer.getServerPort());
        while (true) {
            try {
                Thread.sleep(60000L);
            } catch (InterruptedException e) {
            }
        }
    }

    public List<PerfStats> runLookups(JavaSparkContext javaSparkContext, List<String> list, SyncableFileSystemView syncableFileSystemView, int i, int i2) {
        javaSparkContext.setJobGroup(getClass().getSimpleName(), "Lookup all performance stats");
        return javaSparkContext.parallelize(list, this.cfg.numExecutors.intValue()).flatMap(str -> {
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(100);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            List list2 = (List) syncableFileSystemView.getLatestFileSlices(str).collect(Collectors.toList());
            String fileId = list2.isEmpty() ? "dummyId" : ((FileSlice) list2.get(new Random(Double.doubleToLongBits(Math.random())).nextInt(list2.size()))).getFileId();
            IntStream.range(0, i2).forEach(i3 -> {
                arrayList2.add(scheduledThreadPoolExecutor.schedule(() -> {
                    return runOneRound(syncableFileSystemView, str, fileId, i3, i);
                }, 0L, TimeUnit.NANOSECONDS));
            });
            arrayList2.forEach(scheduledFuture -> {
                try {
                    arrayList.add(scheduledFuture.get());
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
            System.out.println("SLICES are=");
            list2.forEach(fileSlice -> {
                System.out.println("\t\tFileSlice=" + fileSlice);
            });
            return arrayList.iterator();
        }).collect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PerfStats runOneRound(SyncableFileSystemView syncableFileSystemView, String str, String str2, int i, int i2) {
        Histogram histogram = new Histogram(new UniformReservoir(10000));
        for (int i3 = 0; i3 < i2; i3++) {
            long currentTimeMillis = System.currentTimeMillis();
            Option<FileSlice> latestFileSlice = syncableFileSystemView.getLatestFileSlice(str, str2);
            long currentTimeMillis2 = System.currentTimeMillis();
            System.out.println("Latest File Slice for part=" + str + ", fileId=" + str2 + ", Slice=" + latestFileSlice + ", Time=" + (currentTimeMillis2 - currentTimeMillis));
            histogram.update(currentTimeMillis2 - currentTimeMillis);
        }
        return new PerfStats(str, i, histogram.getSnapshot());
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        new TimelineServerPerf(config).run();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -894714564:
                if (implMethodName.equals("lambda$runLookups$aa4e8fb9$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/perf/TimelineServerPerf") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/table/view/SyncableFileSystemView;IILjava/lang/String;)Ljava/util/Iterator;")) {
                    SyncableFileSystemView syncableFileSystemView = (SyncableFileSystemView) serializedLambda.getCapturedArg(0);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    return str -> {
                        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(100);
                        List arrayList = new ArrayList();
                        List arrayList2 = new ArrayList();
                        List list2 = (List) syncableFileSystemView.getLatestFileSlices(str).collect(Collectors.toList());
                        String fileId = list2.isEmpty() ? "dummyId" : ((FileSlice) list2.get(new Random(Double.doubleToLongBits(Math.random())).nextInt(list2.size()))).getFileId();
                        IntStream.range(0, intValue).forEach(i3 -> {
                            arrayList2.add(scheduledThreadPoolExecutor.schedule(() -> {
                                return runOneRound(syncableFileSystemView, str, fileId, i3, intValue2);
                            }, 0L, TimeUnit.NANOSECONDS));
                        });
                        arrayList2.forEach(scheduledFuture -> {
                            try {
                                arrayList.add(scheduledFuture.get());
                            } catch (InterruptedException | ExecutionException e) {
                                throw new RuntimeException(e);
                            }
                        });
                        System.out.println("SLICES are=");
                        list2.forEach(fileSlice -> {
                            System.out.println("\t\tFileSlice=" + fileSlice);
                        });
                        return arrayList.iterator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
