package org.apache.gobblin.kafka.writer;

import com.google.common.base.Throwables;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.configuration.ConfigurationException;
import org.apache.gobblin.writer.WriteCallback;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.gobblin.writer.WriteResponseFuture;
import org.apache.gobblin.writer.WriteResponseMapper;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/kafka/writer/Kafka08DataWriter.class */
public class Kafka08DataWriter<K, V> implements KafkaDataWriter<K, V> {
    private static final Logger log = LoggerFactory.getLogger(Kafka08DataWriter.class);
    public static final WriteResponseMapper<RecordMetadata> WRITE_RESPONSE_WRAPPER = new WriteResponseMapper<RecordMetadata>() { // from class: org.apache.gobblin.kafka.writer.Kafka08DataWriter.1
        public WriteResponse wrap(final RecordMetadata recordMetadata) {
            return new WriteResponse<RecordMetadata>() { // from class: org.apache.gobblin.kafka.writer.Kafka08DataWriter.1.1
                /* renamed from: getRawResponse, reason: merged with bridge method [inline-methods] */
                public RecordMetadata m6getRawResponse() {
                    return recordMetadata;
                }

                public String getStringResponse() {
                    return recordMetadata.toString();
                }

                public long bytesWritten() {
                    return -1L;
                }
            };
        }
    };
    private final Producer<K, V> producer;
    private final String topic;
    private final KafkaWriterCommonConfig commonConfig;

    public static Producer getKafkaProducer(Properties properties) {
        Object kafkaProducer = KafkaWriterHelper.getKafkaProducer(properties);
        try {
            return (Producer) kafkaProducer;
        } catch (ClassCastException e) {
            log.error("Failed to instantiate Kafka producer " + kafkaProducer.getClass().getName() + " as instance of Producer.class", e);
            throw Throwables.propagate(e);
        }
    }

    public Kafka08DataWriter(Properties properties) throws ConfigurationException {
        this(getKafkaProducer(properties), ConfigFactory.parseProperties(properties));
    }

    public Kafka08DataWriter(Producer producer, Config config) throws ConfigurationException {
        this.topic = config.getString("writer.kafka.topic");
        this.producer = producer;
        this.commonConfig = new KafkaWriterCommonConfig(config);
    }

    public void close() throws IOException {
        log.debug("Close called");
        this.producer.close();
    }

    public Future<WriteResponse> write(V v, WriteCallback writeCallback) {
        try {
            return write((Pair) KafkaWriterHelper.getKeyValuePair(v, this.commonConfig), writeCallback);
        } catch (Exception e) {
            throw new RuntimeException("Failed to generate write request", e);
        }
    }

    public Future<WriteResponse> write(Pair<K, V> pair, final WriteCallback writeCallback) {
        try {
            return new WriteResponseFuture(this.producer.send(new ProducerRecord(this.topic, pair.getKey(), pair.getValue()), new Callback() { // from class: org.apache.gobblin.kafka.writer.Kafka08DataWriter.2
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        writeCallback.onFailure(exc);
                    } else {
                        writeCallback.onSuccess(Kafka08DataWriter.WRITE_RESPONSE_WRAPPER.wrap(recordMetadata));
                    }
                }
            }), WRITE_RESPONSE_WRAPPER);
        } catch (Exception e) {
            throw new RuntimeException("Failed to create a Kafka write request", e);
        }
    }

    public void flush() throws IOException {
    }
}
