package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.class */
public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow> implements FlatMapGroupsFunction<K, WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, Iterable<InputT>>>> {
    private final WindowingStrategy<?, W> windowingStrategy;
    private final StateInternalsFactory<K> stateInternalsFactory;
    private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn;
    private final SerializablePipelineOptions options;

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn$GABWOutputWindowedValue.class */
    private static class GABWOutputWindowedValue<K, V> implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final List<WindowedValue<KV<K, Iterable<V>>>> outputs;

        private GABWOutputWindowedValue() {
            this.outputs = new ArrayList();
        }

        public void outputWindowedValue(KV<K, Iterable<V>> kv, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.outputs.add(WindowedValue.of(kv, instant, collection, paneInfo));
        }

        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
        }

        Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
            return this.outputs;
        }

        public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
            outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
        }
    }

    public GroupAlsoByWindowViaOutputBufferFn(WindowingStrategy<?, W> windowingStrategy, StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> systemReduceFn, SerializablePipelineOptions serializablePipelineOptions) {
        this.windowingStrategy = windowingStrategy;
        this.stateInternalsFactory = stateInternalsFactory;
        this.reduceFn = systemReduceFn;
        this.options = serializablePipelineOptions;
    }

    public Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call(K k, Iterator<WindowedValue<KV<K, InputT>>> it) throws Exception {
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            WindowedValue<KV<K, InputT>> next = it.next();
            arrayList.add(next.withValue(((KV) next.getValue()).getValue()));
        }
        InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
        inMemoryTimerInternals.advanceProcessingTime(Instant.now());
        inMemoryTimerInternals.advanceSynchronizedProcessingTime(Instant.now());
        StateInternals stateInternalsForKey = this.stateInternalsFactory.stateInternalsForKey(k);
        GABWOutputWindowedValue gABWOutputWindowedValue = new GABWOutputWindowedValue();
        ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner = new ReduceFnRunner<>(k, this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(this.windowingStrategy.getTrigger()))), stateInternalsForKey, inMemoryTimerInternals, gABWOutputWindowedValue, new UnsupportedSideInputReader("GroupAlsoByWindow"), this.reduceFn, this.options.get());
        reduceFnRunner.processElements(arrayList);
        inMemoryTimerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        inMemoryTimerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        inMemoryTimerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        fireEligibleTimers(inMemoryTimerInternals, reduceFnRunner);
        reduceFnRunner.persist();
        return gABWOutputWindowedValue.getOutputs().iterator();
    }

    private void fireEligibleTimers(InMemoryTimerInternals inMemoryTimerInternals, ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner) throws Exception {
        ArrayList arrayList = new ArrayList();
        while (true) {
            TimerInternals.TimerData removeNextEventTimer = inMemoryTimerInternals.removeNextEventTimer();
            if (removeNextEventTimer != null) {
                arrayList.add(removeNextEventTimer);
            } else {
                while (true) {
                    TimerInternals.TimerData removeNextProcessingTimer = inMemoryTimerInternals.removeNextProcessingTimer();
                    if (removeNextProcessingTimer == null) {
                        break;
                    } else {
                        arrayList.add(removeNextProcessingTimer);
                    }
                }
                while (true) {
                    TimerInternals.TimerData removeNextSynchronizedProcessingTimer = inMemoryTimerInternals.removeNextSynchronizedProcessingTimer();
                    if (removeNextSynchronizedProcessingTimer == null) {
                        break;
                    } else {
                        arrayList.add(removeNextSynchronizedProcessingTimer);
                    }
                }
                if (arrayList.isEmpty()) {
                    return;
                }
                reduceFnRunner.onTimers(arrayList);
                arrayList.clear();
            }
        }
    }
}
