package org.apache.beam.runners.samza.translation;

import java.lang.invoke.SerializedLambda;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.DoFnOp;
import org.apache.beam.runners.samza.runtime.GroupByKeyOp;
import org.apache.beam.runners.samza.runtime.KvToKeyedWorkItemOp;
import org.apache.beam.runners.samza.runtime.OpAdapter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.transforms.GroupWithoutRepartition;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils;
import org.apache.beam.runners.samza.util.WindowUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.serializers.KVSerde;

/* loaded from: input_file:org/apache/beam/runners/samza/translation/GroupByKeyTranslator.class */
class GroupByKeyTranslator<K, InputT, OutputT> implements TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>>, TransformConfigGenerator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
    @Override // org.apache.beam.runners.samza.translation.TransformTranslator
    public void translate(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> pTransform, TransformHierarchy.Node node, TranslationContext translationContext) {
        doTranslate(pTransform, node, translationContext);
    }

    private static <K, InputT, OutputT> void doTranslate(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> pTransform, TransformHierarchy.Node node, TranslationContext translationContext) {
        PCollection input = translationContext.getInput(pTransform);
        PCollection output = translationContext.getOutput(pTransform);
        TupleTag outputTag = translationContext.getOutputTag(pTransform);
        WindowingStrategy windowingStrategy = input.getWindowingStrategy();
        MessageStream messageStream = translationContext.getMessageStream(input);
        KvCoder coder = input.getCoder();
        Coder of = SamzaCoders.of(input);
        translationContext.registerMessageStream(output, doTranslateGBK(messageStream, needRepartition(node, translationContext), getSystemReduceFn(pTransform, input.getPipeline(), coder), windowingStrategy, coder, of, translationContext.getTransformFullName(), translationContext.getTransformId(), outputTag, input.isBounded()));
    }

    @Override // org.apache.beam.runners.samza.translation.TransformTranslator
    public void translatePortable(PipelineNode.PTransformNode pTransformNode, QueryablePipeline queryablePipeline, PortableTranslationContext portableTranslationContext) {
        String inputId = portableTranslationContext.getInputId(pTransformNode);
        portableTranslationContext.registerMessageStream(portableTranslationContext.getOutputId(pTransformNode), doTranslatePortable(queryablePipeline.getComponents().getPcollectionsOrThrow(inputId), portableTranslationContext.getMessageStreamById(inputId), WindowUtils.getWindowStrategy(inputId, queryablePipeline.getComponents()), WindowUtils.instantiateWindowedCoder(inputId, queryablePipeline.getComponents()), new TupleTag((String) Iterables.getOnlyElement(pTransformNode.getTransform().getOutputsMap().keySet())), portableTranslationContext));
    }

    @Override // org.apache.beam.runners.samza.translation.TransformConfigGenerator
    public Map<String, String> createConfig(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> pTransform, TransformHierarchy.Node node, ConfigContext configContext) {
        return ConfigBuilder.createRocksDBStoreConfig(configContext.getPipelineOptions());
    }

    @Override // org.apache.beam.runners.samza.translation.TransformConfigGenerator
    public Map<String, String> createPortableConfig(PipelineNode.PTransformNode pTransformNode, SamzaPipelineOptions samzaPipelineOptions) {
        return ConfigBuilder.createRocksDBStoreConfig(samzaPipelineOptions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, InputT, OutputT> MessageStream<OpMessage<KV<K, OutputT>>> doTranslatePortable(RunnerApi.PCollection pCollection, MessageStream<OpMessage<KV<K, InputT>>> messageStream, WindowingStrategy<?, BoundedWindow> windowingStrategy, WindowedValue.WindowedValueCoder<KV<K, InputT>> windowedValueCoder, TupleTag<KV<K, OutputT>> tupleTag, PortableTranslationContext portableTranslationContext) {
        boolean z = portableTranslationContext.getSamzaPipelineOptions().getMaxSourceParallelism() > 1;
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        KvCoder valueCoder = windowedValueCoder.getValueCoder();
        return doTranslateGBK(messageStream, z, SystemReduceFn.buffering(valueCoder.getValueCoder()), windowingStrategy, valueCoder, WindowedValue.FullWindowedValueCoder.of(valueCoder, windowCoder), portableTranslationContext.getTransformFullName(), portableTranslationContext.getTransformId(), tupleTag, SamzaPipelineTranslatorUtils.isBounded(pCollection));
    }

    private static <K, InputT, OutputT> MessageStream<OpMessage<KV<K, OutputT>>> doTranslateGBK(MessageStream<OpMessage<KV<K, InputT>>> messageStream, boolean z, SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn, WindowingStrategy<?, BoundedWindow> windowingStrategy, KvCoder<K, InputT> kvCoder, Coder<WindowedValue<KV<K, InputT>>> coder, String str, String str2, TupleTag<KV<K, OutputT>> tupleTag, PCollection.IsBounded isBounded) {
        MessageStream filter = messageStream.filter(opMessage -> {
            return opMessage.getType() == OpMessage.Type.ELEMENT;
        });
        return (!z ? filter : filter.partitionBy(opMessage2 -> {
            return ((KV) opMessage2.getElement().getValue()).getKey();
        }, opMessage3 -> {
            return opMessage3.getElement();
        }, KVSerde.of(SamzaCoders.toSerde(kvCoder.getKeyCoder()), SamzaCoders.toSerde(coder)), "gbk-" + SamzaPipelineTranslatorUtils.escape(str2)).map(kv -> {
            return OpMessage.ofElement((WindowedValue) kv.getValue());
        })).flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp())).flatMapAsync(OpAdapter.adapt(new GroupByKeyOp(tupleTag, KeyedWorkItemCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder(), windowingStrategy.getWindowFn().windowCoder()), systemReduceFn, windowingStrategy, new DoFnOp.SingleOutputManagerFactory(), str, str2, isBounded)));
    }

    private static <K, InputT, OutputT> SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> pTransform, Pipeline pipeline, KvCoder<K, InputT> kvCoder) {
        if (pTransform instanceof GroupByKey) {
            return SystemReduceFn.buffering(kvCoder.getValueCoder());
        }
        if (!(pTransform instanceof Combine.PerKey)) {
            throw new RuntimeException("Transform " + pTransform + " cannot be translated as GroupByKey.");
        }
        return SystemReduceFn.combining(kvCoder.getKeyCoder(), AppliedCombineFn.withInputCoder(((Combine.PerKey) pTransform).getFn(), pipeline.getCoderRegistry(), kvCoder));
    }

    private static boolean needRepartition(TransformHierarchy.Node node, TranslationContext translationContext) {
        if (translationContext.getPipelineOptions().getMaxSourceParallelism() == 1) {
            return false;
        }
        if (node == null) {
            return true;
        }
        if (node.getTransform() instanceof GroupWithoutRepartition) {
            return false;
        }
        return needRepartition(node.getEnclosingNode(), translationContext);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1312353984:
                if (implMethodName.equals("lambda$doTranslateGBK$d9246316$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1200091077:
                if (implMethodName.equals("lambda$doTranslateGBK$35776db0$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1200091078:
                if (implMethodName.equals("lambda$doTranslateGBK$35776db0$2")) {
                    z = true;
                    break;
                }
                break;
            case 1200091079:
                if (implMethodName.equals("lambda$doTranslateGBK$35776db0$3")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/samza/translation/GroupByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/samza/operators/KV;)Lorg/apache/beam/runners/samza/runtime/OpMessage;")) {
                    return kv -> {
                        return OpMessage.ofElement((WindowedValue) kv.getValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/samza/translation/GroupByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/runners/samza/runtime/OpMessage;)Lorg/apache/beam/sdk/util/WindowedValue;")) {
                    return opMessage3 -> {
                        return opMessage3.getElement();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/samza/translation/GroupByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/runners/samza/runtime/OpMessage;)Ljava/lang/Object;")) {
                    return opMessage2 -> {
                        return ((KV) opMessage2.getElement().getValue()).getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/beam/runners/samza/translation/GroupByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/runners/samza/runtime/OpMessage;)Z")) {
                    return opMessage -> {
                        return opMessage.getType() == OpMessage.Type.ELEMENT;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
