package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.class */
public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<InputT, OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExecutableStageDoFnOperator.class);
    private final RunnerApi.ExecutableStagePayload payload;
    private final JobInfo jobInfo;
    private final FlinkExecutableStageContext.Factory contextFactory;
    private final Map<String, TupleTag<?>> outputMap;
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds;
    private final ReentrantLock stateBackendLock;
    private transient FlinkExecutableStageContext stageContext;
    private transient StateRequestHandler stateRequestHandler;
    private transient BundleProgressHandler progressHandler;
    private transient StageBundleFactory stageBundleFactory;
    private transient ExecutableStage executableStage;
    private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
    private transient FlinkMetricContainer flinkMetricContainer;
    private transient long backupWatermarkHold;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$BagUserStateFactory.class */
    public static class BagUserStateFactory<K extends ByteString, V, W extends BoundedWindow> implements StateRequestHandlers.BagUserStateHandlerFactory<K, V, W> {
        private final StateInternals stateInternals;
        private final KeyedStateBackend<ByteBuffer> keyedStateBackend;
        private final Lock stateBackendLock;

        private BagUserStateFactory(StateInternals stateInternals, KeyedStateBackend<ByteBuffer> keyedStateBackend, Lock lock) {
            this.stateInternals = stateInternals;
            this.keyedStateBackend = keyedStateBackend;
            this.stateBackendLock = lock;
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandlerFactory
        public StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(final String str, final String str2, Coder<K> coder, final Coder<V> coder2, final Coder<W> coder3) {
            return (StateRequestHandlers.BagUserStateHandler<K, V, W>) new StateRequestHandlers.BagUserStateHandler<K, V, W>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.BagUserStateFactory.1
                /* JADX WARN: Multi-variable type inference failed */
                public Iterable<V> get(K k, W w) {
                    try {
                        BagUserStateFactory.this.stateBackendLock.lock();
                        prepareStateBackend(k);
                        StateNamespace window = StateNamespaces.window(coder3, w);
                        if (ExecutableStageDoFnOperator.LOG.isDebugEnabled()) {
                            ExecutableStageDoFnOperator.LOG.debug("State get for {} {} {} {}", str, str2, Arrays.toString(((ByteBuffer) BagUserStateFactory.this.keyedStateBackend.getCurrentKey()).array()), w);
                        }
                        Iterable<V> read = ((BagState) BagUserStateFactory.this.stateInternals.state(window, StateTags.bag(str2, coder2))).read();
                        BagUserStateFactory.this.stateBackendLock.unlock();
                        return read;
                    } catch (Throwable th) {
                        BagUserStateFactory.this.stateBackendLock.unlock();
                        throw th;
                    }
                }

                /* JADX WARN: Multi-variable type inference failed */
                public void append(K k, W w, Iterator<V> it) {
                    try {
                        BagUserStateFactory.this.stateBackendLock.lock();
                        prepareStateBackend(k);
                        StateNamespace window = StateNamespaces.window(coder3, w);
                        if (ExecutableStageDoFnOperator.LOG.isDebugEnabled()) {
                            ExecutableStageDoFnOperator.LOG.debug("State append for {} {} {} {}", str, str2, Arrays.toString(((ByteBuffer) BagUserStateFactory.this.keyedStateBackend.getCurrentKey()).array()), w);
                        }
                        BagState bagState = (BagState) BagUserStateFactory.this.stateInternals.state(window, StateTags.bag(str2, coder2));
                        while (it.hasNext()) {
                            bagState.add(it.next());
                        }
                    } finally {
                        BagUserStateFactory.this.stateBackendLock.unlock();
                    }
                }

                /* JADX WARN: Multi-variable type inference failed */
                public void clear(K k, W w) {
                    try {
                        BagUserStateFactory.this.stateBackendLock.lock();
                        prepareStateBackend(k);
                        StateNamespace window = StateNamespaces.window(coder3, w);
                        if (ExecutableStageDoFnOperator.LOG.isDebugEnabled()) {
                            ExecutableStageDoFnOperator.LOG.debug("State clear for {} {} {} {}", str, str2, Arrays.toString(((ByteBuffer) BagUserStateFactory.this.keyedStateBackend.getCurrentKey()).array()), w);
                        }
                        ((BagState) BagUserStateFactory.this.stateInternals.state(window, StateTags.bag(str2, coder2))).clear();
                        BagUserStateFactory.this.stateBackendLock.unlock();
                    } catch (Throwable th) {
                        BagUserStateFactory.this.stateBackendLock.unlock();
                        throw th;
                    }
                }

                private void prepareStateBackend(K k) {
                    BagUserStateFactory.this.keyedStateBackend.setCurrentKey(ByteBuffer.wrap(k.toByteArray()));
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandler
                public /* bridge */ /* synthetic */ void clear(Object obj, BoundedWindow boundedWindow) {
                    clear((AnonymousClass1) obj, (ByteString) boundedWindow);
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandler
                public /* bridge */ /* synthetic */ void append(Object obj, BoundedWindow boundedWindow, Iterator it) {
                    append((AnonymousClass1) obj, (ByteString) boundedWindow, it);
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandler
                public /* bridge */ /* synthetic */ Iterable get(Object obj, BoundedWindow boundedWindow) {
                    return get((AnonymousClass1) obj, (ByteString) boundedWindow);
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$CleanupTimer.class */
    public static class CleanupTimer<InputT> implements StatefulDoFnRunner.CleanupTimer<InputT> {
        private static final String GC_TIMER_ID = "__user-state-cleanup__";
        private final TimerInternals timerInternals;
        private final Lock stateBackendLock;
        private final WindowingStrategy windowingStrategy;
        private final Coder keyCoder;
        private final Coder windowCoder;
        private final KeyedStateBackend<ByteBuffer> keyedStateBackend;

        CleanupTimer(TimerInternals timerInternals, Lock lock, WindowingStrategy windowingStrategy, Coder coder, Coder coder2, KeyedStateBackend<ByteBuffer> keyedStateBackend) {
            this.timerInternals = timerInternals;
            this.stateBackendLock = lock;
            this.windowingStrategy = windowingStrategy;
            this.keyCoder = coder;
            this.windowCoder = coder2;
            this.keyedStateBackend = keyedStateBackend;
        }

        @Override // org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer
        public Instant currentInputWatermarkTime() {
            return this.timerInternals.currentInputWatermarkTime();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer
        public void setForWindow(InputT inputt, BoundedWindow boundedWindow) {
            Preconditions.checkNotNull(inputt, "Null input passed to CleanupTimer");
            Instant plus = LateDataUtils.garbageCollectionTime(boundedWindow, this.windowingStrategy).plus(1L);
            ByteBuffer encodeKey = FlinkKeyUtils.encodeKey(((KV) inputt).getKey(), this.keyCoder);
            try {
                this.stateBackendLock.lock();
                this.keyedStateBackend.setCurrentKey(encodeKey);
                this.timerInternals.setTimer(StateNamespaces.window(this.windowCoder, boundedWindow), GC_TIMER_ID, plus, TimeDomain.EVENT_TIME);
                this.stateBackendLock.unlock();
            } catch (Throwable th) {
                this.stateBackendLock.unlock();
                throw th;
            }
        }

        @Override // org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer
        public boolean isForWindow(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
            return timeDomain.equals(TimeDomain.EVENT_TIME) && GC_TIMER_ID.equals(str) && LateDataUtils.garbageCollectionTime(boundedWindow, this.windowingStrategy).plus(1L).equals(instant);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$NoOpDoFn.class */
    private static class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
        private NoOpDoFn() {
        }

        @DoFn.ProcessElement
        public void doNothing(DoFn<InputT, OutputT>.ProcessContext processContext) {
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.class */
    private static class SdkHarnessDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
        private final String mainInput;
        private final LinkedBlockingQueue<KV<String, OutputT>> outputQueue;
        private final StageBundleFactory stageBundleFactory;
        private final StateRequestHandler stateRequestHandler;
        private final BundleProgressHandler progressHandler;
        private final DoFnOperator.BufferedOutputManager<OutputT> outputManager;
        private final Map<String, TupleTag<?>> outputMap;
        private final Map<String, ProcessBundleDescriptors.TimerSpec> timerOutputIdToSpecMap = new HashMap();
        private final Coder<BoundedWindow> windowCoder;
        private final BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration;
        private final Supplier<Object> keyForTimer;
        private RemoteBundle remoteBundle;
        private FnDataReceiver<WindowedValue<?>> mainInputReceiver;

        public SdkHarnessDoFnRunner(String str, StageBundleFactory stageBundleFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler, DoFnOperator.BufferedOutputManager<OutputT> bufferedOutputManager, Map<String, TupleTag<?>> map, Coder<BoundedWindow> coder, BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> biConsumer, Supplier<Object> supplier) {
            this.mainInput = str;
            this.stageBundleFactory = stageBundleFactory;
            this.stateRequestHandler = stateRequestHandler;
            this.progressHandler = bundleProgressHandler;
            this.outputManager = bufferedOutputManager;
            this.outputMap = map;
            this.timerRegistration = biConsumer;
            this.keyForTimer = supplier;
            Iterator<Map<String, ProcessBundleDescriptors.TimerSpec>> it = stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values().iterator();
            while (it.hasNext()) {
                for (ProcessBundleDescriptors.TimerSpec timerSpec : it.next().values()) {
                    this.timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec);
                }
            }
            this.windowCoder = coder;
            this.outputQueue = new LinkedBlockingQueue<>();
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public void startBundle() {
            try {
                this.remoteBundle = this.stageBundleFactory.getBundle(new OutputReceiverFactory() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.SdkHarnessDoFnRunner.1
                    @Override // org.apache.beam.runners.fnexecution.control.OutputReceiverFactory
                    public FnDataReceiver<OutputT> create(String str) {
                        return obj -> {
                            SdkHarnessDoFnRunner.this.outputQueue.put(KV.of(str, obj));
                        };
                    }
                }, this.stateRequestHandler, this.progressHandler);
                this.mainInputReceiver = (FnDataReceiver) Preconditions.checkNotNull(this.remoteBundle.getInputReceivers().get(this.mainInput), "Failed to retrieve main input receiver.");
            } catch (Exception e) {
                throw new RuntimeException("Failed to start remote bundle", e);
            }
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public void processElement(WindowedValue<InputT> windowedValue) {
            try {
                ExecutableStageDoFnOperator.LOG.debug("Sending value: {}", windowedValue);
                this.mainInputReceiver.accept(windowedValue);
                emitResults();
            } catch (Exception e) {
                throw new RuntimeException("Failed to process element with SDK harness.", e);
            }
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public void onTimer(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
            Object obj = this.keyForTimer.get();
            Preconditions.checkNotNull(obj, "Key for timer needs to be set before calling onTimer");
            Preconditions.checkNotNull(this.remoteBundle, "Call to onTimer outside of a bundle");
            ExecutableStageDoFnOperator.LOG.debug("timer callback: {} {} {} {}", str, boundedWindow, instant, timeDomain);
            FnDataReceiver fnDataReceiver = (FnDataReceiver) Preconditions.checkNotNull(this.remoteBundle.getInputReceivers().get(str), "No receiver found for timer %s", str);
            try {
                fnDataReceiver.accept(WindowedValue.of(KV.of(obj, Timer.of(instant, new byte[0])), instant, Collections.singleton(boundedWindow), PaneInfo.NO_FIRING));
            } catch (Exception e) {
                throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to process timer %s", fnDataReceiver), e);
            }
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public void finishBundle() {
            try {
                try {
                    this.remoteBundle.close();
                    emitResults();
                    this.remoteBundle = null;
                } catch (Exception e) {
                    throw new RuntimeException("Failed to finish remote bundle", e);
                }
            } catch (Throwable th) {
                this.remoteBundle = null;
                throw th;
            }
        }

        boolean isBundleInProgress() {
            return this.remoteBundle != null;
        }

        private void emitResults() {
            while (true) {
                KV<String, OutputT> poll = this.outputQueue.poll();
                if (poll == null) {
                    return;
                }
                String str = (String) Preconditions.checkNotNull(poll.getKey());
                TupleTag<?> tupleTag = this.outputMap.get(str);
                WindowedValue windowedValue = (WindowedValue) Preconditions.checkNotNull((WindowedValue) poll.getValue(), "Received a null value from the SDK harness for %s", str);
                if (tupleTag != null) {
                    this.outputManager.output(tupleTag, windowedValue);
                } else {
                    ProcessBundleDescriptors.TimerSpec timerSpec = (ProcessBundleDescriptors.TimerSpec) Preconditions.checkNotNull(this.timerOutputIdToSpecMap.get(str), "Unknown Pcollectionid %s", str);
                    Timer timer = (Timer) Preconditions.checkNotNull((Timer) ((KV) windowedValue.getValue()).getValue(), "Received null Timer from SDK harness: %s", windowedValue);
                    ExecutableStageDoFnOperator.LOG.debug("Timer received: {} {}", str, timer);
                    Iterator<? extends BoundedWindow> it = windowedValue.getWindows().iterator();
                    while (it.hasNext()) {
                        this.timerRegistration.accept(windowedValue, TimerInternals.TimerData.of(timerSpec.inputCollectionId(), StateNamespaces.window(this.windowCoder, it.next()), timer.getTimestamp(), timerSpec.getTimerSpec().getTimeDomain()));
                    }
                }
            }
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public DoFn<InputT, OutputT> getFn() {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$StateCleaner.class */
    public static class StateCleaner implements StatefulDoFnRunner.StateCleaner<BoundedWindow> {
        private final List<String> userStateNames;
        private final Coder windowCoder;
        private final ArrayDeque<KV<ByteBuffer, BoundedWindow>> cleanupQueue = new ArrayDeque<>();
        private final Supplier<ByteBuffer> keyedStateBackend;

        StateCleaner(List<String> list, Coder coder, Supplier<ByteBuffer> supplier) {
            this.userStateNames = list;
            this.windowCoder = coder;
            this.keyedStateBackend = supplier;
        }

        @Override // org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner
        public void clearForWindow(BoundedWindow boundedWindow) {
            this.cleanupQueue.add(KV.of(this.keyedStateBackend.get(), boundedWindow));
        }

        void cleanupState(StateInternals stateInternals, Consumer<ByteBuffer> consumer) {
            while (!this.cleanupQueue.isEmpty()) {
                KV<ByteBuffer, BoundedWindow> remove = this.cleanupQueue.remove();
                if (ExecutableStageDoFnOperator.LOG.isDebugEnabled()) {
                    ExecutableStageDoFnOperator.LOG.debug("State cleanup for {} {}", Arrays.toString(remove.getKey().array()), remove.getValue());
                }
                consumer.accept(remove.getKey());
                Iterator<String> it = this.userStateNames.iterator();
                while (it.hasNext()) {
                    ((BagState) stateInternals.state(StateNamespaces.window(this.windowCoder, remove.getValue()), StateTags.bag(it.next(), VoidCoder.of()))).clear();
                }
            }
        }
    }

    public ExecutableStageDoFnOperator(String str, Coder<WindowedValue<InputT>> coder, Coder<InputT> coder2, Map<TupleTag<?>, Coder<?>> map, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, DoFnOperator.OutputManagerFactory<OutputT> outputManagerFactory, Map<Integer, PCollectionView<?>> map2, Collection<PCollectionView<?>> collection, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> map3, PipelineOptions pipelineOptions, RunnerApi.ExecutableStagePayload executableStagePayload, JobInfo jobInfo, FlinkExecutableStageContext.Factory factory, Map<String, TupleTag<?>> map4, WindowingStrategy windowingStrategy, Coder coder3, KeySelector<WindowedValue<InputT>, ?> keySelector) {
        super(new NoOpDoFn(), str, coder, coder2, map, tupleTag, list, outputManagerFactory, windowingStrategy, map2, collection, pipelineOptions, coder3, keySelector, DoFnSchemaInformation.create());
        this.backupWatermarkHold = Long.MIN_VALUE;
        this.payload = executableStagePayload;
        this.jobInfo = jobInfo;
        this.contextFactory = factory;
        this.outputMap = map4;
        this.sideInputIds = map3;
        this.stateBackendLock = new ReentrantLock();
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected Lock getLockToAcquireForStateAccessDuringBundles() {
        return this.stateBackendLock;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        this.executableStage = ExecutableStage.fromPayload(this.payload);
        initializeUserState(this.executableStage, getKeyedStateBackend());
        this.stageContext = this.contextFactory.get(this.jobInfo);
        this.flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext());
        this.stageBundleFactory = this.stageContext.getStageBundleFactory(this.executableStage);
        this.stateRequestHandler = getStateRequestHandler(this.executableStage);
        this.progressHandler = new BundleProgressHandler() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.1
            @Override // org.apache.beam.runners.fnexecution.control.BundleProgressHandler
            public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
                ExecutableStageDoFnOperator.this.flinkMetricContainer.updateMetrics(ExecutableStageDoFnOperator.this.stepName, processBundleProgressResponse.getMonitoringInfosList());
            }

            @Override // org.apache.beam.runners.fnexecution.control.BundleProgressHandler
            public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                ExecutableStageDoFnOperator.this.flinkMetricContainer.updateMetrics(ExecutableStageDoFnOperator.this.stepName, processBundleResponse.getMonitoringInfosList());
            }
        };
        super.open();
    }

    private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage) {
        StateRequestHandler unsupported;
        StateRequestHandler unsupported2;
        if (executableStage.getSideInputs().size() > 0) {
            org.apache.flink.util.Preconditions.checkNotNull(this.sideInputHandler);
            try {
                unsupported = StateRequestHandlers.forSideInputHandlerFactory(ProcessBundleDescriptors.getSideInputs(executableStage), (StateRequestHandlers.SideInputHandlerFactory) Preconditions.checkNotNull(FlinkStreamingSideInputHandlerFactory.forStage(executableStage, this.sideInputIds, this.sideInputHandler)));
            } catch (IOException e) {
                throw new RuntimeException("Failed to initialize SideInputHandler", e);
            }
        } else {
            unsupported = StateRequestHandler.unsupported();
        }
        if (executableStage.getUserStates().size() <= 0) {
            unsupported2 = StateRequestHandler.unsupported();
        } else {
            if (this.keyedStateInternals == null) {
                throw new IllegalStateException("Input must be keyed when user state is used");
            }
            unsupported2 = StateRequestHandlers.forBagUserStateHandlerFactory(this.stageBundleFactory.getProcessBundleDescriptor(), new BagUserStateFactory(this.keyedStateInternals, getKeyedStateBackend(), this.stateBackendLock));
        }
        EnumMap enumMap = new EnumMap(BeamFnApi.StateKey.TypeCase.class);
        enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) unsupported);
        enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, (BeamFnApi.StateKey.TypeCase) unsupported2);
        return StateRequestHandlers.delegateBasedUponType(enumMap);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement1(StreamRecord streamRecord) {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.KeyContext
    public void setCurrentKey(Object obj) {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.KeyContext
    public ByteBuffer getCurrentKey() {
        Preconditions.checkState(this.stateBackendLock.isLocked(), "State backend must be locked when retrieving the current key.");
        return (ByteBuffer) getKeyedStateBackend().getCurrentKey();
    }

    /* JADX WARN: Finally extract failed */
    private void setTimer(WindowedValue<InputT> windowedValue, TimerInternals.TimerData timerData) {
        try {
            LOG.debug("Setting timer: {} {}", windowedValue, timerData);
            ByteBuffer byteBuffer = (ByteBuffer) this.keySelector.getKey(windowedValue);
            try {
                this.stateBackendLock.lock();
                getKeyedStateBackend().setCurrentKey(byteBuffer);
                if (timerData.getTimestamp().isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                    this.timerInternals.deleteTimer(timerData.getNamespace(), timerData.getTimerId(), timerData.getDomain());
                } else {
                    this.timerInternals.setTimer(timerData);
                }
                this.stateBackendLock.unlock();
            } catch (Throwable th) {
                this.stateBackendLock.unlock();
                throw th;
            }
        } catch (Exception e) {
            throw new RuntimeException("Couldn't set timer", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void fireTimer(InternalTimer<ByteBuffer, TimerInternals.TimerData> internalTimer) {
        ByteBuffer key = internalTimer.getKey();
        try {
            this.stateBackendLock.lock();
            getKeyedStateBackend().setCurrentKey(key);
            super.fireTimer(internalTimer);
        } finally {
            this.stateBackendLock.unlock();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator, org.apache.flink.util.Disposable
    public void dispose() throws Exception {
        if (this.stageContext != null) {
            try {
                StageBundleFactory stageBundleFactory = this.stageBundleFactory;
                try {
                    FlinkExecutableStageContext flinkExecutableStageContext = this.stageContext;
                    try {
                        super.dispose();
                        if (flinkExecutableStageContext != null) {
                            $closeResource(null, flinkExecutableStageContext);
                        }
                        if (stageBundleFactory != null) {
                            $closeResource(null, stageBundleFactory);
                        }
                    } catch (Throwable th) {
                        if (flinkExecutableStageContext != null) {
                            $closeResource(null, flinkExecutableStageContext);
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (stageBundleFactory != null) {
                        $closeResource(null, stageBundleFactory);
                    }
                    throw th2;
                }
            } finally {
                this.stageContext = null;
            }
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
        WindowedValue windowedValue = (WindowedValue) streamRecord.getValue().getValue();
        this.sideInputHandler.addSideInputValue(this.sideInputTagMapping.get(Integer.valueOf(streamRecord.getValue().getUnionTag())), windowedValue.withValue((Iterable) ((KV) windowedValue.getValue()).getValue()));
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner) {
        this.sdkHarnessRunner = new SdkHarnessDoFnRunner<>(this.executableStage.getInputPCollection().getId(), this.stageBundleFactory, this.stateRequestHandler, this.progressHandler, this.outputManager, this.outputMap, this.windowingStrategy.getWindowFn().windowCoder(), this::setTimer, () -> {
            return FlinkKeyUtils.decodeKey(getCurrentKey(), this.keyCoder);
        });
        return ensureStateCleanup(this.sdkHarnessRunner);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
        if (this.sdkHarnessRunner.isBundleInProgress()) {
            if (watermark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                invokeFinishBundle();
                setPushedBackWatermark(Long.MAX_VALUE);
            } else {
                this.backupWatermarkHold = Math.max(this.backupWatermarkHold, getPushbackWatermarkHold());
                setPushedBackWatermark(Math.min(this.currentOutputWatermark, this.backupWatermarkHold));
                super.setBundleFinishedCallback(() -> {
                    try {
                        LOG.debug("processing pushed back watermark: {}", watermark);
                        setPushedBackWatermark(this.backupWatermarkHold);
                        super.processWatermark(watermark);
                    } catch (Exception e) {
                        throw new RuntimeException("Failed to process pushed back watermark after finished bundle.", e);
                    }
                });
            }
        }
        super.processWatermark(watermark);
    }

    private DoFnRunner<InputT, OutputT> ensureStateCleanup(SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessDoFnRunner) {
        if (this.keyCoder == null) {
            return sdkHarnessDoFnRunner;
        }
        Coder<?> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
        CleanupTimer cleanupTimer = new CleanupTimer(this.timerInternals, this.stateBackendLock, this.windowingStrategy, this.keyCoder, windowCoder, getKeyedStateBackend());
        List list = (List) this.executableStage.getUserStates().stream().map((v0) -> {
            return v0.localName();
        }).collect(Collectors.toList());
        final KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
        final StateCleaner stateCleaner = new StateCleaner(list, windowCoder, () -> {
            return (ByteBuffer) keyedStateBackend.getCurrentKey();
        });
        return new StatefulDoFnRunner<InputT, OutputT, BoundedWindow>(sdkHarnessDoFnRunner, this.windowingStrategy, cleanupTimer, stateCleaner) { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.2
            @Override // org.apache.beam.runners.core.StatefulDoFnRunner, org.apache.beam.runners.core.DoFnRunner
            public void finishBundle() {
                super.finishBundle();
                if (stateCleaner.cleanupQueue.isEmpty()) {
                    return;
                }
                try {
                    ExecutableStageDoFnOperator.this.stateBackendLock.lock();
                    StateCleaner stateCleaner2 = stateCleaner;
                    FlinkStateInternals<?> flinkStateInternals = ExecutableStageDoFnOperator.this.keyedStateInternals;
                    KeyedStateBackend keyedStateBackend2 = keyedStateBackend;
                    stateCleaner2.cleanupState(flinkStateInternals, byteBuffer -> {
                        keyedStateBackend2.setCurrentKey(byteBuffer);
                    });
                } finally {
                    ExecutableStageDoFnOperator.this.stateBackendLock.unlock();
                }
            }
        };
    }

    private static void initializeUserState(ExecutableStage executableStage, @Nullable KeyedStateBackend keyedStateBackend) {
        executableStage.getUserStates().forEach(userStateReference -> {
            try {
                keyedStateBackend.getOrCreateKeyedState(StringSerializer.INSTANCE, new ListStateDescriptor(userStateReference.localName(), new CoderTypeSerializer(ByteStringCoder.of())));
            } catch (Exception e) {
                throw new RuntimeException("Couldn't initialize user states.", e);
            }
        });
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
