package org.apache.beam.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableTable;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.SyntheticComponents;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.class */
public class ProcessBundleDescriptors {

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors$ExecutableProcessBundleDescriptor.class */
    public static abstract class ExecutableProcessBundleDescriptor {
        public static ExecutableProcessBundleDescriptor of(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, RemoteInputDestination<WindowedValue<?>> remoteInputDestination, Map<BeamFnApi.Target, Coder<WindowedValue<?>>> map, Map<String, Map<String, MultimapSideInputSpec>> map2) {
            ImmutableTable.Builder builder = ImmutableTable.builder();
            for (Map.Entry<String, Map<String, MultimapSideInputSpec>> entry : map2.entrySet()) {
                for (Map.Entry<String, MultimapSideInputSpec> entry2 : entry.getValue().entrySet()) {
                    builder.put(entry.getKey(), entry2.getKey(), entry2.getValue());
                }
            }
            return new AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor(processBundleDescriptor, remoteInputDestination, Collections.unmodifiableMap(map), builder.build().rowMap());
        }

        public abstract BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor();

        public abstract RemoteInputDestination<WindowedValue<?>> getRemoteInputDestination();

        public abstract Map<BeamFnApi.Target, Coder<WindowedValue<?>>> getOutputTargetCoders();

        public abstract Map<String, Map<String, MultimapSideInputSpec>> getMultimapSideInputSpecs();
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors$MultimapSideInputSpec.class */
    public static abstract class MultimapSideInputSpec<K, V, W extends BoundedWindow> {
        static <K, V, W extends BoundedWindow> MultimapSideInputSpec<K, V, W> of(String str, String str2, Coder<K> coder, Coder<V> coder2, Coder<W> coder3) {
            return new AutoValue_ProcessBundleDescriptors_MultimapSideInputSpec(str, str2, coder, coder2, coder3);
        }

        public abstract String transformId();

        public abstract String sideInputId();

        public abstract Coder<K> keyCoder();

        public abstract Coder<V> valueCoder();

        public abstract Coder<W> windowCoder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors$TargetEncoding.class */
    public static abstract class TargetEncoding {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BeamFnApi.Target getTarget();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<WindowedValue<?>> getCoder();
    }

    public static ExecutableProcessBundleDescriptor fromExecutableStage(String str, ExecutableStage executableStage, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Endpoints.ApiServiceDescriptor apiServiceDescriptor2) throws IOException {
        Preconditions.checkState(str != null, "id must be specified.");
        Preconditions.checkState(executableStage != null, "stage must be specified.");
        Preconditions.checkState(apiServiceDescriptor != null, "dataEndpoint must be specified.");
        Preconditions.checkState(apiServiceDescriptor2 != null, "stateEndpoint must be specified.");
        return fromExecutableStageInternal(str, executableStage, apiServiceDescriptor, apiServiceDescriptor2);
    }

    public static ExecutableProcessBundleDescriptor fromExecutableStage(String str, ExecutableStage executableStage, Endpoints.ApiServiceDescriptor apiServiceDescriptor) throws IOException {
        Preconditions.checkState(str != null, "id must be specified.");
        Preconditions.checkState(executableStage != null, "stage must be specified.");
        Preconditions.checkState(apiServiceDescriptor != null, "dateEndpoint must be specified.");
        return fromExecutableStageInternal(str, executableStage, apiServiceDescriptor, null);
    }

    private static ExecutableProcessBundleDescriptor fromExecutableStageInternal(String str, ExecutableStage executableStage, Endpoints.ApiServiceDescriptor apiServiceDescriptor, @Nullable Endpoints.ApiServiceDescriptor apiServiceDescriptor2) throws IOException {
        RunnerApi.Components components = executableStage.getComponents();
        BeamFnApi.ProcessBundleDescriptor.Builder putAllTransforms = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(str).putAllCoders(components.getCodersMap()).putAllEnvironments(components.getEnvironmentsMap()).putAllPcollections(components.getPcollectionsMap()).putAllWindowingStrategies(components.getWindowingStrategiesMap()).putAllTransforms((Map) executableStage.getTransforms().stream().collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, (v0) -> {
            return v0.getTransform();
        })));
        if (apiServiceDescriptor2 != null) {
            putAllTransforms.setStateApiServiceDescriptor(apiServiceDescriptor2);
        }
        RemoteInputDestination<WindowedValue<?>> addStageInput = addStageInput(executableStage.getInputPCollection(), components, apiServiceDescriptor, putAllTransforms);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Iterator it = executableStage.getOutputPCollections().iterator();
        while (it.hasNext()) {
            TargetEncoding addStageOutput = addStageOutput(components, apiServiceDescriptor, putAllTransforms, (PipelineNode.PCollectionNode) it.next());
            linkedHashMap.put(addStageOutput.getTarget(), addStageOutput.getCoder());
        }
        return ExecutableProcessBundleDescriptor.of(putAllTransforms.build(), addStageInput, linkedHashMap, forMultimapSideInputs(executableStage, components, putAllTransforms));
    }

    private static RemoteInputDestination<WindowedValue<?>> addStageInput(PipelineNode.PCollectionNode pCollectionNode, RunnerApi.Components components, Endpoints.ApiServiceDescriptor apiServiceDescriptor, BeamFnApi.ProcessBundleDescriptor.Builder builder) throws IOException {
        String addWireCoder = addWireCoder(pCollectionNode, components, builder);
        Coder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(pCollectionNode, components);
        BeamFnApi.RemoteGrpcPort build = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(apiServiceDescriptor).setCoderId(addWireCoder).build();
        String format = String.format("fn/read/%s", pCollectionNode.getId());
        Objects.requireNonNull(builder);
        String uniqueId = SyntheticComponents.uniqueId(format, builder::containsTransforms);
        RunnerApi.PTransform pTransform = RemoteGrpcPortRead.readFromPort(build, pCollectionNode.getId()).toPTransform();
        builder.putTransforms(uniqueId, pTransform);
        return RemoteInputDestination.of(instantiateRunnerWireCoder, BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(uniqueId).setName((String) Iterables.getOnlyElement(pTransform.getOutputsMap().keySet())).build());
    }

    private static TargetEncoding addStageOutput(RunnerApi.Components components, Endpoints.ApiServiceDescriptor apiServiceDescriptor, BeamFnApi.ProcessBundleDescriptor.Builder builder, PipelineNode.PCollectionNode pCollectionNode) throws IOException {
        String addWireCoder = addWireCoder(pCollectionNode, components, builder);
        Coder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(pCollectionNode, components);
        RemoteGrpcPortWrite writeToPort = RemoteGrpcPortWrite.writeToPort(pCollectionNode.getId(), BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(apiServiceDescriptor).setCoderId(addWireCoder).build());
        String format = String.format("fn/write/%s", pCollectionNode.getId());
        Objects.requireNonNull(builder);
        String uniqueId = SyntheticComponents.uniqueId(format, builder::containsTransforms);
        RunnerApi.PTransform pTransform = writeToPort.toPTransform();
        builder.putTransforms(uniqueId, pTransform);
        return new AutoValue_ProcessBundleDescriptors_TargetEncoding(BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(uniqueId).setName((String) Iterables.getOnlyElement(pTransform.getInputsMap().keySet())).build(), instantiateRunnerWireCoder);
    }

    private static Map<String, Map<String, MultimapSideInputSpec>> forMultimapSideInputs(ExecutableStage executableStage, RunnerApi.Components components, BeamFnApi.ProcessBundleDescriptor.Builder builder) throws IOException {
        ImmutableTable.Builder builder2 = ImmutableTable.builder();
        for (SideInputReference sideInputReference : executableStage.getSideInputs()) {
            String id = sideInputReference.collection().getId();
            RunnerApi.MessageWithComponents forCoder = LengthPrefixUnknownCoders.forCoder(components.getPcollectionsOrThrow(id).getCoderId(), components, false);
            String format = String.format("fn/side_input/%s", components.getPcollectionsOrThrow(id).getCoderId());
            Set keySet = builder.getCodersMap().keySet();
            Objects.requireNonNull(keySet);
            String uniqueId = SyntheticComponents.uniqueId(format, (v1) -> {
                return r1.contains(v1);
            });
            builder.putCoders(uniqueId, forCoder.getCoder());
            builder.putAllCoders(forCoder.getComponents().getCodersMap());
            builder.putPcollections(id, ((RunnerApi.PCollection) builder.getPcollectionsMap().get(id)).toBuilder().setCoderId(uniqueId).build());
            WindowedValue.FullWindowedValueCoder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(sideInputReference.collection(), components);
            builder2.put(sideInputReference.transform().getId(), sideInputReference.localName(), MultimapSideInputSpec.of(sideInputReference.transform().getId(), sideInputReference.localName(), instantiateRunnerWireCoder.getValueCoder().getKeyCoder(), instantiateRunnerWireCoder.getValueCoder().getValueCoder(), instantiateRunnerWireCoder.getWindowCoder()));
        }
        return builder2.build().rowMap();
    }

    private static String addWireCoder(PipelineNode.PCollectionNode pCollectionNode, RunnerApi.Components components, BeamFnApi.ProcessBundleDescriptor.Builder builder) {
        Objects.requireNonNull(builder);
        RunnerApi.MessageWithComponents createSdkWireCoder = WireCoders.createSdkWireCoder(pCollectionNode, components, builder::containsCoders);
        builder.putAllCoders(createSdkWireCoder.getComponents().getCodersMap());
        String format = String.format("fn/wire/%s", pCollectionNode.getId());
        Objects.requireNonNull(builder);
        String uniqueId = SyntheticComponents.uniqueId(format, builder::containsCoders);
        builder.putCoders(uniqueId, createSdkWireCoder.getCoder());
        return uniqueId;
    }
}
