package co.cask.cdap.data2.metadata.publisher;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.codec.NamespacedIdCodec;
import co.cask.cdap.proto.metadata.MetadataChangeRecord;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Inject;
import java.nio.ByteBuffer;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.twill.internal.kafka.client.ByteBufferEncoder;
import org.apache.twill.internal.kafka.client.IntegerEncoder;
import org.apache.twill.internal.kafka.client.IntegerPartitioner;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/metadata/publisher/KafkaMetadataChangePublisher.class */
public class KafkaMetadataChangePublisher implements MetadataChangePublisher {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMetadataChangePublisher.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Id.NamespacedId.class, new NamespacedIdCodec()).create();
    private final KafkaPublisher.Ack ack = KafkaPublisher.Ack.FIRE_AND_FORGET;
    private final Supplier<Producer<Integer, ByteBuffer>> producer = Suppliers.memoize(new Supplier<Producer<Integer, ByteBuffer>>() { // from class: co.cask.cdap.data2.metadata.publisher.KafkaMetadataChangePublisher.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Producer<Integer, ByteBuffer> m155get() {
            Properties properties = new Properties();
            properties.put("metadata.broker.list", KafkaMetadataChangePublisher.this.brokerList);
            properties.put("serializer.class", ByteBufferEncoder.class.getName());
            properties.put("key.serializer.class", IntegerEncoder.class.getName());
            properties.put("partitioner.class", IntegerPartitioner.class.getName());
            properties.put("request.required.acks", Integer.toString(KafkaMetadataChangePublisher.this.ack.getAck()));
            properties.put("compression.codec", Compression.SNAPPY.getCodec());
            return new Producer<>(new ProducerConfig(properties));
        }
    });
    private final String topic;
    private final String brokerList;

    @Inject
    KafkaMetadataChangePublisher(CConfiguration cConfiguration) {
        this.topic = cConfiguration.get("metadata.updates.kafka.topic");
        this.brokerList = cConfiguration.get("metadata.updates.kafka.broker.list");
    }

    @Override // co.cask.cdap.data2.metadata.publisher.MetadataChangePublisher
    public void publish(MetadataChangeRecord metadataChangeRecord) {
        byte[] bytes = Bytes.toBytes(GSON.toJson(metadataChangeRecord));
        try {
            ((Producer) this.producer.get()).send(new KeyedMessage(this.topic, Integer.valueOf(Math.abs(metadataChangeRecord.getPrevious().getEntityId().hashCode())), ByteBuffer.wrap(bytes)));
        } catch (Exception e) {
            LOG.error("Failed to send message to topic {} with broker list {}", new Object[]{this.topic, this.brokerList, e});
        }
    }
}
