package org.apache.beam.runners.core;

import java.util.ArrayList;
import org.apache.beam.runners.core.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;

@SystemDoFnInternal
/* loaded from: input_file:org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.class */
public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow> extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
    private final WindowingStrategy<?, W> strategy;
    private final StateInternalsFactory<K> stateInternalsFactory;
    private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;

    public GroupAlsoByWindowsViaOutputBufferDoFn(WindowingStrategy<?, W> windowingStrategy, StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, ?, OutputT, W> systemReduceFn) {
        this.strategy = windowingStrategy;
        this.reduceFn = systemReduceFn;
        this.stateInternalsFactory = stateInternalsFactory;
    }

    @Override // org.apache.beam.runners.core.OldDoFn
    public void processElement(OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext processContext) throws Exception {
        Object key = processContext.element().getKey();
        InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
        inMemoryTimerInternals.advanceProcessingTime(Instant.now());
        inMemoryTimerInternals.advanceSynchronizedProcessingTime(Instant.now());
        ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = new ReduceFnRunner<>(key, this.strategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(this.strategy.getTrigger())), this.stateInternalsFactory.stateInternalsForKey(key), inMemoryTimerInternals, WindowingInternalsAdapters.outputWindowedValue(processContext.windowingInternals()), WindowingInternalsAdapters.sideInputReader(processContext.windowingInternals()), this.droppedDueToClosedWindow, this.reduceFn, processContext.getPipelineOptions());
        for (Iterable<WindowedValue<InputT>> iterable : Iterables.partition((Iterable) processContext.element().getValue(), 1000)) {
            reduceFnRunner.processElements(iterable);
            inMemoryTimerInternals.advanceInputWatermark(iterable.iterator().next().getTimestamp());
            inMemoryTimerInternals.advanceProcessingTime(Instant.now());
            inMemoryTimerInternals.advanceSynchronizedProcessingTime(Instant.now());
            fireEligibleTimers(inMemoryTimerInternals, reduceFnRunner);
        }
        inMemoryTimerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        inMemoryTimerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        inMemoryTimerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        fireEligibleTimers(inMemoryTimerInternals, reduceFnRunner);
        reduceFnRunner.persist();
    }

    private void fireEligibleTimers(InMemoryTimerInternals inMemoryTimerInternals, ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner) throws Exception {
        ArrayList arrayList = new ArrayList();
        while (true) {
            TimerInternals.TimerData removeNextEventTimer = inMemoryTimerInternals.removeNextEventTimer();
            if (removeNextEventTimer != null) {
                arrayList.add(removeNextEventTimer);
            } else {
                while (true) {
                    TimerInternals.TimerData removeNextProcessingTimer = inMemoryTimerInternals.removeNextProcessingTimer();
                    if (removeNextProcessingTimer == null) {
                        break;
                    } else {
                        arrayList.add(removeNextProcessingTimer);
                    }
                }
                while (true) {
                    TimerInternals.TimerData removeNextSynchronizedProcessingTimer = inMemoryTimerInternals.removeNextSynchronizedProcessingTimer();
                    if (removeNextSynchronizedProcessingTimer == null) {
                        break;
                    } else {
                        arrayList.add(removeNextSynchronizedProcessingTimer);
                    }
                }
                if (arrayList.isEmpty()) {
                    return;
                }
                reduceFnRunner.onTimers(arrayList);
                arrayList.clear();
            }
        }
    }
}
