package org.apache.nemo.compiler.frontend.beam;

import com.google.common.collect.Iterables;
import java.io.IOException;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.compiler.frontend.beam.source.BeamBoundedSourceVertex;
import org.apache.nemo.compiler.frontend.beam.source.BeamUnboundedSourceVertex;
import org.apache.nemo.compiler.frontend.beam.transform.AbstractDoFnTransform;
import org.apache.nemo.compiler.frontend.beam.transform.CombineFnFinalTransform;
import org.apache.nemo.compiler.frontend.beam.transform.CombineFnPartialTransform;
import org.apache.nemo.compiler.frontend.beam.transform.CreateViewTransform;
import org.apache.nemo.compiler.frontend.beam.transform.DoFnTransform;
import org.apache.nemo.compiler.frontend.beam.transform.FinalCombineFn;
import org.apache.nemo.compiler.frontend.beam.transform.FlattenTransform;
import org.apache.nemo.compiler.frontend.beam.transform.GBKTransform;
import org.apache.nemo.compiler.frontend.beam.transform.GroupByKeyTransform;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.nemo.compiler.frontend.beam.transform.PartialCombineFn;
import org.apache.nemo.compiler.frontend.beam.transform.PushBackDoFnTransform;
import org.apache.nemo.compiler.frontend.beam.transform.WindowFnTransform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/PipelineTranslator.class */
final class PipelineTranslator {
    public static final PipelineTranslator INSTANCE = new PipelineTranslator();
    private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
    private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap();
    private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator = new HashMap();

    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/PipelineTranslator$CompositeTransformTranslator.class */
    private @interface CompositeTransformTranslator {
        Class<? extends PTransform>[] value();
    }

    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/PipelineTranslator$PrimitiveTransformTranslator.class */
    private @interface PrimitiveTransformTranslator {
        Class<? extends PTransform>[] value();
    }

    private PipelineTranslator() {
        for (Method method : getClass().getDeclaredMethods()) {
            PrimitiveTransformTranslator primitiveTransformTranslator = (PrimitiveTransformTranslator) method.getAnnotation(PrimitiveTransformTranslator.class);
            CompositeTransformTranslator compositeTransformTranslator = (CompositeTransformTranslator) method.getAnnotation(CompositeTransformTranslator.class);
            if (primitiveTransformTranslator != null) {
                for (Class<? extends PTransform> cls : primitiveTransformTranslator.value()) {
                    if (this.primitiveTransformToTranslator.containsKey(cls)) {
                        throw new RuntimeException(String.format("Translator for primitive transform %s isalready registered: %s", cls, this.primitiveTransformToTranslator.get(cls)));
                    }
                    this.primitiveTransformToTranslator.put(cls, method);
                }
            }
            if (compositeTransformTranslator != null) {
                for (Class<? extends PTransform> cls2 : compositeTransformTranslator.value()) {
                    if (this.compositeTransformToTranslator.containsKey(cls2)) {
                        throw new RuntimeException(String.format("Translator for composite transform %s isalready registered: %s", cls2, this.compositeTransformToTranslator.get(cls2)));
                    }
                    this.compositeTransformToTranslator.put(cls2, method);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void translatePrimitive(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node) {
        PTransform transform = node.getTransform();
        Method method = this.primitiveTransformToTranslator.get(transform.getClass());
        if (method == null) {
            throw new UnsupportedOperationException(String.format("Primitive transform %s is not supported", transform.getClass().getCanonicalName()));
        }
        try {
            method.setAccessible(true);
            method.invoke(null, pipelineTranslationContext, node, transform);
        } catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        } catch (RuntimeException | InvocationTargetException e2) {
            throw new RuntimeException(String.format("Translator %s have failed to translate %s", method, transform), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pipeline.PipelineVisitor.CompositeBehavior translateComposite(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node) {
        Method method;
        PTransform transform = node.getTransform();
        if (transform != null && (method = this.compositeTransformToTranslator.get(transform.getClass())) != null) {
            try {
                method.setAccessible(true);
                return (Pipeline.PipelineVisitor.CompositeBehavior) method.invoke(null, pipelineTranslationContext, node, transform);
            } catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            } catch (RuntimeException | InvocationTargetException e2) {
                throw new RuntimeException(String.format("Translator %s have failed to translate %s", method, transform), e2);
            }
        }
        return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
    }

    @PrimitiveTransformTranslator({Read.Unbounded.class})
    private static void unboundedReadTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, Read.Unbounded<?> unbounded) {
        BeamUnboundedSourceVertex beamUnboundedSourceVertex = new BeamUnboundedSourceVertex(unbounded.getSource(), DisplayData.from(unbounded));
        pipelineTranslationContext.addVertex(beamUnboundedSourceVertex);
        node.getInputs().values().forEach(pValue -> {
            pipelineTranslationContext.addEdgeTo(beamUnboundedSourceVertex, pValue);
        });
        node.getOutputs().values().forEach(pValue2 -> {
            pipelineTranslationContext.registerMainOutputFrom(node, beamUnboundedSourceVertex, pValue2);
        });
    }

    @PrimitiveTransformTranslator({Read.Bounded.class})
    private static void boundedReadTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, Read.Bounded<?> bounded) {
        BeamBoundedSourceVertex beamBoundedSourceVertex = new BeamBoundedSourceVertex(bounded.getSource(), DisplayData.from(bounded));
        pipelineTranslationContext.addVertex(beamBoundedSourceVertex);
        node.getInputs().values().forEach(pValue -> {
            pipelineTranslationContext.addEdgeTo(beamBoundedSourceVertex, pValue);
        });
        node.getOutputs().values().forEach(pValue2 -> {
            pipelineTranslationContext.registerMainOutputFrom(node, beamBoundedSourceVertex, pValue2);
        });
    }

    @PrimitiveTransformTranslator({ParDo.SingleOutput.class})
    private static void parDoSingleOutputTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, ParDo.SingleOutput<?, ?> singleOutput) {
        Map<Integer, PCollectionView<?>> sideInputMap = getSideInputMap(singleOutput.getSideInputs().values());
        OperatorVertex operatorVertex = new OperatorVertex(createDoFnTransform(pipelineTranslationContext, node, sideInputMap));
        pipelineTranslationContext.addVertex(operatorVertex);
        node.getInputs().values().stream().filter(pValue -> {
            return !singleOutput.getAdditionalInputs().values().contains(pValue);
        }).forEach(pValue2 -> {
            pipelineTranslationContext.addEdgeTo(operatorVertex, pValue2);
        });
        pipelineTranslationContext.addSideInputEdges(operatorVertex, sideInputMap);
        node.getOutputs().values().forEach(pValue3 -> {
            pipelineTranslationContext.registerMainOutputFrom(node, operatorVertex, pValue3);
        });
    }

    @PrimitiveTransformTranslator({ParDo.MultiOutput.class})
    private static void parDoMultiOutputTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, ParDo.MultiOutput<?, ?> multiOutput) {
        Map<Integer, PCollectionView<?>> sideInputMap = getSideInputMap(multiOutput.getSideInputs().values());
        OperatorVertex operatorVertex = new OperatorVertex(createDoFnTransform(pipelineTranslationContext, node, sideInputMap));
        pipelineTranslationContext.addVertex(operatorVertex);
        node.getInputs().values().stream().filter(pValue -> {
            return !multiOutput.getAdditionalInputs().values().contains(pValue);
        }).forEach(pValue2 -> {
            pipelineTranslationContext.addEdgeTo(operatorVertex, pValue2);
        });
        pipelineTranslationContext.addSideInputEdges(operatorVertex, sideInputMap);
        node.getOutputs().entrySet().stream().filter(entry -> {
            return ((TupleTag) entry.getKey()).equals(multiOutput.getMainOutputTag());
        }).forEach(entry2 -> {
            pipelineTranslationContext.registerMainOutputFrom(node, operatorVertex, (PValue) entry2.getValue());
        });
        node.getOutputs().entrySet().stream().filter(entry3 -> {
            return !((TupleTag) entry3.getKey()).equals(multiOutput.getMainOutputTag());
        }).forEach(entry4 -> {
            pipelineTranslationContext.registerAdditionalOutputFrom(node, operatorVertex, (PValue) entry4.getValue(), (TupleTag) entry4.getKey());
        });
    }

    @PrimitiveTransformTranslator({GroupByKey.class})
    private static void groupByKeyTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, GroupByKey<?, ?> groupByKey) {
        OperatorVertex operatorVertex = new OperatorVertex(createGBKTransform(pipelineTranslationContext, node));
        pipelineTranslationContext.addVertex(operatorVertex);
        node.getInputs().values().forEach(pValue -> {
            pipelineTranslationContext.addEdgeTo(operatorVertex, pValue);
        });
        node.getOutputs().values().forEach(pValue2 -> {
            pipelineTranslationContext.registerMainOutputFrom(node, operatorVertex, pValue2);
        });
    }

    @PrimitiveTransformTranslator({Window.class, Window.Assign.class})
    private static void windowTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, PTransform<?, ?> pTransform) {
        WindowFn windowFn;
        if (pTransform instanceof Window) {
            windowFn = ((Window) pTransform).getWindowFn();
        } else {
            if (!(pTransform instanceof Window.Assign)) {
                throw new UnsupportedOperationException(String.format("%s is not supported", pTransform));
            }
            windowFn = ((Window.Assign) pTransform).getWindowFn();
        }
        OperatorVertex operatorVertex = new OperatorVertex(new WindowFnTransform(windowFn, DisplayData.from(node.getTransform())));
        pipelineTranslationContext.addVertex(operatorVertex);
        node.getInputs().values().forEach(pValue -> {
            pipelineTranslationContext.addEdgeTo(operatorVertex, pValue);
        });
        node.getOutputs().values().forEach(pValue2 -> {
            pipelineTranslationContext.registerMainOutputFrom(node, operatorVertex, pValue2);
        });
    }

    @PrimitiveTransformTranslator({View.CreatePCollectionView.class})
    private static void createPCollectionViewTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, View.CreatePCollectionView<?, ?> createPCollectionView) {
        OperatorVertex operatorVertex = new OperatorVertex(new CreateViewTransform(createPCollectionView.getView().getViewFn()));
        pipelineTranslationContext.addVertex(operatorVertex);
        node.getInputs().values().forEach(pValue -> {
            pipelineTranslationContext.addEdgeTo(operatorVertex, pValue);
        });
        pipelineTranslationContext.registerMainOutputFrom(node, operatorVertex, createPCollectionView.getView());
        node.getOutputs().values().forEach(pValue2 -> {
            pipelineTranslationContext.registerMainOutputFrom(node, operatorVertex, pValue2);
        });
    }

    @PrimitiveTransformTranslator({Flatten.PCollections.class})
    private static void flattenTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, Flatten.PCollections<?> pCollections) {
        OperatorVertex operatorVertex = new OperatorVertex(new FlattenTransform());
        pipelineTranslationContext.addVertex(operatorVertex);
        node.getInputs().values().forEach(pValue -> {
            pipelineTranslationContext.addEdgeTo(operatorVertex, pValue);
        });
        node.getOutputs().values().forEach(pValue2 -> {
            pipelineTranslationContext.registerMainOutputFrom(node, operatorVertex, pValue2);
        });
    }

    @CompositeTransformTranslator({Combine.PerKey.class})
    private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, PTransform<?, ?> pTransform) {
        OperatorVertex operatorVertex;
        OperatorVertex operatorVertex2;
        Combine.PerKey perKey = (Combine.PerKey) pTransform;
        if (!perKey.getSideInputs().isEmpty()) {
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }
        Combine.CombineFn fn = perKey.getFn();
        PCollection pCollection = (PCollection) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(pipelineTranslationContext.getPipeline())));
        KvCoder coder = ((PCollection) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(pipelineTranslationContext.getPipeline())))).getCoder();
        try {
            Coder accumulatorCoder = fn.getAccumulatorCoder(pipelineTranslationContext.getPipeline().getCoderRegistry(), coder.getValueCoder());
            if (isMainInputBounded(node, pipelineTranslationContext.getPipeline()) && isGlobalWindow(node, pipelineTranslationContext.getPipeline())) {
                operatorVertex = new OperatorVertex(new CombineFnPartialTransform(fn));
                operatorVertex2 = new OperatorVertex(new CombineFnFinalTransform(fn));
            } else {
                AppliedPTransform appliedPTransform = node.toAppliedPTransform(pipelineTranslationContext.getPipeline());
                PartialCombineFn partialCombineFn = new PartialCombineFn(fn, accumulatorCoder);
                FinalCombineFn finalCombineFn = new FinalCombineFn(fn, accumulatorCoder);
                SystemReduceFn combining = SystemReduceFn.combining(coder.getKeyCoder(), AppliedCombineFn.withInputCoder(partialCombineFn, pipelineTranslationContext.getPipeline().getCoderRegistry(), coder, (Iterable) null, pCollection.getWindowingStrategy()));
                SystemReduceFn combining2 = SystemReduceFn.combining(coder.getKeyCoder(), AppliedCombineFn.withInputCoder(finalCombineFn, pipelineTranslationContext.getPipeline().getCoderRegistry(), KvCoder.of(coder.getKeyCoder(), accumulatorCoder), (Iterable) null, pCollection.getWindowingStrategy()));
                TupleTag tupleTag = new TupleTag();
                GBKTransform gBKTransform = new GBKTransform(coder, Collections.singletonMap(tupleTag, KvCoder.of(coder.getKeyCoder(), accumulatorCoder)), tupleTag, pCollection.getWindowingStrategy(), pipelineTranslationContext.getPipelineOptions(), combining, DoFnSchemaInformation.create(), DisplayData.from(node.getTransform()), true);
                GBKTransform gBKTransform2 = new GBKTransform(KvCoder.of(coder.getKeyCoder(), accumulatorCoder), getOutputCoders(appliedPTransform), (TupleTag) Iterables.getOnlyElement(node.getOutputs().keySet()), pCollection.getWindowingStrategy(), pipelineTranslationContext.getPipelineOptions(), combining2, DoFnSchemaInformation.create(), DisplayData.from(node.getTransform()), false);
                operatorVertex = new OperatorVertex(gBKTransform);
                operatorVertex2 = new OperatorVertex(gBKTransform2);
            }
            pipelineTranslationContext.addVertex(operatorVertex);
            OperatorVertex operatorVertex3 = operatorVertex;
            node.getInputs().values().forEach(pValue -> {
                pipelineTranslationContext.addEdgeTo(operatorVertex3, pValue);
            });
            pipelineTranslationContext.addVertex(operatorVertex2);
            OperatorVertex operatorVertex4 = operatorVertex2;
            node.getOutputs().values().forEach(pValue2 -> {
                pipelineTranslationContext.registerMainOutputFrom(node, operatorVertex4, pValue2);
            });
            pipelineTranslationContext.addEdge(new IREdge(CommunicationPatternProperty.Value.SHUFFLE, operatorVertex, operatorVertex2), KvCoder.of(coder.getKeyCoder(), accumulatorCoder), pCollection.getWindowingStrategy().getWindowFn().windowCoder());
            return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
        } catch (CannotProvideCoderException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @CompositeTransformTranslator({LoopCompositeTransform.class})
    private static Pipeline.PipelineVisitor.CompositeBehavior loopTranslator(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, LoopCompositeTransform<?, ?> loopCompositeTransform) {
        return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [java.util.PrimitiveIterator$OfInt] */
    private static Map<Integer, PCollectionView<?>> getSideInputMap(Collection<PCollectionView<?>> collection) {
        ?? it = IntStream.range(0, collection.size()).iterator();
        return (Map) collection.stream().collect(Collectors.toMap(pCollectionView -> {
            return it.next();
        }, Function.identity()));
    }

    private static AbstractDoFnTransform createDoFnTransform(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node, Map<Integer, PCollectionView<?>> map) {
        try {
            AppliedPTransform appliedPTransform = node.toAppliedPTransform(pipelineTranslationContext.getPipeline());
            DoFn doFn = ParDoTranslation.getDoFn(appliedPTransform);
            TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(appliedPTransform);
            TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(appliedPTransform);
            PCollection pCollection = (PCollection) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(appliedPTransform));
            HasDisplayData hasDisplayData = builder -> {
                builder.add(DisplayData.item("name", node.getFullName()));
            };
            DoFnSchemaInformation schemaInformation = ParDoTranslation.getSchemaInformation(node.toAppliedPTransform(pipelineTranslationContext.getPipeline()));
            return map.isEmpty() ? new DoFnTransform(doFn, pCollection.getCoder(), getOutputCoders(appliedPTransform), mainOutputTag, additionalOutputTags.getAll(), pCollection.getWindowingStrategy(), pipelineTranslationContext.getPipelineOptions(), DisplayData.from(hasDisplayData), schemaInformation, Collections.emptyMap()) : new PushBackDoFnTransform(doFn, pCollection.getCoder(), getOutputCoders(appliedPTransform), mainOutputTag, additionalOutputTags.getAll(), pCollection.getWindowingStrategy(), map, pipelineTranslationContext.getPipelineOptions(), DisplayData.from(hasDisplayData), schemaInformation, Collections.emptyMap());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static Map<TupleTag<?>, Coder<?>> getOutputCoders(AppliedPTransform<?, ?, ?> appliedPTransform) {
        return (Map) appliedPTransform.getOutputs().entrySet().stream().filter(entry -> {
            return entry.getValue() instanceof PCollection;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return ((PCollection) entry2.getValue()).getCoder();
        }));
    }

    private static Transform createGBKTransform(PipelineTranslationContext pipelineTranslationContext, TransformHierarchy.Node node) {
        AppliedPTransform appliedPTransform = node.toAppliedPTransform(pipelineTranslationContext.getPipeline());
        PCollection pCollection = (PCollection) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(appliedPTransform));
        return isGlobalWindow(node, pipelineTranslationContext.getPipeline()) ? new GroupByKeyTransform() : new GBKTransform(pCollection.getCoder(), getOutputCoders(appliedPTransform), (TupleTag) Iterables.getOnlyElement(node.getOutputs().keySet()), pCollection.getWindowingStrategy(), pipelineTranslationContext.getPipelineOptions(), SystemReduceFn.buffering(pCollection.getCoder()), DoFnSchemaInformation.create(), DisplayData.from(node.getTransform()), false);
    }

    private static boolean isGlobalWindow(TransformHierarchy.Node node, Pipeline pipeline) {
        return ((PCollection) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(pipeline)))).getWindowingStrategy().getWindowFn() instanceof GlobalWindows;
    }

    private static boolean isMainInputBounded(TransformHierarchy.Node node, Pipeline pipeline) {
        return ((PCollection) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(pipeline)))).isBounded() == PCollection.IsBounded.BOUNDED;
    }
}
