package org.apache.beam.runners.flink.translation.wrappers.streaming;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryBundleFinalizer;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.ProcessFnRunner;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.utils.CheckpointStats;
import org.apache.beam.runners.flink.translation.utils.Workarounds;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingDoFnRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.NoopLock;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings({"SE_TRANSIENT_FIELD_NOT_RESTORED"})
/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.class */
public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperatorCompat<WindowedValue<OutputT>> implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>, TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>, Triggerable<ByteBuffer, TimerInternals.TimerData> {
    private static final Logger LOG = LoggerFactory.getLogger(DoFnOperator.class);
    protected DoFn<InputT, OutputT> doFn;
    protected final SerializablePipelineOptions serializedOptions;
    protected final TupleTag<OutputT> mainOutputTag;
    protected final List<TupleTag<?>> additionalOutputTags;
    protected final Collection<PCollectionView<?>> sideInputs;
    protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
    protected final WindowingStrategy<?, ?> windowingStrategy;
    protected final OutputManagerFactory<OutputT> outputManagerFactory;
    protected transient DoFnRunner<InputT, OutputT> doFnRunner;
    protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
    protected transient BufferingDoFnRunner<InputT, OutputT> bufferingDoFnRunner;
    protected transient SideInputHandler sideInputHandler;
    protected transient SideInputReader sideInputReader;
    protected transient BufferedOutputManager<OutputT> outputManager;
    private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
    protected transient FlinkStateInternals<?> keyedStateInternals;
    protected transient DoFnOperator<InputT, OutputT>.FlinkTimerInternals timerInternals;
    protected final String stepName;
    private final Coder<WindowedValue<InputT>> windowedInputCoder;
    private final Map<TupleTag<?>, Coder<?>> outputCoders;
    protected final Coder<?> keyCoder;
    final KeySelector<WindowedValue<InputT>, ?> keySelector;
    private final TimerInternals.TimerDataCoderV2 timerCoder;
    private final long maxBundleSize;
    private final long maxBundleTimeMills;
    private final DoFnSchemaInformation doFnSchemaInformation;
    private final Map<String, PCollectionView<?>> sideInputMapping;
    private final boolean requiresStableInput;
    private final boolean finishBundleBeforeCheckpointing;
    private transient InMemoryBundleFinalizer bundleFinalizer;
    private transient LinkedHashMap<Long, List<InMemoryBundleFinalizer.Finalization>> pendingFinalizations;
    private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32;
    protected transient InternalTimerService<TimerInternals.TimerData> timerService;
    private transient InternalTimeServiceManager<?> timeServiceManagerCompat;
    private transient PushedBackElementsHandler<WindowedValue<InputT>> pushedBackElementsHandler;
    transient FlinkMetricContainer flinkMetricContainer;
    private transient CheckpointStats checkpointStats;
    private transient ScheduledFuture<?> checkFinishBundleTimer;
    private volatile transient boolean bundleStarted;
    private volatile transient long elementCount;
    private volatile transient long lastFinishBundleTime;
    private volatile transient Runnable preBundleCallback;
    private volatile transient Runnable bundleFinishedCallback;
    private volatile transient long currentInputWatermark;
    private volatile transient long currentSideInputWatermark;
    private volatile transient long currentOutputWatermark;
    private volatile transient long pushedBackWatermark;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$state$TimeDomain = new int[TimeDomain.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$sdk$state$TimeDomain[TimeDomain.EVENT_TIME.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$state$TimeDomain[TimeDomain.PROCESSING_TIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$state$TimeDomain[TimeDomain.SYNCHRONIZED_PROCESSING_TIME.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$BufferedOutputManager.class */
    public static class BufferedOutputManager<OutputT> implements DoFnRunners.OutputManager {
        private final TupleTag<OutputT> mainTag;
        private final Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
        private final Map<TupleTag<?>, Integer> tagsToIds;
        private final Lock bufferLock;

        @VisibleForTesting
        final PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler;
        protected final Output<StreamRecord<WindowedValue<OutputT>>> output;
        private boolean openBuffer = false;
        private boolean bufferIsEmpty = false;
        private Map<Integer, TupleTag<?>> idsToTags = new HashMap();

        BufferedOutputManager(Output<StreamRecord<WindowedValue<OutputT>>> output, TupleTag<OutputT> tupleTag, Map<TupleTag<?>, OutputTag<WindowedValue<?>>> map, Map<TupleTag<?>, Integer> map2, Lock lock, PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler) {
            this.output = output;
            this.mainTag = tupleTag;
            this.tagsToOutputTags = map;
            this.tagsToIds = map2;
            this.bufferLock = lock;
            for (Map.Entry<TupleTag<?>, Integer> entry : map2.entrySet()) {
                this.idsToTags.put(entry.getValue(), entry.getKey());
            }
            this.pushedBackElementsHandler = pushedBackElementsHandler;
        }

        void openBuffer() {
            this.openBuffer = true;
        }

        void closeBuffer() {
            this.openBuffer = false;
        }

        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            if (this.openBuffer) {
                buffer(KV.of(this.tagsToIds.get(tupleTag), windowedValue));
            } else {
                emit(tupleTag, windowedValue);
            }
        }

        private void buffer(KV<Integer, WindowedValue<?>> kv) {
            try {
                try {
                    this.bufferLock.lock();
                    this.pushedBackElementsHandler.pushBack(kv);
                    this.bufferLock.unlock();
                    this.bufferIsEmpty = false;
                } catch (Exception e) {
                    throw new RuntimeException("Couldn't pushback element.", e);
                }
            } catch (Throwable th) {
                this.bufferLock.unlock();
                this.bufferIsEmpty = false;
                throw th;
            }
        }

        void flushBuffer() {
            if (this.openBuffer || this.bufferIsEmpty) {
                return;
            }
            try {
                this.pushedBackElementsHandler.getElements().forEach(kv -> {
                    emit(this.idsToTags.get(kv.getKey()), (WindowedValue) kv.getValue());
                });
                this.pushedBackElementsHandler.clear();
                this.bufferIsEmpty = true;
            } catch (Exception e) {
                throw new RuntimeException("Couldn't flush pushed back elements.", e);
            }
        }

        private <T> void emit(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            if (tupleTag.equals(this.mainTag)) {
                this.output.collect(new StreamRecord(windowedValue));
            } else {
                this.output.collect(this.tagsToOutputTags.get(tupleTag), new StreamRecord(windowedValue));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$FlinkStepContext.class */
    protected class FlinkStepContext implements StepContext {
        protected FlinkStepContext() {
        }

        public StateInternals stateInternals() {
            return DoFnOperator.this.keyedStateInternals;
        }

        public TimerInternals timerInternals() {
            return DoFnOperator.this.timerInternals;
        }

        public DoFn.BundleFinalizer bundleFinalizer() {
            return DoFnOperator.this.bundleFinalizer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$FlinkTimerInternals.class */
    public class FlinkTimerInternals implements TimerInternals {
        private static final String PENDING_TIMERS_STATE_NAME = "pending-timers";

        @VisibleForTesting
        final MapState<String, TimerInternals.TimerData> pendingTimersById;

        private FlinkTimerInternals() {
            this.pendingTimersById = DoFnOperator.this.getKeyedStateStore().getMapState(new MapStateDescriptor(PENDING_TIMERS_STATE_NAME, new StringSerializer(), new CoderTypeSerializer(DoFnOperator.this.timerCoder, DoFnOperator.this.serializedOptions)));
            populateOutputTimestampQueue();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void processPendingProcessingTimeTimers() {
            KeyedStateBackend keyedStateBackend = DoFnOperator.this.getKeyedStateBackend();
            InternalPriorityQueue<InternalTimer<Object, TimerInternals.TimerData>> retrieveInternalProcessingTimerQueue = Workarounds.retrieveInternalProcessingTimerQueue(DoFnOperator.this.timerService);
            while (true) {
                InternalTimer internalTimer = (InternalTimer) retrieveInternalProcessingTimerQueue.poll();
                if (internalTimer == null) {
                    return;
                }
                keyedStateBackend.setCurrentKey(internalTimer.getKey());
                TimerInternals.TimerData timerData = (TimerInternals.TimerData) internalTimer.getNamespace();
                DoFnOperator.this.checkInvokeStartBundle();
                DoFnOperator.this.fireTimerInternal((ByteBuffer) internalTimer.getKey(), timerData);
            }
        }

        private void onNewEventTimer(TimerInternals.TimerData timerData) {
            Preconditions.checkState(timerData.getDomain() == TimeDomain.EVENT_TIME, "Timer with id %s is not an event time timer!", timerData.getTimerId());
            if (timerUsesOutputTimestamp(timerData)) {
                DoFnOperator.this.keyedStateInternals.addWatermarkHoldUsage(timerData.getOutputTimestamp());
            }
        }

        private void onNewSdfTimer(TimerInternals.TimerData timerData) {
            Preconditions.checkState(BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerData.getTimerId()));
            Preconditions.checkState(timerUsesOutputTimestamp(timerData));
            DoFnOperator.this.keyedStateInternals.addWatermarkHoldUsage(timerData.getOutputTimestamp());
        }

        private void populateOutputTimestampQueue() {
            KeyedStateBackend keyedStateBackend = DoFnOperator.this.getKeyedStateBackend();
            Object currentKey = keyedStateBackend.getCurrentKey();
            try {
                Stream keys = keyedStateBackend.getKeys(PENDING_TIMERS_STATE_NAME, VoidNamespace.INSTANCE);
                Throwable th = null;
                try {
                    try {
                        keys.forEach(obj -> {
                            keyedStateBackend.setCurrentKey(obj);
                            try {
                                for (TimerInternals.TimerData timerData : this.pendingTimersById.values()) {
                                    if ((timerData.getDomain() == TimeDomain.EVENT_TIME || BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerData.getTimerId())) && timerUsesOutputTimestamp(timerData)) {
                                        DoFnOperator.this.keyedStateInternals.addWatermarkHoldUsage(timerData.getOutputTimestamp());
                                    }
                                }
                            } catch (Exception e) {
                                throw new RuntimeException("Exception while reading set of timers for key: " + obj, e);
                            }
                        });
                        if (keys != null) {
                            if (0 != 0) {
                                try {
                                    keys.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                keys.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
                if (currentKey != null) {
                    keyedStateBackend.setCurrentKey(currentKey);
                }
            }
        }

        private boolean timerUsesOutputTimestamp(TimerInternals.TimerData timerData) {
            return timerData.getOutputTimestamp().isBefore(timerData.getTimestamp());
        }

        private String constructTimerId(String str, String str2) {
            return str + "+" + str2;
        }

        public void setTimer(StateNamespace stateNamespace, String str, String str2, Instant instant, Instant instant2, TimeDomain timeDomain) {
            setTimer(TimerInternals.TimerData.of(str, str2, stateNamespace, instant, instant2, timeDomain));
        }

        @Deprecated
        public void setTimer(TimerInternals.TimerData timerData) {
            try {
                DoFnOperator.LOG.debug("Setting timer: {} at {} with output time {}", new Object[]{timerData.getTimerId(), Long.valueOf(timerData.getTimestamp().getMillis()), Long.valueOf(timerData.getOutputTimestamp().getMillis())});
                String contextTimerId = getContextTimerId(constructTimerId(timerData.getTimerFamilyId(), timerData.getTimerId()), timerData.getNamespace());
                TimerInternals.TimerData timerData2 = (TimerInternals.TimerData) this.pendingTimersById.get(contextTimerId);
                if (!timerData.equals(timerData2)) {
                    cancelPendingTimer(timerData2);
                    registerTimer(timerData, contextTimerId);
                }
            } catch (Exception e) {
                throw new RuntimeException("Failed to set timer", e);
            }
        }

        private void registerTimer(TimerInternals.TimerData timerData, String str) throws Exception {
            this.pendingTimersById.put(str, timerData);
            long millis = timerData.getTimestamp().getMillis();
            switch (AnonymousClass2.$SwitchMap$org$apache$beam$sdk$state$TimeDomain[timerData.getDomain().ordinal()]) {
                case 1:
                    DoFnOperator.this.timerService.registerEventTimeTimer(timerData, DoFnOperator.adjustTimestampForFlink(millis));
                    onNewEventTimer(timerData);
                    return;
                case 2:
                case 3:
                    DoFnOperator.this.timerService.registerProcessingTimeTimer(timerData, DoFnOperator.adjustTimestampForFlink(millis));
                    if (BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerData.getTimerId())) {
                        onNewSdfTimer(timerData);
                        return;
                    }
                    return;
                default:
                    throw new UnsupportedOperationException("Unsupported time domain: " + timerData.getDomain());
            }
        }

        private void cancelPendingTimerById(String str) throws Exception {
            cancelPendingTimer((TimerInternals.TimerData) this.pendingTimersById.get(str));
        }

        private void cancelPendingTimer(TimerInternals.TimerData timerData) {
            if (timerData != null) {
                deleteTimerInternal(timerData);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void onFiredOrDeletedTimer(TimerInternals.TimerData timerData) {
            try {
                this.pendingTimersById.remove(getContextTimerId(constructTimerId(timerData.getTimerFamilyId(), timerData.getTimerId()), timerData.getNamespace()));
                if ((timerData.getDomain() == TimeDomain.EVENT_TIME || BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerData.getTimerId())) && timerUsesOutputTimestamp(timerData)) {
                    DoFnOperator.this.keyedStateInternals.removeWatermarkHoldUsage(timerData.getOutputTimestamp());
                }
            } catch (Exception e) {
                throw new RuntimeException("Failed to cleanup pending timers state.", e);
            }
        }

        @Deprecated
        public void deleteTimer(StateNamespace stateNamespace, String str, String str2) {
            throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
        }

        public void deleteTimer(StateNamespace stateNamespace, String str, String str2, TimeDomain timeDomain) {
            try {
                cancelPendingTimerById(getContextTimerId(str, stateNamespace));
            } catch (Exception e) {
                throw new RuntimeException("Failed to cancel timer", e);
            }
        }

        @Deprecated
        public void deleteTimer(TimerInternals.TimerData timerData) {
            deleteTimer(timerData.getNamespace(), constructTimerId(timerData.getTimerFamilyId(), timerData.getTimerId()), timerData.getTimerFamilyId(), timerData.getDomain());
        }

        void deleteTimerInternal(TimerInternals.TimerData timerData) {
            long millis = timerData.getTimestamp().getMillis();
            switch (AnonymousClass2.$SwitchMap$org$apache$beam$sdk$state$TimeDomain[timerData.getDomain().ordinal()]) {
                case 1:
                    DoFnOperator.this.timerService.deleteEventTimeTimer(timerData, DoFnOperator.adjustTimestampForFlink(millis));
                    break;
                case 2:
                case 3:
                    DoFnOperator.this.timerService.deleteProcessingTimeTimer(timerData, DoFnOperator.adjustTimestampForFlink(millis));
                    break;
                default:
                    throw new UnsupportedOperationException("Unsupported time domain: " + timerData.getDomain());
            }
            onFiredOrDeletedTimer(timerData);
        }

        public Instant currentProcessingTime() {
            return new Instant(DoFnOperator.this.timerService.currentProcessingTime());
        }

        public Instant currentSynchronizedProcessingTime() {
            return new Instant(DoFnOperator.this.timerService.currentProcessingTime());
        }

        public Instant currentInputWatermarkTime() {
            return new Instant(DoFnOperator.this.getEffectiveInputWatermark());
        }

        public Instant currentOutputWatermarkTime() {
            return new Instant(DoFnOperator.this.currentOutputWatermark);
        }

        public boolean hasPendingEventTimeTimers(long j) throws Exception {
            for (TimerInternals.TimerData timerData : this.pendingTimersById.values()) {
                if (timerData.getDomain() == TimeDomain.EVENT_TIME && timerData.getTimestamp().getMillis() <= j) {
                    return true;
                }
            }
            return false;
        }

        private String getContextTimerId(String str, StateNamespace stateNamespace) {
            return str + stateNamespace.stringKey();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$MultiOutputOutputManagerFactory.class */
    public static class MultiOutputOutputManagerFactory<OutputT> implements OutputManagerFactory<OutputT> {
        private final TupleTag<OutputT> mainTag;
        private final Map<TupleTag<?>, Integer> tagsToIds;
        private final Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
        private final Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders;
        private final SerializablePipelineOptions pipelineOptions;

        public MultiOutputOutputManagerFactory(TupleTag<OutputT> tupleTag, Coder<WindowedValue<OutputT>> coder, SerializablePipelineOptions serializablePipelineOptions) {
            this(tupleTag, new HashMap(), ImmutableMap.builder().put(tupleTag, coder).build(), ImmutableMap.builder().put(tupleTag, 0).build(), serializablePipelineOptions);
        }

        public MultiOutputOutputManagerFactory(TupleTag<OutputT> tupleTag, Map<TupleTag<?>, OutputTag<WindowedValue<?>>> map, Map<TupleTag<?>, Coder<WindowedValue<?>>> map2, Map<TupleTag<?>, Integer> map3, SerializablePipelineOptions serializablePipelineOptions) {
            this.mainTag = tupleTag;
            this.tagsToOutputTags = map;
            this.tagsToCoders = map2;
            this.tagsToIds = map3;
            this.pipelineOptions = serializablePipelineOptions;
        }

        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.OutputManagerFactory
        public BufferedOutputManager<OutputT> create(Output<StreamRecord<WindowedValue<OutputT>>> output, Lock lock, OperatorStateBackend operatorStateBackend) throws Exception {
            Preconditions.checkNotNull(output);
            Preconditions.checkNotNull(lock);
            Preconditions.checkNotNull(operatorStateBackend);
            return new BufferedOutputManager<>(output, this.mainTag, this.tagsToOutputTags, this.tagsToIds, lock, NonKeyedPushedBackElementsHandler.create(operatorStateBackend.getListState(new ListStateDescriptor("bundle-buffer-tag", new CoderTypeSerializer(buildTaggedKvCoder(), this.pipelineOptions)))));
        }

        private TaggedKvCoder buildTaggedKvCoder() {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (Map.Entry<TupleTag<?>, Integer> entry : this.tagsToIds.entrySet()) {
                builder.put(entry.getValue(), this.tagsToCoders.get(entry.getKey()));
            }
            return new TaggedKvCoder(builder.build());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$OutputManagerFactory.class */
    public interface OutputManagerFactory<OutputT> extends Serializable {
        BufferedOutputManager<OutputT> create(Output<StreamRecord<WindowedValue<OutputT>>> output, Lock lock, OperatorStateBackend operatorStateBackend) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$TaggedKvCoder.class */
    public static class TaggedKvCoder extends StructuredCoder<KV<Integer, WindowedValue<?>>> {
        private final Map<Integer, Coder<WindowedValue<?>>> idsToCoders;

        TaggedKvCoder(Map<Integer, Coder<WindowedValue<?>>> map) {
            this.idsToCoders = map;
        }

        public void encode(KV<Integer, WindowedValue<?>> kv, OutputStream outputStream) throws IOException {
            Coder<WindowedValue<?>> coder = this.idsToCoders.get(kv.getKey());
            VarIntCoder.of().encode((Integer) kv.getKey(), outputStream);
            coder.encode((WindowedValue) kv.getValue(), outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public KV<Integer, WindowedValue<?>> m42decode(InputStream inputStream) throws IOException {
            Integer decode = VarIntCoder.of().decode(inputStream);
            return KV.of(decode, (WindowedValue) this.idsToCoders.get(decode).decode(inputStream));
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return new ArrayList(this.idsToCoders.values());
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
            Iterator<Coder<WindowedValue<?>>> it = this.idsToCoders.values().iterator();
            while (it.hasNext()) {
                verifyDeterministic(this, "Coder must be deterministic", new Coder[]{it.next()});
            }
        }
    }

    public DoFnOperator(DoFn<InputT, OutputT> doFn, String str, Coder<WindowedValue<InputT>> coder, Map<TupleTag<?>, Coder<?>> map, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map2, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions, Coder<?> coder2, KeySelector<WindowedValue<InputT>, ?> keySelector, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map3) {
        this.doFn = doFn;
        this.stepName = str;
        this.windowedInputCoder = coder;
        this.outputCoders = map;
        this.mainOutputTag = tupleTag;
        this.additionalOutputTags = list;
        this.sideInputTagMapping = map2;
        this.sideInputs = collection;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.windowingStrategy = windowingStrategy;
        this.outputManagerFactory = outputManagerFactory;
        setChainingStrategy(ChainingStrategy.ALWAYS);
        this.keyCoder = coder2;
        this.keySelector = keySelector;
        this.timerCoder = TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
        FlinkPipelineOptions flinkPipelineOptions = (FlinkPipelineOptions) pipelineOptions.as(FlinkPipelineOptions.class);
        this.maxBundleSize = flinkPipelineOptions.getMaxBundleSize().longValue();
        Preconditions.checkArgument(this.maxBundleSize > 0, "Bundle size must be at least 1");
        this.maxBundleTimeMills = flinkPipelineOptions.getMaxBundleTimeMills().longValue();
        Preconditions.checkArgument(this.maxBundleTimeMills > 0, "Bundle time must be at least 1");
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.sideInputMapping = map3;
        this.requiresStableInput = doFn != null && DoFnSignatures.getSignature(doFn.getClass()).processElement().requiresStableInput();
        if (this.requiresStableInput) {
            Preconditions.checkState(CheckpointingMode.valueOf(flinkPipelineOptions.getCheckpointingMode()) == CheckpointingMode.EXACTLY_ONCE, "Checkpointing mode is not set to exactly once but @RequiresStableInput is used.");
            Preconditions.checkState(flinkPipelineOptions.getCheckpointingInterval().longValue() > 0, "No checkpointing configured but pipeline uses @RequiresStableInput");
            LOG.warn("Enabling stable input for transform {}. Will only process elements at most every {} milliseconds.", str, Long.valueOf(flinkPipelineOptions.getCheckpointingInterval().longValue() + Math.max(0L, flinkPipelineOptions.getMinPauseBetweenCheckpoints().longValue())));
        }
        this.finishBundleBeforeCheckpointing = flinkPipelineOptions.getFinishBundleBeforeCheckpointing();
    }

    protected DoFn<InputT, OutputT> getDoFn() {
        return this.doFn;
    }

    protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner, StepContext stepContext) {
        if (this.keyCoder == null) {
            return this.doFnRunner;
        }
        return DoFnRunners.defaultStatefulDoFnRunner(this.doFn, getInputCoder(), doFnRunner, stepContext, this.windowingStrategy, new StatefulDoFnRunner.TimeInternalsCleanupTimer<InputT>(this.timerInternals, this.windowingStrategy) { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.1
            public void setForWindow(InputT inputt, BoundedWindow boundedWindow) {
                if (boundedWindow.equals(GlobalWindow.INSTANCE)) {
                    return;
                }
                super.setForWindow(inputt, boundedWindow);
            }
        }, new StatefulDoFnRunner.StateInternalsStateCleaner(this.doFn, this.keyedStateInternals, this.windowingStrategy.getWindowFn().windowCoder()), true);
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<WindowedValue<OutputT>>> output) {
        FileSystems.setDefaultPipelineOptions(this.serializedOptions.get());
        super.setup(streamTask, streamConfig, output);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("pushed-back-elements", new CoderTypeSerializer(this.windowedInputCoder, this.serializedOptions));
        if (this.keySelector != null) {
            this.pushedBackElementsHandler = KeyedPushedBackElementsHandler.create(this.keySelector, getKeyedStateBackend(), listStateDescriptor);
        } else {
            this.pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(getOperatorStateBackend().getListState(listStateDescriptor));
        }
        this.currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
        this.currentSideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
        this.currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
        this.sideInputReader = NullSideInputReader.of(this.sideInputs);
        if (this.sideInputs.isEmpty()) {
            this.pushedBackWatermark = Long.MAX_VALUE;
        } else {
            this.sideInputHandler = new SideInputHandler(this.sideInputs, new FlinkBroadcastStateInternals(getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend(), this.serializedOptions));
            this.sideInputReader = this.sideInputHandler;
            this.pushedBackWatermark = ((Long) this.pushedBackElementsHandler.getElements().map(windowedValue -> {
                return Long.valueOf(windowedValue.getTimestamp().getMillis());
            }).reduce(Long.MAX_VALUE, (v0, v1) -> {
                return Math.min(v0, v1);
            })).longValue();
        }
        if (this.keyCoder != null) {
            this.keyedStateInternals = new FlinkStateInternals<>(getKeyedStateBackend(), this.keyCoder, this.serializedOptions);
            if (this.timerService == null) {
                this.timerService = getInternalTimerService("beam-timer", new CoderTypeSerializer(this.timerCoder, this.serializedOptions), this);
            }
            this.timerInternals = new FlinkTimerInternals();
            this.timeServiceManagerCompat = getTimeServiceManagerCompat();
        }
        this.outputManager = this.outputManagerFactory.create(this.output, getLockToAcquireForStateAccessDuringBundles(), getOperatorStateBackend());
    }

    protected Lock getLockToAcquireForStateAccessDuringBundles() {
        return NoopLock.get();
    }

    public void open() throws Exception {
        this.doFn = getDoFn();
        FlinkPipelineOptions flinkPipelineOptions = (FlinkPipelineOptions) this.serializedOptions.get().as(FlinkPipelineOptions.class);
        this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(this.doFn, flinkPipelineOptions);
        FlinkStepContext flinkStepContext = new FlinkStepContext();
        this.doFnRunner = DoFnRunners.simpleRunner(flinkPipelineOptions, this.doFn, this.sideInputReader, this.outputManager, this.mainOutputTag, this.additionalOutputTags, flinkStepContext, getInputCoder(), this.outputCoders, this.windowingStrategy, this.doFnSchemaInformation, this.sideInputMapping);
        if (this.requiresStableInput) {
            BufferingDoFnRunner<InputT, OutputT> create = BufferingDoFnRunner.create(this.doFnRunner, "stable-input-buffer", this.windowedInputCoder, this.windowingStrategy.getWindowFn().windowCoder(), getOperatorStateBackend(), getKeyedStateBackend(), flinkPipelineOptions.getNumConcurrentCheckpoints(), this.serializedOptions);
            this.bufferingDoFnRunner = create;
            this.doFnRunner = create;
        }
        this.doFnRunner = createWrappingDoFnRunner(this.doFnRunner, flinkStepContext);
        earlyBindStateIfNeeded();
        if (!flinkPipelineOptions.getDisableMetrics().booleanValue()) {
            this.flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext());
            this.doFnRunner = new DoFnRunnerWithMetricsUpdate(this.stepName, this.doFnRunner, this.flinkMetricContainer);
            String reportCheckpointDuration = flinkPipelineOptions.getReportCheckpointDuration();
            if (reportCheckpointDuration != null) {
                MetricName named = MetricName.named(reportCheckpointDuration, "checkpoint_duration");
                this.checkpointStats = new CheckpointStats(() -> {
                    return this.flinkMetricContainer.getMetricsContainer(this.stepName).getDistribution(named);
                });
            }
        }
        this.elementCount = 0L;
        this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
        long max = Math.max(this.maxBundleTimeMills / 2, 1L);
        this.checkFinishBundleTimer = getProcessingTimeService().scheduleAtFixedRate(j -> {
            checkInvokeFinishBundleByTime();
        }, max, max);
        if (this.doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
            this.pushbackDoFnRunner = new ProcessFnRunner(this.doFnRunner, this.sideInputs, this.sideInputHandler);
        } else {
            this.pushbackDoFnRunner = SimplePushbackSideInputDoFnRunner.create(this.doFnRunner, this.sideInputs, this.sideInputHandler);
        }
        this.bundleFinalizer = new InMemoryBundleFinalizer();
        this.pendingFinalizations = new LinkedHashMap<>();
    }

    private void earlyBindStateIfNeeded() throws IllegalArgumentException, IllegalAccessException {
        if (this.keyCoder == null || this.doFn == null) {
            return;
        }
        DoFnSignature signature = DoFnSignatures.getSignature(this.doFn.getClass());
        FlinkStateInternals.EarlyBinder earlyBinder = new FlinkStateInternals.EarlyBinder(getKeyedStateBackend(), this.serializedOptions);
        for (DoFnSignature.StateDeclaration stateDeclaration : signature.stateDeclarations().values()) {
            ((StateSpec) ((DoFnSignature.StateDeclaration) signature.stateDeclarations().get(stateDeclaration.id())).field().get(this.doFn)).bind(stateDeclaration.id(), earlyBinder);
        }
        if (this.doFnRunner instanceof StatefulDoFnRunner) {
            this.doFnRunner.getSystemStateTags().forEach(stateTag -> {
                stateTag.getSpec().bind(stateTag.getId(), earlyBinder);
            });
        }
    }

    public void dispose() throws Exception {
        try {
            Optional.ofNullable(this.flinkMetricContainer).ifPresent((v0) -> {
                v0.registerMetricsForPipelineResult();
            });
            Optional.ofNullable(this.checkFinishBundleTimer).ifPresent(scheduledFuture -> {
                scheduledFuture.cancel(true);
            });
            Workarounds.deleteStaticCaches();
            Optional.ofNullable(this.doFnInvoker).ifPresent((v0) -> {
                v0.invokeTeardown();
            });
        } finally {
            super.dispose();
        }
    }

    public void close() throws Exception {
        try {
            if (numProcessingTimeTimers() > 0) {
                this.timerInternals.processPendingProcessingTimeTimers();
            }
            if (numProcessingTimeTimers() > 0) {
                throw new RuntimeException("There are still " + numProcessingTimeTimers() + " processing-time timers left, this indicates a bug");
            }
            processWatermark(new Watermark(Long.MAX_VALUE));
            while (this.bundleStarted) {
                invokeFinishBundle();
            }
            if (this.currentOutputWatermark < Long.MAX_VALUE) {
                throw new RuntimeException("There are still watermark holds. Watermark held at " + this.currentOutputWatermark);
            }
            if (this.sideInputs.isEmpty()) {
                return;
            }
            List list = (List) this.pushedBackElementsHandler.getElements().collect(Collectors.toList());
            if (list.size() > 0) {
                throw new RuntimeException("Leftover pushed-back data: " + Joiner.on(",").join(list) + ". This indicates a bug.");
            }
        } finally {
            super.close();
        }
    }

    public long getEffectiveInputWatermark() {
        return Math.min(this.pushedBackWatermark, this.currentInputWatermark);
    }

    public long getCurrentOutputWatermark() {
        return this.currentOutputWatermark;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void setPreBundleCallback(Runnable runnable) {
        this.preBundleCallback = runnable;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void setBundleFinishedCallback(Runnable runnable) {
        this.bundleFinishedCallback = runnable;
    }

    public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) {
        checkInvokeStartBundle();
        long longValue = this.keyCoder != null ? this.keyedStateInternals.minWatermarkHoldMs().longValue() : -1L;
        this.doFnRunner.processElement((WindowedValue) streamRecord.getValue());
        checkInvokeFinishBundleByCount();
        emitWatermarkIfHoldChanged(longValue);
    }

    public final void processElement1(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        checkInvokeStartBundle();
        Iterable<WindowedValue<InputT>> processElementInReadyWindows = this.pushbackDoFnRunner.processElementInReadyWindows((WindowedValue) streamRecord.getValue());
        long j = this.pushedBackWatermark;
        for (WindowedValue<InputT> windowedValue : processElementInReadyWindows) {
            j = Math.min(j, windowedValue.getTimestamp().getMillis());
            this.pushedBackElementsHandler.pushBack(windowedValue);
        }
        this.pushedBackWatermark = j;
        checkInvokeFinishBundleByCount();
    }

    protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
        WindowedValue windowedValue = (WindowedValue) ((RawUnionValue) streamRecord.getValue()).getValue();
        this.sideInputHandler.addSideInputValue(this.sideInputTagMapping.get(Integer.valueOf(((RawUnionValue) streamRecord.getValue()).getUnionTag())), windowedValue);
    }

    public final void processElement2(StreamRecord<RawUnionValue> streamRecord) throws Exception {
        invokeFinishBundle();
        checkInvokeStartBundle();
        addSideInputValue(streamRecord);
        ArrayList<WindowedValue<InputT>> arrayList = new ArrayList();
        for (WindowedValue<InputT> windowedValue : this.pushedBackElementsHandler.getElements()) {
            setKeyContextElement1(new StreamRecord(windowedValue));
            Iterables.addAll(arrayList, this.pushbackDoFnRunner.processElementInReadyWindows(windowedValue));
        }
        this.pushedBackElementsHandler.clear();
        long j = Long.MAX_VALUE;
        for (WindowedValue<InputT> windowedValue2 : arrayList) {
            j = Math.min(j, windowedValue2.getTimestamp().getMillis());
            this.pushedBackElementsHandler.pushBack(windowedValue2);
        }
        this.pushedBackWatermark = j;
        checkInvokeFinishBundleByCount();
        processWatermark1(new Watermark(this.currentInputWatermark));
    }

    public final void processWatermark(Watermark watermark) throws Exception {
        processWatermark1(watermark);
    }

    public final void processWatermark1(Watermark watermark) throws Exception {
        this.outputManager.flushBuffer();
        if (this.currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            emitAllPushedBackData();
        }
        this.currentInputWatermark = watermark.getTimestamp();
        processInputWatermark(true);
    }

    private void processInputWatermark(boolean z) throws Exception {
        long applyInputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark());
        if (this.keyCoder != null && z) {
            this.timeServiceManagerCompat.advanceWatermark(new Watermark(applyInputWatermarkHold));
        }
        maybeEmitWatermark(applyOutputWatermarkHold(this.currentOutputWatermark, computeOutputWatermark(applyInputWatermarkHold)));
    }

    public long applyInputWatermarkHold(long j) {
        return j;
    }

    public long applyOutputWatermarkHold(long j, long j2) {
        return j2;
    }

    private long computeOutputWatermark(long j) {
        return this.keyCoder == null ? j : Math.min(this.keyedStateInternals.minWatermarkHoldMs().longValue(), j);
    }

    private void maybeEmitWatermark(long j) {
        if (j > this.currentOutputWatermark) {
            if (j >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                invokeFinishBundle();
            }
            LOG.debug("Emitting watermark {}", Long.valueOf(j));
            this.currentOutputWatermark = j;
            this.output.emitWatermark(new Watermark(j));
            if (this.keyedStateInternals == null || this.currentOutputWatermark <= adjustTimestampForFlink(GlobalWindow.INSTANCE.maxTimestamp().getMillis())) {
                return;
            }
            this.keyedStateInternals.clearGlobalState();
        }
    }

    public final void processWatermark2(Watermark watermark) throws Exception {
        this.currentSideInputWatermark = watermark.getTimestamp();
        if (watermark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            emitAllPushedBackData();
            processWatermark1(new Watermark(this.currentInputWatermark));
        }
    }

    private void emitAllPushedBackData() throws Exception {
        for (WindowedValue<InputT> windowedValue : this.pushedBackElementsHandler.getElements()) {
            checkInvokeStartBundle();
            setKeyContextElement1(new StreamRecord(windowedValue));
            this.doFnRunner.processElement(windowedValue);
        }
        this.pushedBackElementsHandler.clear();
        this.pushedBackWatermark = Long.MAX_VALUE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkInvokeStartBundle() {
        if (this.bundleStarted) {
            return;
        }
        this.outputManager.flushBuffer();
        LOG.debug("Starting bundle.");
        if (this.preBundleCallback != null) {
            this.preBundleCallback.run();
        }
        this.pushbackDoFnRunner.startBundle();
        this.bundleStarted = true;
    }

    @SuppressFBWarnings({"VO_VOLATILE_INCREMENT"})
    private void checkInvokeFinishBundleByCount() {
        this.elementCount++;
        if (this.elementCount >= this.maxBundleSize) {
            invokeFinishBundle();
        }
    }

    private void checkInvokeFinishBundleByTime() {
        if (getProcessingTimeService().getCurrentProcessingTime() - this.lastFinishBundleTime >= this.maxBundleTimeMills) {
            invokeFinishBundle();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void invokeFinishBundle() {
        if (this.bundleStarted) {
            LOG.debug("Finishing bundle.");
            this.pushbackDoFnRunner.finishBundle();
            LOG.debug("Finished bundle. Element count: {}", Long.valueOf(this.elementCount));
            this.elementCount = 0L;
            this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
            this.bundleStarted = false;
            if (this.bundleFinishedCallback != null) {
                LOG.debug("Invoking bundle finish callback.");
                this.bundleFinishedCallback.run();
            }
        }
    }

    public void prepareSnapshotPreBarrier(long j) {
        if (this.finishBundleBeforeCheckpointing) {
            while (this.bundleStarted) {
                invokeFinishBundle();
            }
        }
    }

    public final void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        if (this.checkpointStats != null) {
            this.checkpointStats.snapshotStart(stateSnapshotContext.getCheckpointId());
        }
        if (this.requiresStableInput) {
            this.bufferingDoFnRunner.checkpoint(stateSnapshotContext.getCheckpointId());
        }
        int size = this.pendingFinalizations.size() - MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS;
        if (size >= 0) {
            Iterator<Long> it = this.pendingFinalizations.keySet().iterator();
            while (size >= 0) {
                it.next();
                it.remove();
                size--;
            }
        }
        this.pendingFinalizations.put(Long.valueOf(stateSnapshotContext.getCheckpointId()), this.bundleFinalizer.getAndClearFinalizations());
        try {
            this.outputManager.openBuffer();
            while (this.bundleStarted) {
                invokeFinishBundle();
            }
            this.outputManager.closeBuffer();
            super.snapshotState(stateSnapshotContext);
        } catch (Exception e) {
            throw new Error("Checkpointing failed because bundle failed to finalize.", e);
        }
    }

    public DoFn.BundleFinalizer getBundleFinalizer() {
        return this.bundleFinalizer;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.checkpointStats != null) {
            this.checkpointStats.reportCheckpointDuration(j);
        }
        if (this.requiresStableInput) {
            this.bufferingDoFnRunner.checkpointCompleted(j);
        }
        List<InMemoryBundleFinalizer.Finalization> remove = this.pendingFinalizations.remove(Long.valueOf(j));
        if (remove != null) {
            Iterator<InMemoryBundleFinalizer.Finalization> it = remove.iterator();
            while (it.hasNext()) {
                it.next().getCallback().onBundleSuccess();
            }
        }
        super.notifyCheckpointComplete(j);
    }

    public void onEventTime(InternalTimer<ByteBuffer, TimerInternals.TimerData> internalTimer) {
        checkInvokeStartBundle();
        fireTimerInternal((ByteBuffer) internalTimer.getKey(), (TimerInternals.TimerData) internalTimer.getNamespace());
    }

    public void onProcessingTime(InternalTimer<ByteBuffer, TimerInternals.TimerData> internalTimer) {
        checkInvokeStartBundle();
        fireTimerInternal((ByteBuffer) internalTimer.getKey(), (TimerInternals.TimerData) internalTimer.getNamespace());
    }

    protected void fireTimerInternal(ByteBuffer byteBuffer, TimerInternals.TimerData timerData) {
        long longValue = this.keyCoder != null ? this.keyedStateInternals.minWatermarkHoldMs().longValue() : -1L;
        fireTimer(timerData);
        emitWatermarkIfHoldChanged(longValue);
    }

    void emitWatermarkIfHoldChanged(long j) {
        if (this.keyCoder == null || this.keyedStateInternals.minWatermarkHoldMs().longValue() <= j) {
            return;
        }
        try {
            processInputWatermark(false);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fireTimer(TimerInternals.TimerData timerData) {
        LOG.debug("Firing timer: {} at {} with output time {}", new Object[]{timerData.getTimerId(), Long.valueOf(timerData.getTimestamp().getMillis()), Long.valueOf(timerData.getOutputTimestamp().getMillis())});
        StateNamespaces.WindowNamespace namespace = timerData.getNamespace();
        org.apache.flink.util.Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
        BoundedWindow window = namespace.getWindow();
        this.timerInternals.onFiredOrDeletedTimer(timerData);
        this.pushbackDoFnRunner.onTimer(timerData.getTimerId(), timerData.getTimerFamilyId(), this.keyedStateInternals.getKey(), window, timerData.getTimestamp(), timerData.getOutputTimestamp(), timerData.getDomain());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Coder<InputT> getInputCoder() {
        return (Coder) Iterables.getOnlyElement(this.windowedInputCoder.getCoderArguments());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static long adjustTimestampForFlink(long j) {
        if (j == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        return j + 1;
    }
}
