package org.apache.beam.runners.direct;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombine.class */
public class MultiStepCombine<K, InputT, AccumT, OutputT> extends PTransformTranslation.RawPTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
    private final Coder<KV<K, OutputT>> outputCoder;
    static final String DIRECT_MERGE_ACCUMULATORS_EXTRACT_OUTPUT_URN = "urn:beam:directrunner:transforms:merge_accumulators_extract_output:v1";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombine$CombineInputs.class */
    public static class CombineInputs<K, InputT, AccumT> extends DoFn<KV<K, InputT>, KV<K, AccumT>> {
        private final Combine.CombineFn<InputT, AccumT, ?> combineFn;
        private final TimestampCombiner timestampCombiner;
        private final Coder<K> keyCoder;
        private transient Map<WindowedStructuralKey<K>, AccumT> accumulators;
        private transient Map<WindowedStructuralKey<K>, Instant> timestamps;

        private CombineInputs(Combine.CombineFn<InputT, AccumT, ?> combineFn, TimestampCombiner timestampCombiner, Coder<K> coder) {
            this.combineFn = combineFn;
            this.timestampCombiner = timestampCombiner;
            this.keyCoder = coder;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.accumulators = new LinkedHashMap();
            this.timestamps = new LinkedHashMap();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, InputT>, KV<K, AccumT>>.ProcessContext processContext, BoundedWindow boundedWindow) {
            WindowedStructuralKey<K> create = WindowedStructuralKey.create(this.keyCoder, ((KV) processContext.element()).getKey(), boundedWindow);
            AccumT accumt = this.accumulators.get(create);
            Instant assign = this.timestampCombiner.assign(boundedWindow, processContext.timestamp());
            if (accumt == null) {
                accumt = this.combineFn.createAccumulator();
                this.accumulators.put(create, accumt);
                this.timestamps.put(create, assign);
            }
            this.accumulators.put(create, this.combineFn.addInput(accumt, ((KV) processContext.element()).getValue()));
            this.timestamps.put(create, this.timestampCombiner.combine(new Instant[]{assign, this.timestamps.get(create)}));
        }

        @DoFn.FinishBundle
        public void outputAccumulators(DoFn<KV<K, InputT>, KV<K, AccumT>>.FinishBundleContext finishBundleContext) {
            for (Map.Entry<WindowedStructuralKey<K>, AccumT> entry : this.accumulators.entrySet()) {
                finishBundleContext.output(KV.of(entry.getKey().getKey(), this.combineFn.compact(entry.getValue())), this.timestamps.get(entry.getKey()), entry.getKey().getWindow());
            }
            this.accumulators = null;
            this.timestamps = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombine$Factory.class */
    public static class Factory<K, InputT, AccumT, OutputT> extends SingleInputOutputOverrideFactory<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
        public static PTransformOverrideFactory create() {
            return new Factory();
        }

        private Factory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> appliedPTransform) {
            Combine.CombineFn fn = appliedPTransform.getTransform().getFn();
            Preconditions.checkState(fn instanceof Combine.CombineFn, "%s.matcher() should only match %s instances using %s, got %s", MultiStepCombine.class.getSimpleName(), Combine.PerKey.class.getSimpleName(), Combine.CombineFn.class.getSimpleName(), fn.getClass().getName());
            return PTransformOverrideFactory.PTransformReplacement.of((PCollection) Iterables.getOnlyElement(appliedPTransform.getInputs().values()), new MultiStepCombine(fn, ((PCollection) Iterables.getOnlyElement(appliedPTransform.getOutputs().values())).getCoder()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombine$MergeAccumulatorsAndExtractOutputEvaluator.class */
    public static class MergeAccumulatorsAndExtractOutputEvaluator<K, AccumT, OutputT> implements TransformEvaluator<KV<K, Iterable<AccumT>>> {
        private final AppliedPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>, MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>> application;
        private final Combine.CombineFn<?, AccumT, OutputT> combineFn;
        private final UncommittedBundle<KV<K, OutputT>> output;

        public MergeAccumulatorsAndExtractOutputEvaluator(EvaluationContext evaluationContext, AppliedPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>, MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>> appliedPTransform) {
            this.application = appliedPTransform;
            this.combineFn = ((MergeAndExtractAccumulatorOutput) appliedPTransform.getTransform()).getCombineFn();
            this.output = evaluationContext.createBundle((PCollection) Iterables.getOnlyElement(appliedPTransform.getOutputs().values()));
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<KV<K, Iterable<AccumT>>> windowedValue) throws Exception {
            Preconditions.checkState(windowedValue.getWindows().size() == 1, "Expected inputs to %s to be in exactly one window. Got %s", MergeAccumulatorsAndExtractOutputEvaluator.class.getSimpleName(), windowedValue.getWindows().size());
            try {
                this.output.add(windowedValue.withValue(KV.of(((KV) windowedValue.getValue()).getKey(), this.combineFn.extractOutput(this.combineFn.mergeAccumulators(Iterables.concat(Collections.singleton(this.combineFn.createAccumulator()), (Iterable) ((KV) windowedValue.getValue()).getValue(), Collections.singleton(this.combineFn.createAccumulator())))))));
            } catch (Exception e) {
                throw UserCodeException.wrap(e);
            }
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<KV<K, Iterable<AccumT>>> finishBundle() throws Exception {
            return StepTransformResult.withoutHold(this.application).addOutput(this.output, new UncommittedBundle[0]).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombine$MergeAndExtractAccumulatorOutput.class */
    public static class MergeAndExtractAccumulatorOutput<K, AccumT, OutputT> extends PTransformTranslation.RawPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>> {
        private final Combine.CombineFn<?, AccumT, OutputT> combineFn;
        private final Coder<KV<K, OutputT>> outputCoder;

        private MergeAndExtractAccumulatorOutput(Combine.CombineFn<?, AccumT, OutputT> combineFn, Coder<KV<K, OutputT>> coder) {
            this.combineFn = combineFn;
            this.outputCoder = coder;
        }

        Combine.CombineFn<?, AccumT, OutputT> getCombineFn() {
            return this.combineFn;
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.RawPTransform
        public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<AccumT>>> pCollection) {
            return PCollection.createPrimitiveOutputInternal(pCollection.getPipeline(), pCollection.getWindowingStrategy(), pCollection.isBounded(), this.outputCoder);
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.RawPTransform
        @Nonnull
        public String getUrn() {
            return MultiStepCombine.DIRECT_MERGE_ACCUMULATORS_EXTRACT_OUTPUT_URN;
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.RawPTransform
        @Nullable
        public RunnerApi.FunctionSpec getSpec() {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombine$MergeAndExtractAccumulatorOutputEvaluatorFactory.class */
    public static class MergeAndExtractAccumulatorOutputEvaluatorFactory implements TransformEvaluatorFactory {
        private final EvaluationContext ctxt;

        public MergeAndExtractAccumulatorOutputEvaluatorFactory(EvaluationContext evaluationContext) {
            this.ctxt = evaluationContext;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
        @Nullable
        public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) throws Exception {
            return createEvaluator(appliedPTransform, committedBundle);
        }

        private <K, AccumT, OutputT> TransformEvaluator<KV<K, Iterable<AccumT>>> createEvaluator(AppliedPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>, MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>> appliedPTransform, CommittedBundle<KV<K, Iterable<AccumT>>> committedBundle) {
            return new MergeAccumulatorsAndExtractOutputEvaluator(this.ctxt, appliedPTransform);
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
        public void cleanup() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombine$WindowedStructuralKey.class */
    static class WindowedStructuralKey<K> {
        private final StructuralKey<K> key;
        private final BoundedWindow window;

        public static <K> WindowedStructuralKey<K> create(Coder<K> coder, K k, BoundedWindow boundedWindow) {
            return new WindowedStructuralKey<>(StructuralKey.of(k, coder), boundedWindow);
        }

        private WindowedStructuralKey(StructuralKey<K> structuralKey, BoundedWindow boundedWindow) {
            this.key = (StructuralKey) Preconditions.checkNotNull(structuralKey, "key cannot be null");
            this.window = (BoundedWindow) Preconditions.checkNotNull(boundedWindow, "Window cannot be null");
        }

        public K getKey() {
            return this.key.getKey();
        }

        public BoundedWindow getWindow() {
            return this.window;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof WindowedStructuralKey)) {
                return false;
            }
            WindowedStructuralKey windowedStructuralKey = (WindowedStructuralKey) obj;
            return this.window.equals(windowedStructuralKey.window) && this.key.equals(windowedStructuralKey.key);
        }

        public int hashCode() {
            return Objects.hash(this.window, this.key);
        }
    }

    public static PTransformMatcher matcher() {
        return new PTransformMatcher() { // from class: org.apache.beam.runners.direct.MultiStepCombine.1
            public boolean matches(AppliedPTransform<?, ?, ?> appliedPTransform) {
                if (!PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN.equals(PTransformTranslation.urnForTransformOrNull((PTransform<?, ?>) appliedPTransform.getTransform()))) {
                    return false;
                }
                return isApplicable(appliedPTransform.getInputs(), appliedPTransform.getTransform().getFn());
            }

            private <K, InputT> boolean isApplicable(Map<TupleTag<?>, PValue> map, CombineFnBase.GlobalCombineFn<InputT, ?, ?> globalCombineFn) {
                boolean z;
                if (!(globalCombineFn instanceof Combine.CombineFn) || map.size() != 1) {
                    return false;
                }
                PCollection pCollection = (PCollection) Iterables.getOnlyElement(map.values());
                WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
                boolean isNonMerging = windowingStrategy.getWindowFn().isNonMerging();
                boolean equals = DefaultTrigger.of().equals(windowingStrategy.getTrigger());
                try {
                    if (pCollection.getCoder() instanceof KvCoder) {
                        z = globalCombineFn.getAccumulatorCoder(pCollection.getPipeline().getCoderRegistry(), pCollection.getCoder().getValueCoder()) != null;
                    } else {
                        z = false;
                    }
                    return isNonMerging && equals && z;
                } catch (CannotProvideCoderException e) {
                    throw new RuntimeException(String.format("Could not construct an accumulator %s for %s. Accumulator %s for a %s may be null, but may not throw an exception", Coder.class.getSimpleName(), globalCombineFn, Coder.class.getSimpleName(), Combine.class.getSimpleName()), e);
                }
            }
        };
    }

    public static <K, InputT, AccumT, OutputT> MultiStepCombine<K, InputT, AccumT, OutputT> of(Combine.CombineFn<InputT, AccumT, OutputT> combineFn, Coder<KV<K, OutputT>> coder) {
        return new MultiStepCombine<>(combineFn, coder);
    }

    private MultiStepCombine(Combine.CombineFn<InputT, AccumT, OutputT> combineFn, Coder<KV<K, OutputT>> coder) {
        this.combineFn = combineFn;
        this.outputCoder = coder;
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.RawPTransform
    @Nonnull
    public String getUrn() {
        return "urn:beam:directrunner:transforms:multistepcombine:v1";
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.RawPTransform
    @Nullable
    public RunnerApi.FunctionSpec getSpec() {
        return null;
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.RawPTransform
    public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> pCollection) {
        Preconditions.checkArgument(pCollection.getCoder() instanceof KvCoder, "Expected input to have a %s of type %s, got %s", Coder.class.getSimpleName(), KvCoder.class.getSimpleName(), pCollection.getCoder());
        KvCoder coder = pCollection.getCoder();
        try {
            return pCollection.apply(ParDo.of(new CombineInputs(this.combineFn, pCollection.getWindowingStrategy().getTimestampCombiner(), coder.getKeyCoder()))).setCoder(KvCoder.of(coder.getKeyCoder(), this.combineFn.getAccumulatorCoder(pCollection.getPipeline().getCoderRegistry(), coder.getValueCoder()))).apply(GroupByKey.create()).apply(new MergeAndExtractAccumulatorOutput(this.combineFn, this.outputCoder));
        } catch (CannotProvideCoderException e) {
            throw new IllegalStateException(String.format("Could not construct an Accumulator Coder with the provided %s %s", Combine.CombineFn.class.getSimpleName(), this.combineFn), e);
        }
    }
}
