package org.apache.flink.streaming.io;

import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.api.ChannelSelector;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

/* loaded from: input_file:org/apache/flink/streaming/io/StreamRecordWriter.class */
public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
    private final BufferProvider bufferPool;
    private final ChannelSelector<T> channelSelector;
    private int numChannels;
    private long timeout;
    private RecordSerializer<T>[] serializers;

    /* loaded from: input_file:org/apache/flink/streaming/io/StreamRecordWriter$OutputFlusher.class */
    private class OutputFlusher extends Thread {
        private OutputFlusher() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!StreamRecordWriter.this.outputGate.isClosed()) {
                try {
                    Thread.sleep(StreamRecordWriter.this.timeout);
                    StreamRecordWriter.this.flush();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public StreamRecordWriter(AbstractInvokable abstractInvokable) {
        this(abstractInvokable, new RoundRobinChannelSelector(), 1000L);
    }

    public StreamRecordWriter(AbstractInvokable abstractInvokable, ChannelSelector<T> channelSelector) {
        this(abstractInvokable, channelSelector, 1000L);
    }

    public StreamRecordWriter(AbstractInvokable abstractInvokable, ChannelSelector<T> channelSelector, long j) {
        super(abstractInvokable);
        this.timeout = j;
        this.bufferPool = abstractInvokable.getEnvironment().getOutputBufferProvider();
        this.channelSelector = channelSelector;
    }

    public void initializeSerializers() {
        this.numChannels = this.outputGate.getNumChannels();
        this.serializers = new RecordSerializer[this.numChannels];
        for (int i = 0; i < this.numChannels; i++) {
            this.serializers[i] = new SpanningRecordSerializer();
        }
        new OutputFlusher().start();
    }

    public void emit(T t) throws IOException, InterruptedException {
        for (int i : this.channelSelector.selectChannels(t, this.numChannels)) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            synchronized (recordSerializer) {
                RecordSerializer.SerializationResult addRecord = recordSerializer.addRecord(t);
                while (addRecord.isFullBuffer()) {
                    Buffer currentBuffer = recordSerializer.getCurrentBuffer();
                    if (currentBuffer != null) {
                        sendBuffer(currentBuffer, i);
                    }
                    addRecord = recordSerializer.setNextBuffer(this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize()));
                }
            }
        }
    }

    public void flush() throws IOException, InterruptedException {
        for (int i = 0; i < this.numChannels; i++) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            synchronized (recordSerializer) {
                Buffer currentBuffer = recordSerializer.getCurrentBuffer();
                if (currentBuffer != null) {
                    sendBuffer(currentBuffer, i);
                }
                recordSerializer.clear();
            }
        }
    }
}
