package org.apache.nemo.compiler.frontend.beam.transform;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
import org.apache.nemo.common.punctuation.Watermark;

/* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.class */
public final class CreateViewTransform<I, O> extends LatencymarkEmitTransform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
    private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
    private final Map<BoundedWindow, List<I>> windowListMap = new HashMap();
    private long currentOutputWatermark = Long.MIN_VALUE;

    /* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform$MultiView.class */
    public static final class MultiView<T> implements Materializations.MultimapView<Void, T>, Serializable {
        private final Iterable<T> iterable;

        public MultiView(Iterable<T> iterable) {
            this.iterable = iterable;
        }

        public Iterable<Void> get() {
            return null;
        }

        public Iterable<T> get(@Nullable Void r3) {
            return this.iterable;
        }
    }

    public CreateViewTransform(ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn) {
        this.viewFn = viewFn;
    }

    public void onData(WindowedValue<KV<?, I>> windowedValue) {
        for (BoundedWindow boundedWindow : windowedValue.getWindows()) {
            this.windowListMap.putIfAbsent(boundedWindow, new ArrayList());
            this.windowListMap.get(boundedWindow).add(((KV) windowedValue.getValue()).getValue());
        }
    }

    public void onWatermark(Watermark watermark) {
        if (this.windowListMap.size() == 0 && this.currentOutputWatermark < watermark.getTimestamp()) {
            this.currentOutputWatermark = watermark.getTimestamp();
            getOutputCollector().emitWatermark(watermark);
            return;
        }
        Iterator<Map.Entry<BoundedWindow, List<I>>> it = this.windowListMap.entrySet().iterator();
        long j = Long.MAX_VALUE;
        while (it.hasNext()) {
            Map.Entry<BoundedWindow, List<I>> next = it.next();
            if (next.getKey().maxTimestamp().getMillis() <= watermark.getTimestamp()) {
                getOutputCollector().emit(WindowedValue.of(this.viewFn.apply(new MultiView(next.getValue())), next.getKey().maxTimestamp(), next.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING));
                it.remove();
                j = Math.min(j, next.getKey().maxTimestamp().getMillis());
            }
        }
        if (j == Long.MAX_VALUE || this.currentOutputWatermark >= j) {
            return;
        }
        this.currentOutputWatermark = j;
        getOutputCollector().emitWatermark(new Watermark(this.currentOutputWatermark));
    }

    public void close() {
        onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("CreateViewTransform  " + this.viewFn.getClass().getName());
        return sb.toString();
    }
}
