package org.apache.beam.it.gcp;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.common.collect.UnmodifiableIterator;
import com.google.monitoring.v3.TimeInterval;
import com.google.protobuf.util.Timestamps;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.text.ParseException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.logging.LogStrings;
import org.apache.beam.it.gcp.AutoValue_LoadTestBase_MetricsConfiguration;
import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager;
import org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher;
import org.apache.beam.it.gcp.monitoring.MonitoringClient;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/it/gcp/LoadTestBase.class */
public abstract class LoadTestBase {
    private static final double VCPU_PER_HR_BATCH = 0.056d;
    private static final double VCPU_PER_HR_STREAMING = 0.069d;
    private static final double MEM_PER_GB_HR_BATCH = 0.003557d;
    private static final double MEM_PER_GB_HR_STREAMING = 0.0035557d;
    private static final double PD_PER_GB_HR = 5.4E-5d;
    private static final double PD_SSD_PER_GB_HR = 2.98E-4d;
    private static final double SHUFFLE_PER_GB_BATCH = 0.011d;
    private static final double SHUFFLE_PER_GB_STREAMING = 0.018d;

    @SuppressFBWarnings({"MS_PKGPROTECT"})
    protected static String project;

    @SuppressFBWarnings({"MS_PKGPROTECT"})
    protected static String region;
    protected MonitoringClient monitoringClient;
    protected PipelineLauncher pipelineLauncher;
    protected PipelineOperator pipelineOperator;
    protected String testName;

    @Rule
    public TestRule watcher = new TestWatcher() { // from class: org.apache.beam.it.gcp.LoadTestBase.1
        protected void starting(Description description) {
            LoadTestBase.LOG.info("Starting load test {}.{}", description.getClassName(), description.getMethodName());
            LoadTestBase.this.testName = description.getMethodName();
        }
    };
    private static final Logger LOG = LoggerFactory.getLogger(LoadTestBase.class);
    private static final Pattern WORKER_START_PATTERN = Pattern.compile("^All workers have finished the startup processes and began to receive work requests.*$");
    private static final Pattern WORKER_STOP_PATTERN = Pattern.compile("^Stopping worker pool.*$");
    protected static final Credentials CREDENTIALS = TestProperties.googleCredentials();
    protected static final CredentialsProvider CREDENTIALS_PROVIDER = FixedCredentialsProvider.create(CREDENTIALS);

    @AutoValue
    /* loaded from: input_file:org/apache/beam/it/gcp/LoadTestBase$MetricsConfiguration.class */
    public static abstract class MetricsConfiguration {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/it/gcp/LoadTestBase$MetricsConfiguration$Builder.class */
        public static abstract class Builder {
            public abstract Builder setInputPCollection(@Nullable String str);

            public abstract Builder setInputPCollectionV2(@Nullable String str);

            public abstract Builder setOutputPCollection(@Nullable String str);

            public abstract Builder setOutputPCollectionV2(@Nullable String str);

            public abstract MetricsConfiguration build();
        }

        @Nullable
        public abstract String inputPCollection();

        @Nullable
        public abstract String inputPCollectionV2();

        @Nullable
        public abstract String outputPCollection();

        @Nullable
        public abstract String outputPCollectionV2();

        public static Builder builder() {
            return new AutoValue_LoadTestBase_MetricsConfiguration.Builder();
        }
    }

    @BeforeClass
    public static void setUpClass() {
        project = TestProperties.project();
        region = TestProperties.region();
    }

    @Before
    public void setUp() throws IOException {
        this.monitoringClient = MonitoringClient.builder(CREDENTIALS_PROVIDER).build();
        this.pipelineLauncher = launcher();
        this.pipelineOperator = new PipelineOperator(this.pipelineLauncher);
    }

    @After
    public void tearDownLoadTestBase() throws IOException {
        this.pipelineLauncher.cleanupAll();
        this.monitoringClient.cleanupAll();
    }

    public abstract PipelineLauncher launcher();

    protected void exportMetricsToBigQuery(PipelineLauncher.LaunchInfo launchInfo, Map<String, Double> map) {
        LOG.info("Exporting metrics:\n{}", LogStrings.formatForLogging(map));
        try {
            BigQueryResourceManager build = BigQueryResourceManager.builder(this.testName, (String) MoreObjects.firstNonNull(TestProperties.exportProject(), project), CREDENTIALS).setDatasetId(TestProperties.exportDataset()).build();
            HashMap hashMap = new HashMap();
            hashMap.put("timestamp", launchInfo.createTime());
            hashMap.put("sdk", launchInfo.sdk());
            hashMap.put("version", launchInfo.version());
            hashMap.put("job_type", launchInfo.jobType());
            putOptional(hashMap, "template_name", launchInfo.templateName());
            putOptional(hashMap, "template_version", launchInfo.templateVersion());
            putOptional(hashMap, "template_type", launchInfo.templateType());
            putOptional(hashMap, "pipeline_name", launchInfo.pipelineName());
            hashMap.put("test_name", this.testName);
            ArrayList arrayList = new ArrayList();
            UnmodifiableIterator it = launchInfo.parameters().entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                arrayList.add(new TableRow().set("name", entry.getKey()).set("value", entry.getValue()));
            }
            hashMap.put("parameters", arrayList);
            ArrayList arrayList2 = new ArrayList();
            for (Map.Entry<String, Double> entry2 : map.entrySet()) {
                arrayList2.add(new TableRow().set("name", entry2.getKey()).set("value", entry2.getValue()));
            }
            hashMap.put("metrics", arrayList2);
            build.write(TestProperties.exportTable(), InsertAllRequest.RowToInsert.of("rowId", hashMap));
        } catch (IllegalStateException e) {
            LOG.error("Unable to export results to datastore. ", e);
        }
    }

    protected boolean waitForNumMessages(String str, String str2, Long l) {
        try {
            Double metric = this.pipelineLauncher.getMetric(project, region, str, str2.replace(".", "-") + "-ElementCount");
            if (metric != null && metric.doubleValue() >= l.longValue()) {
                return true;
            }
            LOG.info("Expected {} messages in input PCollection, but found {}.", l, metric);
            return false;
        } catch (Exception e) {
            LOG.warn("Encountered error when trying to measure input elements. ", e);
            return false;
        }
    }

    private void computeDataflowMetrics(Map<String, Double> map, PipelineLauncher.LaunchInfo launchInfo, MetricsConfiguration metricsConfiguration) throws ParseException {
        double doubleValue;
        LOG.info("Calculating approximate cost for {} under {}", launchInfo.jobId(), project);
        TimeInterval workerTimeInterval = getWorkerTimeInterval(launchInfo);
        map.put("RunTime", Double.valueOf(Timestamps.between(workerTimeInterval.getStartTime(), workerTimeInterval.getEndTime()).getSeconds()));
        if (launchInfo.jobType().equals("JOB_TYPE_STREAMING")) {
            doubleValue = 0.0d + ((map.get("TotalVcpuTime").doubleValue() / 3600.0d) * VCPU_PER_HR_STREAMING) + (((map.get("TotalMemoryUsage").doubleValue() / 1000.0d) / 3600.0d) * MEM_PER_GB_HR_STREAMING) + (map.get("TotalStreamingDataProcessed").doubleValue() * SHUFFLE_PER_GB_STREAMING);
            map.putAll(getDataFreshnessMetrics(launchInfo.jobId(), workerTimeInterval));
            map.putAll(getSystemLatencyMetrics(launchInfo.jobId(), workerTimeInterval));
        } else {
            doubleValue = 0.0d + ((map.get("TotalVcpuTime").doubleValue() / 3600.0d) * VCPU_PER_HR_BATCH) + (((map.get("TotalMemoryUsage").doubleValue() / 1000.0d) / 3600.0d) * MEM_PER_GB_HR_BATCH) + (map.get("TotalShuffleDataProcessed").doubleValue() * SHUFFLE_PER_GB_BATCH);
        }
        map.put("EstimatedCost", Double.valueOf(doubleValue + ((map.get("TotalPdUsage").doubleValue() / 3600.0d) * PD_PER_GB_HR) + ((map.get("TotalSsdUsage").doubleValue() / 3600.0d) * PD_SSD_PER_GB_HR)));
        map.put("ElapsedTime", this.monitoringClient.getElapsedTime(project, launchInfo));
        Double dataProcessed = this.monitoringClient.getDataProcessed(project, launchInfo, metricsConfiguration.inputPCollection());
        if (dataProcessed != null) {
            map.put("EstimatedDataProcessedGB", Double.valueOf(dataProcessed.doubleValue() / 1.0E9d));
        }
        map.putAll(getCpuUtilizationMetrics(launchInfo.jobId(), workerTimeInterval));
        map.putAll(getThroughputMetrics(launchInfo, metricsConfiguration, workerTimeInterval));
    }

    private void computeDirectMetrics(Map<String, Double> map, PipelineLauncher.LaunchInfo launchInfo) throws ParseException {
        map.put("ElapsedTime", Double.valueOf(0.001d * (System.currentTimeMillis() - Timestamps.toMillis(Timestamps.parse(launchInfo.createTime())))));
    }

    protected Map<String, Double> getMetrics(PipelineLauncher.LaunchInfo launchInfo, @Nullable String str, @Nullable String str2) throws InterruptedException, IOException, ParseException {
        return getMetrics(launchInfo, MetricsConfiguration.builder().setInputPCollection(str).setOutputPCollection(str2).build());
    }

    protected Map<String, Double> getMetrics(PipelineLauncher.LaunchInfo launchInfo, String str) throws ParseException, InterruptedException, IOException {
        return getMetrics(launchInfo, str, null);
    }

    protected Map<String, Double> getMetrics(PipelineLauncher.LaunchInfo launchInfo) throws ParseException, InterruptedException, IOException {
        return getMetrics(launchInfo, null, null);
    }

    protected Map<String, Double> getMetrics(PipelineLauncher.LaunchInfo launchInfo, MetricsConfiguration metricsConfiguration) throws IOException, InterruptedException, ParseException {
        Map<String, Double> metrics = this.pipelineLauncher.getMetrics(project, region, launchInfo.jobId());
        if (launchInfo.runner().contains("Dataflow")) {
            LOG.info("Sleeping for 4 minutes to query Dataflow runner metrics.");
            Thread.sleep(Duration.ofMinutes(4L).toMillis());
            computeDataflowMetrics(metrics, launchInfo, metricsConfiguration);
        } else if ("DirectRunner".equalsIgnoreCase(launchInfo.runner())) {
            computeDirectMetrics(metrics, launchInfo);
        }
        return metrics;
    }

    protected Map<String, Double> getCpuUtilizationMetrics(String str, TimeInterval timeInterval) {
        List<Double> cpuUtilization = this.monitoringClient.getCpuUtilization(project, str, timeInterval);
        HashMap hashMap = new HashMap();
        if (cpuUtilization == null) {
            return hashMap;
        }
        hashMap.put("AvgCpuUtilization", calculateAverage(cpuUtilization));
        hashMap.put("MaxCpuUtilization", (Double) Collections.max(cpuUtilization));
        return hashMap;
    }

    protected Map<String, Double> getThroughputMetrics(PipelineLauncher.LaunchInfo launchInfo, MetricsConfiguration metricsConfiguration, TimeInterval timeInterval) {
        String jobId = launchInfo.jobId();
        String inputPCollectionV2 = AbstractPipelineLauncher.RUNNER_V2.equals(launchInfo.runner()) ? metricsConfiguration.inputPCollectionV2() : metricsConfiguration.inputPCollection();
        String outputPCollectionV2 = AbstractPipelineLauncher.RUNNER_V2.equals(launchInfo.runner()) ? metricsConfiguration.outputPCollectionV2() : metricsConfiguration.outputPCollection();
        return getThroughputMetrics(this.monitoringClient.getThroughputBytesPerSecond(project, jobId, inputPCollectionV2, timeInterval), this.monitoringClient.getThroughputElementsPerSecond(project, jobId, inputPCollectionV2, timeInterval), this.monitoringClient.getThroughputBytesPerSecond(project, jobId, outputPCollectionV2, timeInterval), this.monitoringClient.getThroughputElementsPerSecond(project, jobId, outputPCollectionV2, timeInterval));
    }

    protected Map<String, Double> getDataFreshnessMetrics(String str, TimeInterval timeInterval) {
        List<Double> dataFreshness = this.monitoringClient.getDataFreshness(project, str, timeInterval);
        HashMap hashMap = new HashMap();
        if (dataFreshness == null) {
            return hashMap;
        }
        hashMap.put("AvgDataFreshness", calculateAverage(dataFreshness));
        hashMap.put("MaxDataFreshness", (Double) Collections.max(dataFreshness));
        return hashMap;
    }

    protected Map<String, Double> getSystemLatencyMetrics(String str, TimeInterval timeInterval) {
        List<Double> systemLatency = this.monitoringClient.getSystemLatency(project, str, timeInterval);
        HashMap hashMap = new HashMap();
        if (systemLatency == null) {
            return hashMap;
        }
        hashMap.put("AvgSystemLatency", calculateAverage(systemLatency));
        hashMap.put("MaxSystemLatency", (Double) Collections.max(systemLatency));
        return hashMap;
    }

    protected TimeInterval getWorkerTimeInterval(PipelineLauncher.LaunchInfo launchInfo) throws ParseException {
        TimeInterval.Builder newBuilder = TimeInterval.newBuilder();
        for (JobMessage jobMessage : this.pipelineLauncher.listMessages(project, region, launchInfo.jobId(), "JOB_MESSAGE_DETAILED")) {
            if (jobMessage.getMessageText() != null && !jobMessage.getMessageText().isEmpty()) {
                if (WORKER_START_PATTERN.matcher(jobMessage.getMessageText()).find()) {
                    LOG.info("Found worker start message in job messages.");
                    newBuilder.setStartTime(Timestamps.parse(jobMessage.getTime()));
                }
                if (WORKER_STOP_PATTERN.matcher(jobMessage.getMessageText()).find()) {
                    LOG.info("Found worker stop message in job messages.");
                    newBuilder.setEndTime(Timestamps.parse(jobMessage.getTime()));
                }
            }
        }
        return newBuilder.build();
    }

    private Map<String, Double> getThroughputMetrics(List<Double> list, List<Double> list2, List<Double> list3, List<Double> list4) {
        HashMap hashMap = new HashMap();
        if (list != null) {
            hashMap.put("AvgInputThroughputBytesPerSec", calculateAverage(list));
            hashMap.put("MaxInputThroughputBytesPerSec", (Double) Collections.max(list));
        }
        if (list2 != null) {
            hashMap.put("AvgInputThroughputElementsPerSec", calculateAverage(list2));
            hashMap.put("MaxInputThroughputElementsPerSec", (Double) Collections.max(list2));
        }
        if (list3 != null) {
            hashMap.put("AvgOutputThroughputBytesPerSec", calculateAverage(list3));
            hashMap.put("MaxOutputThroughputBytesPerSec", (Double) Collections.max(list3));
        }
        if (list4 != null) {
            hashMap.put("AvgOutputThroughputElementsPerSec", calculateAverage(list4));
            hashMap.put("MaxOutputThroughputElementsPerSec", (Double) Collections.max(list4));
        }
        return hashMap;
    }

    public static Double calculateAverage(List<Double> list) {
        return Double.valueOf(list.stream().mapToDouble(d -> {
            return d.doubleValue();
        }).average().orElse(0.0d));
    }

    private static void putOptional(Map<String, Object> map, String str, @Nullable Object obj) {
        if (obj != null) {
            map.put(str, obj);
        }
    }

    public static PipelineOperator.Config createConfig(PipelineLauncher.LaunchInfo launchInfo, Duration duration) {
        return PipelineOperator.Config.builder().setJobId(launchInfo.jobId()).setProject(project).setRegion(region).setTimeoutAfter(duration).build();
    }
}
