package co.cask.cdap.etl.realtime.kafka;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.realtime.RealtimeContext;
import co.cask.cdap.etl.realtime.source.KafkaSource;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.Futures;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.twill.internal.kafka.client.ZKBrokerService;
import org.apache.twill.kafka.client.BrokerInfo;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.TopicPartition;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/etl/realtime/kafka/Kafka08SimpleApiConsumer.class */
public class Kafka08SimpleApiConsumer extends KafkaSimpleApiConsumer<String, ByteBuffer, Long> {
    private static final Logger LOG = LoggerFactory.getLogger(Kafka08SimpleApiConsumer.class);
    private ZKClientService zkClient;
    private BrokerService brokerService;
    private Cache<TopicPartition, SimpleConsumer> kafkaConsumers;

    public Kafka08SimpleApiConsumer(KafkaSource kafkaSource) {
        super(kafkaSource);
    }

    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    protected void configureKafka(KafkaConfigurer kafkaConfigurer) {
        KafkaSource.KafkaPluginConfig config = this.kafkaSource.getConfig();
        Preconditions.checkNotNull(config, "Could not have Kafka source plugin config to be null.");
        String zkConnect = config.getZkConnect();
        String kafkaBrokers = config.getKafkaBrokers();
        Preconditions.checkState((zkConnect == null && kafkaBrokers == null) ? false : true);
        if (zkConnect != null) {
            kafkaConfigurer.setZooKeeper(zkConnect);
        } else {
            kafkaConfigurer.setBrokers(kafkaBrokers);
        }
        setupTopicPartitions(kafkaConfigurer, config);
    }

    private void setupTopicPartitions(KafkaConsumerConfigurer kafkaConsumerConfigurer, KafkaSource.KafkaPluginConfig kafkaPluginConfig) {
        int intValue = kafkaPluginConfig.getPartitions().intValue();
        int instanceId = getContext().getInstanceId();
        int instanceCount = getContext().getInstanceCount();
        String topic = kafkaPluginConfig.getTopic();
        for (int i = 0; i < intValue; i++) {
            if (i % instanceCount == instanceId) {
                kafkaConsumerConfigurer.addTopicPartition(topic, i);
            }
        }
    }

    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    protected Iterator<KafkaMessage<Long>> readMessages(KafkaConsumerInfo<Long> kafkaConsumerInfo) throws Exception {
        final TopicPartition topicPartition = kafkaConsumerInfo.getTopicPartition();
        String topic = topicPartition.getTopic();
        int partition = topicPartition.getPartition();
        SimpleConsumer consumer = getConsumer(kafkaConsumerInfo);
        if (consumer == null) {
            return Iterators.emptyIterator();
        }
        long longValue = kafkaConsumerInfo.getReadOffset().longValue();
        if (longValue < 0) {
            longValue = getReadOffset(consumer, topic, partition, longValue);
            kafkaConsumerInfo.setReadOffset(Long.valueOf(longValue));
        }
        FetchResponse fetch = consumer.fetch(new FetchRequestBuilder().clientId(consumer.clientId()).addFetch(topic, partition, longValue, kafkaConsumerInfo.getFetchSize()).build());
        if (fetch.hasError()) {
            handleFetchError(kafkaConsumerInfo, consumer, longValue, fetch.errorCode(topic, partition));
            return Iterators.emptyIterator();
        }
        final long j = longValue;
        final Iterator it = fetch.messageSet(topic, partition).iterator();
        return new AbstractIterator<KafkaMessage<Long>>() { // from class: co.cask.cdap.etl.realtime.kafka.Kafka08SimpleApiConsumer.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public KafkaMessage<Long> m20computeNext() {
                while (it.hasNext()) {
                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                    if (messageAndOffset.offset() >= j) {
                        Message message = messageAndOffset.message();
                        return new KafkaMessage<>(topicPartition, Long.valueOf(messageAndOffset.nextOffset()), message.key(), message.payload());
                    }
                }
                return (KafkaMessage) endOfData();
            }
        };
    }

    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    public void initialize(RealtimeContext realtimeContext) throws Exception {
        super.initialize(realtimeContext);
        String zookeeper = getKafkaConfig().getZookeeper();
        if (zookeeper != null) {
            this.zkClient = ZKClientServices.delegate(ZKClients.reWatchOnExpire(ZKClients.retryOnFailure(ZKClientService.Builder.of(zookeeper).build(), RetryStrategies.fixDelay(2L, TimeUnit.SECONDS))));
            this.brokerService = new ZKBrokerService(this.zkClient);
            try {
                Futures.get(this.zkClient.start(), 3L, TimeUnit.SECONDS, TimeoutException.class);
                Futures.get(this.brokerService.start(), 3L, TimeUnit.SECONDS, TimeoutException.class);
            } catch (TimeoutException e) {
                Futures.get(this.brokerService.stop(), 3L, TimeUnit.SECONDS, TimeoutException.class);
                Futures.get(this.zkClient.stop(), 3L, TimeUnit.SECONDS, TimeoutException.class);
                throw new IllegalArgumentException(String.format("Timeout while trying to start ZookeeperClient/Broker Service. Check if the zookeeper connection string %s is correct.", zookeeper), e);
            }
        }
        this.kafkaConsumers = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(60L, TimeUnit.SECONDS).removalListener(consumerCacheRemovalListener()).build();
    }

    private RemovalListener<TopicPartition, SimpleConsumer> consumerCacheRemovalListener() {
        return new RemovalListener<TopicPartition, SimpleConsumer>() { // from class: co.cask.cdap.etl.realtime.kafka.Kafka08SimpleApiConsumer.2
            public void onRemoval(RemovalNotification<TopicPartition, SimpleConsumer> removalNotification) {
                SimpleConsumer simpleConsumer = (SimpleConsumer) removalNotification.getValue();
                if (simpleConsumer == null) {
                    return;
                }
                try {
                    simpleConsumer.close();
                } catch (Throwable th) {
                    Kafka08SimpleApiConsumer.LOG.error("Exception when closing Kafka consumer.", th);
                }
            }
        };
    }

    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    public void destroy() {
        super.destroy();
        if (this.kafkaConsumers != null) {
            this.kafkaConsumers.invalidateAll();
            this.kafkaConsumers.cleanUp();
        }
        if (this.brokerService != null) {
            stopService(this.brokerService);
        }
        if (this.zkClient != null) {
            stopService(this.zkClient);
        }
    }

    /* renamed from: processMessage, reason: avoid collision after fix types in other method */
    protected void processMessage2(String str, ByteBuffer byteBuffer, Emitter<StructuredRecord> emitter) {
        emitter.emit(byteBufferToStructuredRecord(str, byteBuffer));
    }

    /* renamed from: processMessage, reason: avoid collision after fix types in other method */
    protected void processMessage2(ByteBuffer byteBuffer, Emitter<StructuredRecord> emitter) {
        emitter.emit(byteBufferToStructuredRecord(null, byteBuffer));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    public Long getBeginOffset(TopicPartition topicPartition) {
        Map<String, byte[]> offsetStore = getOffsetStore();
        if (offsetStore == null) {
            return Long.valueOf(getDefaultOffset(topicPartition));
        }
        byte[] bArr = offsetStore.get(getStoreKey(topicPartition));
        return (bArr == null || bArr.length != 8) ? Long.valueOf(getDefaultOffset(topicPartition)) : Long.valueOf(Bytes.toLong(bArr));
    }

    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    protected void saveReadOffsets(Map<TopicPartition, Long> map) {
        Map<String, byte[]> offsetStore = getOffsetStore();
        if (offsetStore == null) {
            return;
        }
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            offsetStore.put(getStoreKey(entry.getKey()), Bytes.toBytes(entry.getValue().longValue()));
        }
    }

    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    protected long getDefaultOffset(TopicPartition topicPartition) {
        Long defaultOffset = this.kafkaSource.getConfig().getDefaultOffset();
        return defaultOffset != null ? defaultOffset.longValue() : OffsetRequest.EarliestTime();
    }

    private long getReadOffset(SimpleConsumer simpleConsumer, String str, int i, long j) {
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(ImmutableMap.of(new TopicAndPartition(str, i), new PartitionOffsetRequestInfo(j, 1)), OffsetRequest.CurrentVersion(), simpleConsumer.clientId()));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(str, i)[0];
        }
        LOG.warn("Failed to fetch offset from broker {}:{} for topic-partition {}-{} with error code {}", new Object[]{simpleConsumer.host(), Integer.valueOf(simpleConsumer.port()), str, Integer.valueOf(i), Short.valueOf(offsetsBefore.errorCode(str, i))});
        return 0L;
    }

    @Nullable
    private SimpleConsumer getConsumer(KafkaConsumerInfo<Long> kafkaConsumerInfo) throws Exception {
        TopicPartition topicPartition = kafkaConsumerInfo.getTopicPartition();
        SimpleConsumer simpleConsumer = (SimpleConsumer) this.kafkaConsumers.getIfPresent(topicPartition);
        if (simpleConsumer != null) {
            return simpleConsumer;
        }
        InetSocketAddress leader = getLeader(topicPartition.getTopic(), topicPartition.getPartition());
        if (leader == null) {
            return null;
        }
        SimpleConsumer simpleConsumer2 = new SimpleConsumer(leader.getHostName(), leader.getPort(), 5000, kafkaConsumerInfo.getFetchSize(), String.format("%s-%d-kafka-consumer", getName(), Integer.valueOf(getContext().getInstanceId())));
        this.kafkaConsumers.put(topicPartition, simpleConsumer2);
        return simpleConsumer2;
    }

    private void handleFetchError(KafkaConsumerInfo<Long> kafkaConsumerInfo, SimpleConsumer simpleConsumer, long j, short s) {
        TopicPartition topicPartition = kafkaConsumerInfo.getTopicPartition();
        String topic = topicPartition.getTopic();
        int partition = topicPartition.getPartition();
        LOG.warn("Failed to fetch from broker {}:{} for topic-partition {}-{} with error code {}", new Object[]{simpleConsumer.host(), Integer.valueOf(simpleConsumer.port()), topic, Integer.valueOf(partition), Short.valueOf(s)});
        if (s != ErrorMapping.OffsetOutOfRangeCode()) {
            this.kafkaConsumers.invalidate(topicPartition);
            return;
        }
        long readOffset = getReadOffset(simpleConsumer, topic, partition, OffsetRequest.EarliestTime());
        if (j < readOffset) {
            kafkaConsumerInfo.setReadOffset(Long.valueOf(readOffset));
        } else {
            kafkaConsumerInfo.setReadOffset(Long.valueOf(getReadOffset(simpleConsumer, topic, partition, OffsetRequest.LatestTime())));
        }
    }

    @Nullable
    private InetSocketAddress getLeader(String str, int i) throws Exception {
        if (this.brokerService == null) {
            return findLeader(getKafkaConfig().getBrokers(), str, i);
        }
        BrokerInfo leader = this.brokerService.getLeader(str, i);
        if (leader == null) {
            return null;
        }
        return new InetSocketAddress(leader.getHost(), leader.getPort());
    }

    /* JADX WARN: Finally extract failed */
    @Nullable
    private InetSocketAddress findLeader(String str, String str2, int i) throws Exception {
        for (Map.Entry entry : Splitter.on(',').withKeyValueSeparator(":").split(str).entrySet()) {
            try {
                SimpleConsumer simpleConsumer = new SimpleConsumer((String) entry.getKey(), Integer.parseInt((String) entry.getValue()), 5000, KafkaConsumerConfigurer.DEFAULT_FETCH_SIZE, "leaderLookup");
                try {
                    Iterator it = simpleConsumer.send(new TopicMetadataRequest(ImmutableList.of(str2))).topicsMetadata().iterator();
                    while (it.hasNext()) {
                        for (PartitionMetadata partitionMetadata : ((TopicMetadata) it.next()).partitionsMetadata()) {
                            if (partitionMetadata.partitionId() == i) {
                                Broker leader = partitionMetadata.leader();
                                InetSocketAddress inetSocketAddress = new InetSocketAddress(leader.host(), leader.port());
                                simpleConsumer.close();
                                return inetSocketAddress;
                            }
                        }
                    }
                    simpleConsumer.close();
                } catch (Throwable th) {
                    simpleConsumer.close();
                    throw th;
                }
            } catch (Exception e) {
                throw new Exception(String.format("Failed to communicate with broker %s:%s for leader lookup for topic-partition %s-%s", entry.getKey(), entry.getValue(), str2, Integer.valueOf(i)), e);
            }
        }
        return null;
    }

    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    protected /* bridge */ /* synthetic */ void processMessage(ByteBuffer byteBuffer, Emitter emitter) {
        processMessage2(byteBuffer, (Emitter<StructuredRecord>) emitter);
    }

    @Override // co.cask.cdap.etl.realtime.kafka.KafkaSimpleApiConsumer
    protected /* bridge */ /* synthetic */ void processMessage(String str, ByteBuffer byteBuffer, Emitter emitter) {
        processMessage2(str, byteBuffer, (Emitter<StructuredRecord>) emitter);
    }
}
