package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.Iterator;
import java.util.NavigableSet;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.slf4j.Marker;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.class */
public final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements TransformEvaluatorFactory {
    private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
    private final EvaluationContext evaluationContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory$AppliedPTransformOutputKeyAndWindow.class */
    public static abstract class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> getTransform();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract StructuralKey<K> getKey();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BoundedWindow getWindow();

        static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> appliedPTransform, StructuralKey<K> structuralKey, BoundedWindow boundedWindow) {
            return new AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow(appliedPTransform, structuralKey, boundedWindow);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.class */
    public static class StatefulParDoEvaluator<K, InputT> implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
        private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;
        private final DirectTimerInternals timerInternals;
        DirectExecutionContext.DirectStepContext stepContext;

        public StatefulParDoEvaluator(DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> doFnLifecycleManagerRemovingTransformEvaluator, DirectExecutionContext.DirectStepContext directStepContext) {
            this.delegateEvaluator = doFnLifecycleManagerRemovingTransformEvaluator;
            this.timerInternals = doFnLifecycleManagerRemovingTransformEvaluator.getParDoEvaluator().getStepContext().timerInternals();
            this.stepContext = directStepContext;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> windowedValue) throws Exception {
            Iterator<WindowedValue<KV<K, InputT>>> it = windowedValue.getValue().elementsIterable().iterator();
            while (it.hasNext()) {
                this.delegateEvaluator.processElement(it.next());
            }
            for (TimerInternals.TimerData timerData : windowedValue.getValue().timersIterable()) {
                NavigableSet<TimerInternals.TimerData> headSet = this.timerInternals.getModifiedTimersOrdered(timerData.getDomain()).headSet(timerData, true);
                while (!headSet.isEmpty()) {
                    TimerInternals.TimerData pollFirst = headSet.pollFirst();
                    if (!timerModified(pollFirst)) {
                        this.timerInternals.deleteTimer(pollFirst);
                        processTimer(pollFirst, windowedValue.getValue().key());
                    }
                }
                if (!timerModified(timerData)) {
                    processTimer(timerData, windowedValue.getValue().key());
                }
            }
        }

        private boolean timerModified(TimerInternals.TimerData timerData) {
            TimerInternals.TimerData timerData2 = this.timerInternals.getModifiedTimerIds().get(timerData.stringKey());
            return (timerData2 == null || timerData2.equals(timerData)) ? false : true;
        }

        private void processTimer(TimerInternals.TimerData timerData, K k) throws Exception {
            this.delegateEvaluator.onTimer(timerData, k, ((StateNamespaces.WindowNamespace) timerData.getNamespace()).getWindow());
            clearWatermarkHold(timerData);
        }

        private void clearWatermarkHold(TimerInternals.TimerData timerData) {
            ((WatermarkHoldState) this.stepContext.stateInternals().state(timerData.getNamespace(), StatefulParDoEvaluatorFactory.setTimerTag(timerData))).clear();
            this.stepContext.stateInternals().commit();
        }

        private void setWatermarkHold(TimerInternals.TimerData timerData) {
            ((WatermarkHoldState) this.stepContext.stateInternals().state(timerData.getNamespace(), StatefulParDoEvaluatorFactory.setTimerTag(timerData))).add(timerData.getOutputTimestamp());
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
            CopyOnAccessInMemoryStateInternals state;
            Instant watermarkHold;
            TransformResult<KV<K, InputT>> finishBundle = this.delegateEvaluator.finishBundle();
            boolean z = false;
            Iterator<? extends TimerInternals.TimerData> it = finishBundle.getTimerUpdate().getSetTimers().iterator();
            while (it.hasNext()) {
                setWatermarkHold(it.next());
                z = true;
            }
            Iterator<? extends TimerInternals.TimerData> it2 = finishBundle.getTimerUpdate().getDeletedTimers().iterator();
            while (it2.hasNext()) {
                clearWatermarkHold(it2.next());
            }
            if (z && finishBundle.getState() != null) {
                state = finishBundle.getState();
                watermarkHold = this.stepContext.commitState().getEarliestWatermarkHold();
            } else if (z) {
                state = this.stepContext.commitState();
                watermarkHold = state.getEarliestWatermarkHold();
            } else {
                state = finishBundle.getState();
                watermarkHold = finishBundle.getWatermarkHold();
            }
            StepTransformResult.Builder<InputT> withBundleFinalizations = StepTransformResult.withHold(finishBundle.getTransform(), watermarkHold).withTimerUpdate(finishBundle.getTimerUpdate()).withState(state).withMetricUpdates(finishBundle.getLogicalMetricUpdates()).addOutput(Lists.newArrayList(finishBundle.getOutputBundles())).withBundleFinalizations(finishBundle.getBundleFinalizations());
            for (WindowedValue<KV<K, InputT>> windowedValue : finishBundle.getUnprocessedElements()) {
                withBundleFinalizations.addUnprocessedElements(windowedValue.withValue(KeyedWorkItems.elementsWorkItem(windowedValue.getValue().getKey(), Collections.singleton(windowedValue))));
            }
            return withBundleFinalizations.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext, final PipelineOptions pipelineOptions) {
        this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext, ParDoEvaluator.defaultRunnerFactory(), new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() { // from class: org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.1
            @Override // org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader
            public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> appliedPTransform) throws Exception {
                return DoFnLifecycleManager.of(((ParDoMultiOverrideFactory.StatefulParDo) appliedPTransform.getTransform()).getDoFn(), pipelineOptions);
            }
        }, pipelineOptions);
        this.evaluationContext = evaluationContext;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) throws Exception {
        return createEvaluator(appliedPTransform, committedBundle);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() throws Exception {
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> appliedPTransform, CommittedBundle<KeyedWorkItem<K, KV<K, InputT>>> committedBundle) throws Exception {
        DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> createEvaluator = this.delegateFactory.createEvaluator(appliedPTransform, committedBundle.getPCollection(), committedBundle.getKey(), appliedPTransform.getTransform().getSideInputs(), appliedPTransform.getTransform().getMainOutputTag(), appliedPTransform.getTransform().getAdditionalOutputTags().getAll(), appliedPTransform.getTransform().getSchemaInformation(), appliedPTransform.getTransform().getSideInputMapping());
        DirectExecutionContext.DirectStepContext stepContext = this.evaluationContext.getExecutionContext(appliedPTransform, committedBundle.getKey()).getStepContext(this.evaluationContext.getStepName(appliedPTransform));
        stepContext.stateInternals().commit();
        return new StatefulParDoEvaluator(createEvaluator, stepContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StateTag<WatermarkHoldState> setTimerTag(TimerInternals.TimerData timerData) {
        return StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("timer-" + timerData.getTimerId() + Marker.ANY_NON_NULL_MARKER + timerData.getTimerFamilyId(), TimestampCombiner.EARLIEST));
    }
}
