/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.Properties;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class FlinkKafkaProducer010<T>
extends StreamSink<T>
implements SinkFunction<T>,
RichFunction,
CheckpointedFunction {
    private boolean writeTimestampToKafka = false;

    public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
        return FlinkKafkaProducer010.writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner());
    }

    public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
        return FlinkKafkaProducer010.writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper(serializationSchema), producerConfig, new FlinkFixedPartitioner());
    }

    public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
        GenericTypeInfo objectTypeInfo = new GenericTypeInfo(Object.class);
        FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<T>(topicId, serializationSchema, producerConfig, customPartitioner);
        SingleOutputStreamOperator transformation = inStream.transform("FlinKafkaProducer 0.10.x", (TypeInformation)objectTypeInfo, kafkaProducer);
        return new FlinkKafkaProducer010Configuration((DataStream)transformation, kafkaProducer);
    }

    public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
        this(topicId, (KeyedSerializationSchema<T>)new KeyedSerializationSchemaWrapper(serializationSchema), FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)brokerList), (FlinkKafkaPartitioner<T>)new FlinkFixedPartitioner());
    }

    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
        this(topicId, (KeyedSerializationSchema<T>)new KeyedSerializationSchemaWrapper(serializationSchema), producerConfig, (FlinkKafkaPartitioner<T>)new FlinkFixedPartitioner());
    }

    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
        this(topicId, (KeyedSerializationSchema<T>)new KeyedSerializationSchemaWrapper(serializationSchema), producerConfig, customPartitioner);
    }

    public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
        this(topicId, serializationSchema, FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)brokerList), (FlinkKafkaPartitioner<T>)new FlinkFixedPartitioner());
    }

    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
        this(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner<T>)new FlinkFixedPartitioner());
    }

    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
        super((SinkFunction)new FlinkKafkaProducer09(topicId, serializationSchema, producerConfig, customPartitioner));
    }

    @Deprecated
    public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
        GenericTypeInfo objectTypeInfo = new GenericTypeInfo(Object.class);
        FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<T>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner(customPartitioner));
        SingleOutputStreamOperator transformation = inStream.transform("FlinKafkaProducer 0.10.x", (TypeInformation)objectTypeInfo, kafkaProducer);
        return new FlinkKafkaProducer010Configuration((DataStream)transformation, kafkaProducer);
    }

    @Deprecated
    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
        this(topicId, (KeyedSerializationSchema<T>)new KeyedSerializationSchemaWrapper(serializationSchema), producerConfig, customPartitioner);
    }

    @Deprecated
    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
        super((SinkFunction)new FlinkKafkaProducer09(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner)new FlinkKafkaDelegatePartitioner(customPartitioner)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void invokeInternal(T next, long elementTimestamp) throws Exception {
        int[] partitions;
        FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase)this.userFunction;
        internalProducer.checkErroneous();
        byte[] serializedKey = internalProducer.schema.serializeKey(next);
        byte[] serializedValue = internalProducer.schema.serializeValue(next);
        String targetTopic = internalProducer.schema.getTargetTopic(next);
        if (targetTopic == null) {
            targetTopic = internalProducer.defaultTopicId;
        }
        Long timestamp = null;
        if (this.writeTimestampToKafka) {
            timestamp = elementTimestamp;
        }
        if (null == (partitions = (int[])internalProducer.topicPartitionsMap.get(targetTopic))) {
            partitions = FlinkKafkaProducerBase.getPartitionsByTopic((String)targetTopic, (KafkaProducer)internalProducer.producer);
            internalProducer.topicPartitionsMap.put(targetTopic, partitions);
        }
        ProducerRecord record = internalProducer.flinkKafkaPartitioner == null ? new ProducerRecord(targetTopic, null, timestamp, (Object)serializedKey, (Object)serializedValue) : new ProducerRecord(targetTopic, Integer.valueOf(internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions)), timestamp, (Object)serializedKey, (Object)serializedValue);
        if (internalProducer.flushOnCheckpoint) {
            SerializableObject serializableObject = internalProducer.pendingRecordsLock;
            synchronized (serializableObject) {
                ++internalProducer.pendingRecords;
            }
        }
        internalProducer.producer.send(record, internalProducer.callback);
    }

    public void setLogFailuresOnly(boolean logFailuresOnly) {
        FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase)this.userFunction;
        internalProducer.setLogFailuresOnly(logFailuresOnly);
    }

    public void setFlushOnCheckpoint(boolean flush) {
        FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase)this.userFunction;
        internalProducer.setFlushOnCheckpoint(flush);
    }

    public void open(Configuration parameters) throws Exception {
        FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase)this.userFunction;
        internalProducer.open(parameters);
    }

    public IterationRuntimeContext getIterationRuntimeContext() {
        FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase)this.userFunction;
        return internalProducer.getIterationRuntimeContext();
    }

    public void setRuntimeContext(RuntimeContext t) {
        FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase)this.userFunction;
        internalProducer.setRuntimeContext(t);
    }

    public void invoke(T value) throws Exception {
        this.invokeInternal(value, Long.MAX_VALUE);
    }

    public void processElement(StreamRecord<T> element) throws Exception {
        this.invokeInternal(element.getValue(), element.getTimestamp());
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase)this.userFunction;
        internalProducer.initializeState(context);
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase)this.userFunction;
        internalProducer.snapshotState(context);
    }

    public static class FlinkKafkaProducer010Configuration<T>
    extends DataStreamSink<T> {
        private final FlinkKafkaProducerBase wrappedProducerBase;
        private final FlinkKafkaProducer010 producer;

        private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
            super(stream, producer);
            this.producer = producer;
            this.wrappedProducerBase = (FlinkKafkaProducerBase)((FlinkKafkaProducer010)producer).userFunction;
        }

        public void setLogFailuresOnly(boolean logFailuresOnly) {
            this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
        }

        public void setFlushOnCheckpoint(boolean flush) {
            this.wrappedProducerBase.setFlushOnCheckpoint(flush);
        }

        public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
            this.producer.writeTimestampToKafka = writeTimestampToKafka;
        }
    }
}

