package org.apache.flink.iteration.broadcast;

import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.class */
public class RecordWriterBroadcastOutput<OUT> implements BroadcastOutput<OUT> {
    private final RecordWriter<SerializationDelegate<StreamElement>> recordWriter;
    private final SerializationDelegate<StreamElement> serializationDelegate;

    public RecordWriterBroadcastOutput(RecordWriter<SerializationDelegate<StreamElement>> recordWriter, TypeSerializer<StreamElement> typeSerializer) {
        this.recordWriter = recordWriter;
        this.serializationDelegate = new SerializationDelegate<>(typeSerializer);
    }

    @Override // org.apache.flink.iteration.broadcast.BroadcastOutput
    public void broadcastEmit(StreamRecord<OUT> streamRecord) throws IOException {
        this.serializationDelegate.setInstance(streamRecord);
        this.recordWriter.broadcastEmit(this.serializationDelegate);
        if (isIterationEpochWatermark(streamRecord)) {
            this.recordWriter.flushAll();
        }
    }

    private static <T> boolean isIterationEpochWatermark(StreamRecord<T> streamRecord) {
        if (streamRecord.getValue() instanceof IterationRecord) {
            return ((IterationRecord) streamRecord.getValue()).getType().equals(IterationRecord.Type.EPOCH_WATERMARK);
        }
        return false;
    }
}
