package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamRecordWriter.class */
public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
    private long timeout;
    private boolean flushAlways;
    private StreamRecordWriter<T>.OutputFlusher outputFlusher;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamRecordWriter$OutputFlusher.class */
    private class OutputFlusher extends Thread {
        private volatile boolean running;

        private OutputFlusher() {
            this.running = true;
        }

        public void terminate() {
            this.running = false;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    StreamRecordWriter.this.flush();
                    Thread.sleep(StreamRecordWriter.this.timeout);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    public StreamRecordWriter(ResultPartitionWriter resultPartitionWriter) {
        this(resultPartitionWriter, new RoundRobinChannelSelector(), 1000L);
    }

    public StreamRecordWriter(ResultPartitionWriter resultPartitionWriter, ChannelSelector<T> channelSelector) {
        this(resultPartitionWriter, channelSelector, 1000L);
    }

    public StreamRecordWriter(ResultPartitionWriter resultPartitionWriter, ChannelSelector<T> channelSelector, long j) {
        super(resultPartitionWriter, channelSelector);
        this.flushAlways = false;
        this.timeout = j;
        if (j == 0) {
            this.flushAlways = true;
        } else {
            this.outputFlusher = new OutputFlusher();
            this.outputFlusher.start();
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.RecordWriter
    public void emit(T t) throws IOException, InterruptedException {
        super.emit(t);
        if (this.flushAlways) {
            flush();
        }
    }

    public void close() {
        try {
            if (this.outputFlusher != null) {
                this.outputFlusher.terminate();
                this.outputFlusher.join();
            }
            flush();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e2) {
        }
    }
}
