package org.apache.beam.repackaged.direct_java.runners.fnexecution.splittabledofn;

import java.io.IOException;
import java.util.List;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.Durations;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.class */
public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
    private final Coder<BoundedWindow> windowCoder;
    private final Coder<WindowedValue<KV<InputT, RestrictionT>>> elementRestrictionWireCoder;
    private final StateInternals stateInternals;
    private final TimerInternals timerInternals;
    private StateNamespace stateNamespace;
    private final StateTag<ValueState<WindowedValue<KV<InputT, RestrictionT>>>> seedTag;
    private ValueState<WindowedValue<KV<InputT, RestrictionT>>> seedState;
    private final StateTag<ValueState<RestrictionT>> restrictionTag;
    private ValueState<RestrictionT> restrictionState;
    private StateTag<WatermarkHoldState> watermarkHoldTag = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", TimestampCombiner.LATEST));
    private WatermarkHoldState holdState;
    private Instant inputTimestamp;
    private List<BeamFnApi.BundleApplication> primaryRoots;
    private List<BeamFnApi.DelayedBundleApplication> residualRoots;

    public SDFFeederViaStateAndTimers(StateInternals stateInternals, TimerInternals timerInternals, Coder<InputT> coder, Coder<RestrictionT> coder2, Coder<BoundedWindow> coder3) {
        this.stateInternals = stateInternals;
        this.timerInternals = timerInternals;
        this.windowCoder = coder3;
        this.elementRestrictionWireCoder = WindowedValue.FullWindowedValueCoder.of(KvCoder.of(coder, coder2), coder3);
        this.seedTag = StateTags.value("seed", this.elementRestrictionWireCoder);
        this.restrictionTag = StateTags.value("restriction", coder2);
    }

    public void seed(WindowedValue<KV<InputT, RestrictionT>> windowedValue) {
        initState(StateNamespaces.window(this.windowCoder, (BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows())));
        this.seedState.write(windowedValue);
        this.inputTimestamp = windowedValue.getTimestamp();
    }

    public WindowedValue<KV<InputT, RestrictionT>> resume(TimerInternals.TimerData timerData) {
        initState(timerData.getNamespace());
        WindowedValue windowedValue = (WindowedValue) this.seedState.read();
        this.inputTimestamp = windowedValue.getTimestamp();
        return windowedValue.withValue(KV.of(((KV) windowedValue.getValue()).getKey(), this.restrictionState.read()));
    }

    public void commit() throws IOException {
        if (this.primaryRoots == null) {
            this.seedState.clear();
            this.restrictionState.clear();
            this.holdState.clear();
            return;
        }
        Preconditions.checkArgument(this.residualRoots.size() == 1, "More than 1 residual is unsupported for now");
        BeamFnApi.DelayedBundleApplication delayedBundleApplication = this.residualRoots.get(0);
        this.restrictionState.write(((KV) ((WindowedValue) this.elementRestrictionWireCoder.decode(delayedBundleApplication.getApplication().getElement().newInput())).getValue()).getValue());
        Instant instant = delayedBundleApplication.getApplication().getOutputWatermarksMap().isEmpty() ? this.inputTimestamp : new Instant(Iterables.getOnlyElement(delayedBundleApplication.getApplication().getOutputWatermarksMap().values()));
        Preconditions.checkArgument(!instant.isBefore(this.inputTimestamp), "Watermark hold %s can not be before input timestamp %s", instant, this.inputTimestamp);
        this.holdState.add(instant);
        Instant instant2 = new Instant(System.currentTimeMillis() + Durations.toMillis(delayedBundleApplication.getRequestedTimeDelay()));
        Instant currentProcessingTime = this.timerInternals.currentProcessingTime().isBefore(instant2) ? instant2 : this.timerInternals.currentProcessingTime();
        this.timerInternals.setTimer(this.stateNamespace, "sdfContinuation", "sdfContinuation", currentProcessingTime, currentProcessingTime, TimeDomain.PROCESSING_TIME);
    }

    public void split(List<BeamFnApi.BundleApplication> list, List<BeamFnApi.DelayedBundleApplication> list2) {
        Preconditions.checkState(this.primaryRoots == null, "At most 1 split supported, however got new split (%s, %s) in addition to existing (%s, %s)", list, list2, this.primaryRoots, this.residualRoots);
        this.primaryRoots = list;
        this.residualRoots = list2;
    }

    private void initState(StateNamespace stateNamespace) {
        this.stateNamespace = stateNamespace;
        this.seedState = this.stateInternals.state(stateNamespace, this.seedTag);
        this.restrictionState = this.stateInternals.state(stateNamespace, this.restrictionTag);
        this.holdState = this.stateInternals.state(stateNamespace, this.watermarkHoldTag);
    }
}
