package org.apache.flink.streaming.connectors.fs;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.JsonNode;

/* loaded from: input_file:org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.class */
public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
    private static final long serialVersionUID = 1;
    public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
    public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value";
    public static final String CONF_COMPRESS = "mapreduce.output.fileoutputformat.compress";
    public static final String CONF_COMPRESS_CODEC = "mapreduce.output.fileoutputformat.compress.codec";
    public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
    public static final String CONF_XZ_LEVEL = "avro.xz.level";
    private transient AvroKeyValueWriter<K, V> keyValueWriter;
    private final Map<String, String> properties;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter$AvroKeyValue.class */
    public static class AvroKeyValue<K, V> {
        public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair";
        public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = "org.apache.avro.mapreduce";
        public static final String KEY_FIELD = "key";
        public static final String VALUE_FIELD = "value";
        public final GenericRecord mKeyValueRecord;

        public AvroKeyValue(GenericRecord genericRecord) {
            this.mKeyValueRecord = genericRecord;
        }

        public GenericRecord get() {
            return this.mKeyValueRecord;
        }

        public void setKey(K k) {
            this.mKeyValueRecord.put(KEY_FIELD, k);
        }

        public void setValue(V v) {
            this.mKeyValueRecord.put(VALUE_FIELD, v);
        }

        public K getKey() {
            return (K) this.mKeyValueRecord.get(KEY_FIELD);
        }

        public V getValue() {
            return (V) this.mKeyValueRecord.get(VALUE_FIELD);
        }

        public static Schema getSchema(Schema schema, Schema schema2) {
            Schema createRecord = Schema.createRecord(KEY_VALUE_PAIR_RECORD_NAME, "A key/value pair", KEY_VALUE_PAIR_RECORD_NAMESPACE, false);
            createRecord.setFields(Arrays.asList(new Schema.Field(KEY_FIELD, schema, "The key", (JsonNode) null), new Schema.Field(VALUE_FIELD, schema2, "The value", (JsonNode) null)));
            return createRecord;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter$AvroKeyValueWriter.class */
    public static final class AvroKeyValueWriter<K, V> {
        private final DataFileWriter<GenericRecord> mAvroFileWriter;
        private final Schema mKeyValuePairSchema;
        private final AvroKeyValue<Object, Object> mOutputRecord;

        AvroKeyValueWriter(Schema schema, Schema schema2, CodecFactory codecFactory, OutputStream outputStream, int i) throws IOException {
            this.mKeyValuePairSchema = AvroKeyValue.getSchema(schema, schema2);
            this.mAvroFileWriter = new DataFileWriter<>(new GenericDatumWriter(this.mKeyValuePairSchema));
            this.mAvroFileWriter.setCodec(codecFactory);
            this.mAvroFileWriter.setSyncInterval(i);
            this.mAvroFileWriter.create(this.mKeyValuePairSchema, outputStream);
            this.mOutputRecord = new AvroKeyValue<>(new GenericData.Record(this.mKeyValuePairSchema));
        }

        AvroKeyValueWriter(Schema schema, Schema schema2, CodecFactory codecFactory, OutputStream outputStream) throws IOException {
            this(schema, schema2, codecFactory, outputStream, 64000);
        }

        void write(K k, V v) throws IOException {
            this.mOutputRecord.setKey(k);
            this.mOutputRecord.setValue(v);
            this.mAvroFileWriter.append(this.mOutputRecord.get());
        }

        void close() throws IOException {
            this.mAvroFileWriter.close();
        }

        long sync() throws IOException {
            return this.mAvroFileWriter.sync();
        }
    }

    public AvroKeyValueSinkWriter(Map<String, String> map) {
        this.properties = map;
        validateProperties();
    }

    protected AvroKeyValueSinkWriter(AvroKeyValueSinkWriter<K, V> avroKeyValueSinkWriter) {
        super(avroKeyValueSinkWriter);
        this.properties = avroKeyValueSinkWriter.properties;
        validateProperties();
    }

    private void validateProperties() {
        String str = this.properties.get(CONF_OUTPUT_KEY_SCHEMA);
        if (str == null) {
            throw new IllegalStateException("No key schema provided, set 'avro.schema.output.key' property");
        }
        Schema.parse(str);
        String str2 = this.properties.get(CONF_OUTPUT_VALUE_SCHEMA);
        if (str2 == null) {
            throw new IllegalStateException("No value schema provided, set 'avro.schema.output.value' property");
        }
        Schema.parse(str2);
    }

    private boolean getBoolean(Map<String, String> map, String str, boolean z) {
        String str2 = map.get(str);
        return str2 == null ? z : Boolean.parseBoolean(str2);
    }

    private int getInt(Map<String, String> map, String str, int i) {
        String str2 = map.get(str);
        return str2 == null ? i : Integer.parseInt(str2);
    }

    private CodecFactory getCompressionCodec(Map<String, String> map) {
        if (!getBoolean(map, CONF_COMPRESS, false)) {
            return CodecFactory.nullCodec();
        }
        int i = getInt(map, CONF_DEFLATE_LEVEL, -1);
        int i2 = getInt(map, CONF_XZ_LEVEL, 6);
        String str = map.get(CONF_COMPRESS_CODEC);
        return "deflate".equals(str) ? CodecFactory.deflateCodec(i) : "xz".equals(str) ? CodecFactory.xzCodec(i2) : CodecFactory.fromString(str);
    }

    @Override // org.apache.flink.streaming.connectors.fs.StreamWriterBase, org.apache.flink.streaming.connectors.fs.Writer
    public void open(FileSystem fileSystem, Path path) throws IOException {
        super.open(fileSystem, path);
        this.keyValueWriter = new AvroKeyValueWriter<>(Schema.parse(this.properties.get(CONF_OUTPUT_KEY_SCHEMA)), Schema.parse(this.properties.get(CONF_OUTPUT_VALUE_SCHEMA)), getCompressionCodec(this.properties), getStream());
    }

    @Override // org.apache.flink.streaming.connectors.fs.StreamWriterBase, org.apache.flink.streaming.connectors.fs.Writer
    public void close() throws IOException {
        super.close();
        if (this.keyValueWriter != null) {
            this.keyValueWriter.close();
        }
    }

    @Override // org.apache.flink.streaming.connectors.fs.StreamWriterBase, org.apache.flink.streaming.connectors.fs.Writer
    public long flush() throws IOException {
        if (this.keyValueWriter != null) {
            this.keyValueWriter.sync();
        }
        return super.flush();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.connectors.fs.Writer
    public void write(Tuple2<K, V> tuple2) throws IOException {
        getStream();
        this.keyValueWriter.write(tuple2.f0, tuple2.f1);
    }

    public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        if (!typeInformation.isTupleType()) {
            throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
        }
        if (((TupleTypeInfoBase) typeInformation).getArity() != 2) {
            throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
        }
    }

    @Override // org.apache.flink.streaming.connectors.fs.Writer
    public Writer<Tuple2<K, V>> duplicate() {
        return new AvroKeyValueSinkWriter(this);
    }

    @Override // org.apache.flink.streaming.connectors.fs.StreamWriterBase
    public int hashCode() {
        return Objects.hash(Integer.valueOf(super.hashCode()), this.properties);
    }

    @Override // org.apache.flink.streaming.connectors.fs.StreamWriterBase
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return obj != null && getClass() == obj.getClass() && Objects.equals(this.properties, ((AvroKeyValueSinkWriter) obj).properties) && super.equals(obj);
    }
}
