package org.apache.beam.runners.direct;

import java.util.Collections;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectGroupByKey;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.class */
public class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    /* loaded from: input_file:org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory$ConstantStateInternalsFactory.class */
    private static final class ConstantStateInternalsFactory<K> implements StateInternalsFactory<K> {
        private final StateInternals<K> stateInternals;

        private ConstantStateInternalsFactory(StateInternals<K> stateInternals) {
            this.stateInternals = stateInternals;
        }

        public StateInternals<K> stateInternalsForKey(K k) {
            return this.stateInternals;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory$GroupAlsoByWindowEvaluator.class */
    public static class GroupAlsoByWindowEvaluator<K, V> implements TransformEvaluator<KeyedWorkItem<K, V>> {
        private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator;

        public GroupAlsoByWindowEvaluator(EvaluationContext evaluationContext, DirectRunner.CommittedBundle<KeyedWorkItem<K, V>> committedBundle, AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> appliedPTransform) {
            Coder<V> valueCoder = ((DirectGroupByKey.DirectGroupAlsoByWindow) appliedPTransform.getTransform()).getValueCoder(committedBundle.getPCollection().getCoder());
            WindowingStrategy<?, ?> windowingStrategy = ((DirectGroupByKey.DirectGroupAlsoByWindow) appliedPTransform.getTransform()).getWindowingStrategy();
            DirectExecutionContext.DirectStepContext orCreateStepContext = evaluationContext.getExecutionContext(appliedPTransform, committedBundle.getKey()).getOrCreateStepContext(evaluationContext.getStepName(appliedPTransform), ((DirectGroupByKey.DirectGroupAlsoByWindow) appliedPTransform.getTransform()).getName());
            OldDoFn create = GroupAlsoByWindowViaWindowSetDoFn.create(windowingStrategy, new ConstantStateInternalsFactory(orCreateStepContext.m4stateInternals()), SystemReduceFn.buffering(valueCoder));
            TupleTag<KV<K, Iterable<V>>> tupleTag = new TupleTag<KV<K, Iterable<V>>>() { // from class: org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory.GroupAlsoByWindowEvaluator.1
            };
            this.gabwParDoEvaluator = ParDoEvaluator.create(evaluationContext, orCreateStepContext, committedBundle, appliedPTransform, create, Collections.emptyList(), tupleTag, Collections.emptyList(), ImmutableMap.of(tupleTag, appliedPTransform.getOutput()));
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<KeyedWorkItem<K, V>> windowedValue) throws Exception {
            this.gabwParDoEvaluator.processElement(windowedValue);
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult finishBundle() throws Exception {
            return this.gabwParDoEvaluator.finishBundle();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupAlsoByWindowEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<?> committedBundle) {
        return createEvaluator(appliedPTransform, committedBundle);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> appliedPTransform, DirectRunner.CommittedBundle<KeyedWorkItem<K, V>> committedBundle) {
        return new GroupAlsoByWindowEvaluator(this.evaluationContext, committedBundle, appliedPTransform);
    }
}
