/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.io.Serializable;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.FusedPipeline;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Test;

public class ProcessBundleDescriptorsTest
implements Serializable {
    @Test
    public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exception {
        Pipeline p = Pipeline.create();
        VoidCoder keycoder = VoidCoder.of();
        MatcherAssert.assertThat((Object)ModelCoderRegistrar.isKnownCoder((Coder)keycoder), (Matcher)CoreMatchers.is((Object)false));
        ((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], KV<Void, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
            }
        }))).setCoder((Coder)KvCoder.of((Coder)keycoder, (Coder)StringUtf8Coder.of())).apply("userState", (PTransform)ParDo.of((DoFn)new DoFn<KV<Void, String>, KV<Void, String>>(){
            @DoFn.StateId(value="stateId")
            private final StateSpec<BagState<String>> bufferState = StateSpecs.bag((Coder)StringUtf8Coder.of());
            @DoFn.TimerId(value="timerId")
            private final TimerSpec timerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element KV<Void, String> element, @DoFn.StateId(value="stateId") BagState<String> state, @DoFn.TimerId(value="timerId") Timer timer, DoFn.OutputReceiver<KV<Void, String>> r) {
            }

            @DoFn.OnTimer(value="timerId")
            public void onTimer() {
            }
        }))).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> stage.getUserStates().stream().anyMatch(spec -> spec.localName().equals("stateId")));
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with user state.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        PipelineNode.PCollectionNode inputPCollection = stage2.getInputPCollection();
        Map stageCoderMap = stage2.getComponents().getCodersMap();
        RunnerApi.Coder originalMainInputCoder = (RunnerApi.Coder)stageCoderMap.get(inputPCollection.getPCollection().getCoderId());
        String originalKeyCoderId = ModelCoders.getKvCoderComponents((RunnerApi.Coder)originalMainInputCoder).keyCoderId();
        RunnerApi.Coder originalKeyCoder = (RunnerApi.Coder)stageCoderMap.get(originalKeyCoderId);
        MatcherAssert.assertThat((Object)originalKeyCoder.getSpec().getUrn(), (Matcher)CoreMatchers.is((Object)"beam:coders:javasdk:0.1"));
        BeamFnApi.ProcessBundleDescriptor pbd = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)Endpoints.ApiServiceDescriptor.getDefaultInstance()).getProcessBundleDescriptor();
        Map pbsCoderMap = pbd.getCodersMap();
        RunnerApi.Coder pbsMainInputCoder = (RunnerApi.Coder)pbsCoderMap.get(pbd.getPcollectionsOrThrow(inputPCollection.getId()).getCoderId());
        String keyCoderId = ModelCoders.getKvCoderComponents((RunnerApi.Coder)pbsMainInputCoder).keyCoderId();
        RunnerApi.Coder keyCoder = (RunnerApi.Coder)pbsCoderMap.get(keyCoderId);
        ProcessBundleDescriptorsTest.ensureLengthPrefixed(keyCoder, originalKeyCoder, pbsCoderMap);
        TimerReference timerRef = (TimerReference)Iterables.getOnlyElement((Iterable)stage2.getTimers());
        String timerTransformId = timerRef.transform().getId();
        RunnerApi.ParDoPayload parDoPayload = RunnerApi.ParDoPayload.parseFrom((ByteString)pbd.getTransformsOrThrow(timerTransformId).getSpec().getPayload());
        RunnerApi.TimerFamilySpec timerSpec = parDoPayload.getTimerFamilySpecsOrThrow(timerRef.localName());
        RunnerApi.Coder timerCoder = (RunnerApi.Coder)pbsCoderMap.get(timerSpec.getTimerFamilyCoderId());
        String timerKeyCoderId = timerCoder.getComponentCoderIds(0);
        RunnerApi.Coder timerKeyCoder = (RunnerApi.Coder)pbsCoderMap.get(timerKeyCoderId);
        ProcessBundleDescriptorsTest.ensureLengthPrefixed(timerKeyCoder, originalKeyCoder, pbsCoderMap);
    }

    @Test
    public void testLengthPrefixingOfInputCoderExecutableStage() throws Exception {
        Pipeline p = Pipeline.create();
        VoidCoder voidCoder = VoidCoder.of();
        MatcherAssert.assertThat((Object)ModelCoderRegistrar.isKnownCoder((Coder)voidCoder), (Matcher)CoreMatchers.is((Object)false));
        ((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply((PTransform)ParDo.of((DoFn)new DoFn<byte[], Void>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
            }
        }))).setCoder((Coder)voidCoder).apply((PTransform)ParDo.of((DoFn)new DoFn<Void, Void>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context, RestrictionTracker<Void, Void> tracker) {
            }

            @DoFn.GetInitialRestriction
            public Void getInitialRestriction() {
                return null;
            }

            @DoFn.NewTracker
            public SomeTracker newTracker(@DoFn.Restriction Void restriction) {
                return null;
            }
        }))).setCoder((Coder)voidCoder);
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        RunnerApi.Pipeline pipelineWithSdfExpanded = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)pipelineProto, (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineWithSdfExpanded);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> stage.getTransforms().stream().anyMatch(transform -> transform.getTransform().getSpec().getUrn().equals("beam:transform:sdf_process_sized_element_and_restrictions:v1")));
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        PipelineNode.PCollectionNode inputPCollection = stage2.getInputPCollection();
        Map stageCoderMap = stage2.getComponents().getCodersMap();
        RunnerApi.Coder originalMainInputCoder = (RunnerApi.Coder)stageCoderMap.get(inputPCollection.getPCollection().getCoderId());
        BeamFnApi.ProcessBundleDescriptor pbd = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)Endpoints.ApiServiceDescriptor.getDefaultInstance()).getProcessBundleDescriptor();
        Map pbsCoderMap = pbd.getCodersMap();
        RunnerApi.Coder pbsMainInputCoder = (RunnerApi.Coder)pbsCoderMap.get(pbd.getPcollectionsOrThrow(inputPCollection.getId()).getCoderId());
        RunnerApi.Coder kvCoder = (RunnerApi.Coder)pbsCoderMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder)pbsMainInputCoder).keyCoderId());
        RunnerApi.Coder keyCoder = (RunnerApi.Coder)pbsCoderMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder)kvCoder).keyCoderId());
        RunnerApi.Coder valueKvCoder = (RunnerApi.Coder)pbsCoderMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder)kvCoder).valueCoderId());
        RunnerApi.Coder valueCoder = (RunnerApi.Coder)pbsCoderMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder)valueKvCoder).keyCoderId());
        RunnerApi.Coder originalKvCoder = (RunnerApi.Coder)stageCoderMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder)originalMainInputCoder).keyCoderId());
        RunnerApi.Coder originalKeyCoder = (RunnerApi.Coder)stageCoderMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder)originalKvCoder).keyCoderId());
        RunnerApi.Coder originalvalueKvCoder = (RunnerApi.Coder)stageCoderMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder)originalKvCoder).valueCoderId());
        RunnerApi.Coder originalvalueCoder = (RunnerApi.Coder)stageCoderMap.get(ModelCoders.getKvCoderComponents((RunnerApi.Coder)originalvalueKvCoder).keyCoderId());
        ProcessBundleDescriptorsTest.ensureLengthPrefixed(keyCoder, originalKeyCoder, pbsCoderMap);
        ProcessBundleDescriptorsTest.ensureLengthPrefixed(valueCoder, originalvalueCoder, pbsCoderMap);
    }

    private static void ensureLengthPrefixed(RunnerApi.Coder coder, RunnerApi.Coder originalCoder, Map<String, RunnerApi.Coder> pbsCoderMap) {
        MatcherAssert.assertThat((Object)coder.getSpec().getUrn(), (Matcher)CoreMatchers.is((Object)ModelCoders.LENGTH_PREFIX_CODER_URN));
        String lengthPrefixedWrappedCoderId = coder.getComponentCoderIds(0);
        MatcherAssert.assertThat((Object)pbsCoderMap.get(lengthPrefixedWrappedCoderId), (Matcher)CoreMatchers.is((Object)originalCoder));
    }

    private static abstract class SomeTracker
    extends RestrictionTracker<Void, Void> {
        private SomeTracker() {
        }
    }
}

