/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkNoOpStepContext;
import org.apache.beam.runners.flink.translation.functions.FlinkSideInputReader;
import org.apache.beam.runners.flink.translation.utils.Workarounds;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

public class FlinkStatefulDoFnFunction<@UnknownKeyFor K, @UnknownKeyFor V, @UnknownKeyFor OutputT>
extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<RawUnionValue>> {
    private final @UnknownKeyFor @NonNull @Initialized DoFn<@UnknownKeyFor @NonNull @Initialized KV<K, V>, OutputT> dofn;
    private @UnknownKeyFor @NonNull @Initialized String stepName;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> windowingStrategy;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?>> sideInputs;
    private final @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions serializedOptions;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Integer> outputMap;
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<OutputT> mainOutputTag;
    private final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized KV<K, V>> inputCoder;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputCoderMap;
    private final @UnknownKeyFor @NonNull @Initialized DoFnSchemaInformation doFnSchemaInformation;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputMapping;
    private transient @UnknownKeyFor @NonNull @Initialized DoFnInvoker doFnInvoker;
    private transient @UnknownKeyFor @NonNull @Initialized FlinkMetricContainer metricContainer;

    public FlinkStatefulDoFnFunction(@UnknownKeyFor @NonNull @Initialized DoFn<@UnknownKeyFor @NonNull @Initialized KV<K, V>, OutputT> dofn, @UnknownKeyFor @NonNull @Initialized String stepName, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> windowingStrategy, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?>> sideInputs, @UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Integer> outputMap, @UnknownKeyFor @NonNull @Initialized TupleTag<OutputT> mainOutputTag, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized KV<K, V>> inputCoder, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputCoderMap, @UnknownKeyFor @NonNull @Initialized DoFnSchemaInformation doFnSchemaInformation, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputMapping) {
        this.dofn = dofn;
        this.stepName = stepName;
        this.windowingStrategy = windowingStrategy;
        this.sideInputs = sideInputs;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.outputMap = outputMap;
        this.mainOutputTag = mainOutputTag;
        this.inputCoder = inputCoder;
        this.outputCoderMap = outputCoderMap;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.sideInputMapping = sideInputMapping;
    }

    public void reduce(@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, V>>> values, @UnknownKeyFor @NonNull @Initialized Collector<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized RawUnionValue>> out) throws @UnknownKeyFor @NonNull @Initialized Exception {
        RuntimeContext runtimeContext = this.getRuntimeContext();
        DoFnRunners.OutputManager outputManager = this.outputMap.size() == 1 ? new FlinkDoFnFunction.DoFnOutputManager(out) : new FlinkDoFnFunction.MultiDoFnOutputManager(out, this.outputMap);
        Iterator<WindowedValue<KV<K, V>>> iterator = values.iterator();
        WindowedValue<KV<K, V>> currentValue = iterator.next();
        Object key = ((KV)currentValue.getValue()).getKey();
        final InMemoryStateInternals stateInternals = InMemoryStateInternals.forKey((Object)key);
        final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
        timerInternals.advanceProcessingTime(Instant.now());
        timerInternals.advanceSynchronizedProcessingTime(Instant.now());
        ArrayList additionalOutputTags = Lists.newArrayList(this.outputMap.keySet());
        DoFnRunnerWithMetricsUpdate doFnRunner = DoFnRunners.simpleRunner((PipelineOptions)this.serializedOptions.get(), this.dofn, (SideInputReader)new FlinkSideInputReader(this.sideInputs, runtimeContext), (DoFnRunners.OutputManager)outputManager, this.mainOutputTag, (List)additionalOutputTags, (StepContext)new FlinkNoOpStepContext(){

            @Override
            public @UnknownKeyFor @NonNull @Initialized StateInternals stateInternals() {
                return stateInternals;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized TimerInternals timerInternals() {
                return timerInternals;
            }
        }, this.inputCoder, this.outputCoderMap, this.windowingStrategy, (DoFnSchemaInformation)this.doFnSchemaInformation, this.sideInputMapping);
        FlinkPipelineOptions pipelineOptions = (FlinkPipelineOptions)this.serializedOptions.get().as(FlinkPipelineOptions.class);
        if (!pipelineOptions.getDisableMetrics().booleanValue()) {
            doFnRunner = new DoFnRunnerWithMetricsUpdate(this.stepName, doFnRunner, this.metricContainer);
        }
        doFnRunner.startBundle();
        doFnRunner.processElement(currentValue);
        while (iterator.hasNext()) {
            currentValue = iterator.next();
            doFnRunner.processElement(currentValue);
        }
        timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.fireEligibleTimers(key, timerInternals, doFnRunner);
        doFnRunner.finishBundle();
    }

    private void fireEligibleTimers(K key, @UnknownKeyFor @NonNull @Initialized InMemoryTimerInternals timerInternals, @UnknownKeyFor @NonNull @Initialized DoFnRunner<@UnknownKeyFor @NonNull @Initialized KV<K, V>, OutputT> runner) throws @UnknownKeyFor @NonNull @Initialized Exception {
        boolean hasFired;
        do {
            TimerInternals.TimerData timer;
            hasFired = false;
            while ((timer = timerInternals.removeNextEventTimer()) != null) {
                hasFired = true;
                this.fireTimer(key, timer, runner);
            }
            while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
                hasFired = true;
                this.fireTimer(key, timer, runner);
            }
            while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
                hasFired = true;
                this.fireTimer(key, timer, runner);
            }
        } while (hasFired);
    }

    private void fireTimer(K key, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized TimerInternals.TimerData timer, @UnknownKeyFor @NonNull @Initialized DoFnRunner<@UnknownKeyFor @NonNull @Initialized KV<K, V>, OutputT> doFnRunner) {
        StateNamespace namespace = timer.getNamespace();
        Preconditions.checkArgument((boolean)(namespace instanceof StateNamespaces.WindowNamespace));
        BoundedWindow window = ((StateNamespaces.WindowNamespace)namespace).getWindow();
        doFnRunner.onTimer(timer.getTimerId(), timer.getTimerFamilyId(), key, window, timer.getTimestamp(), timer.getOutputTimestamp(), timer.getDomain());
    }

    public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) {
        FileSystems.setDefaultPipelineOptions((PipelineOptions)this.serializedOptions.get());
        this.metricContainer = new FlinkMetricContainer(this.getRuntimeContext());
        this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(this.dofn);
    }

    public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
        try {
            this.metricContainer.registerMetricsForPipelineResult();
            Optional.ofNullable(this.doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown);
        }
        finally {
            Workarounds.deleteStaticCaches();
        }
    }
}

