package org.apache.flink.streaming.connectors.fs;

import java.io.IOException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/fs/SequenceFileWriter.class */
public class SequenceFileWriter<K extends Writable, V extends Writable> extends StreamWriterBase<Tuple2<K, V>> implements InputTypeConfigurable {
    private static final long serialVersionUID = 1;
    private final String compressionCodecName;
    private SequenceFile.CompressionType compressionType;
    private transient SequenceFile.Writer writer;
    private Class<K> keyClass;
    private Class<V> valueClass;

    public SequenceFileWriter() {
        this("None", SequenceFile.CompressionType.NONE);
    }

    public SequenceFileWriter(String str, SequenceFile.CompressionType compressionType) {
        this.compressionCodecName = str;
        this.compressionType = compressionType;
    }

    @Override // org.apache.flink.streaming.connectors.fs.StreamWriterBase, org.apache.flink.streaming.connectors.fs.Writer
    public void open(FileSystem fileSystem, Path path) throws IOException {
        super.open(fileSystem, path);
        if (this.keyClass == null) {
            throw new IllegalStateException("Key Class has not been initialized.");
        }
        if (this.valueClass == null) {
            throw new IllegalStateException("Value Class has not been initialized.");
        }
        CompressionCodec compressionCodec = null;
        Configuration hadoopConfiguration = HadoopFileSystem.getHadoopConfiguration();
        if (!this.compressionCodecName.equals("None")) {
            compressionCodec = new CompressionCodecFactory(hadoopConfiguration).getCodecByName(this.compressionCodecName);
            if (compressionCodec == null) {
                throw new RuntimeException("Codec " + this.compressionCodecName + " not found.");
            }
        }
        this.writer = SequenceFile.createWriter(hadoopConfiguration, getStream(), this.keyClass, this.valueClass, this.compressionType, compressionCodec);
    }

    @Override // org.apache.flink.streaming.connectors.fs.StreamWriterBase, org.apache.flink.streaming.connectors.fs.Writer
    public void close() throws IOException {
        if (this.writer != null) {
            this.writer.close();
        }
        super.close();
    }

    @Override // org.apache.flink.streaming.connectors.fs.Writer
    public void write(Tuple2<K, V> tuple2) throws IOException {
        getStream();
        this.writer.append((Writable) tuple2.f0, (Writable) tuple2.f1);
    }

    public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        if (!typeInformation.isTupleType()) {
            throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
        }
        TupleTypeInfoBase tupleTypeInfoBase = (TupleTypeInfoBase) typeInformation;
        if (tupleTypeInfoBase.getArity() != 2) {
            throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
        }
        TypeInformation typeAt = tupleTypeInfoBase.getTypeAt(0);
        TypeInformation typeAt2 = tupleTypeInfoBase.getTypeAt(1);
        this.keyClass = typeAt.getTypeClass();
        this.valueClass = typeAt2.getTypeClass();
    }

    @Override // org.apache.flink.streaming.connectors.fs.Writer
    public Writer<Tuple2<K, V>> duplicate() {
        SequenceFileWriter sequenceFileWriter = new SequenceFileWriter(this.compressionCodecName, this.compressionType);
        sequenceFileWriter.keyClass = this.keyClass;
        sequenceFileWriter.valueClass = this.valueClass;
        return sequenceFileWriter;
    }
}
