package org.apache.flink.iteration.proxy;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

/* loaded from: input_file:org/apache/flink/iteration/proxy/ProxyOutput.class */
public class ProxyOutput<T> implements Output<StreamRecord<T>> {
    private final Output<StreamRecord<IterationRecord<T>>> output;
    private Integer contextRound;
    private final Map<String, SideOutputCache> sideOutputCaches = new HashMap();
    private final StreamRecord<IterationRecord<T>> reuseRecord = new StreamRecord<>(IterationRecord.newRecord(null, 0));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/iteration/proxy/ProxyOutput$SideOutputCache.class */
    public static class SideOutputCache {
        final OutputTag<IterationRecord<?>> tag;
        final StreamRecord<IterationRecord<?>> cachedRecord;

        public SideOutputCache(OutputTag<IterationRecord<?>> outputTag, StreamRecord<IterationRecord<?>> streamRecord) {
            this.tag = outputTag;
            this.cachedRecord = streamRecord;
        }
    }

    public ProxyOutput(Output<StreamRecord<IterationRecord<T>>> output) {
        this.output = (Output) Objects.requireNonNull(output);
    }

    public void setContextRound(Integer num) {
        this.contextRound = num;
    }

    public void emitWatermark(Watermark watermark) {
    }

    public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        this.output.emitWatermarkStatus(watermarkStatus);
    }

    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
        SideOutputCache computeIfAbsent = this.sideOutputCaches.computeIfAbsent(outputTag.getId(), str -> {
            return new SideOutputCache(new OutputTag(outputTag.getId(), new IterationRecordTypeInfo(outputTag.getTypeInfo())), new StreamRecord(IterationRecord.newRecord(null, 0)));
        });
        computeIfAbsent.cachedRecord.replace(IterationRecord.newRecord(streamRecord.getValue(), this.contextRound.intValue()), streamRecord.getTimestamp());
        this.output.collect(computeIfAbsent.tag, computeIfAbsent.cachedRecord);
    }

    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        this.output.emitLatencyMarker(latencyMarker);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void collect(StreamRecord<T> streamRecord) {
        ((IterationRecord) this.reuseRecord.getValue()).setValue(streamRecord.getValue());
        ((IterationRecord) this.reuseRecord.getValue()).setEpoch(this.contextRound.intValue());
        this.reuseRecord.setTimestamp(streamRecord.getTimestamp());
        this.output.collect(this.reuseRecord);
    }

    public void close() {
        this.output.close();
    }
}
