package org.apache.beam.runners.flink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkJobInvocation.class */
public class FlinkJobInvocation implements JobInvocation {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FlinkJobInvocation.class);
    private final String id;
    private final String retrievalToken;
    private final ListeningExecutorService executorService;
    private final RunnerApi.Pipeline pipeline;
    private final FlinkPipelineOptions pipelineOptions;
    private final String confDir;
    private final List<String> filesToStage;

    @Nullable
    private ListenableFuture<PipelineResult> invocationFuture = null;
    private JobApi.JobState.Enum jobState = JobApi.JobState.Enum.STOPPED;
    private List<Consumer<JobApi.JobState.Enum>> stateObservers = new ArrayList();
    private List<Consumer<JobApi.JobMessage>> messageObservers = new ArrayList();

    public static FlinkJobInvocation create(String str, String str2, ListeningExecutorService listeningExecutorService, RunnerApi.Pipeline pipeline, FlinkPipelineOptions flinkPipelineOptions, @Nullable String str3, List<String> list) {
        return new FlinkJobInvocation(str, str2, listeningExecutorService, pipeline, flinkPipelineOptions, str3, list);
    }

    private FlinkJobInvocation(String str, String str2, ListeningExecutorService listeningExecutorService, RunnerApi.Pipeline pipeline, FlinkPipelineOptions flinkPipelineOptions, @Nullable String str3, List<String> list) {
        this.id = str;
        this.retrievalToken = str2;
        this.executorService = listeningExecutorService;
        this.pipeline = pipeline;
        this.pipelineOptions = flinkPipelineOptions;
        this.confDir = str3;
        this.filesToStage = list;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator] */
    private PipelineResult runPipeline() throws Exception {
        MetricsEnvironment.setMetricsSupported(false);
        return runPipelineWithTranslator((this.pipelineOptions.isStreaming() || hasUnboundedPCollections(this.pipeline)) ? new FlinkStreamingPortablePipelineTranslator() : FlinkBatchPortablePipelineTranslator.createTranslator());
    }

    private <T extends FlinkPortablePipelineTranslator.TranslationContext> PipelineResult runPipelineWithTranslator(FlinkPortablePipelineTranslator<T> flinkPortablePipelineTranslator) throws Exception {
        LOG.info("Translating pipeline to Flink program.");
        RunnerApi.Pipeline makeKnownUrnsPrimitives = makeKnownUrnsPrimitives(this.pipeline, Sets.difference(flinkPortablePipelineTranslator.knownUrns(), ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
        return FlinkRunner.createPipelineResult(flinkPortablePipelineTranslator.translate(flinkPortablePipelineTranslator.createTranslationContext(JobInfo.create(this.id, this.pipelineOptions.getJobName(), this.retrievalToken, PipelineOptionsTranslation.toProto(this.pipelineOptions)), this.pipelineOptions, this.confDir, this.filesToStage), makeKnownUrnsPrimitives.getComponents().getTransformsMap().values().stream().anyMatch(pTransform -> {
            return ExecutableStage.URN.equals(pTransform.getSpec().getUrn());
        }) ? makeKnownUrnsPrimitives : GreedyPipelineFuser.fuse(makeKnownUrnsPrimitives).toPipeline()).execute(this.pipelineOptions.getJobName()), this.pipelineOptions);
    }

    private RunnerApi.Pipeline makeKnownUrnsPrimitives(RunnerApi.Pipeline pipeline, Set<String> set) {
        RunnerApi.Pipeline.Builder builder = pipeline.toBuilder();
        for (String str : pipeline.getComponents().getTransformsMap().keySet()) {
            if (set.contains(pipeline.getComponents().getTransformsOrThrow(str).getSpec().getUrn())) {
                LOG.debug("Removing descendants of known PTransform {}" + str);
                removeDescendants(builder, str);
            }
        }
        return builder.build();
    }

    private void removeDescendants(RunnerApi.Pipeline.Builder builder, String str) {
        RunnerApi.PTransform transformsOrDefault = builder.getComponents().getTransformsOrDefault(str, null);
        if (transformsOrDefault != null) {
            for (String str2 : transformsOrDefault.getSubtransformsList()) {
                removeDescendants(builder, str2);
                builder.getComponentsBuilder().removeTransforms(str2);
            }
            builder.getComponentsBuilder().putTransforms(str, transformsOrDefault.toBuilder().clearSubtransforms().build());
        }
    }

    @Override // org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
    public synchronized void start() {
        LOG.info("Starting job invocation {}", getId());
        if (getState() != JobApi.JobState.Enum.STOPPED) {
            throw new IllegalStateException(String.format("Job %s already running.", getId()));
        }
        setState(JobApi.JobState.Enum.STARTING);
        this.invocationFuture = this.executorService.submit(this::runPipeline);
        setState(JobApi.JobState.Enum.RUNNING);
        Futures.addCallback(this.invocationFuture, new FutureCallback<PipelineResult>() { // from class: org.apache.beam.runners.flink.FlinkJobInvocation.1
            @Override // org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(@Nullable PipelineResult pipelineResult) {
                if (pipelineResult == null) {
                    FlinkJobInvocation.this.setState(JobApi.JobState.Enum.UNSPECIFIED);
                } else {
                    Preconditions.checkArgument(pipelineResult.getState() == PipelineResult.State.DONE, "Success on non-Done state: " + pipelineResult.getState());
                    FlinkJobInvocation.this.setState(JobApi.JobState.Enum.DONE);
                }
            }

            @Override // org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                FlinkJobInvocation.LOG.error(String.format("Error during job invocation %s.", FlinkJobInvocation.this.getId()), th);
                FlinkJobInvocation.this.sendMessage(JobApi.JobMessage.newBuilder().setMessageText(Throwables.getStackTraceAsString(th)).setImportance(JobApi.JobMessage.MessageImportance.JOB_MESSAGE_DEBUG).build());
                FlinkJobInvocation.this.sendMessage(JobApi.JobMessage.newBuilder().setMessageText(Throwables.getRootCause(th).toString()).setImportance(JobApi.JobMessage.MessageImportance.JOB_MESSAGE_ERROR).build());
                FlinkJobInvocation.this.setState(JobApi.JobState.Enum.FAILED);
            }
        }, this.executorService);
    }

    @Override // org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
    public String getId() {
        return this.id;
    }

    @Override // org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
    public synchronized void cancel() {
        LOG.info("Canceling job invocation {}", getId());
        if (this.invocationFuture != null) {
            this.invocationFuture.cancel(true);
            Futures.addCallback(this.invocationFuture, new FutureCallback<PipelineResult>() { // from class: org.apache.beam.runners.flink.FlinkJobInvocation.2
                @Override // org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback
                public void onSuccess(@Nullable PipelineResult pipelineResult) {
                    if (pipelineResult != null) {
                        try {
                            pipelineResult.cancel();
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }

                @Override // org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                }
            }, this.executorService);
        }
    }

    @Override // org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
    public JobApi.JobState.Enum getState() {
        return this.jobState;
    }

    @Override // org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
    public synchronized void addStateListener(Consumer<JobApi.JobState.Enum> consumer) {
        consumer.accept(getState());
        this.stateObservers.add(consumer);
    }

    @Override // org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
    public synchronized void addMessageListener(Consumer<JobApi.JobMessage> consumer) {
        this.messageObservers.add(consumer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void setState(JobApi.JobState.Enum r4) {
        this.jobState = r4;
        Iterator<Consumer<JobApi.JobState.Enum>> it = this.stateObservers.iterator();
        while (it.hasNext()) {
            it.next().accept(r4);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void sendMessage(JobApi.JobMessage jobMessage) {
        Iterator<Consumer<JobApi.JobMessage>> it = this.messageObservers.iterator();
        while (it.hasNext()) {
            it.next().accept(jobMessage);
        }
    }

    private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
        Preconditions.checkNotNull(pipeline);
        return pipeline.getComponents().getPcollectionsMap().values().stream().anyMatch(pCollection -> {
            return pCollection.getIsBounded() == RunnerApi.IsBounded.Enum.UNBOUNDED;
        });
    }
}
