package org.apache.beam.runners.core;

import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;

@SystemDoFnInternal
/* loaded from: input_file:org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.class */
public class GroupAlsoByWindowViaWindowSetDoFn<K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>> extends OldDoFn<RinT, KV<K, OutputT>> {
    protected final Aggregator<Long, Long> droppedDueToClosedWindow = createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, (Combine.CombineFn) Sum.ofLongs());
    protected final Aggregator<Long, Long> droppedDueToLateness = createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, (Combine.CombineFn) Sum.ofLongs());
    private final WindowingStrategy<Object, W> windowingStrategy;
    private final StateInternalsFactory<K> stateInternalsFactory;
    private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;

    public static <K, InputT, OutputT, W extends BoundedWindow> OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(WindowingStrategy<?, W> windowingStrategy, StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, ?, OutputT, W> systemReduceFn) {
        return new GroupAlsoByWindowViaWindowSetDoFn(windowingStrategy, stateInternalsFactory, systemReduceFn);
    }

    private GroupAlsoByWindowViaWindowSetDoFn(WindowingStrategy<?, W> windowingStrategy, StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, ?, OutputT, W> systemReduceFn) {
        this.windowingStrategy = windowingStrategy;
        this.reduceFn = systemReduceFn;
        this.stateInternalsFactory = stateInternalsFactory;
    }

    @Override // org.apache.beam.runners.core.OldDoFn
    public void processElement(OldDoFn<RinT, KV<K, OutputT>>.ProcessContext processContext) throws Exception {
        KeyedWorkItem keyedWorkItem = (KeyedWorkItem) processContext.element();
        Object key = keyedWorkItem.key();
        TimerInternals timerInternals = processContext.windowingInternals().timerInternals();
        ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(this.windowingStrategy.getTrigger())), this.stateInternalsFactory.stateInternalsForKey(key), timerInternals, WindowingInternalsAdapters.outputWindowedValue(processContext.windowingInternals()), WindowingInternalsAdapters.sideInputReader(processContext.windowingInternals()), this.droppedDueToClosedWindow, this.reduceFn, processContext.getPipelineOptions());
        reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
        reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
        reduceFnRunner.persist();
    }

    public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
        return this;
    }

    public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
        return this.droppedDueToLateness;
    }
}
