package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.Proto2Coder;
import com.google.cloud.dataflow.sdk.coders.SetCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.runners.worker.StateFetcher;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.DoFnInfo;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.state.BagState;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.StateTags;
import com.google.cloud.dataflow.sdk.util.state.ValueState;
import com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.class */
public class StreamingSideInputDoFnRunner<InputT, OutputT, W extends BoundedWindow> extends DoFnRunner<InputT, OutputT> {
    private StreamingModeExecutionContext.StepContext stepContext;
    private StreamingModeExecutionContext execContext;
    private Map<String, PCollectionView<?>> sideInputViews;
    private final StateTag<BagState<WindowedValue<InputT>>> elementsAddr;
    private final StateTag<WatermarkStateInternal> watermarkHoldingAddr;
    private final StateTag<ValueState<Map<W, Set<Windmill.GlobalDataRequest>>>> blockedMapAddr;
    private Map<W, Set<Windmill.GlobalDataRequest>> blockedMap;
    private WindowFn<?, W> windowFn;

    public StreamingSideInputDoFnRunner(PipelineOptions pipelineOptions, DoFnInfo<InputT, OutputT> doFnInfo, SideInputReader sideInputReader, DoFnRunner.OutputManager outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator) throws Exception {
        super(pipelineOptions, doFnInfo.getDoFn(), sideInputReader, outputManager, tupleTag, list, stepContext, addCounterMutator, doFnInfo.getWindowingStrategy());
        this.stepContext = (StreamingModeExecutionContext.StepContext) stepContext;
        this.windowFn = (WindowFn<?, W>) doFnInfo.getWindowingStrategy().getWindowFn();
        this.sideInputViews = new HashMap();
        for (PCollectionView<?> pCollectionView : doFnInfo.getSideInputViews()) {
            this.sideInputViews.put(pCollectionView.getTagInternal().getId(), pCollectionView);
        }
        this.execContext = (StreamingModeExecutionContext) stepContext.getExecutionContext();
        this.blockedMapAddr = blockedMapAddr(this.windowFn);
        this.elementsAddr = StateTags.makeSystemTagInternal(StateTags.bag("elem", WindowedValue.getFullCoder(doFnInfo.getInputCoder(), this.windowFn.windowCoder())));
        this.watermarkHoldingAddr = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", doFnInfo.getWindowingStrategy().getOutputTimeFn()));
        this.blockedMap = (Map) ((ValueState) stepContext.stateInternals().state(StateNamespaces.global(), this.blockedMapAddr)).get().read();
        if (this.blockedMap == null) {
            this.blockedMap = new HashMap();
        }
    }

    @VisibleForTesting
    static <W extends BoundedWindow> StateTag<ValueState<Map<W, Set<Windmill.GlobalDataRequest>>>> blockedMapAddr(WindowFn<?, W> windowFn) {
        return StateTags.value("blockedMap", MapCoder.of(windowFn.windowCoder(), SetCoder.of(Proto2Coder.of(Windmill.GlobalDataRequest.class))));
    }

    private Set<W> getReadyWindows() {
        HashSet hashSet = new HashSet();
        for (Windmill.GlobalDataId globalDataId : this.execContext.getSideInputNotifications()) {
            if (this.sideInputViews.get(globalDataId.getTag()) != null) {
                for (Map.Entry<W, Set<Windmill.GlobalDataRequest>> entry : this.blockedMap.entrySet()) {
                    Set<Windmill.GlobalDataRequest> value = entry.getValue();
                    HashSet hashSet2 = new HashSet();
                    for (Windmill.GlobalDataRequest globalDataRequest : value) {
                        if (globalDataId.equals(globalDataRequest.getDataId())) {
                            hashSet2.add(globalDataRequest);
                        }
                    }
                    value.removeAll(hashSet2);
                    if (value.isEmpty()) {
                        try {
                            W key = entry.getKey();
                            boolean z = true;
                            for (PCollectionView<?> pCollectionView : this.sideInputViews.values()) {
                                if (!this.stepContext.issueSideInputFetch(pCollectionView, key, StateFetcher.SideInputState.KNOWN_READY)) {
                                    Windmill.GlobalDataRequest buildGlobalDataRequest = buildGlobalDataRequest(pCollectionView, key);
                                    this.stepContext.addBlockingSideInput(buildGlobalDataRequest);
                                    value.add(buildGlobalDataRequest);
                                    z = false;
                                }
                            }
                            if (z) {
                                hashSet.add(key);
                            }
                        } catch (IOException e) {
                            throw Throwables.propagate(e);
                        }
                    }
                }
            }
        }
        return hashSet;
    }

    @Override // com.google.cloud.dataflow.sdk.util.DoFnRunner
    public void startBundle() {
        super.startBundle();
        Set<W> readyWindows = getReadyWindows();
        for (W w : readyWindows) {
            elementBag(w).get();
            WatermarkStateInternal watermarkHold = watermarkHold(w);
            watermarkHold.get();
            watermarkHold.clear();
        }
        for (W w2 : readyWindows) {
            this.blockedMap.remove(w2);
            BagState<WindowedValue<InputT>> elementBag = elementBag(w2);
            try {
                Iterator it = ((Iterable) elementBag.get().read()).iterator();
                while (it.hasNext()) {
                    this.fn.processElement(createProcessContext((WindowedValue) it.next()));
                }
                elementBag.clear();
            } catch (Throwable th) {
                Throwables.propagateIfInstanceOf(th, UserCodeException.class);
                throw new UserCodeException(th);
            }
        }
    }

    private Set<Windmill.GlobalDataRequest> computeBlockedSideInputs(W w) throws IOException {
        Set<Windmill.GlobalDataRequest> set = this.blockedMap.get(w);
        if (set == null) {
            for (PCollectionView<?> pCollectionView : this.sideInputViews.values()) {
                if (!this.stepContext.issueSideInputFetch(pCollectionView, w, StateFetcher.SideInputState.UNKNOWN)) {
                    if (set == null) {
                        set = new HashSet();
                        this.blockedMap.put(w, set);
                    }
                    set.add(buildGlobalDataRequest(pCollectionView, w));
                }
            }
        }
        return set;
    }

    @VisibleForTesting
    BagState<WindowedValue<InputT>> elementBag(W w) {
        return (BagState) this.stepContext.stateInternals().state(StateNamespaces.window(this.windowFn.windowCoder(), w), this.elementsAddr);
    }

    @VisibleForTesting
    WatermarkStateInternal watermarkHold(W w) {
        return (WatermarkStateInternal) this.stepContext.stateInternals().state(StateNamespaces.window(this.windowFn.windowCoder(), w), this.watermarkHoldingAddr);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.google.cloud.dataflow.sdk.util.DoFnRunner
    public void invokeProcessElement(WindowedValue<InputT> windowedValue) {
        BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows());
        try {
            Set<Windmill.GlobalDataRequest> computeBlockedSideInputs = computeBlockedSideInputs(boundedWindow);
            if (computeBlockedSideInputs == null) {
                this.fn.processElement(createProcessContext(windowedValue));
            } else {
                elementBag(boundedWindow).add(windowedValue);
                watermarkHold(boundedWindow).add(windowedValue.getTimestamp());
                this.stepContext.addBlockingSideInputs(computeBlockedSideInputs);
            }
        } catch (Throwable th) {
            Throwables.propagateIfInstanceOf(th, UserCodeException.class);
            throw new UserCodeException(th);
        }
    }

    @Override // com.google.cloud.dataflow.sdk.util.DoFnRunner
    public void finishBundle() {
        super.finishBundle();
        ((ValueState) this.stepContext.stateInternals().state(StateNamespaces.global(), this.blockedMapAddr)).set(this.blockedMap);
    }

    private <SideWindowT extends BoundedWindow> Windmill.GlobalDataRequest buildGlobalDataRequest(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow) throws IOException {
        WindowingStrategy<?, ?> windowingStrategyInternal = pCollectionView.getWindowingStrategyInternal();
        WindowFn<?, ?> windowFn = windowingStrategyInternal.getWindowFn();
        Coder<?> windowCoder = windowFn.windowCoder();
        Object sideInputWindow = windowFn.getSideInputWindow(boundedWindow);
        ByteString.Output newOutput = ByteString.newOutput();
        windowCoder.encode(sideInputWindow, newOutput, Coder.Context.OUTER);
        return Windmill.GlobalDataRequest.newBuilder().setDataId(Windmill.GlobalDataId.newBuilder().setTag(pCollectionView.getTagInternal().getId()).setVersion(newOutput.toByteString()).build()).setExistenceWatermarkDeadline(TimeUnit.MILLISECONDS.toMicros(windowingStrategyInternal.getTrigger().getSpec().getWatermarkThatGuaranteesFiring(sideInputWindow).getMillis())).build();
    }
}
