package org.apache.wayang.profiler.spark;

import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.WayangArrays;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.profiler.util.ProfilingUtils;
import org.apache.wayang.profiler.util.RrdAccessor;
import org.apache.wayang.spark.channels.RddChannel;
import org.apache.wayang.spark.compiler.FunctionCompiler;
import org.apache.wayang.spark.execution.SparkExecutor;
import org.apache.wayang.spark.operators.SparkExecutionOperator;
import org.rrd4j.ConsolFun;

/* loaded from: input_file:org/apache/wayang/profiler/spark/SparkOperatorProfiler.class */
public abstract class SparkOperatorProfiler {
    protected Supplier<SparkExecutionOperator> operatorGenerator;
    protected final List<Supplier<?>> dataQuantumGenerators;
    private final String gangliaRrdsDir;
    private final String gangliaClusterName;
    public int cpuMhz;
    public int numMachines;
    public int numCoresPerMachine;
    public int numPartitions;
    private final int dataQuantumGeneratorBatchSize;
    private final String dataQuantumGeneratorLocation;
    protected final long executionPaddingTime;
    protected SparkExecutionOperator operator;
    protected SparkExecutor sparkExecutor;
    protected List<Long> inputCardinalities;
    protected final Logger logger = LogManager.getLogger(getClass());
    protected final FunctionCompiler functionCompiler = new FunctionCompiler();

    /* loaded from: input_file:org/apache/wayang/profiler/spark/SparkOperatorProfiler$Result.class */
    public static class Result {
        private final List<Long> inputCardinalities;
        private final int numMachines;
        private final int numCoresPerMachine;
        private final long outputCardinality;
        private final long diskBytes;
        private final long networkBytes;
        private final long cpuCycles;
        private final long wallclockMillis;

        public Result(List<Long> list, long j, long j2, long j3, long j4, long j5, int i, int i2) {
            this.inputCardinalities = list;
            this.outputCardinality = j;
            this.wallclockMillis = j2;
            this.diskBytes = j3;
            this.networkBytes = j4;
            this.cpuCycles = j5;
            this.numMachines = i;
            this.numCoresPerMachine = i2;
        }

        public List<Long> getInputCardinalities() {
            return this.inputCardinalities;
        }

        public long getOutputCardinality() {
            return this.outputCardinality;
        }

        public long getDiskBytes() {
            return this.diskBytes;
        }

        public long getNetworkBytes() {
            return this.networkBytes;
        }

        public long getCpuCycles() {
            return this.cpuCycles;
        }

        public String toString() {
            return "Result{inputCardinalities=" + this.inputCardinalities + ", outputCardinality=" + this.outputCardinality + ", numMachines=" + this.numMachines + ", numCoresPerMachine=" + this.numCoresPerMachine + ", wallclockMillis=" + this.wallclockMillis + ", cpuCycles=" + this.cpuCycles + ", diskBytes=" + this.diskBytes + ", networkBytes=" + this.networkBytes + '}';
        }

        public String getCsvHeader() {
            return String.join(",", WayangCollections.map(this.inputCardinalities, (num, l) -> {
                return "input_card_" + num;
            })) + ",output_card,wallclock,disk,network,cpu,machines,cores_per_machine";
        }

        public String toCsvString() {
            return String.join(",", WayangCollections.map(this.inputCardinalities, (v0) -> {
                return v0.toString();
            })) + "," + this.outputCardinality + "," + this.wallclockMillis + "," + this.diskBytes + "," + this.networkBytes + "," + this.cpuCycles + "," + this.numMachines + "," + this.numCoresPerMachine;
        }
    }

    public SparkOperatorProfiler(Supplier<SparkExecutionOperator> supplier, Configuration configuration, Supplier<?>... supplierArr) {
        this.operatorGenerator = supplier;
        this.dataQuantumGenerators = Arrays.asList(supplierArr);
        this.cpuMhz = (int) configuration.getLongProperty("wayang.spark.cpu.mhz", 2700L);
        this.numMachines = (int) configuration.getLongProperty("wayang.spark.machines", 1L);
        this.numCoresPerMachine = (int) configuration.getLongProperty("wayang.spark.cores-per-machine", 1L);
        this.numPartitions = (int) configuration.getLongProperty("wayang.spark.partitions", -1L);
        this.gangliaRrdsDir = configuration.getStringProperty("wayang.ganglia.rrds", "/var/lib/ganglia/rrds");
        this.gangliaClusterName = configuration.getStringProperty("wayang.ganglia.cluster", "cluster");
        this.dataQuantumGeneratorBatchSize = (int) configuration.getLongProperty("wayang.profiler.datagen.batchsize", 5000000L);
        this.dataQuantumGeneratorLocation = configuration.getStringProperty("wayang.profiler.datagen.location", "worker");
        this.executionPaddingTime = configuration.getLongProperty("wayang.profiler.execute.padding", 5000L);
    }

    public void prepare(long... jArr) {
        this.operator = this.operatorGenerator.get();
        this.inputCardinalities = WayangArrays.asList(jArr);
        this.sparkExecutor = ProfilingUtils.fakeSparkExecutor(ReflectionUtils.getDeclaringJar(SparkOperatorProfiler.class));
        for (int i = 0; i < jArr.length; i++) {
            prepareInput(i, jArr[i]);
        }
    }

    protected abstract void prepareInput(int i, long j);

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> JavaRDD<T> prepareInputRdd(long j, int i) {
        String str = this.dataQuantumGeneratorLocation;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1323526104:
                if (str.equals("driver")) {
                    z = true;
                    break;
                }
                break;
            case -782085250:
                if (str.equals("worker")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return prepareInputRddInWorker(j, i);
            case true:
                return prepareInputRddInDriver(j, i);
            default:
                this.logger.error("In correct data generation location (is: {}, allowed: worker/driver). Using worker.");
                return prepareInputRddInWorker(j, i);
        }
    }

    protected <T> JavaRDD<T> prepareInputRddInDriver(long j, int i) {
        Supplier<?> supplier = this.dataQuantumGenerators.get(i);
        JavaRDD<T> javaRDD = null;
        long j2 = j;
        do {
            int min = (int) Math.min(j2, this.dataQuantumGeneratorBatchSize);
            ArrayList arrayList = new ArrayList(min);
            while (arrayList.size() < min) {
                arrayList.add(supplier.get());
            }
            JavaRDD<T> parallelize = this.sparkExecutor.sc.parallelize(arrayList);
            javaRDD = javaRDD == null ? parallelize : javaRDD.union(parallelize);
            j2 -= min;
        } while (j2 > 0);
        JavaRDD<T> cache = partition(javaRDD).cache();
        cache.foreach(obj -> {
        });
        return cache;
    }

    protected <T> JavaRDD<T> prepareInputRddInWorker(long j, int i) {
        LinkedList linkedList = new LinkedList();
        int i2 = (int) (j / this.dataQuantumGeneratorBatchSize);
        for (int i3 = 0; i3 < i2; i3++) {
            linkedList.add(Integer.valueOf(this.dataQuantumGeneratorBatchSize));
        }
        linkedList.add(Integer.valueOf((int) (j % this.dataQuantumGeneratorBatchSize)));
        Supplier<?> supplier = this.dataQuantumGenerators.get(i);
        JavaRDD<T> cache = partition(this.sparkExecutor.sc.parallelize(linkedList, 1).flatMap(num -> {
            ArrayList arrayList = new ArrayList(num.intValue());
            for (int i4 = 0; i4 < num.intValue(); i4++) {
                arrayList.add(supplier.get());
            }
            return arrayList.iterator();
        })).cache();
        cache.foreach(obj -> {
        });
        return cache;
    }

    protected <T> JavaRDD<T> partition(JavaRDD<T> javaRDD) {
        return this.numPartitions == -1 ? javaRDD : javaRDD.coalesce(this.numPartitions, true);
    }

    public Result run() {
        Result executeOperator = executeOperator();
        this.sparkExecutor.dispose();
        this.sparkExecutor = null;
        return executeOperator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long provideCpuCycles(long j, long j2) {
        return Math.round((j2 - j) * this.cpuMhz * 1000.0d * this.numCoresPerMachine * this.numMachines * (1.0d - ((waitAndQueryMetricAverage("cpu_idle", "sum", j, j2) / waitAndQueryMetricAverage("cpu_idle", "num", j, j2)) / 100.0d)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long provideNetworkBytes(long j, long j2) {
        return (long) ((((waitAndQueryMetricAverage("tx_bytes_eth0", "sum", j, j2) + waitAndQueryMetricAverage("rx_bytes_eth0", "sum", j, j2)) / 2.0d) / 1000.0d) * (j2 - j));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long provideDiskBytes(long j, long j2) {
        return (long) (((waitAndQueryMetricAverage("diskstat_sda_read_bytes_per_sec", "sum", j, j2) + waitAndQueryMetricAverage("diskstat_sda_read_bytes_per_sec", "sum", j, j2)) / 1000.0d) * (j2 - j));
    }

    private double waitAndQueryMetricAverage(String str, String str2, long j, long j2) {
        String str3 = this.gangliaRrdsDir + File.separator + this.gangliaClusterName + File.separator + "__SummaryInfo__" + File.separator + str + ".rrd";
        double d = Double.NaN;
        int i = 0;
        do {
            int i2 = i;
            i++;
            if (i2 > 0) {
                ProfilingUtils.sleep(5000L);
            }
            try {
                RrdAccessor open = RrdAccessor.open(str3);
                Throwable th = null;
                try {
                    try {
                        long lastUpdateMillis = open.getLastUpdateMillis();
                        if (lastUpdateMillis >= j2) {
                            d = open.query(str2, j, j2, ConsolFun.AVERAGE);
                        } else {
                            this.logger.info("Last RRD file update is only from {} ({} attempts so far).", new Date(lastUpdateMillis), Integer.valueOf(i));
                        }
                        if (open != null) {
                            if (0 != 0) {
                                try {
                                    open.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                open.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                this.logger.error(String.format("Could not access RRD %s.", str3), e);
                return Double.NaN;
            }
        } while (Double.isNaN(d));
        return d;
    }

    protected abstract Result executeOperator();

    /* JADX INFO: Access modifiers changed from: protected */
    public void evaluate(SparkExecutionOperator sparkExecutionOperator, ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2) {
        sparkExecutionOperator.evaluate(channelInstanceArr, channelInstanceArr2, this.sparkExecutor, new DefaultOptimizationContext(this.sparkExecutor.getJob()).addOneTimeOperator(sparkExecutionOperator));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static RddChannel.Instance createChannelInstance(JavaRDD<?> javaRDD, SparkExecutor sparkExecutor) {
        RddChannel.Instance createChannelInstance = createChannelInstance(sparkExecutor);
        createChannelInstance.accept(javaRDD, sparkExecutor);
        return createChannelInstance;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static RddChannel.Instance createChannelInstance(SparkExecutor sparkExecutor) {
        return RddChannel.CACHED_DESCRIPTOR.createChannel((OutputSlot) null, sparkExecutor.getConfiguration()).createInstance((Executor) null, (OptimizationContext.OperatorContext) null, -1);
    }

    public void cleanUp() {
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 92803455:
                if (implMethodName.equals("lambda$prepareInputRddInWorker$43bbf5b7$1")) {
                    z = true;
                    break;
                }
                break;
            case 963845781:
                if (implMethodName.equals("lambda$prepareInputRddInDriver$43bbf5b7$1")) {
                    z = false;
                    break;
                }
                break;
            case 1479914365:
                if (implMethodName.equals("lambda$prepareInputRddInWorker$7a73c0f4$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/wayang/profiler/spark/SparkOperatorProfiler") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)V")) {
                    return obj -> {
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/wayang/profiler/spark/SparkOperatorProfiler") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)V")) {
                    return obj2 -> {
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/wayang/profiler/spark/SparkOperatorProfiler") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/function/Supplier;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    Supplier supplier = (Supplier) serializedLambda.getCapturedArg(0);
                    return num -> {
                        ArrayList arrayList = new ArrayList(num.intValue());
                        for (int i4 = 0; i4 < num.intValue(); i4++) {
                            arrayList.add(supplier.get());
                        }
                        return arrayList.iterator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
