package org.apache.twill.internal.kafka.client;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.class */
public final class SimpleKafkaPublisher implements KafkaPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaPublisher.class);
    private final BrokerService brokerService;
    private final KafkaPublisher.Ack ack;
    private final Compression compression;
    private final AtomicReference<Producer<Integer, ByteBuffer>> producer = new AtomicReference<>();
    private final AtomicBoolean listenerCancelled = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/twill/internal/kafka/client/SimpleKafkaPublisher$BrokerListChangeListener.class */
    private static final class BrokerListChangeListener extends BrokerService.BrokerChangeListener {
        private final AtomicBoolean listenerCancelled;
        private final AtomicReference<Producer<Integer, ByteBuffer>> producer;
        private final KafkaPublisher.Ack ack;
        private final Compression compression;
        private String brokerList;

        private BrokerListChangeListener(AtomicBoolean atomicBoolean, AtomicReference<Producer<Integer, ByteBuffer>> atomicReference, KafkaPublisher.Ack ack, Compression compression) {
            this.listenerCancelled = atomicBoolean;
            this.producer = atomicReference;
            this.ack = ack;
            this.compression = compression;
        }

        @Override // org.apache.twill.kafka.client.BrokerService.BrokerChangeListener
        public void changed(BrokerService brokerService) {
            if (this.listenerCancelled.get()) {
                return;
            }
            String brokerList = brokerService.getBrokerList();
            if (brokerList.isEmpty()) {
                SimpleKafkaPublisher.LOG.warn("Broker list is empty. No Kafka producer is created.");
                return;
            }
            if (Objects.equal(this.brokerList, brokerList)) {
                return;
            }
            Properties properties = new Properties();
            properties.put("metadata.broker.list", brokerList);
            properties.put("serializer.class", ByteBufferEncoder.class.getName());
            properties.put("key.serializer.class", IntegerEncoder.class.getName());
            properties.put("partitioner.class", IntegerPartitioner.class.getName());
            properties.put("request.required.acks", Integer.toString(this.ack.getAck()));
            properties.put("compression.codec", this.compression.getCodec());
            Producer<Integer, ByteBuffer> andSet = this.producer.getAndSet(new Producer<>(new ProducerConfig(properties)));
            if (andSet != null) {
                andSet.close();
            }
            SimpleKafkaPublisher.LOG.info("Update Kafka producer broker list: {}", brokerList);
            this.brokerList = brokerList;
        }
    }

    /* loaded from: input_file:org/apache/twill/internal/kafka/client/SimpleKafkaPublisher$ProducerCancellable.class */
    private static final class ProducerCancellable implements Cancellable, Runnable {
        private final ExecutorService executor;
        private final AtomicBoolean listenerCancelled;
        private final Cancellable cancelChangeListener;
        private final AtomicReference<Producer<Integer, ByteBuffer>> producer;

        private ProducerCancellable(ExecutorService executorService, AtomicBoolean atomicBoolean, Cancellable cancellable, AtomicReference<Producer<Integer, ByteBuffer>> atomicReference) {
            this.executor = executorService;
            this.listenerCancelled = atomicBoolean;
            this.cancelChangeListener = cancellable;
            this.producer = atomicReference;
        }

        public void cancel() {
            if (this.listenerCancelled.compareAndSet(false, true)) {
                this.executor.execute(this);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            this.cancelChangeListener.cancel();
            this.producer.get().close();
            this.executor.shutdownNow();
        }
    }

    /* loaded from: input_file:org/apache/twill/internal/kafka/client/SimpleKafkaPublisher$SimplePreparer.class */
    private final class SimplePreparer implements KafkaPublisher.Preparer {
        private final String topic;
        private final List<KeyedMessage<Integer, ByteBuffer>> messages;

        private SimplePreparer(String str) {
            this.topic = str;
            this.messages = Lists.newLinkedList();
        }

        @Override // org.apache.twill.kafka.client.KafkaPublisher.Preparer
        public KafkaPublisher.Preparer add(ByteBuffer byteBuffer, Object obj) {
            this.messages.add(new KeyedMessage<>(this.topic, Integer.valueOf(Math.abs(obj.hashCode())), byteBuffer));
            return this;
        }

        @Override // org.apache.twill.kafka.client.KafkaPublisher.Preparer
        public ListenableFuture<Integer> send() {
            try {
                try {
                    int size = this.messages.size();
                    Producer producer = (Producer) SimpleKafkaPublisher.this.producer.get();
                    if (producer == null) {
                        ListenableFuture<Integer> immediateFailedFuture = Futures.immediateFailedFuture(new IllegalStateException("No kafka producer available."));
                        this.messages.clear();
                        return immediateFailedFuture;
                    }
                    producer.send(this.messages);
                    ListenableFuture<Integer> immediateFuture = Futures.immediateFuture(Integer.valueOf(size));
                    this.messages.clear();
                    return immediateFuture;
                } catch (Exception e) {
                    ListenableFuture<Integer> immediateFailedFuture2 = Futures.immediateFailedFuture(e);
                    this.messages.clear();
                    return immediateFailedFuture2;
                }
            } catch (Throwable th) {
                this.messages.clear();
                throw th;
            }
        }
    }

    public SimpleKafkaPublisher(BrokerService brokerService, KafkaPublisher.Ack ack, Compression compression) {
        this.brokerService = brokerService;
        this.ack = ack;
        this.compression = compression;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Cancellable start() {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("kafka-publisher"));
        final BrokerListChangeListener brokerListChangeListener = new BrokerListChangeListener(this.listenerCancelled, this.producer, this.ack, this.compression);
        Cancellable addChangeListener = this.brokerService.addChangeListener(brokerListChangeListener, newSingleThreadExecutor);
        Futures.getUnchecked(newSingleThreadExecutor.submit(new Runnable() { // from class: org.apache.twill.internal.kafka.client.SimpleKafkaPublisher.1
            @Override // java.lang.Runnable
            public void run() {
                brokerListChangeListener.changed(SimpleKafkaPublisher.this.brokerService);
            }
        }));
        return new ProducerCancellable(newSingleThreadExecutor, this.listenerCancelled, addChangeListener, this.producer);
    }

    @Override // org.apache.twill.kafka.client.KafkaPublisher
    public KafkaPublisher.Preparer prepare(String str) {
        return new SimplePreparer(str);
    }
}
