package org.apache.beam.runners.fnexecution.control;

import java.io.Serializable;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.class */
public class ProcessBundleDescriptorsTest implements Serializable {
    @Test
    public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exception {
        Pipeline create = Pipeline.create();
        VoidCoder of = VoidCoder.of();
        MatcherAssert.assertThat(Boolean.valueOf(ModelCoderRegistrar.isKnownCoder(of)), CoreMatchers.is(false));
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], KV<Void, String>>() { // from class: org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptorsTest.1
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<Void, String>>.ProcessContext processContext) {
            }
        })).setCoder(KvCoder.of(of, StringUtf8Coder.of())).apply("userState", ParDo.of(new DoFn<KV<Void, String>, KV<Void, String>>() { // from class: org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptorsTest.2

            @DoFn.StateId("stateId")
            private final StateSpec<BagState<String>> bufferState = StateSpecs.bag(StringUtf8Coder.of());

            @DoFn.TimerId("timerId")
            private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element KV<Void, String> kv, @DoFn.StateId("stateId") BagState<String> bagState, @DoFn.TimerId("timerId") Timer timer, DoFn.OutputReceiver<KV<Void, String>> outputReceiver) {
            }

            @DoFn.OnTimer("timerId")
            public void onTimer() {
            }
        })).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return executableStage.getUserStates().stream().anyMatch(userStateReference -> {
                return userStateReference.localName().equals("stateId");
            });
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with user state.");
        ExecutableStage executableStage2 = (ExecutableStage) tryFind.get();
        PipelineNode.PCollectionNode inputPCollection = executableStage2.getInputPCollection();
        Map codersMap = executableStage2.getComponents().getCodersMap();
        RunnerApi.Coder coder = (RunnerApi.Coder) codersMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder) codersMap.get(inputPCollection.getPCollection().getCoderId())).keyCoderId());
        MatcherAssert.assertThat(coder.getSpec().getUrn(), CoreMatchers.is("beam:coders:javasdk:0.1"));
        BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = ProcessBundleDescriptors.fromExecutableStage("test_stage", executableStage2, Endpoints.ApiServiceDescriptor.getDefaultInstance()).getProcessBundleDescriptor();
        Map codersMap2 = processBundleDescriptor.getCodersMap();
        ensureLengthPrefixed((RunnerApi.Coder) codersMap2.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder) codersMap2.get(processBundleDescriptor.getPcollectionsOrThrow(inputPCollection.getId()).getCoderId())).keyCoderId()), coder, codersMap2);
        TimerReference timerReference = (TimerReference) Iterables.getOnlyElement(executableStage2.getTimers());
        ensureLengthPrefixed((RunnerApi.Coder) codersMap2.get(((RunnerApi.Coder) codersMap2.get(RunnerApi.ParDoPayload.parseFrom(processBundleDescriptor.getTransformsOrThrow(timerReference.transform().getId()).getSpec().getPayload()).getTimerFamilySpecsOrThrow(timerReference.localName()).getTimerFamilyCoderId())).getComponentCoderIds(0)), coder, codersMap2);
    }

    private static void ensureLengthPrefixed(RunnerApi.Coder coder, RunnerApi.Coder coder2, Map<String, RunnerApi.Coder> map) {
        MatcherAssert.assertThat(coder.getSpec().getUrn(), CoreMatchers.is(ModelCoders.LENGTH_PREFIX_CODER_URN));
        MatcherAssert.assertThat(map.get(coder.getComponentCoderIds(0)), CoreMatchers.is(coder2));
    }
}
