package org.apache.beam.runners.flink.translation.functions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.class */
public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction implements MapPartitionFunction<WindowedValue<InputT>, RawUnionValue>, GroupReduceFunction<WindowedValue<InputT>, RawUnionValue> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutableStageFunction.class);
    private final RunnerApi.ExecutableStagePayload stagePayload;
    private final JobInfo jobInfo;
    private final Map<String, Integer> outputMap;
    private final FlinkExecutableStageContext.Factory contextFactory;
    private final Coder windowCoder;
    private final String stageName;
    private transient RuntimeContext runtimeContext;
    private transient FlinkMetricContainer container;
    private transient StateRequestHandler stateRequestHandler;
    private transient FlinkExecutableStageContext stageContext;
    private transient StageBundleFactory stageBundleFactory;
    private transient BundleProgressHandler progressHandler;
    private transient InMemoryBagUserStateFactory bagUserStateHandlerFactory;
    private transient ExecutableStage executableStage;
    private transient Object currentTimerKey;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction$InMemoryBagUserStateFactory.class */
    public static class InMemoryBagUserStateFactory implements StateRequestHandlers.BagUserStateHandlerFactory {
        private List<InMemorySingleKeyBagState> handlers;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction$InMemoryBagUserStateFactory$InMemorySingleKeyBagState.class */
        public static class InMemorySingleKeyBagState<K, V, W extends BoundedWindow> implements StateRequestHandlers.BagUserStateHandler<K, V, W> {
            private final StateTag<BagState<V>> stateTag;
            private final Coder<W> windowCoder;
            private volatile StateInternals stateInternals;

            InMemorySingleKeyBagState(String str, Coder<V> coder, Coder<W> coder2) {
                this.windowCoder = coder2;
                this.stateTag = StateTags.bag(str, coder);
            }

            public Iterable<V> get(K k, W w) {
                initStateInternals(k);
                return this.stateInternals.state(StateNamespaces.window(this.windowCoder, w), this.stateTag).read();
            }

            public void append(K k, W w, Iterator<V> it) {
                initStateInternals(k);
                BagState state = this.stateInternals.state(StateNamespaces.window(this.windowCoder, w), this.stateTag);
                while (it.hasNext()) {
                    state.add(it.next());
                }
            }

            public void clear(K k, W w) {
                initStateInternals(k);
                this.stateInternals.state(StateNamespaces.window(this.windowCoder, w), this.stateTag).clear();
            }

            private void initStateInternals(K k) {
                if (this.stateInternals == null) {
                    this.stateInternals = InMemoryStateInternals.forKey(k);
                }
            }

            void reset() {
                this.stateInternals = null;
            }
        }

        private InMemoryBagUserStateFactory() {
            this.handlers = new ArrayList();
        }

        public <K, V, W extends BoundedWindow> StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(String str, String str2, Coder<K> coder, Coder<V> coder2, Coder<W> coder3) {
            InMemorySingleKeyBagState inMemorySingleKeyBagState = new InMemorySingleKeyBagState(str2, coder2, coder3);
            this.handlers.add(inMemorySingleKeyBagState);
            return inMemorySingleKeyBagState;
        }

        void resetForNewKey() {
            Iterator<InMemorySingleKeyBagState> it = this.handlers.iterator();
            while (it.hasNext()) {
                it.next().reset();
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction$ReceiverFactory.class */
    private static class ReceiverFactory implements OutputReceiverFactory {
        private final Object collectorLock;

        @GuardedBy("collectorLock")
        private final Collector<RawUnionValue> collector;
        private final Map<String, Integer> outputMap;

        @Nullable
        private final TimerReceiverFactory timerReceiverFactory;

        ReceiverFactory(Collector<RawUnionValue> collector, Map<String, Integer> map) {
            this(collector, map, null);
        }

        ReceiverFactory(Collector<RawUnionValue> collector, Map<String, Integer> map, @Nullable TimerReceiverFactory timerReceiverFactory) {
            this.collectorLock = new Object();
            this.collector = collector;
            this.outputMap = map;
            this.timerReceiverFactory = timerReceiverFactory;
        }

        public <OutputT> FnDataReceiver<OutputT> create(String str) {
            Integer num = this.outputMap.get(str);
            if (num != null) {
                int intValue = num.intValue();
                return obj -> {
                    synchronized (this.collectorLock) {
                        this.collector.collect(new RawUnionValue(intValue, obj));
                    }
                };
            }
            if (this.timerReceiverFactory != null) {
                return this.timerReceiverFactory.create(str);
            }
            throw new IllegalStateException(String.format(Locale.ENGLISH, "Unknown PCollectionId %s", str));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction$TimerReceiverFactory.class */
    private static class TimerReceiverFactory implements OutputReceiverFactory {
        private final StageBundleFactory stageBundleFactory;
        private final HashMap<String, ProcessBundleDescriptors.TimerSpec> timerOutputIdToSpecMap = new HashMap<>();
        private final Map<String, Map<String, ProcessBundleDescriptors.TimerSpec>> timerSpecMap;
        private final BiConsumer<WindowedValue, TimerInternals.TimerData> timerDataConsumer;
        private final Coder windowCoder;

        TimerReceiverFactory(StageBundleFactory stageBundleFactory, Collection<TimerReference> collection, Map<String, Map<String, ProcessBundleDescriptors.TimerSpec>> map, BiConsumer<WindowedValue, TimerInternals.TimerData> biConsumer, Coder coder) {
            this.stageBundleFactory = stageBundleFactory;
            Iterator it = stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values().iterator();
            while (it.hasNext()) {
                for (ProcessBundleDescriptors.TimerSpec timerSpec : ((Map) it.next()).values()) {
                    this.timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec);
                }
            }
            this.timerSpecMap = map;
            this.timerDataConsumer = biConsumer;
            this.windowCoder = coder;
        }

        public <OutputT> FnDataReceiver<OutputT> create(String str) {
            ProcessBundleDescriptors.TimerSpec timerSpec = this.timerOutputIdToSpecMap.get(str);
            return obj -> {
                WindowedValue windowedValue = (WindowedValue) obj;
                Timer timer = (Timer) Preconditions.checkNotNull((Timer) ((KV) windowedValue.getValue()).getValue(), "Received null Timer from SDK harness: %s", new Object[]{obj});
                FlinkExecutableStageFunction.LOG.debug("Timer received: {} {}", str, timer);
                Iterator it = windowedValue.getWindows().iterator();
                while (it.hasNext()) {
                    this.timerDataConsumer.accept(windowedValue, TimerInternals.TimerData.of(timerSpec.inputCollectionId(), StateNamespaces.window(this.windowCoder, (BoundedWindow) it.next()), timer.getTimestamp(), timerSpec.getTimerSpec().getTimeDomain()));
                }
            };
        }
    }

    public FlinkExecutableStageFunction(RunnerApi.ExecutableStagePayload executableStagePayload, JobInfo jobInfo, Map<String, Integer> map, FlinkExecutableStageContext.Factory factory, Coder coder) {
        this.stagePayload = executableStagePayload;
        this.jobInfo = jobInfo;
        this.outputMap = map;
        this.contextFactory = factory;
        this.windowCoder = coder;
        this.stageName = executableStagePayload.getInput();
    }

    public void open(Configuration configuration) throws Exception {
        FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
        this.executableStage = ExecutableStage.fromPayload(this.stagePayload);
        this.runtimeContext = getRuntimeContext();
        this.container = new FlinkMetricContainer(getRuntimeContext());
        this.stageContext = this.contextFactory.get(this.jobInfo);
        this.stageBundleFactory = this.stageContext.getStageBundleFactory(this.executableStage);
        this.stateRequestHandler = getStateRequestHandler(this.executableStage, this.stageBundleFactory.getProcessBundleDescriptor(), this.runtimeContext);
        this.progressHandler = new BundleProgressHandler() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.1
            public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
                FlinkExecutableStageFunction.this.container.updateMetrics(FlinkExecutableStageFunction.this.stageName, processBundleProgressResponse.getMonitoringInfosList());
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                FlinkExecutableStageFunction.this.container.updateMetrics(FlinkExecutableStageFunction.this.stageName, processBundleResponse.getMonitoringInfosList());
            }
        };
    }

    private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, RuntimeContext runtimeContext) {
        StateRequestHandler unsupported;
        try {
            StateRequestHandler forSideInputHandlerFactory = StateRequestHandlers.forSideInputHandlerFactory(ProcessBundleDescriptors.getSideInputs(executableStage), FlinkBatchSideInputHandlerFactory.forStage(executableStage, runtimeContext));
            if (executableStage.getUserStates().size() > 0) {
                this.bagUserStateHandlerFactory = new InMemoryBagUserStateFactory();
                unsupported = StateRequestHandlers.forBagUserStateHandlerFactory(executableProcessBundleDescriptor, this.bagUserStateHandlerFactory);
            } else {
                unsupported = StateRequestHandler.unsupported();
            }
            EnumMap enumMap = new EnumMap(BeamFnApi.StateKey.TypeCase.class);
            enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) forSideInputHandlerFactory);
            enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, (BeamFnApi.StateKey.TypeCase) unsupported);
            return StateRequestHandlers.delegateBasedUponType(enumMap);
        } catch (IOException e) {
            throw new RuntimeException("Failed to setup state handler", e);
        }
    }

    public void mapPartition(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> collector) throws Exception {
        RemoteBundle bundle = this.stageBundleFactory.getBundle(new ReceiverFactory(collector, this.outputMap), this.stateRequestHandler, this.progressHandler);
        Throwable th = null;
        try {
            try {
                processElements(iterable, bundle);
                if (bundle != null) {
                    $closeResource(null, bundle);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (bundle != null) {
                $closeResource(th, bundle);
            }
            throw th3;
        }
    }

    public void reduce(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> collector) throws Exception {
        Throwable th;
        if (this.bagUserStateHandlerFactory != null) {
            this.bagUserStateHandlerFactory.resetForNewKey();
        }
        InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
        inMemoryTimerInternals.advanceProcessingTime(Instant.now());
        inMemoryTimerInternals.advanceSynchronizedProcessingTime(Instant.now());
        ReceiverFactory receiverFactory = new ReceiverFactory(collector, this.outputMap, new TimerReceiverFactory(this.stageBundleFactory, this.executableStage.getTimers(), this.stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(), (windowedValue, timerData) -> {
            this.currentTimerKey = ((KV) windowedValue.getValue()).getKey();
            inMemoryTimerInternals.setTimer(timerData);
        }, this.windowCoder));
        RemoteBundle bundle = this.stageBundleFactory.getBundle(receiverFactory, this.stateRequestHandler, this.progressHandler);
        Throwable th2 = null;
        try {
            try {
                processElements(iterable, bundle);
                if (bundle != null) {
                    $closeResource(null, bundle);
                }
                inMemoryTimerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
                inMemoryTimerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
                inMemoryTimerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
                bundle = this.stageBundleFactory.getBundle(receiverFactory, this.stateRequestHandler, this.progressHandler);
                th = null;
            } catch (Throwable th3) {
                th2 = th3;
                throw th3;
            }
            try {
                try {
                    fireEligibleTimers(inMemoryTimerInternals, (str, windowedValue2) -> {
                        FnDataReceiver fnDataReceiver = (FnDataReceiver) bundle.getInputReceivers().get(str);
                        Preconditions.checkNotNull(fnDataReceiver, "No FnDataReceiver found for %s", new Object[]{str});
                        try {
                            fnDataReceiver.accept(windowedValue2);
                        } catch (Exception e) {
                            throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to process timer: %s", windowedValue2));
                        }
                    });
                    if (bundle != null) {
                        $closeResource(null, bundle);
                    }
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } finally {
        }
    }

    private void processElements(Iterable<WindowedValue<InputT>> iterable, RemoteBundle remoteBundle) throws Exception {
        Preconditions.checkArgument(remoteBundle != null, "RemoteBundle must not be null");
        String id = this.executableStage.getInputPCollection().getId();
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Preconditions.checkNotNull((FnDataReceiver) remoteBundle.getInputReceivers().get(id), "Main input receiver for %s could not be initialized", new Object[]{id});
        Iterator<WindowedValue<InputT>> it = iterable.iterator();
        while (it.hasNext()) {
            fnDataReceiver.accept(it.next());
        }
    }

    private void fireEligibleTimers(InMemoryTimerInternals inMemoryTimerInternals, BiConsumer<String, WindowedValue> biConsumer) {
        boolean z;
        do {
            z = false;
            while (true) {
                TimerInternals.TimerData removeNextEventTimer = inMemoryTimerInternals.removeNextEventTimer();
                if (removeNextEventTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextEventTimer, biConsumer);
            }
            while (true) {
                TimerInternals.TimerData removeNextProcessingTimer = inMemoryTimerInternals.removeNextProcessingTimer();
                if (removeNextProcessingTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextProcessingTimer, biConsumer);
            }
            while (true) {
                TimerInternals.TimerData removeNextSynchronizedProcessingTimer = inMemoryTimerInternals.removeNextSynchronizedProcessingTimer();
                if (removeNextSynchronizedProcessingTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextSynchronizedProcessingTimer, biConsumer);
            }
        } while (z);
    }

    private void fireTimer(TimerInternals.TimerData timerData, BiConsumer<String, WindowedValue> biConsumer) {
        StateNamespaces.WindowNamespace namespace = timerData.getNamespace();
        Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
        BoundedWindow window = namespace.getWindow();
        Instant timestamp = timerData.getTimestamp();
        biConsumer.accept(timerData.getTimerId(), WindowedValue.of(KV.of(this.currentTimerKey, Timer.of(timestamp, new byte[0])), timestamp, Collections.singleton(window), PaneInfo.NO_FIRING));
    }

    /* JADX WARN: Finally extract failed */
    public void close() throws Exception {
        if (this.stageContext != null) {
            try {
                StageBundleFactory stageBundleFactory = this.stageBundleFactory;
                try {
                    FlinkExecutableStageContext flinkExecutableStageContext = this.stageContext;
                    if (flinkExecutableStageContext != null) {
                        $closeResource(null, flinkExecutableStageContext);
                    }
                    if (stageBundleFactory != null) {
                        $closeResource(null, stageBundleFactory);
                    }
                } catch (Throwable th) {
                    if (stageBundleFactory != null) {
                        $closeResource(null, stageBundleFactory);
                    }
                    throw th;
                }
            } catch (Exception e) {
                LOG.error("Error in close: ", e);
                throw e;
            }
        }
        this.stageContext = null;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
