package org.apache.nemo.compiler.frontend.beam.transform;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.compiler.frontend.beam.SideInputElement;

/* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.class */
public final class PushBackDoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<InputT, InputT, OutputT> {
    private List<WindowedValue<InputT>> curPushedBacks;
    private long curPushedBackWatermark;
    private long curInputWatermark;
    private long curOutputWatermark;

    public PushBackDoFnTransform(DoFn<InputT, OutputT> doFn, Coder<InputT> coder, Map<TupleTag<?>, Coder<?>> map, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map2, PipelineOptions pipelineOptions, DisplayData displayData, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map3) {
        super(doFn, coder, map, tupleTag, list, windowingStrategy, map2, pipelineOptions, displayData, doFnSchemaInformation, map3);
        this.curPushedBacks = new ArrayList();
        this.curPushedBackWatermark = Long.MAX_VALUE;
        this.curInputWatermark = Long.MIN_VALUE;
        this.curOutputWatermark = Long.MIN_VALUE;
    }

    @Override // org.apache.nemo.compiler.frontend.beam.transform.AbstractDoFnTransform
    protected DoFn wrapDoFn(DoFn doFn) {
        return doFn;
    }

    public void onData(WindowedValue windowedValue) {
        if (windowedValue.getValue() instanceof SideInputElement) {
            getSideInputReader().addSideInputElement(getSideInputs().get(Integer.valueOf(((SideInputElement) windowedValue.getValue()).getSideInputIndex())), windowedValue);
            handlePushBacks();
            onWatermark(new Watermark(this.curInputWatermark));
            return;
        }
        checkAndInvokeBundle();
        for (WindowedValue<InputT> windowedValue2 : getPushBackRunner().processElementInReadyWindows(windowedValue)) {
            this.curPushedBackWatermark = Math.min(this.curPushedBackWatermark, windowedValue2.getTimestamp().getMillis());
            this.curPushedBacks.add(windowedValue2);
        }
        checkAndFinishBundle();
    }

    private void handlePushBacks() {
        forceFinishBundle();
        ArrayList arrayList = new ArrayList();
        long j = Long.MAX_VALUE;
        for (WindowedValue<InputT> windowedValue : this.curPushedBacks) {
            checkAndInvokeBundle();
            Iterable<WindowedValue> processElementInReadyWindows = getPushBackRunner().processElementInReadyWindows(windowedValue);
            checkAndFinishBundle();
            for (WindowedValue windowedValue2 : processElementInReadyWindows) {
                j = Math.min(j, windowedValue2.getTimestamp().getMillis());
                arrayList.add(windowedValue2);
            }
        }
        this.curPushedBacks = arrayList;
        this.curPushedBackWatermark = j;
    }

    public void onWatermark(Watermark watermark) {
        checkAndInvokeBundle();
        this.curInputWatermark = watermark.getTimestamp();
        getSideInputReader().setCurrentWatermarkOfAllMainAndSideInputs(this.curInputWatermark);
        long min = Math.min(this.curInputWatermark, this.curPushedBackWatermark);
        if (min > this.curOutputWatermark) {
            getOutputCollector().emitWatermark(new Watermark(min));
            this.curOutputWatermark = min;
        }
        checkAndFinishBundle();
    }

    @Override // org.apache.nemo.compiler.frontend.beam.transform.AbstractDoFnTransform
    protected void beforeClose() {
        onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
        handlePushBacks();
    }

    @Override // org.apache.nemo.compiler.frontend.beam.transform.AbstractDoFnTransform
    OutputCollector wrapOutputCollector(OutputCollector outputCollector) {
        return outputCollector;
    }
}
