package org.apache.nemo.runtime.executor.datatransfer;

import java.util.ArrayList;
import java.util.List;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.class */
public final class MultiInputWatermarkManager implements InputWatermarkManager {
    private static final Logger LOG = LoggerFactory.getLogger(MultiInputWatermarkManager.class.getName());
    private final List<Watermark> watermarks;
    private final OutputCollector<?> watermarkCollector;
    private int minWatermarkIndex = 0;

    public MultiInputWatermarkManager(int i, OutputCollector<?> outputCollector) {
        this.watermarks = new ArrayList(i);
        this.watermarkCollector = outputCollector;
        for (int i2 = 0; i2 < i; i2++) {
            this.watermarks.add(new Watermark(Long.MIN_VALUE));
        }
    }

    private int findNextMinWatermarkIndex() {
        int i = -1;
        long j = Long.MAX_VALUE;
        for (int i2 = 0; i2 < this.watermarks.size(); i2++) {
            if (this.watermarks.get(i2).getTimestamp() < j) {
                i = i2;
                j = this.watermarks.get(i2).getTimestamp();
            }
        }
        return i;
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager
    public void trackAndEmitWatermarks(int i, Watermark watermark) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Track watermark {} emitted from edge {}:, {}", new Object[]{Long.valueOf(watermark.getTimestamp()), Integer.valueOf(i), this.watermarks.toString()});
        }
        if (i != this.minWatermarkIndex) {
            if (this.watermarks.get(i).getTimestamp() > watermark.getTimestamp()) {
                throw new IllegalStateException("The recent watermark timestamp cannot be less than the previous one because watermark is monotonically increasing.");
            }
            this.watermarks.set(i, watermark);
            return;
        }
        Watermark watermark2 = this.watermarks.get(this.minWatermarkIndex);
        this.watermarks.set(this.minWatermarkIndex, watermark);
        this.minWatermarkIndex = findNextMinWatermarkIndex();
        Watermark watermark3 = this.watermarks.get(this.minWatermarkIndex);
        if (watermark3.getTimestamp() < watermark2.getTimestamp()) {
            throw new IllegalStateException("The current min watermark is ahead of prev min: " + watermark3 + ", " + watermark2);
        }
        if (watermark3.getTimestamp() > watermark2.getTimestamp()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Emit watermark {}, {}", watermark3, this.watermarks);
            }
            this.watermarkCollector.emitWatermark(watermark3);
        }
    }
}
