package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.common.util.Pair;
import com.google.common.collect.Sets;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kafka/AbstractExactlyOnceKafkaOutputOperator.class */
public abstract class AbstractExactlyOnceKafkaOutputOperator<T, K, V> extends AbstractKafkaOutputOperator<K, V> {
    private Map<Integer, Pair<byte[], byte[]>> lastMsgs;
    private transient Partitioner partitioner;
    private transient int partitionNum = 1;
    public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() { // from class: com.datatorrent.contrib.kafka.AbstractExactlyOnceKafkaOutputOperator.1
        public void process(T t) {
            Pair<K, V> tupleToKeyValue = AbstractExactlyOnceKafkaOutputOperator.this.tupleToKeyValue(t);
            int i = 0;
            if (AbstractExactlyOnceKafkaOutputOperator.this.partitioner != null) {
                i = AbstractExactlyOnceKafkaOutputOperator.this.partitioner.partition(tupleToKeyValue.first, AbstractExactlyOnceKafkaOutputOperator.this.partitionNum);
            }
            Pair<byte[], byte[]> pair = (Pair) AbstractExactlyOnceKafkaOutputOperator.this.lastMsgs.get(Integer.valueOf(i));
            if (pair != null && AbstractExactlyOnceKafkaOutputOperator.this.compareToLastMsg(tupleToKeyValue, pair) <= 0) {
                AbstractExactlyOnceKafkaOutputOperator.logger.debug("Ingore tuple " + t);
                return;
            }
            AbstractExactlyOnceKafkaOutputOperator.this.getProducer().send(new KeyedMessage(AbstractExactlyOnceKafkaOutputOperator.this.getTopic(), tupleToKeyValue.first, tupleToKeyValue.second));
            AbstractExactlyOnceKafkaOutputOperator.this.sendCount++;
        }
    };
    private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class);

    @Override // com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator
    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        try {
            String str = (String) getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_PARTITIONER);
            if (str != null) {
                this.partitioner = (Partitioner) Class.forName(str).newInstance();
            }
            initializeLastProcessingOffset();
        } catch (Exception e) {
            throw new RuntimeException("Failed to initialize partitioner", e);
        }
    }

    private void initializeLastProcessingOffset() {
        TopicMetadata topicMetadata = KafkaMetadataUtil.getTopicMetadata(Sets.newHashSet(new String[]{(String) getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)}), getTopic());
        if (topicMetadata == null) {
            throw new RuntimeException("Failed to retrieve topic metadata");
        }
        this.partitionNum = topicMetadata.partitionsMetadata().size();
        this.lastMsgs = new HashMap(this.partitionNum);
        for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
            String host = partitionMetadata.leader().host();
            int port = partitionMetadata.leader().port();
            String str = getClass().getName().replace('$', '.') + "_Client_" + topicMetadata.topic() + "_" + partitionMetadata.partitionId();
            SimpleConsumer simpleConsumer = new SimpleConsumer(host, port, 100000, 65536, str);
            Iterator it = simpleConsumer.fetch(new FetchRequestBuilder().clientId(str).addFetch(topicMetadata.topic(), partitionMetadata.partitionId(), KafkaMetadataUtil.getLastOffset(simpleConsumer, topicMetadata.topic(), partitionMetadata.partitionId(), OffsetRequest.LatestTime(), str) - 1, 100000).build()).messageSet(topicMetadata.topic(), partitionMetadata.partitionId()).iterator();
            while (it.hasNext()) {
                Message message = ((MessageAndOffset) it.next()).message();
                ByteBuffer payload = message.payload();
                ByteBuffer key = message.key();
                byte[] bArr = new byte[payload.limit()];
                byte[] bArr2 = new byte[key.limit()];
                payload.get(bArr);
                key.get(bArr2);
                this.lastMsgs.put(Integer.valueOf(partitionMetadata.partitionId()), new Pair<>(bArr2, bArr));
            }
        }
    }

    protected abstract int compareToLastMsg(Pair<K, V> pair, Pair<byte[], byte[]> pair2);

    protected abstract Pair<K, V> tupleToKeyValue(T t);
}
