package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
import org.apache.beam.runners.flink.translation.utils.NoopLock;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.class */
public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<InputT, OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutableStageDoFnOperator.class);
    private final RunnerApi.ExecutableStagePayload payload;
    private final JobInfo jobInfo;
    private final FlinkExecutableStageContext.Factory contextFactory;
    private final Map<String, TupleTag<?>> outputMap;
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds;
    private final boolean usesTimers;
    private final Lock stateBackendLock;
    private transient FlinkExecutableStageContext stageContext;
    private transient StateRequestHandler stateRequestHandler;
    private transient BundleProgressHandler progressHandler;
    private transient StageBundleFactory stageBundleFactory;
    private transient ExecutableStage executableStage;
    private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
    private transient FlinkMetricContainer flinkMetricContainer;
    private transient long backupWatermarkHold;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$BagUserStateFactory.class */
    public static class BagUserStateFactory implements StateRequestHandlers.BagUserStateHandlerFactory {
        private final StateInternals stateInternals;
        private final KeyedStateBackend<ByteBuffer> keyedStateBackend;
        private final Lock stateBackendLock;

        private BagUserStateFactory(StateInternals stateInternals, KeyedStateBackend<ByteBuffer> keyedStateBackend, Lock lock) {
            this.stateInternals = stateInternals;
            this.keyedStateBackend = keyedStateBackend;
            this.stateBackendLock = lock;
        }

        public <K, V, W extends BoundedWindow> StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(String str, final String str2, final Coder<K> coder, final Coder<V> coder2, final Coder<W> coder3) {
            return (StateRequestHandlers.BagUserStateHandler<K, V, W>) new StateRequestHandlers.BagUserStateHandler<K, V, W>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.BagUserStateFactory.1
                /* JADX WARN: Incorrect types in method signature: (TK;TW;)Ljava/lang/Iterable<TV;>; */
                /* JADX WARN: Multi-variable type inference failed */
                public Iterable get(Object obj, BoundedWindow boundedWindow) {
                    try {
                        BagUserStateFactory.this.stateBackendLock.lock();
                        prepareStateBackend(obj, coder);
                        Iterable read = BagUserStateFactory.this.stateInternals.state(StateNamespaces.window(coder3, boundedWindow), StateTags.bag(str2, coder2)).read();
                        BagUserStateFactory.this.stateBackendLock.unlock();
                        return read;
                    } catch (Throwable th) {
                        BagUserStateFactory.this.stateBackendLock.unlock();
                        throw th;
                    }
                }

                /* JADX WARN: Incorrect types in method signature: (TK;TW;Ljava/util/Iterator<TV;>;)V */
                /* JADX WARN: Multi-variable type inference failed */
                public void append(Object obj, BoundedWindow boundedWindow, Iterator it) {
                    try {
                        BagUserStateFactory.this.stateBackendLock.lock();
                        prepareStateBackend(obj, coder);
                        BagState state = BagUserStateFactory.this.stateInternals.state(StateNamespaces.window(coder3, boundedWindow), StateTags.bag(str2, coder2));
                        while (it.hasNext()) {
                            state.add(it.next());
                        }
                    } finally {
                        BagUserStateFactory.this.stateBackendLock.unlock();
                    }
                }

                /* JADX WARN: Incorrect types in method signature: (TK;TW;)V */
                /* JADX WARN: Multi-variable type inference failed */
                public void clear(Object obj, BoundedWindow boundedWindow) {
                    try {
                        BagUserStateFactory.this.stateBackendLock.lock();
                        prepareStateBackend(obj, coder);
                        BagUserStateFactory.this.stateInternals.state(StateNamespaces.window(coder3, boundedWindow), StateTags.bag(str2, coder2)).clear();
                        BagUserStateFactory.this.stateBackendLock.unlock();
                    } catch (Throwable th) {
                        BagUserStateFactory.this.stateBackendLock.unlock();
                        throw th;
                    }
                }

                private void prepareStateBackend(K k, Coder<K> coder4) {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    try {
                        coder4.encode(k, byteArrayOutputStream);
                        BagUserStateFactory.this.keyedStateBackend.setCurrentKey(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
                    } catch (IOException e) {
                        throw new RuntimeException("Failed to encode key for Flink state backend", e);
                    }
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$NoOpDoFn.class */
    private static class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
        private NoOpDoFn() {
        }

        @DoFn.ProcessElement
        public void doNothing(DoFn<InputT, OutputT>.ProcessContext processContext) {
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.class */
    private static class SdkHarnessDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
        private final String mainInput;
        private final LinkedBlockingQueue<KV<String, OutputT>> outputQueue;
        private final StageBundleFactory stageBundleFactory;
        private final StateRequestHandler stateRequestHandler;
        private final BundleProgressHandler progressHandler;
        private final DoFnOperator.BufferedOutputManager<OutputT> outputManager;
        private final Map<String, TupleTag<?>> outputMap;
        private final Map<String, ProcessBundleDescriptors.TimerSpec> timerOutputIdToSpecMap = new HashMap();
        private final Coder<BoundedWindow> windowCoder;
        private final KeySelector<WindowedValue<InputT>, ?> keySelector;
        private final BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration;
        private RemoteBundle remoteBundle;
        private FnDataReceiver<WindowedValue<?>> mainInputReceiver;
        private Runnable bundleFinishedCallback;
        private Object currentTimerKey;

        public SdkHarnessDoFnRunner(String str, StageBundleFactory stageBundleFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler, DoFnOperator.BufferedOutputManager<OutputT> bufferedOutputManager, Map<String, TupleTag<?>> map, Coder<BoundedWindow> coder, KeySelector<WindowedValue<InputT>, ?> keySelector, BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> biConsumer) {
            this.mainInput = str;
            this.stageBundleFactory = stageBundleFactory;
            this.stateRequestHandler = stateRequestHandler;
            this.progressHandler = bundleProgressHandler;
            this.outputManager = bufferedOutputManager;
            this.outputMap = map;
            this.keySelector = keySelector;
            this.timerRegistration = biConsumer;
            Iterator it = stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values().iterator();
            while (it.hasNext()) {
                for (ProcessBundleDescriptors.TimerSpec timerSpec : ((Map) it.next()).values()) {
                    this.timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec);
                }
            }
            this.windowCoder = coder;
            this.outputQueue = new LinkedBlockingQueue<>();
        }

        public void startBundle() {
            try {
                this.remoteBundle = this.stageBundleFactory.getBundle(new OutputReceiverFactory() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.SdkHarnessDoFnRunner.1
                    public FnDataReceiver<OutputT> create(String str) {
                        return obj -> {
                            SdkHarnessDoFnRunner.this.outputQueue.put(KV.of(str, obj));
                        };
                    }
                }, this.stateRequestHandler, this.progressHandler);
                this.mainInputReceiver = (FnDataReceiver) Preconditions.checkNotNull((FnDataReceiver) this.remoteBundle.getInputReceivers().get(this.mainInput), "Failed to retrieve main input receiver.");
            } catch (Exception e) {
                throw new RuntimeException("Failed to start remote bundle", e);
            }
        }

        public void processElement(WindowedValue<InputT> windowedValue) {
            try {
                ExecutableStageDoFnOperator.LOG.debug("Sending value: {}", windowedValue);
                this.mainInputReceiver.accept(windowedValue);
                emitResults();
            } catch (Exception e) {
                throw new RuntimeException("Failed to process element with SDK harness.", e);
            }
        }

        public void onTimer(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
            Preconditions.checkNotNull(this.currentTimerKey, "Key for timer needs to be set before calling onTimer");
            ExecutableStageDoFnOperator.LOG.debug("timer callback: {} {} {} {}", new Object[]{str, boundedWindow, instant, timeDomain});
            FnDataReceiver fnDataReceiver = (FnDataReceiver) Preconditions.checkNotNull((FnDataReceiver) this.remoteBundle.getInputReceivers().get(str), "No receiver found for timer %s", str);
            try {
                try {
                    fnDataReceiver.accept(WindowedValue.of(KV.of(this.currentTimerKey, Timer.of(instant, new byte[0])), instant, Collections.singleton(boundedWindow), PaneInfo.NO_FIRING));
                    this.currentTimerKey = null;
                } catch (Exception e) {
                    throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to process timer %s", fnDataReceiver), e);
                }
            } catch (Throwable th) {
                this.currentTimerKey = null;
                throw th;
            }
        }

        public void finishBundle() {
            try {
                try {
                    this.remoteBundle.close();
                    emitResults();
                    this.remoteBundle = null;
                    if (this.bundleFinishedCallback != null) {
                        this.bundleFinishedCallback.run();
                        this.bundleFinishedCallback = null;
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Failed to finish remote bundle", e);
                }
            } catch (Throwable th) {
                this.remoteBundle = null;
                throw th;
            }
        }

        Object getCurrentTimerKey() {
            return this.currentTimerKey;
        }

        void setCurrentTimerKey(Object obj) {
            this.currentTimerKey = obj;
        }

        boolean isBundleInProgress() {
            return this.remoteBundle != null;
        }

        void setBundleFinishedCallback(Runnable runnable) {
            this.bundleFinishedCallback = runnable;
        }

        private void emitResults() {
            while (true) {
                KV<String, OutputT> poll = this.outputQueue.poll();
                if (poll == null) {
                    return;
                }
                String str = (String) Preconditions.checkNotNull((String) poll.getKey());
                TupleTag<?> tupleTag = this.outputMap.get(str);
                WindowedValue windowedValue = (WindowedValue) Preconditions.checkNotNull((WindowedValue) poll.getValue(), "Received a null value from the SDK harness for %s", str);
                if (tupleTag != null) {
                    this.outputManager.output(tupleTag, windowedValue);
                } else {
                    ProcessBundleDescriptors.TimerSpec timerSpec = (ProcessBundleDescriptors.TimerSpec) Preconditions.checkNotNull(this.timerOutputIdToSpecMap.get(str), "Unknown Pcollectionid %s", str);
                    Timer timer = (Timer) Preconditions.checkNotNull((Timer) ((KV) windowedValue.getValue()).getValue(), "Received null Timer from SDK harness: %s", windowedValue);
                    ExecutableStageDoFnOperator.LOG.debug("Timer received: {} {}", str, timer);
                    Iterator it = windowedValue.getWindows().iterator();
                    while (it.hasNext()) {
                        this.timerRegistration.accept(windowedValue, TimerInternals.TimerData.of(timerSpec.inputCollectionId(), StateNamespaces.window(this.windowCoder, (BoundedWindow) it.next()), timer.getTimestamp(), timerSpec.getTimerSpec().getTimeDomain()));
                    }
                }
            }
        }

        public DoFn<InputT, OutputT> getFn() {
            throw new UnsupportedOperationException();
        }
    }

    public ExecutableStageDoFnOperator(String str, Coder<WindowedValue<InputT>> coder, Coder<InputT> coder2, Map<TupleTag<?>, Coder<?>> map, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, DoFnOperator.OutputManagerFactory<OutputT> outputManagerFactory, Map<Integer, PCollectionView<?>> map2, Collection<PCollectionView<?>> collection, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> map3, PipelineOptions pipelineOptions, RunnerApi.ExecutableStagePayload executableStagePayload, JobInfo jobInfo, FlinkExecutableStageContext.Factory factory, Map<String, TupleTag<?>> map4, WindowingStrategy windowingStrategy, Coder coder3, KeySelector<WindowedValue<InputT>, ?> keySelector) {
        super(new NoOpDoFn(), str, coder, coder2, map, tupleTag, list, outputManagerFactory, windowingStrategy, map2, collection, pipelineOptions, coder3, keySelector);
        this.backupWatermarkHold = Long.MIN_VALUE;
        this.payload = executableStagePayload;
        this.jobInfo = jobInfo;
        this.contextFactory = factory;
        this.outputMap = map4;
        this.sideInputIds = map3;
        this.usesTimers = executableStagePayload.getTimersCount() > 0;
        if (this.usesTimers) {
            this.stateBackendLock = new ReentrantLock();
        } else {
            this.stateBackendLock = NoopLock.get();
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void open() throws Exception {
        this.executableStage = ExecutableStage.fromPayload(this.payload);
        this.stageContext = this.contextFactory.get(this.jobInfo);
        this.flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext());
        this.stageBundleFactory = this.stageContext.getStageBundleFactory(this.executableStage);
        this.stateRequestHandler = getStateRequestHandler(this.executableStage);
        this.progressHandler = new BundleProgressHandler() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.1
            public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
                ExecutableStageDoFnOperator.this.flinkMetricContainer.updateMetrics(ExecutableStageDoFnOperator.this.stepName, processBundleProgressResponse.getMonitoringInfosList());
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                ExecutableStageDoFnOperator.this.flinkMetricContainer.updateMetrics(ExecutableStageDoFnOperator.this.stepName, processBundleResponse.getMonitoringInfosList());
            }
        };
        super.open();
    }

    private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage) {
        StateRequestHandler unsupported;
        StateRequestHandler unsupported2;
        if (executableStage.getSideInputs().size() > 0) {
            org.apache.flink.util.Preconditions.checkNotNull(this.sideInputHandler);
            try {
                unsupported = StateRequestHandlers.forSideInputHandlerFactory(ProcessBundleDescriptors.getSideInputs(executableStage), (StateRequestHandlers.SideInputHandlerFactory) Preconditions.checkNotNull(FlinkStreamingSideInputHandlerFactory.forStage(executableStage, this.sideInputIds, this.sideInputHandler)));
            } catch (IOException e) {
                throw new RuntimeException("Failed to initialize SideInputHandler", e);
            }
        } else {
            unsupported = StateRequestHandler.unsupported();
        }
        if (executableStage.getUserStates().size() <= 0) {
            unsupported2 = StateRequestHandler.unsupported();
        } else {
            if (this.keyedStateInternals == null) {
                throw new IllegalStateException("Input must be keyed when user state is used");
            }
            unsupported2 = StateRequestHandlers.forBagUserStateHandlerFactory(this.stageBundleFactory.getProcessBundleDescriptor(), new BagUserStateFactory(this.keyedStateInternals, getKeyedStateBackend(), this.stateBackendLock));
        }
        EnumMap enumMap = new EnumMap(BeamFnApi.StateKey.TypeCase.class);
        enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) unsupported);
        enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, (BeamFnApi.StateKey.TypeCase) unsupported2);
        return StateRequestHandlers.delegateBasedUponType(enumMap);
    }

    public void setKeyContextElement1(StreamRecord streamRecord) throws Exception {
    }

    public void setCurrentKey(Object obj) {
        if (!this.usesTimers) {
            throw new UnsupportedOperationException("Current key for state backend can only be set by state requests from SDK workers or when processing timers.");
        }
    }

    public Object getCurrentKey() {
        return this.sdkHarnessRunner.getCurrentTimerKey();
    }

    private void setTimer(WindowedValue<InputT> windowedValue, TimerInternals.TimerData timerData) {
        try {
            try {
                Object key = this.keySelector.getKey(windowedValue);
                this.sdkHarnessRunner.setCurrentTimerKey(key);
                try {
                    this.stateBackendLock.lock();
                    getKeyedStateBackend().setCurrentKey(key);
                    this.timerInternals.setTimer(timerData);
                    this.stateBackendLock.unlock();
                } catch (Throwable th) {
                    this.stateBackendLock.unlock();
                    throw th;
                }
            } finally {
                this.sdkHarnessRunner.setCurrentTimerKey(null);
            }
        } catch (Exception e) {
            throw new RuntimeException("Couldn't set timer", e);
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void fireTimer(InternalTimer<?, TimerInternals.TimerData> internalTimer) {
        ByteBuffer byteBuffer = (ByteBuffer) internalTimer.getKey();
        byte[] array = byteBuffer.array();
        try {
            this.sdkHarnessRunner.setCurrentTimerKey(CoderUtils.decodeFromByteArray(this.keyCoder, array));
            try {
                this.stateBackendLock.lock();
                getKeyedStateBackend().setCurrentKey(byteBuffer);
                super.fireTimer(internalTimer);
                this.stateBackendLock.unlock();
            } catch (Throwable th) {
                this.stateBackendLock.unlock();
                throw th;
            }
        } catch (CoderException e) {
            throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to decode encoded key: %s", Arrays.toString(array)), e);
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void dispose() throws Exception {
        if (this.stageContext != null) {
            try {
                StageBundleFactory stageBundleFactory = this.stageBundleFactory;
                try {
                    FlinkExecutableStageContext flinkExecutableStageContext = this.stageContext;
                    try {
                        super.dispose();
                        if (flinkExecutableStageContext != null) {
                            $closeResource(null, flinkExecutableStageContext);
                        }
                        if (stageBundleFactory != null) {
                            $closeResource(null, stageBundleFactory);
                        }
                    } catch (Throwable th) {
                        if (flinkExecutableStageContext != null) {
                            $closeResource(null, flinkExecutableStageContext);
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (stageBundleFactory != null) {
                        $closeResource(null, stageBundleFactory);
                    }
                    throw th2;
                }
            } finally {
                this.stageContext = null;
            }
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
        WindowedValue windowedValue = (WindowedValue) ((RawUnionValue) streamRecord.getValue()).getValue();
        this.sideInputHandler.addSideInputValue(this.sideInputTagMapping.get(Integer.valueOf(((RawUnionValue) streamRecord.getValue()).getUnionTag())), windowedValue.withValue((Iterable) ((KV) windowedValue.getValue()).getValue()));
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner) {
        this.sdkHarnessRunner = new SdkHarnessDoFnRunner<>(this.executableStage.getInputPCollection().getId(), this.stageBundleFactory, this.stateRequestHandler, this.progressHandler, this.outputManager, this.outputMap, this.windowingStrategy.getWindowFn().windowCoder(), this.keySelector, this::setTimer);
        return this.sdkHarnessRunner;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void processWatermark(Watermark watermark) throws Exception {
        if (this.sdkHarnessRunner.isBundleInProgress()) {
            if (watermark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                invokeFinishBundle();
                setPushedBackWatermark(Long.MAX_VALUE);
            } else {
                this.backupWatermarkHold = Math.max(this.backupWatermarkHold, getPushbackWatermarkHold());
                setPushedBackWatermark(Math.min(this.currentOutputWatermark, this.backupWatermarkHold));
                this.sdkHarnessRunner.setBundleFinishedCallback(() -> {
                    try {
                        LOG.debug("processing pushed back watermark: {}", watermark);
                        setPushedBackWatermark(this.backupWatermarkHold);
                        super.processWatermark(watermark);
                    } catch (Exception e) {
                        throw new RuntimeException("Failed to process pushed back watermark after finished bundle.", e);
                    }
                });
            }
        }
        super.processWatermark(watermark);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
