package org.apache.nemo.compiler.frontend.beam.transform;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.class */
public final class GBKTransform<K, InputT, OutputT> extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
    private static final Logger LOG = LoggerFactory.getLogger(GBKTransform.class.getName());
    private final SystemReduceFn reduceFn;
    private transient InMemoryTimerInternalsFactory<K> inMemoryTimerInternalsFactory;
    private transient InMemoryStateInternalsFactory<K> inMemoryStateInternalsFactory;
    private final Map<K, Watermark> keyOutputWatermarkMap;
    private Watermark prevOutputWatermark;
    private Watermark inputWatermark;
    private boolean dataReceived;
    private transient OutputCollector originOc;
    private final boolean isPartialCombining;

    /* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/transform/GBKTransform$GBKOutputCollector.class */
    public class GBKOutputCollector implements OutputCollector<WindowedValue<KV<K, OutputT>>> {
        private final OutputCollector<WindowedValue<KV<K, OutputT>>> oc;

        public GBKOutputCollector(OutputCollector outputCollector) {
            this.oc = outputCollector;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public final void emit(WindowedValue<KV<K, OutputT>> windowedValue) {
            if (windowedValue.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
                Object key = ((KV) windowedValue.getValue()).getKey();
                InMemoryTimerInternals timerInternalsForKey = GBKTransform.this.inMemoryTimerInternalsFactory.timerInternalsForKey(key);
                GBKTransform.this.keyOutputWatermarkMap.put(key, new Watermark(windowedValue.getTimestamp().getMillis() + 1));
                timerInternalsForKey.advanceOutputWatermark(new Instant(windowedValue.getTimestamp().getMillis() + 1));
            }
            this.oc.emit(windowedValue);
        }

        public final void emitWatermark(Watermark watermark) {
            this.oc.emitWatermark(watermark);
        }

        public final <T> void emit(String str, T t) {
            this.oc.emit(str, t);
        }
    }

    public GBKTransform(Coder<KV<K, InputT>> coder, Map<TupleTag<?>, Coder<?>> map, TupleTag<KV<K, OutputT>> tupleTag, WindowingStrategy<?, ?> windowingStrategy, PipelineOptions pipelineOptions, SystemReduceFn systemReduceFn, DoFnSchemaInformation doFnSchemaInformation, DisplayData displayData, boolean z) {
        super(null, coder, map, tupleTag, Collections.emptyList(), windowingStrategy, Collections.emptyMap(), pipelineOptions, displayData, doFnSchemaInformation, Collections.emptyMap());
        this.keyOutputWatermarkMap = new HashMap();
        this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
        this.inputWatermark = new Watermark(Long.MIN_VALUE);
        this.dataReceived = false;
        this.reduceFn = systemReduceFn;
        this.isPartialCombining = z;
    }

    @Override // org.apache.nemo.compiler.frontend.beam.transform.AbstractDoFnTransform
    protected DoFn wrapDoFn(DoFn doFn) {
        if (this.inMemoryStateInternalsFactory == null) {
            this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory<>();
        } else {
            LOG.info("InMemoryStateInternalFactory is already set");
        }
        if (this.inMemoryTimerInternalsFactory == null) {
            this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory<>();
        } else {
            LOG.info("InMemoryTimerInternalsFactory is already set");
        }
        return GroupAlsoByWindowViaWindowSetNewDoFn.create(getWindowingStrategy(), this.inMemoryStateInternalsFactory, this.inMemoryTimerInternalsFactory, (SideInputReader) null, this.reduceFn, getOutputManager(), getMainOutputTag());
    }

    @Override // org.apache.nemo.compiler.frontend.beam.transform.AbstractDoFnTransform
    OutputCollector wrapOutputCollector(OutputCollector outputCollector) {
        this.originOc = outputCollector;
        return new GBKOutputCollector(outputCollector);
    }

    public void onData(WindowedValue<KV<K, InputT>> windowedValue) {
        this.dataReceived = true;
        try {
            checkAndInvokeBundle();
            KV kv = (KV) windowedValue.getValue();
            getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(KeyedWorkItems.elementsWorkItem(kv.getKey(), Collections.singletonList(windowedValue.withValue(kv.getValue())))));
            checkAndFinishBundle();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("Exception triggered element " + windowedValue.toString());
        }
    }

    public void onWatermark(Watermark watermark) throws RuntimeException {
        if (watermark.getTimestamp() <= this.inputWatermark.getTimestamp()) {
            throw new RuntimeException("Received watermark " + watermark.getTimestamp() + " is before the previous inputWatermark " + this.inputWatermark.getTimestamp() + " in GBKTransform.");
        }
        checkAndInvokeBundle();
        this.inputWatermark = watermark;
        try {
            triggerTimers(Instant.now(), Instant.now(), this.inputWatermark);
            emitOutputWatermark();
            checkAndFinishBundle();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.nemo.compiler.frontend.beam.transform.AbstractDoFnTransform
    protected void beforeClose() {
        this.inputWatermark = new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
        triggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE, this.inputWatermark);
        emitOutputWatermark();
    }

    private void triggerTimers(Instant instant, Instant instant2, Watermark watermark) {
        Iterator<Map.Entry<K, InMemoryTimerInternals>> it = this.inMemoryTimerInternalsFactory.getTimerInternalsMap().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<K, InMemoryTimerInternals> next = it.next();
            try {
                next.getValue().advanceInputWatermark(new Instant(watermark.getTimestamp()));
                next.getValue().advanceProcessingTime(instant);
                next.getValue().advanceSynchronizedProcessingTime(instant2);
                for (TimeDomain timeDomain : TimeDomain.values()) {
                    processTrigger(next.getKey(), next.getValue(), timeDomain);
                }
                if (this.inMemoryTimerInternalsFactory.isEmpty(next.getValue())) {
                    it.remove();
                    this.inMemoryStateInternalsFactory.getStateInternalMap().remove(next.getKey());
                }
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException();
            }
        }
    }

    private void processTrigger(K k, InMemoryTimerInternals inMemoryTimerInternals, TimeDomain timeDomain) {
        while (true) {
            TimerInternals.TimerData pollTimer = this.inMemoryTimerInternalsFactory.pollTimer(inMemoryTimerInternals, timeDomain);
            if (pollTimer == null) {
                return;
            }
            getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(KeyedWorkItems.timersWorkItem(k, Collections.singletonList(pollTimer))));
        }
    }

    private void emitOutputWatermark() {
        Watermark watermark;
        if (this.keyOutputWatermarkMap.isEmpty()) {
            watermark = new Watermark(this.dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE);
        } else {
            watermark = (Watermark) Collections.min(this.keyOutputWatermarkMap.values());
        }
        Watermark watermark2 = watermark;
        Watermark watermark3 = new Watermark(Math.max(this.prevOutputWatermark.getTimestamp(), Math.min(watermark2.getTimestamp(), this.inputWatermark.getTimestamp())));
        while (true) {
            Watermark watermark4 = watermark3;
            if (watermark4.getTimestamp() <= this.prevOutputWatermark.getTimestamp()) {
                return;
            }
            this.prevOutputWatermark = watermark4;
            getOutputCollector().emitWatermark(watermark4);
            if (watermark2.getTimestamp() == watermark4.getTimestamp()) {
                long timestamp = watermark2.getTimestamp();
                this.keyOutputWatermarkMap.entrySet().removeIf(entry -> {
                    return ((Watermark) entry.getValue()).getTimestamp() == timestamp;
                });
            }
            watermark2 = this.keyOutputWatermarkMap.isEmpty() ? new Watermark(Long.MAX_VALUE) : (Watermark) Collections.min(this.keyOutputWatermarkMap.values());
            watermark3 = new Watermark(Math.max(this.prevOutputWatermark.getTimestamp(), Math.min(watermark2.getTimestamp(), this.inputWatermark.getTimestamp())));
        }
    }

    public boolean getIsPartialCombining() {
        return this.isPartialCombining;
    }
}
