package org.apache.beam.it.gcp.dataflow;

import com.google.api.client.util.ArrayMap;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Environment;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.ListJobMessagesResponse;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.logging.LogStrings;
import org.apache.beam.it.common.utils.RetryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.class */
public abstract class AbstractPipelineLauncher implements PipelineLauncher {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractPipelineLauncher.class);
    private static final Pattern CURRENT_METRICS = Pattern.compile(".*Current.*");
    public static final String LEGACY_RUNNER = "Dataflow Legacy Runner";
    public static final String RUNNER_V2 = "Dataflow Runner V2";
    public static final String PARAM_RUNNER = "runner";
    public static final String PARAM_JOB_TYPE = "jobType";
    public static final String PARAM_JOB_ID = "jobId";
    protected final List<String> launchedJobs = new ArrayList();
    protected final Dataflow client;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractPipelineLauncher(Dataflow dataflow) {
        this.client = dataflow;
    }

    public Job getJob(String str, String str2, String str3) throws IOException {
        return getJob(str, str2, str3, "JOB_VIEW_UNKNOWN");
    }

    public Job getJob(String str, String str2, String str3, String str4) {
        return (Job) Failsafe.with(RetryUtil.clientRetryPolicy(), new RetryPolicy[0]).get(() -> {
            return (Job) this.client.projects().locations().jobs().get(str, str2, str3).setView(str4).execute();
        });
    }

    public PipelineLauncher.JobState getJobStatus(String str, String str2, String str3) throws IOException {
        return handleJobState(getJob(str, str2, str3));
    }

    public List<JobMessage> listMessages(String str, String str2, String str3, String str4) {
        LOG.info("Listing messages of {} under {}", str3, str);
        List<JobMessage> jobMessages = ((ListJobMessagesResponse) Failsafe.with(RetryUtil.clientRetryPolicy(), new RetryPolicy[0]).get(() -> {
            return (ListJobMessagesResponse) this.client.projects().locations().jobs().messages().list(str, str2, str3).setMinimumImportance(str4).execute();
        })).getJobMessages();
        LOG.info("Received {} messages for {} under {}", new Object[]{Integer.valueOf(jobMessages.size()), str3, str});
        return jobMessages;
    }

    public Job cancelJob(String str, String str2, String str3) {
        LOG.info("Cancelling {} under {}", str3, str);
        Job requestedState = new Job().setRequestedState(PipelineLauncher.JobState.CANCELLED.toString());
        LOG.info("Sending job to update {}:\n{}", str3, LogStrings.formatForLogging(requestedState));
        return (Job) Failsafe.with(RetryUtil.clientRetryPolicy(), new RetryPolicy[0]).get(() -> {
            return (Job) this.client.projects().locations().jobs().update(str, str2, str3, requestedState).execute();
        });
    }

    public Job drainJob(String str, String str2, String str3) {
        LOG.info("Draining {} under {}", str3, str);
        Job requestedState = new Job().setRequestedState(PipelineLauncher.JobState.DRAINED.toString());
        LOG.info("Sending job to update {}:\n{}", str3, LogStrings.formatForLogging(requestedState));
        return (Job) Failsafe.with(RetryUtil.clientRetryPolicy(), new RetryPolicy[0]).get(() -> {
            return (Job) this.client.projects().locations().jobs().update(str, str2, str3, requestedState).execute();
        });
    }

    @Nullable
    public Double getMetric(String str, String str2, String str3, String str4) throws IOException {
        LOG.info("Getting '{}' metric for {} under {}", new Object[]{str4, str3, str});
        List<MetricUpdate> metrics = ((JobMetrics) this.client.projects().locations().jobs().getMetrics(str, str2, str3).execute()).getMetrics();
        if (metrics == null) {
            LOG.warn("No metrics received for the job {} under {}.", str3, str);
            return null;
        }
        for (MetricUpdate metricUpdate : metrics) {
            String name = metricUpdate.getName().getName();
            String str5 = (String) metricUpdate.getName().getContext().get("original_name");
            if (Objects.equals(str4, name) || Objects.equals(str4, str5)) {
                if (metricUpdate.getScalar() != null) {
                    return Double.valueOf(((Number) metricUpdate.getScalar()).doubleValue());
                }
                LOG.warn("The given metric '{}' is not a scalar metric. Please use getMetrics instead.", str4);
                return null;
            }
        }
        LOG.warn("Unable to find '{}' metric for {} under {}. Please check the metricName and try again!", new Object[]{str4, str3, str});
        return null;
    }

    public Map<String, Double> getMetrics(String str, String str2, String str3) throws IOException {
        LOG.info("Getting metrics for {} under {}", str3, str);
        List<MetricUpdate> metrics = ((JobMetrics) this.client.projects().locations().jobs().getMetrics(str, str2, str3).execute()).getMetrics();
        HashMap hashMap = new HashMap();
        for (MetricUpdate metricUpdate : metrics) {
            String name = metricUpdate.getName().getName();
            Matcher matcher = CURRENT_METRICS.matcher(name);
            if (!metricUpdate.getName().getContext().containsKey("tentative") && !metricUpdate.getName().getContext().containsKey("execution_step") && !metricUpdate.getName().getContext().containsKey("step") && !name.equals("MeanByteCount") && !name.equals("ElementCount") && !matcher.find()) {
                if (hashMap.containsKey(name)) {
                    LOG.warn("Key {} already exists in metrics. Something might be wrong.", name);
                }
                if (metricUpdate.getScalar() != null) {
                    hashMap.put(name, Double.valueOf(((Number) metricUpdate.getScalar()).doubleValue()));
                } else if (metricUpdate.getDistribution() != null) {
                    ArrayMap arrayMap = (ArrayMap) metricUpdate.getDistribution();
                    hashMap.put(name + "_COUNT", Double.valueOf(((Number) arrayMap.get("count")).doubleValue()));
                    hashMap.put(name + "_MIN", Double.valueOf(((Number) arrayMap.get("min")).doubleValue()));
                    hashMap.put(name + "_MAX", Double.valueOf(((Number) arrayMap.get("max")).doubleValue()));
                    hashMap.put(name + "_SUM", Double.valueOf(((Number) arrayMap.get("sum")).doubleValue()));
                } else if (metricUpdate.getGauge() != null) {
                    LOG.warn("Gauge metric {} cannot be handled.", name);
                }
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void printJobResponse(Job job) {
        LOG.info("Received job response: {}", LogStrings.formatForLogging(job));
        LOG.info("Dataflow Console: https://console.cloud.google.com/dataflow/jobs/{}/{}?project={}", new Object[]{job.getLocation(), job.getId(), job.getProjectId()});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PipelineLauncher.JobState handleJobState(Job job) {
        String currentState = job.getCurrentState();
        return Strings.isNullOrEmpty(currentState) ? PipelineLauncher.JobState.UNKNOWN : PipelineLauncher.JobState.parse(currentState);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PipelineLauncher.LaunchInfo.Builder getJobInfoBuilder(PipelineLauncher.LaunchConfig launchConfig, PipelineLauncher.JobState jobState, Job job) {
        Map labels = job.getLabels();
        String str = LEGACY_RUNNER;
        Environment environment = job.getEnvironment();
        if (environment != null && environment.getExperiments() != null && environment.getExperiments().contains("use_runner_v2")) {
            str = RUNNER_V2;
        }
        PipelineLauncher.LaunchInfo.Builder state = PipelineLauncher.LaunchInfo.builder().setProjectId(job.getProjectId()).setJobId(job.getId()).setRegion(job.getLocation()).setCreateTime(job.getCreateTime()).setSdk(job.getJobMetadata().getSdkVersion().getVersionDisplayName()).setVersion(job.getJobMetadata().getSdkVersion().getVersion()).setJobType(job.getType()).setRunner(str).setState(jobState);
        HashMap hashMap = new HashMap((Map) launchConfig.parameters());
        launchConfig.environment().forEach((str2, obj) -> {
        });
        hashMap.put(PARAM_RUNNER, str);
        hashMap.put(PARAM_JOB_TYPE, job.getType());
        hashMap.put(PARAM_JOB_ID, job.getId());
        state.setParameters(ImmutableMap.copyOf(hashMap));
        if (labels != null && !labels.isEmpty()) {
            state.setTemplateType((String) job.getLabels().get("goog-dataflow-provided-template-type")).setTemplateVersion((String) job.getLabels().get("goog-dataflow-provided-template-version")).setTemplateName((String) job.getLabels().get("goog-dataflow-provided-template-name"));
        }
        return state;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final PipelineLauncher.LaunchInfo getJobInfo(PipelineLauncher.LaunchConfig launchConfig, PipelineLauncher.JobState jobState, Job job) {
        return getJobInfoBuilder(launchConfig, jobState, job).build();
    }

    public PipelineLauncher.JobState waitUntilActive(String str, String str2, String str3) throws IOException {
        PipelineLauncher.JobState jobState;
        PipelineLauncher.JobState jobStatus = getJobStatus(str, str2, str3);
        while (true) {
            jobState = jobStatus;
            if (!PipelineLauncher.JobState.PENDING_STATES.contains(jobState)) {
                break;
            }
            LOG.info("Job still pending. Will check again in 15 seconds");
            try {
                TimeUnit.SECONDS.sleep(15L);
            } catch (InterruptedException e) {
                LOG.warn("Wait interrupted. Checking now.");
            }
            jobStatus = getJobStatus(str, str2, str3);
        }
        if (jobState == PipelineLauncher.JobState.FAILED) {
            throw new RuntimeException(String.format("The job failed before launch! For more information please check if the job log for Job ID: %s, under project %s.", str3, str));
        }
        return jobState;
    }

    public synchronized void cleanupAll() throws IOException {
        LOG.info("Cleaning up Dataflow jobs...");
        for (String str : this.launchedJobs) {
            try {
                PipelineLauncher.JobState jobStatus = getJobStatus(TestProperties.project(), TestProperties.region(), str);
                if (PipelineLauncher.JobState.ACTIVE_STATES.contains(jobStatus) || PipelineLauncher.JobState.PENDING_STATES.contains(jobStatus)) {
                    cancelJob(TestProperties.project(), TestProperties.region(), str);
                }
            } catch (Exception e) {
                LOG.warn("Unable to cancel {}. Encountered error.", str, e);
            }
        }
        LOG.info("Dataflow jobs successfully cleaned up.");
    }
}
