package com.linkedin.parseq;

import com.linkedin.parseq.batching.BatchingSupport;
import com.linkedin.parseq.trace.ShallowTrace;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Exchanger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Base64CompressedHistogramSerializer;
import org.HdrHistogram.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/parseq/AbstractBenchmark.class */
public abstract class AbstractBenchmark {
    public static final String BENCHMARK_TEST_RESULTS_LOG_PREFIX = "Benchmark test results -> ";
    private final BatchingSupport _batchingSupport = new BatchingSupport();
    private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
    private final ConcurrentLinkedQueue<Thread> _parseqThreads = new ConcurrentLinkedQueue<>();
    private final Map<Long, Long> threadCPU = new HashMap();
    private final Map<Long, Long> threadUserCPU = new HashMap();
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBenchmark.class);
    private static final HistogramSerializer _histogramSerializer = new Base64CompressedHistogramSerializer();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/parseq/AbstractBenchmark$BenchmarkConfig.class */
    public static abstract class BenchmarkConfig {
        int CONCURRENCY_LEVEL = (Runtime.getRuntime().availableProcessors() / 2) + 1;
        double sampleRate = 0.001d;

        BenchmarkConfig() {
        }

        public abstract void runTask(Engine engine, Task<?> task) throws InterruptedException;

        public abstract void wrapUp();

        public String toString() {
            return "BenchmarkConfig [CONCURRENCY_LEVEL=" + this.CONCURRENCY_LEVEL + ", sampleRate=" + this.sampleRate + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/parseq/AbstractBenchmark$ConstantThroughputBenchmarkConfig.class */
    public static class ConstantThroughputBenchmarkConfig extends BenchmarkConfig {
        EventsArrival arrivalProcess;
        long warmupRime = 120;
        double events = 1000.0d;
        TimeUnit perUnit = TimeUnit.SECONDS;
        long runtime = 360;
        final Histogram planExecutionAccuracy = AbstractBenchmark.access$100();
        private long lastNano = 0;

        @Override // com.linkedin.parseq.AbstractBenchmark.BenchmarkConfig
        public void runTask(Engine engine, Task<?> task) throws InterruptedException {
            initArrivalProcess();
            if (this.lastNano == 0) {
                this.lastNano = System.nanoTime();
            }
            long nanosToNextEvent = this.lastNano + this.arrivalProcess.nanosToNextEvent();
            this.planExecutionAccuracy.recordValue(Math.abs(AbstractBenchmark.waitUntil(nanosToNextEvent) - nanosToNextEvent));
            engine.run(task);
            this.lastNano = nanosToNextEvent;
        }

        private void initArrivalProcess() {
            if (this.arrivalProcess == null) {
                this.arrivalProcess = new PoissonEventsArrival(this.events, this.perUnit);
            }
        }

        @Override // com.linkedin.parseq.AbstractBenchmark.BenchmarkConfig
        public String toString() {
            initArrivalProcess();
            return "ConstantThroughputBenchmarkConfig [throughput=" + this.events + "/" + this.perUnit + ", warmup=" + this.warmupRime + " " + this.perUnit + ", runtime=" + this.runtime + " " + this.perUnit + ", arrivalProcess=" + this.arrivalProcess + "], " + super.toString();
        }

        @Override // com.linkedin.parseq.AbstractBenchmark.BenchmarkConfig
        public void wrapUp() {
            AbstractBenchmark.LOG.info("----------------------------------------------------------------");
            AbstractBenchmark.LOG.info("Histogram of benchmark execution plan accuracy in µs:");
            this.planExecutionAccuracy.outputPercentileDistribution(System.out, Double.valueOf(1000.0d));
            AbstractBenchmark.LOG.info("Benchmark test results -> Histogram of benchmark execution plan accuracy in µs: " + AbstractBenchmark._histogramSerializer.serialize(this.planExecutionAccuracy));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/parseq/AbstractBenchmark$FullLoadBenchmarkConfig.class */
    public static class FullLoadBenchmarkConfig extends BenchmarkConfig {
        int WARMUP_ROUNDS = 100000;
        int N = 1000000;

        @Override // com.linkedin.parseq.AbstractBenchmark.BenchmarkConfig
        public void runTask(Engine engine, Task<?> task) {
            engine.blockingRun(task);
        }

        @Override // com.linkedin.parseq.AbstractBenchmark.BenchmarkConfig
        public String toString() {
            return "FullLoadBenchmarkConfig [WARMUP_ROUNDS=" + this.WARMUP_ROUNDS + ", ROUNDS=" + this.N + "]";
        }

        @Override // com.linkedin.parseq.AbstractBenchmark.BenchmarkConfig
        public void wrapUp() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/parseq/AbstractBenchmark$Stepper.class */
    public static class Stepper {
        private final double countPerStep;
        private int currentStep = 0;

        public Stepper(double d, int i) {
            this.countPerStep = i * d;
        }

        Optional<Integer> isNewStep(int i) {
            int i2 = (int) (i / this.countPerStep);
            if (this.currentStep == i2) {
                return Optional.empty();
            }
            this.currentStep = i2;
            return Optional.of(Integer.valueOf(i2));
        }
    }

    public void runExample(BenchmarkConfig benchmarkConfig) throws Exception {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() - 1, new ThreadFactory() { // from class: com.linkedin.parseq.AbstractBenchmark.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                AbstractBenchmark.this._parseqThreads.add(thread);
                return thread;
            }
        });
        EngineBuilder timerScheduler = new EngineBuilder().setTaskExecutor(newScheduledThreadPool).setTimerScheduler(newScheduledThreadPool);
        timerScheduler.setPlanDeactivationListener(this._batchingSupport);
        timerScheduler.setEngineProperty("_MaxConcurrentPlans_", Integer.valueOf(benchmarkConfig.CONCURRENCY_LEVEL));
        Engine build = timerScheduler.build();
        try {
            doRunBenchmark(build, benchmarkConfig);
            build.shutdown();
            newScheduledThreadPool.shutdownNow();
        } catch (Throwable th) {
            build.shutdown();
            newScheduledThreadPool.shutdownNow();
            throw th;
        }
    }

    abstract Task<?> createPlan();

    private int N(BenchmarkConfig benchmarkConfig) {
        if (benchmarkConfig instanceof FullLoadBenchmarkConfig) {
            return ((FullLoadBenchmarkConfig) benchmarkConfig).N;
        }
        if (!(benchmarkConfig instanceof ConstantThroughputBenchmarkConfig)) {
            throw new IllegalArgumentException();
        }
        return (int) (r0.runtime * ((ConstantThroughputBenchmarkConfig) benchmarkConfig).events);
    }

    private int warmUpN(BenchmarkConfig benchmarkConfig) {
        if (benchmarkConfig instanceof FullLoadBenchmarkConfig) {
            return ((FullLoadBenchmarkConfig) benchmarkConfig).WARMUP_ROUNDS;
        }
        if (!(benchmarkConfig instanceof ConstantThroughputBenchmarkConfig)) {
            throw new IllegalArgumentException();
        }
        return (int) (r0.warmupRime * ((ConstantThroughputBenchmarkConfig) benchmarkConfig).events);
    }

    protected void doRunBenchmark(Engine engine, BenchmarkConfig benchmarkConfig) throws Exception {
        int N = N(benchmarkConfig);
        int warmUpN = warmUpN(benchmarkConfig);
        Histogram createHistogram = createHistogram();
        Histogram createHistogram2 = createHistogram();
        LOG.info("Number of cores: " + Runtime.getRuntime().availableProcessors());
        LOG.info("Configuration: " + benchmarkConfig);
        Task<?> createPlan = createPlan();
        engine.run(createPlan);
        createPlan.await();
        LOG.info("Number of tasks per plan: " + createPlan.getTrace().getTraceMap().size());
        Exchanger exchanger = new Exchanger();
        Thread thread = new Thread(() -> {
            try {
                Optional optional = (Optional) exchanger.exchange(Optional.empty());
                while (optional.isPresent()) {
                    Task<?> task = (Task) optional.get();
                    task.await();
                    recordCompletionTimes(createHistogram, createHistogram2, task);
                    optional = (Optional) exchanger.exchange(Optional.empty());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread.start();
        LOG.info("Warming up using " + warmUpN + " plan execution");
        System.out.print("Progress[");
        Stepper stepper = new Stepper(0.1d, warmUpN);
        for (int i = 0; i < warmUpN; i++) {
            benchmarkConfig.runTask(engine, createPlan());
            stepper.isNewStep(i).ifPresent(num -> {
                System.out.print(".");
            });
        }
        System.out.println(".]");
        grabCPUTimesBeforeTest();
        LOG.info("Starting test of " + N + " plan executions");
        System.out.print("Progress[");
        Stepper stepper2 = new Stepper(0.1d, N);
        Stepper stepper3 = new Stepper(1.0d / (N * benchmarkConfig.sampleRate), N);
        long nanoTime = System.nanoTime();
        for (int i2 = 0; i2 < N; i2++) {
            Task<?> createPlan2 = createPlan();
            benchmarkConfig.runTask(engine, createPlan2);
            stepper3.isNewStep(i2).ifPresent(num2 -> {
                try {
                    exchanger.exchange(Optional.of(createPlan2));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            stepper2.isNewStep(i2).ifPresent(num3 -> {
                System.out.print(".");
            });
        }
        long nanoTime2 = System.nanoTime();
        System.out.println(".]");
        grabCPUTimesAfterTest();
        exchanger.exchange(Optional.empty());
        thread.join();
        benchmarkConfig.wrapUp();
        LOG.info("----------------------------------------------------------------");
        LOG.info("Histogram of task execution times on parseq threads in µs:");
        createHistogram2.outputPercentileDistribution(System.out, Double.valueOf(1000.0d));
        LOG.info("Benchmark test results -> Histogram of task execution times on parseq threads in µs: " + _histogramSerializer.serialize(createHistogram2));
        LOG.info("----------------------------------------------------------------");
        LOG.info("Histogram of plan completion times in µs:");
        createHistogram.outputPercentileDistribution(System.out, Double.valueOf(1000.0d));
        LOG.info("Benchmark test results -> Histogram of plan completion times in µs: " + _histogramSerializer.serialize(createHistogram));
        LOG.info("----------------------------------------------------------------");
        LOG.info("Throughput: " + String.format("%.3f", Double.valueOf(N / ((nanoTime2 - nanoTime) / 1.0E9d))) + " plans/s, " + String.format("%.3f", Double.valueOf((N * r0) / ((nanoTime2 - nanoTime) / 1.0E9d))) + " tasks/s");
    }

    private void grabCPUTimesBeforeTest() {
        boolean isThreadCpuTimeSupported = this.threadBean.isThreadCpuTimeSupported();
        LOG.info("Thread CPU time measurment supported: " + isThreadCpuTimeSupported);
        if (isThreadCpuTimeSupported) {
            this.threadBean.setThreadCpuTimeEnabled(true);
        }
        Iterator<Thread> it = this._parseqThreads.iterator();
        while (it.hasNext()) {
            long id = it.next().getId();
            long threadCpuTime = this.threadBean.getThreadCpuTime(id);
            if (threadCpuTime > -1) {
                this.threadCPU.put(Long.valueOf(id), Long.valueOf(threadCpuTime));
            }
            long threadUserTime = this.threadBean.getThreadUserTime(id);
            if (threadUserTime > -1) {
                this.threadUserCPU.put(Long.valueOf(id), Long.valueOf(threadUserTime));
            }
        }
    }

    private long addTime(Map<Long, Long> map, long j, long j2, long j3, String str) {
        long longValue = map.get(Long.valueOf(j3)).longValue();
        if (longValue == -1) {
            if (j > -1) {
                LOG.warn(str + " time could not be captured before test but was captured after the test - bailing out...");
            }
        } else if (j <= -1) {
            LOG.warn(str + " Time could be captured before test but was not captured after the test - bailing out...");
        } else if (j < longValue) {
            LOG.warn(str + " Time captured before test is greater than the one captured after the test - bailing out...");
        } else {
            j2 += j - longValue;
        }
        return j2;
    }

    private void grabCPUTimesAfterTest() {
        long j = 0;
        long j2 = 0;
        Iterator<Thread> it = this._parseqThreads.iterator();
        while (it.hasNext()) {
            long id = it.next().getId();
            long threadCpuTime = this.threadBean.getThreadCpuTime(id);
            long threadUserTime = this.threadBean.getThreadUserTime(id);
            if (this.threadCPU.containsKey(Long.valueOf(id))) {
                j = addTime(this.threadCPU, threadCpuTime, j, id, "CPU");
                j2 = addTime(this.threadUserCPU, threadUserTime, j2, id, "User");
            } else {
                LOG.warn("New ParSeq thread was added during test");
            }
        }
        if (j > 0) {
            LOG.info("Benchmark test results -> Total CPU time in ms: " + (j / 1000000));
            LOG.info("Benchmark test results -> Total CPU User time in ms: " + (j2 / 1000000));
        }
    }

    private static Histogram createHistogram() {
        return new Histogram(1L, 10000000000L, 3);
    }

    private void recordCompletionTimes(Histogram histogram, Histogram histogram2, Task<?> task) {
        ShallowTrace shallowTrace = task.getShallowTrace();
        histogram.recordValue(shallowTrace.getEndNanos().longValue() - shallowTrace.getStartNanos().longValue());
        task.getTrace().getTraceMap().values().forEach(shallowTrace2 -> {
            histogram2.recordValue(shallowTrace2.getPendingNanos().longValue() - shallowTrace2.getStartNanos().longValue());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static long waitUntil(long j) throws InterruptedException {
        long nanoTime = System.nanoTime();
        return j - nanoTime > 0 ? waitNano(j, nanoTime) : nanoTime;
    }

    private static long waitNano(long j, long j2) throws InterruptedException {
        long j3 = ((j - j2) >> 20) - 1;
        if (j3 < 0) {
            j3 = 0;
        }
        if (j3 <= 0) {
            return busyWaitUntil(j);
        }
        Thread.sleep(j3);
        return waitUntil(j);
    }

    private static long busyWaitUntil(long j) {
        long j2 = 0;
        while (true) {
            j2++;
            if (j2 % 1000 == 0) {
                long nanoTime = System.nanoTime();
                if (nanoTime - j >= 0) {
                    return nanoTime;
                }
            }
        }
    }

    static /* synthetic */ Histogram access$100() {
        return createHistogram();
    }
}
