package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.DirectGroupByKey;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.class */
public class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory$GroupByKeyOnlyEvaluator.class */
    public static class GroupByKeyOnlyEvaluator<K, V> implements TransformEvaluator<KV<K, V>> {
        private final EvaluationContext evaluationContext;
        private final AppliedPTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKey.DirectGroupByKeyOnly<K, V>> application;
        private final Coder<K> keyCoder;
        private Map<StructuralKey<K>, List<WindowedValue<V>>> groupingMap = new HashMap();

        public GroupByKeyOnlyEvaluator(EvaluationContext evaluationContext, AppliedPTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKey.DirectGroupByKeyOnly<K, V>> appliedPTransform) {
            this.evaluationContext = evaluationContext;
            this.application = appliedPTransform;
            this.keyCoder = getKeyCoder(((PCollection) Iterables.getOnlyElement(appliedPTransform.getInputs().values())).getCoder());
        }

        private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) {
            Preconditions.checkState(coder instanceof KvCoder, "%s requires a coder of class %s. This is an internal error; this is checked during pipeline construction but became corrupted.", getClass().getSimpleName(), KvCoder.class.getSimpleName());
            return ((KvCoder) coder).getKeyCoder();
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<KV<K, V>> windowedValue) {
            KV<K, V> value = windowedValue.getValue();
            this.groupingMap.computeIfAbsent(StructuralKey.of(value.getKey(), this.keyCoder), structuralKey -> {
                return new ArrayList();
            }).add(windowedValue.withValue(value.getValue()));
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<KV<K, V>> finishBundle() {
            StepTransformResult.Builder withoutHold = StepTransformResult.withoutHold(this.application);
            for (Map.Entry<StructuralKey<K>, List<WindowedValue<V>>> entry : this.groupingMap.entrySet()) {
                K key = entry.getKey().getKey();
                KeyedWorkItem elementsWorkItem = KeyedWorkItems.elementsWorkItem(key, entry.getValue());
                UncommittedBundle<?> createKeyedBundle = this.evaluationContext.createKeyedBundle(StructuralKey.of(key, this.keyCoder), (PCollection) Iterables.getOnlyElement(this.application.getOutputs().values()));
                createKeyedBundle.add(WindowedValue.valueInGlobalWindow(elementsWorkItem));
                withoutHold.addOutput(createKeyedBundle, new UncommittedBundle[0]);
            }
            return withoutHold.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupByKeyOnlyEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) {
        return createEvaluator(appliedPTransform);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(AppliedPTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKey.DirectGroupByKeyOnly<K, V>> appliedPTransform) {
        return new GroupByKeyOnlyEvaluator(this.evaluationContext, appliedPTransform);
    }
}
