package org.apache.beam.runners.flink.translation.functions;

import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
import org.apache.beam.runners.flink.translation.utils.FlinkClassloading;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.class */
public class FlinkStatefulDoFnFunction<K, V, OutputT> extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> {
    private final DoFn<KV<K, V>, OutputT> dofn;
    private String stepName;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
    private final SerializablePipelineOptions serializedOptions;
    private final Map<TupleTag<?>, Integer> outputMap;
    private final TupleTag<OutputT> mainOutputTag;
    private final Coder<KV<K, V>> inputCoder;
    private final Map<TupleTag<?>, Coder<?>> outputCoderMap;
    private final DoFnSchemaInformation doFnSchemaInformation;
    private final Map<String, PCollectionView<?>> sideInputMapping;
    private transient DoFnInvoker doFnInvoker;

    public FlinkStatefulDoFnFunction(DoFn<KV<K, V>, OutputT> doFn, String str, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> map, PipelineOptions pipelineOptions, Map<TupleTag<?>, Integer> map2, TupleTag<OutputT> tupleTag, Coder<KV<K, V>> coder, Map<TupleTag<?>, Coder<?>> map3, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map4) {
        this.dofn = doFn;
        this.stepName = str;
        this.windowingStrategy = windowingStrategy;
        this.sideInputs = map;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.outputMap = map2;
        this.mainOutputTag = tupleTag;
        this.inputCoder = coder;
        this.outputCoderMap = map3;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.sideInputMapping = map4;
    }

    public void reduce(Iterable<WindowedValue<KV<K, V>>> iterable, Collector<WindowedValue<OutputT>> collector) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        DoFnRunners.OutputManager doFnOutputManager = this.outputMap.size() == 1 ? new FlinkDoFnFunction.DoFnOutputManager(collector) : new FlinkDoFnFunction.MultiDoFnOutputManager(collector, this.outputMap);
        Iterator<WindowedValue<KV<K, V>>> it = iterable.iterator();
        WindowedValue<KV<K, V>> next = it.next();
        final InMemoryStateInternals forKey = InMemoryStateInternals.forKey(((KV) next.getValue()).getKey());
        final InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
        inMemoryTimerInternals.advanceProcessingTime(Instant.now());
        inMemoryTimerInternals.advanceSynchronizedProcessingTime(Instant.now());
        DoFnRunner<KV<K, V>, OutputT> simpleRunner = DoFnRunners.simpleRunner(this.serializedOptions.get(), this.dofn, new FlinkSideInputReader(this.sideInputs, runtimeContext), doFnOutputManager, this.mainOutputTag, Lists.newArrayList(this.outputMap.keySet()), new FlinkNoOpStepContext() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction.1
            @Override // org.apache.beam.runners.flink.translation.functions.FlinkNoOpStepContext
            public StateInternals stateInternals() {
                return forKey;
            }

            @Override // org.apache.beam.runners.flink.translation.functions.FlinkNoOpStepContext
            public TimerInternals timerInternals() {
                return inMemoryTimerInternals;
            }
        }, this.inputCoder, this.outputCoderMap, this.windowingStrategy, this.doFnSchemaInformation, this.sideInputMapping);
        if (((FlinkPipelineOptions) this.serializedOptions.get().as(FlinkPipelineOptions.class)).getEnableMetrics().booleanValue()) {
            simpleRunner = new DoFnRunnerWithMetricsUpdate(this.stepName, simpleRunner, new FlinkMetricContainer(getRuntimeContext()));
        }
        simpleRunner.startBundle();
        simpleRunner.processElement(next);
        while (it.hasNext()) {
            simpleRunner.processElement(it.next());
        }
        inMemoryTimerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        inMemoryTimerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        inMemoryTimerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        fireEligibleTimers(inMemoryTimerInternals, simpleRunner);
        simpleRunner.finishBundle();
    }

    private void fireEligibleTimers(InMemoryTimerInternals inMemoryTimerInternals, DoFnRunner<KV<K, V>, OutputT> doFnRunner) throws Exception {
        boolean z;
        do {
            z = false;
            while (true) {
                TimerInternals.TimerData removeNextEventTimer = inMemoryTimerInternals.removeNextEventTimer();
                if (removeNextEventTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextEventTimer, doFnRunner);
            }
            while (true) {
                TimerInternals.TimerData removeNextProcessingTimer = inMemoryTimerInternals.removeNextProcessingTimer();
                if (removeNextProcessingTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextProcessingTimer, doFnRunner);
            }
            while (true) {
                TimerInternals.TimerData removeNextSynchronizedProcessingTimer = inMemoryTimerInternals.removeNextSynchronizedProcessingTimer();
                if (removeNextSynchronizedProcessingTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextSynchronizedProcessingTimer, doFnRunner);
            }
        } while (z);
    }

    private void fireTimer(TimerInternals.TimerData timerData, DoFnRunner<KV<K, V>, OutputT> doFnRunner) {
        StateNamespaces.WindowNamespace namespace = timerData.getNamespace();
        Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
        doFnRunner.onTimer(timerData.getTimerId(), namespace.getWindow(), timerData.getTimestamp(), timerData.getDomain());
    }

    public void open(Configuration configuration) {
        FileSystems.setDefaultPipelineOptions(this.serializedOptions.get());
        this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(this.dofn);
    }

    public void close() throws Exception {
        try {
            Optional.ofNullable(this.doFnInvoker).ifPresent((v0) -> {
                v0.invokeTeardown();
            });
        } finally {
            FlinkClassloading.deleteStaticCaches();
        }
    }
}
