package org.apache.beam.runners.jet;

import com.hazelcast.config.EventJournalConfig;
import com.hazelcast.config.MapConfig;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.JetTestInstanceFactory;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;

/* loaded from: input_file:org/apache/beam/runners/jet/TestJetRunner.class */
public class TestJetRunner extends PipelineRunner<PipelineResult> {
    private static final Map<String, JetTransformTranslator> TRANSLATORS = new HashMap();
    private final JetTestInstanceFactory factory = new JetTestInstanceFactory();
    private final JetRunner delegate;

    /* loaded from: input_file:org/apache/beam/runners/jet/TestJetRunner$TestStreamTranslator.class */
    private static class TestStreamTranslator<T> implements JetTransformTranslator<PTransform<PBegin, PCollection<T>>> {
        private TestStreamTranslator() {
        }

        public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedPTransform, TransformHierarchy.Node node, JetTranslationContext jetTranslationContext) {
            String fullName = appliedPTransform.getFullName();
            DAGBuilder dagBuilder = jetTranslationContext.getDagBuilder();
            String newVertexId = dagBuilder.newVertexId(fullName);
            TestStream transform = appliedPTransform.getTransform();
            Map.Entry output = Utils.getOutput(appliedPTransform);
            Coder coder = Utils.getCoder((PCollection) output.getValue());
            TestStream.TestStreamCoder of = TestStream.TestStreamCoder.of(transform.getValueCoder());
            Vertex addVertex = dagBuilder.addVertex(newVertexId, TestStreamP.supplier(getEncodedPayload(transform, of), of, coder));
            String tupleTagId = Utils.getTupleTagId((PValue) output.getValue());
            dagBuilder.registerCollectionOfEdge(tupleTagId, ((TupleTag) output.getKey()).getId());
            dagBuilder.registerEdgeStartPoint(tupleTagId, addVertex, coder);
            return addVertex;
        }

        private static <T> byte[] getEncodedPayload(TestStream<T> testStream, TestStream.TestStreamCoder<T> testStreamCoder) {
            try {
                return CoderUtils.encodeToByteArray(testStreamCoder, testStream);
            } catch (CoderException e) {
                throw ExceptionUtil.rethrow(e);
            }
        }
    }

    private TestJetRunner(PipelineOptions pipelineOptions) {
        JetTestInstanceFactory jetTestInstanceFactory = this.factory;
        Objects.requireNonNull(jetTestInstanceFactory);
        this.delegate = JetRunner.fromOptions(pipelineOptions, jetTestInstanceFactory::newClient);
        this.delegate.addExtraTranslators(TestJetRunner::getTranslator);
    }

    public static TestJetRunner fromOptions(PipelineOptions pipelineOptions) {
        return new TestJetRunner(pipelineOptions);
    }

    public PipelineResult run(Pipeline pipeline) {
        Collection<JetInstance> initMemberInstances = initMemberInstances(this.factory);
        try {
            FailedRunningPipelineResults run = this.delegate.run(pipeline);
            if (run instanceof FailedRunningPipelineResults) {
                throw run.getCause();
            }
            run.waitUntilFinish();
            killMemberInstances(initMemberInstances, this.factory);
            return run;
        } catch (Throwable th) {
            killMemberInstances(initMemberInstances, this.factory);
            throw th;
        }
    }

    private static Collection<JetInstance> initMemberInstances(JetTestInstanceFactory jetTestInstanceFactory) {
        JetConfig configureHazelcast = new JetConfig().configureHazelcast(config -> {
            config.addMapConfig(new MapConfig("map").setEventJournalConfig(new EventJournalConfig().setEnabled(true)));
        });
        return Arrays.asList(jetTestInstanceFactory.newMember(configureHazelcast), jetTestInstanceFactory.newMember(configureHazelcast));
    }

    private static void killMemberInstances(Collection<JetInstance> collection, JetTestInstanceFactory jetTestInstanceFactory) {
        if (collection.isEmpty()) {
            return;
        }
        jetTestInstanceFactory.shutdownAll();
    }

    private static JetTransformTranslator<?> getTranslator(PTransform<?, ?> pTransform) {
        String urnForTransformOrNull = PTransformTranslation.urnForTransformOrNull(pTransform);
        if (urnForTransformOrNull == null) {
            return null;
        }
        return TRANSLATORS.get(urnForTransformOrNull);
    }

    static {
        TRANSLATORS.put("beam:transform:teststream:v1", new TestStreamTranslator());
    }
}
