package org.apache.beam.it.gcp.dataflow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.googleapis.util.Utils;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.common.collect.UnmodifiableIterator;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.logging.LogStrings;
import org.apache.beam.it.common.utils.PipelineUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.class */
public class DefaultPipelineLauncher extends AbstractPipelineLauncher {
    private static final String READ_PIPELINE_NAME_OVERWRITE = "readPipelineNameOverride";
    private static final String WRITE_PIPELINE_NAME_OVERWRITE = "writePipelineNameOverride";
    private static final long UNKNOWN_METRIC_VALUE = -1;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultPipelineLauncher.class);
    private static final Pattern JOB_ID_PATTERN = Pattern.compile("Submitted job: (\\S+)");
    private static final Map<String, PipelineResult> MANAGED_JOBS = new HashMap();
    private static final Map<String, PipelineResult> UNMANAGED_JOBS = new HashMap();
    private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
    private static final Map<PipelineResult.State, PipelineLauncher.JobState> PIPELINE_STATE_TRANSLATE = ImmutableMap.builder().put(PipelineResult.State.CANCELLED, PipelineLauncher.JobState.CANCELLED).put(PipelineResult.State.RUNNING, PipelineLauncher.JobState.RUNNING).put(PipelineResult.State.DONE, PipelineLauncher.JobState.DONE).put(PipelineResult.State.FAILED, PipelineLauncher.JobState.FAILED).put(PipelineResult.State.STOPPED, PipelineLauncher.JobState.STOPPED).put(PipelineResult.State.UNKNOWN, PipelineLauncher.JobState.UNKNOWN).put(PipelineResult.State.UPDATED, PipelineLauncher.JobState.UPDATED).put(PipelineResult.State.UNRECOGNIZED, PipelineLauncher.JobState.UNKNOWN).build();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.it.gcp.dataflow.DefaultPipelineLauncher$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$it$common$PipelineLauncher$Sdk = new int[PipelineLauncher.Sdk.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$it$common$PipelineLauncher$Sdk[PipelineLauncher.Sdk.JAVA.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$it$common$PipelineLauncher$Sdk[PipelineLauncher.Sdk.PYTHON.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$it$common$PipelineLauncher$Sdk[PipelineLauncher.Sdk.GO.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$beam$it$gcp$IOLoadTestBase$PipelineMetricsType = new int[IOLoadTestBase.PipelineMetricsType.values().length];
            try {
                $SwitchMap$org$apache$beam$it$gcp$IOLoadTestBase$PipelineMetricsType[IOLoadTestBase.PipelineMetricsType.COUNTER.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$beam$it$gcp$IOLoadTestBase$PipelineMetricsType[IOLoadTestBase.PipelineMetricsType.STARTTIME.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$beam$it$gcp$IOLoadTestBase$PipelineMetricsType[IOLoadTestBase.PipelineMetricsType.ENDTIME.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$beam$it$gcp$IOLoadTestBase$PipelineMetricsType[IOLoadTestBase.PipelineMetricsType.RUNTIME.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher$Builder.class */
    public static final class Builder {
        private Credentials credentials;

        private Builder(Credentials credentials) {
            this.credentials = credentials;
        }

        public Credentials getCredentials() {
            return this.credentials;
        }

        public DefaultPipelineLauncher build() {
            return new DefaultPipelineLauncher(this, null);
        }

        /* synthetic */ Builder(Credentials credentials, AnonymousClass1 anonymousClass1) {
            this(credentials);
        }
    }

    private DefaultPipelineLauncher(Builder builder) {
        super(new Dataflow(Utils.getDefaultTransport(), Utils.getDefaultJsonFactory(), builder.getCredentials() == null ? null : new HttpCredentialsAdapter(builder.getCredentials())));
    }

    public static Builder builder(Credentials credentials) {
        return new Builder(credentials, null);
    }

    @Override // org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher
    public PipelineLauncher.JobState getJobStatus(String str, String str2, String str3) throws IOException {
        return MANAGED_JOBS.containsKey(str3) ? PIPELINE_STATE_TRANSLATE.get(MANAGED_JOBS.get(str3).getState()) : super.handleJobState(getJob(str, str2, str3));
    }

    @Override // org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher
    public Job cancelJob(String str, String str2, String str3) {
        if (!MANAGED_JOBS.containsKey(str3)) {
            return super.cancelJob(str, str2, str3);
        }
        try {
            MANAGED_JOBS.get(str3).cancel();
            return new Job().setId(str3).setRequestedState(PipelineLauncher.JobState.CANCELLED.toString());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher
    public Job getJob(String str, String str2, String str3) throws IOException {
        return MANAGED_JOBS.containsKey(str3) ? new Job().setId(str3).setRequestedState(PIPELINE_STATE_TRANSLATE.get(MANAGED_JOBS.get(str3).getState()).toString()) : super.getJob(str, str2, str3);
    }

    @Override // org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher
    public Job drainJob(String str, String str2, String str3) {
        if (!MANAGED_JOBS.containsKey(str3)) {
            return super.drainJob(str, str2, str3);
        }
        Job requestedState = new Job().setId(str3).setRequestedState(PipelineLauncher.JobState.DRAINED.toString());
        cancelJob(str, str2, str3);
        return requestedState;
    }

    private static <T> void checkIfMetricResultIsUnique(String str, Iterable<MetricResult<T>> iterable) throws IllegalStateException {
        int size = Iterables.size(iterable);
        Preconditions.checkState(size <= 1, "More than one metric result matches name: %s in namespace %s. Metric results count: %s", str, IOLoadTestBase.BEAM_METRICS_NAMESPACE, Integer.valueOf(size));
    }

    private static Iterable<MetricResult<DistributionResult>> getDistributions(PipelineResult pipelineResult, String str) {
        return pipelineResult.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(IOLoadTestBase.BEAM_METRICS_NAMESPACE, str)).build()).getDistributions();
    }

    public Long getBeamMetric(String str, IOLoadTestBase.PipelineMetricsType pipelineMetricsType, String str2) {
        PipelineResult orDefault = MANAGED_JOBS.getOrDefault(str, UNMANAGED_JOBS.getOrDefault(str, null));
        if (orDefault == null) {
            LOG.warn("Query pipeline defined metrics this SDK or runner is currently unsupported.");
            return Long.valueOf(UNKNOWN_METRIC_VALUE);
        }
        MetricQueryResults queryMetrics = orDefault.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(IOLoadTestBase.BEAM_METRICS_NAMESPACE, str2)).build());
        switch (pipelineMetricsType) {
            case COUNTER:
                Iterable counters = queryMetrics.getCounters();
                checkIfMetricResultIsUnique(str2, counters);
                try {
                    return (Long) ((MetricResult) counters.iterator().next()).getAttempted();
                } catch (NoSuchElementException e) {
                    LOG.error("Failed to get metric {}, from namespace {}", str2, IOLoadTestBase.BEAM_METRICS_NAMESPACE);
                    return Long.valueOf(UNKNOWN_METRIC_VALUE);
                }
            case STARTTIME:
            case ENDTIME:
            case RUNTIME:
                Iterable<MetricResult<DistributionResult>> distributions = getDistributions(orDefault, str2);
                Long l = (Long) StreamSupport.stream(distributions.spliterator(), true).map(metricResult -> {
                    return Long.valueOf(((DistributionResult) Objects.requireNonNull((DistributionResult) metricResult.getAttempted())).getMin());
                }).min((v0, v1) -> {
                    return v0.compareTo(v1);
                }).orElse(Long.valueOf(UNKNOWN_METRIC_VALUE));
                Long l2 = (Long) StreamSupport.stream(distributions.spliterator(), true).map(metricResult2 -> {
                    return Long.valueOf(((DistributionResult) Objects.requireNonNull((DistributionResult) metricResult2.getAttempted())).getMax());
                }).max((v0, v1) -> {
                    return v0.compareTo(v1);
                }).orElse(Long.valueOf(UNKNOWN_METRIC_VALUE));
                return pipelineMetricsType == IOLoadTestBase.PipelineMetricsType.STARTTIME ? l : pipelineMetricsType == IOLoadTestBase.PipelineMetricsType.ENDTIME ? l2 : (l.longValue() == UNKNOWN_METRIC_VALUE || l2.longValue() == UNKNOWN_METRIC_VALUE) ? Long.valueOf(UNKNOWN_METRIC_VALUE) : Long.valueOf(l2.longValue() - l.longValue());
            default:
                throw new IllegalArgumentException(String.format("Unexpected metric type %s.", pipelineMetricsType));
        }
    }

    @Override // org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher
    public Double getMetric(String str, String str2, String str3, String str4) throws IOException {
        if (!str4.startsWith(IOLoadTestBase.BEAM_METRICS_NAMESPACE)) {
            return super.getMetric(str, str2, str3, str4);
        }
        String[] split = str4.split(":", 3);
        Preconditions.checkState(split.length == 3, String.format("Invalid Beam metrics name: %s, expected: '%s:metric_type:metric_name'", str4, IOLoadTestBase.BEAM_METRICS_NAMESPACE));
        return Double.valueOf(getBeamMetric(str3, IOLoadTestBase.PipelineMetricsType.valueOf(split[1]), split[2]).doubleValue());
    }

    @Override // org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher
    public Map<String, Double> getMetrics(String str, String str2, String str3) throws IOException {
        return MANAGED_JOBS.containsKey(str3) ? new HashMap() : super.getMetrics(str, str2, str3);
    }

    public PipelineLauncher.LaunchInfo launch(String str, String str2, PipelineLauncher.LaunchConfig launchConfig) throws IOException {
        String executeCommandAndParseResponse;
        Preconditions.checkState(launchConfig.sdk() != null, "Cannot launch a dataflow job without sdk specified. Please specify sdk and try again!");
        LOG.info("Getting ready to launch {} in {} under {}", new Object[]{launchConfig.jobName(), str2, str});
        LOG.info("Using parameters:\n{}", LogStrings.formatForLogging(launchConfig.parameters()));
        ArrayList arrayList = new ArrayList();
        switch (AnonymousClass1.$SwitchMap$org$apache$beam$it$common$PipelineLauncher$Sdk[launchConfig.sdk().ordinal()]) {
            case 1:
                Preconditions.checkState(launchConfig.pipeline() != null, "Cannot launch a dataflow job without pipeline specified. Please specify pipeline and try again!");
                PipelineOptions options = launchConfig.pipeline().getOptions();
                if (!"DataflowRunner".equalsIgnoreCase(launchConfig.getParameter(AbstractPipelineLauncher.PARAM_RUNNER))) {
                    options.setRunner(PipelineUtils.getRunnerClass(launchConfig.getParameter(AbstractPipelineLauncher.PARAM_RUNNER)));
                    options.setJobName(launchConfig.jobName());
                    HashMap hashMap = new HashMap();
                    hashMap.put("createTime", Timestamps.toString(Timestamps.fromMillis(System.currentTimeMillis())));
                    if (options.as(StreamingOptions.class).isStreaming()) {
                        hashMap.put(AbstractPipelineLauncher.PARAM_JOB_TYPE, "JOB_TYPE_STREAMING");
                    } else {
                        hashMap.put(AbstractPipelineLauncher.PARAM_JOB_TYPE, "JOB_TYPE_BATCH");
                    }
                    PipelineResult run = launchConfig.pipeline().run();
                    String jobName = launchConfig.jobName();
                    MANAGED_JOBS.put(jobName, run);
                    return PipelineLauncher.LaunchInfo.builder().setJobId(jobName).setProjectId(str).setRegion(str2).setCreateTime((String) hashMap.get("createTime")).setSdk("DirectBeam").setVersion("0.0.1").setJobType((String) hashMap.get(AbstractPipelineLauncher.PARAM_JOB_TYPE)).setRunner(launchConfig.getParameter(AbstractPipelineLauncher.PARAM_RUNNER)).setParameters(launchConfig.parameters()).setState(PipelineLauncher.JobState.RUNNING).build();
                }
                List<String> extractOptions = extractOptions(str, str2, launchConfig);
                if (options.as(StreamingOptions.class).isStreaming()) {
                    extractOptions.add("--streaming");
                }
                if (!Strings.isNullOrEmpty(options.getTempLocation())) {
                    extractOptions.add(String.format("--tempLocation=%s", options.getTempLocation()));
                }
                PipelineOptions create = PipelineOptionsFactory.fromArgs((String[]) extractOptions.toArray(new String[0])).create();
                create.setJobName(launchConfig.jobName());
                PipelineResult run2 = launchConfig.pipeline().run(create);
                executeCommandAndParseResponse = ((DataflowPipelineJob) run2).getJobId();
                UNMANAGED_JOBS.put(executeCommandAndParseResponse, run2);
                this.launchedJobs.add(executeCommandAndParseResponse);
                break;
            case 2:
                Preconditions.checkState(launchConfig.executable() != null, "Cannot launch a dataflow job without executable specified. Please specify executable and try again!");
                LOG.info("Using the executable at {}", launchConfig.executable());
                arrayList.add("python3");
                arrayList.add(launchConfig.executable());
                arrayList.addAll(extractOptions(str, str2, launchConfig));
                executeCommandAndParseResponse = executeCommandAndParseResponse(arrayList);
                break;
            case 3:
                Preconditions.checkState(launchConfig.executable() != null, "Cannot launch a dataflow job without executable specified. Please specify executable and try again!");
                LOG.info("Using the executable at {}", launchConfig.executable());
                arrayList.add("go");
                arrayList.add("run");
                arrayList.add(launchConfig.executable());
                arrayList.addAll(extractOptions(str, str2, launchConfig));
                executeCommandAndParseResponse = executeCommandAndParseResponse(arrayList);
                break;
            default:
                throw new RuntimeException(String.format("Invalid sdk %s specified. sdk can be one of java, python, or go.", launchConfig.sdk()));
        }
        PipelineLauncher.JobState waitUntilActive = waitUntilActive(str, str2, executeCommandAndParseResponse);
        Job job = getJob(str, str2, executeCommandAndParseResponse, "JOB_VIEW_DESCRIPTION");
        LOG.info("Received Dataflow job {}: {}", job.getId(), LogStrings.formatForLogging(job));
        return getJobInfo(launchConfig, waitUntilActive, job);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher
    public PipelineLauncher.LaunchInfo.Builder getJobInfoBuilder(PipelineLauncher.LaunchConfig launchConfig, PipelineLauncher.JobState jobState, Job job) {
        PipelineLauncher.LaunchInfo.Builder jobInfoBuilder = super.getJobInfoBuilder(launchConfig, jobState, job);
        String extractJobName = PipelineUtils.extractJobName(launchConfig.jobName());
        String str = null;
        if (extractJobName.endsWith("write")) {
            str = System.getProperty(WRITE_PIPELINE_NAME_OVERWRITE);
        } else if (extractJobName.endsWith("read")) {
            str = System.getProperty(READ_PIPELINE_NAME_OVERWRITE);
        }
        if (!Strings.isNullOrEmpty(str)) {
            extractJobName = str;
        }
        jobInfoBuilder.setPipelineName(extractJobName);
        return jobInfoBuilder;
    }

    private List<String> extractOptions(String str, String str2, PipelineLauncher.LaunchConfig launchConfig) {
        ArrayList arrayList = new ArrayList();
        String property = System.getProperty("beamTestPipelineOptions");
        if (!Strings.isNullOrEmpty(property)) {
            try {
                arrayList.addAll((Collection) MAPPER.readValue(property, List.class));
            } catch (IOException e) {
                throw new RuntimeException("Unable to instantiate test options from system property beamTestPipelineOptions:" + System.getProperty("beamTestPipelineOptions"), e);
            }
        }
        UnmodifiableIterator it = launchConfig.parameters().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            arrayList.add(String.format("--%s=%s", entry.getKey(), entry.getValue()));
        }
        arrayList.add(String.format("--project=%s", str));
        arrayList.add(String.format("--region=%s", str2));
        return arrayList;
    }

    private String executeCommandAndParseResponse(List<String> list) throws IOException {
        String str = new String(ByteStreams.toByteArray(new ProcessBuilder(new String[0]).command(list).redirectErrorStream(true).start().getInputStream()), StandardCharsets.UTF_8);
        Matcher matcher = JOB_ID_PATTERN.matcher(str);
        if (!matcher.find()) {
            throw new RuntimeException(String.format("Dataflow output in unexpected format. Failed to parse Dataflow Job ID. Result from process: %s", str));
        }
        String group = matcher.group(1);
        LOG.info("Submitted job: {}", group);
        return group;
    }

    /* synthetic */ DefaultPipelineLauncher(Builder builder, AnonymousClass1 anonymousClass1) {
        this(builder);
    }
}
