package org.apache.beam.runners.jet.processors;

import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.jet.processors.AbstractParDoP;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/jet/processors/StatefulParDoP.class */
public class StatefulParDoP<OutputT> extends AbstractParDoP<KV<?, ?>, OutputT> {
    private KeyedStepContext keyedStepContext;
    private InMemoryTimerInternals timerInternals;

    /* loaded from: input_file:org/apache/beam/runners/jet/processors/StatefulParDoP$KeyedStepContext.class */
    private static class KeyedStepContext implements StepContext {
        private final Map<Object, InMemoryStateInternals> stateInternalsOfKeys = new HashMap();
        private final InMemoryTimerInternals timerInternals;
        private InMemoryStateInternals currentStateInternals;

        KeyedStepContext(InMemoryTimerInternals inMemoryTimerInternals) {
            this.timerInternals = inMemoryTimerInternals;
        }

        void setKey(Object obj) {
            this.currentStateInternals = this.stateInternalsOfKeys.computeIfAbsent(obj, InMemoryStateInternals::forKey);
        }

        public StateInternals stateInternals() {
            return this.currentStateInternals;
        }

        public TimerInternals timerInternals() {
            return this.timerInternals;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/jet/processors/StatefulParDoP$Supplier.class */
    public static class Supplier<OutputT> extends AbstractParDoP.AbstractSupplier<KV<?, ?>, OutputT> {
        public Supplier(String str, String str2, DoFn<KV<?, ?>, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, SerializablePipelineOptions serializablePipelineOptions, TupleTag<OutputT> tupleTag, Set<TupleTag<OutputT>> set, Coder<KV<?, ?>> coder, Map<PCollectionView<?>, Coder<?>> map, Map<TupleTag<?>, Coder<?>> map2, Coder<KV<?, ?>> coder2, Map<TupleTag<?>, Coder<?>> map3, List<PCollectionView<?>> list) {
            super(str, str2, doFn, windowingStrategy, doFnSchemaInformation, serializablePipelineOptions, tupleTag, set, coder, map, map2, coder2, map3, list);
        }

        @Override // org.apache.beam.runners.jet.processors.AbstractParDoP.AbstractSupplier
        Processor getEx(DoFn<KV<?, ?>, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, Map<TupleTag<?>, int[]> map, SerializablePipelineOptions serializablePipelineOptions, TupleTag<OutputT> tupleTag, Coder<KV<?, ?>> coder, Map<PCollectionView<?>, Coder<?>> map2, Map<TupleTag<?>, Coder<?>> map3, Coder<KV<?, ?>> coder2, Map<TupleTag<?>, Coder<?>> map4, Map<Integer, PCollectionView<?>> map5, String str, String str2) {
            return new StatefulParDoP(doFn, windowingStrategy, doFnSchemaInformation, map, serializablePipelineOptions, tupleTag, coder, map2, map3, coder2, map4, map5, str, str2);
        }

        @Override // org.apache.beam.runners.jet.processors.AbstractParDoP.AbstractSupplier, org.apache.beam.runners.jet.DAGBuilder.WiringListener
        public /* bridge */ /* synthetic */ void isInboundEdgeOfVertex(Edge edge, String str, String str2, String str3) {
            super.isInboundEdgeOfVertex(edge, str, str2, str3);
        }

        @Override // org.apache.beam.runners.jet.processors.AbstractParDoP.AbstractSupplier, org.apache.beam.runners.jet.DAGBuilder.WiringListener
        public /* bridge */ /* synthetic */ void isOutboundEdgeOfVertex(Edge edge, String str, String str2, String str3) {
            super.isOutboundEdgeOfVertex(edge, str, str2, str3);
        }

        @Override // org.apache.beam.runners.jet.processors.AbstractParDoP.AbstractSupplier
        /* renamed from: getEx */
        public /* bridge */ /* synthetic */ Processor m4getEx() {
            return super.m4getEx();
        }
    }

    private StatefulParDoP(DoFn<KV<?, ?>, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, Map<TupleTag<?>, int[]> map, SerializablePipelineOptions serializablePipelineOptions, TupleTag<OutputT> tupleTag, Coder<KV<?, ?>> coder, Map<PCollectionView<?>, Coder<?>> map2, Map<TupleTag<?>, Coder<?>> map3, Coder<KV<?, ?>> coder2, Map<TupleTag<?>, Coder<?>> map4, Map<Integer, PCollectionView<?>> map5, String str, String str2) {
        super(doFn, windowingStrategy, doFnSchemaInformation, map, serializablePipelineOptions, tupleTag, coder, map2, map3, coder2, map4, map5, str, str2);
    }

    private static void fireTimer(TimerInternals.TimerData timerData, DoFnRunner<KV<?, ?>, ?> doFnRunner) {
        doFnRunner.onTimer(timerData.getTimerId(), timerData.getNamespace().getWindow(), timerData.getTimestamp(), timerData.getDomain());
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    protected DoFnRunner<KV<?, ?>, OutputT> getDoFnRunner(PipelineOptions pipelineOptions, DoFn<KV<?, ?>, OutputT> doFn, SideInputReader sideInputReader, AbstractParDoP.JetOutputManager jetOutputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, Coder<KV<?, ?>> coder, Map<TupleTag<?>, Coder<?>> map, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map2) {
        this.timerInternals = new InMemoryTimerInternals();
        this.keyedStepContext = new KeyedStepContext(this.timerInternals);
        return DoFnRunners.simpleRunner(pipelineOptions, doFn, sideInputReader, jetOutputManager, tupleTag, list, this.keyedStepContext, coder, map, windowingStrategy, doFnSchemaInformation, map2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public void startRunnerBundle(DoFnRunner<KV<?, ?>, OutputT> doFnRunner) {
        try {
            Instant now = Instant.now();
            this.timerInternals.advanceProcessingTime(now);
            this.timerInternals.advanceSynchronizedProcessingTime(now);
            super.startRunnerBundle(doFnRunner);
        } catch (Exception e) {
            throw new RuntimeException("Failed advancing time!");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public void processElementWithRunner(DoFnRunner<KV<?, ?>, OutputT> doFnRunner, WindowedValue<KV<?, ?>> windowedValue) {
        this.keyedStepContext.setKey(((KV) windowedValue.getValue()).getKey());
        super.processElementWithRunner(doFnRunner, windowedValue);
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return flushTimers(watermark.timestamp()) && super.tryProcessWatermark(watermark);
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public boolean complete() {
        return flushTimers(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) && super.complete();
    }

    private boolean flushTimers(long j) {
        if (this.timerInternals.currentInputWatermarkTime().isBefore(j)) {
            try {
                Instant instant = new Instant(j);
                this.timerInternals.advanceInputWatermark(instant);
                if (instant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                    this.timerInternals.advanceProcessingTime(instant);
                    this.timerInternals.advanceSynchronizedProcessingTime(instant);
                }
                fireEligibleTimers(this.timerInternals);
            } catch (Exception e) {
                throw new RuntimeException("Failed advancing processing time", e);
            }
        }
        return this.outputManager.tryFlush();
    }

    private void fireEligibleTimers(InMemoryTimerInternals inMemoryTimerInternals) {
        boolean z;
        do {
            z = false;
            while (true) {
                TimerInternals.TimerData removeNextEventTimer = inMemoryTimerInternals.removeNextEventTimer();
                if (removeNextEventTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextEventTimer, this.doFnRunner);
            }
            while (true) {
                TimerInternals.TimerData removeNextProcessingTimer = inMemoryTimerInternals.removeNextProcessingTimer();
                if (removeNextProcessingTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextProcessingTimer, this.doFnRunner);
            }
            while (true) {
                TimerInternals.TimerData removeNextSynchronizedProcessingTimer = inMemoryTimerInternals.removeNextSynchronizedProcessingTimer();
                if (removeNextSynchronizedProcessingTimer == null) {
                    break;
                }
                z = true;
                fireTimer(removeNextSynchronizedProcessingTimer, this.doFnRunner);
            }
        } while (z);
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public /* bridge */ /* synthetic */ boolean completeEdge(int i) {
        return super.completeEdge(i);
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public /* bridge */ /* synthetic */ boolean tryProcess() {
        return super.tryProcess();
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public /* bridge */ /* synthetic */ void process(int i, @Nonnull Inbox inbox) {
        super.process(i, inbox);
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public /* bridge */ /* synthetic */ boolean isCooperative() {
        return super.isCooperative();
    }

    @Override // org.apache.beam.runners.jet.processors.AbstractParDoP
    public /* bridge */ /* synthetic */ void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        super.init(outbox, context);
    }
}
