package org.apache.beam.runners.jet;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.map.IMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.jet.metrics.JetMetricsContainer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.transforms.PTransform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/jet/JetRunner.class */
public class JetRunner extends PipelineRunner<PipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(JetRunner.class);
    private final JetPipelineOptions options;
    private final Function<ClientConfig, JetInstance> jetClientSupplier;
    private Function<PTransform<?, ?>, JetTransformTranslator<?>> translatorProvider = JetTransformTranslators::getTranslator;

    public static JetRunner fromOptions(PipelineOptions pipelineOptions) {
        return fromOptions(pipelineOptions, Jet::newJetClient);
    }

    public static JetRunner fromOptions(PipelineOptions pipelineOptions, Function<ClientConfig, JetInstance> function) {
        return new JetRunner(pipelineOptions, function);
    }

    private JetRunner(PipelineOptions pipelineOptions, Function<ClientConfig, JetInstance> function) {
        this.options = validate((JetPipelineOptions) pipelineOptions.as(JetPipelineOptions.class));
        this.jetClientSupplier = function;
    }

    public PipelineResult run(Pipeline pipeline) {
        try {
            normalize(pipeline);
            return run(translate(pipeline));
        } catch (UnsupportedOperationException e) {
            LOG.error("Failed running pipeline!", e);
            return new FailedRunningPipelineResults(e);
        }
    }

    void addExtraTranslators(Function<PTransform<?, ?>, JetTransformTranslator<?>> function) {
        Function<PTransform<?, ?>, JetTransformTranslator<?>> function2 = this.translatorProvider;
        this.translatorProvider = pTransform -> {
            JetTransformTranslator jetTransformTranslator = (JetTransformTranslator) function2.apply(pTransform);
            if (jetTransformTranslator == null) {
                jetTransformTranslator = (JetTransformTranslator) function.apply(pTransform);
            }
            return jetTransformTranslator;
        };
    }

    private void normalize(Pipeline pipeline) {
        pipeline.replaceAll(getDefaultOverrides());
        UnconsumedReads.ensureAllReadsConsumed(pipeline);
    }

    private DAG translate(Pipeline pipeline) {
        JetGraphVisitor jetGraphVisitor = new JetGraphVisitor(this.options, this.translatorProvider);
        pipeline.traverseTopologically(jetGraphVisitor);
        return jetGraphVisitor.getDAG();
    }

    private JetPipelineResult run(DAG dag) {
        startClusterIfNeeded(this.options);
        JetInstance jetInstance = getJetInstance(this.options);
        Job newJob = jetInstance.newJob(dag, getJobConfig(this.options));
        IMap map = jetInstance.getMap(JetMetricsContainer.getMetricsMapName(newJob.getId()));
        JetPipelineResult jetPipelineResult = new JetPipelineResult(newJob, map);
        jetPipelineResult.setCompletionFuture(newJob.getFuture().whenCompleteAsync((r7, th) -> {
            jetPipelineResult.freeze(th);
            map.destroy();
            jetInstance.shutdown();
            stopClusterIfNeeded(this.options);
        }));
        return jetPipelineResult;
    }

    private void startClusterIfNeeded(JetPipelineOptions jetPipelineOptions) {
        Integer jetLocalMode = jetPipelineOptions.getJetLocalMode();
        if (jetLocalMode.intValue() > 0) {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < jetLocalMode.intValue(); i++) {
                arrayList.add(Jet.newJetInstance());
            }
            LOG.info("Started " + arrayList.size() + " Jet cluster members");
        }
    }

    private void stopClusterIfNeeded(JetPipelineOptions jetPipelineOptions) {
        if (jetPipelineOptions.getJetLocalMode().intValue() > 0) {
            Jet.shutdownAll();
            LOG.info("Stopped all Jet cluster members");
        }
    }

    private JobConfig getJobConfig(JetPipelineOptions jetPipelineOptions) {
        String codeJarPathname;
        JobConfig jobConfig = new JobConfig();
        String jobName = jetPipelineOptions.getJobName();
        if (jobName != null) {
            jobConfig.setName(jobName);
        }
        if ((jetPipelineOptions.getJetLocalMode().intValue() <= 0) && (codeJarPathname = jetPipelineOptions.getCodeJarPathname()) != null && !codeJarPathname.isEmpty()) {
            jobConfig.addJar(codeJarPathname);
        }
        return jobConfig;
    }

    private JetInstance getJetInstance(JetPipelineOptions jetPipelineOptions) {
        String clusterName = jetPipelineOptions.getClusterName();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setClusterName(clusterName);
        if (jetPipelineOptions.getJetLocalMode().intValue() <= 0) {
            clientConfig.getNetworkConfig().setAddresses(Arrays.asList(jetPipelineOptions.getJetServers().split(",")));
        }
        return this.jetClientSupplier.apply(clientConfig);
    }

    private static List<PTransformOverride> getDefaultOverrides() {
        return Arrays.asList(PTransformOverride.of(PTransformMatchers.splittableParDo(), new SplittableParDo.OverrideFactory()), PTransformOverride.of(PTransformMatchers.splittableProcessKeyedBounded(), new SplittableParDoNaiveBounded.OverrideFactory()), PTransformOverride.of(PTransformMatchers.splittableProcessKeyedUnbounded(), new SplittableParDoViaKeyedWorkItems.OverrideFactory()));
    }

    private static JetPipelineOptions validate(JetPipelineOptions jetPipelineOptions) {
        if (jetPipelineOptions.getClusterName() == null) {
            throw new IllegalArgumentException("Jet cluster name not set in options");
        }
        Integer jetDefaultParallelism = jetPipelineOptions.getJetDefaultParallelism();
        if (jetDefaultParallelism == null) {
            throw new IllegalArgumentException("Jet node local parallelism must be specified");
        }
        if (jetDefaultParallelism.intValue() == -1 || jetDefaultParallelism.intValue() >= 1) {
            return jetPipelineOptions;
        }
        throw new IllegalArgumentException("Jet node local parallelism must be >1 or -1");
    }
}
