package gobblin.source.extractor.extract.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import gobblin.configuration.State;
import gobblin.source.extractor.extract.kafka.KafkaPartition;
import gobblin.util.ConfigUtils;
import gobblin.util.DatasetFilterUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import kafka.api.OffsetRequest;
import kafka.api.PartitionFetchInfo;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchRequest;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
/* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaWrapper.class */
public class KafkaWrapper implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaWrapper.class);
    private static final String USE_NEW_KAFKA_API = "use.new.kafka.api";
    private static final boolean DEFAULT_USE_NEW_KAFKA_API = false;
    private final List<String> brokers;
    private final KafkaAPI kafkaAPI;
    private final boolean useNewKafkaAPI;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaWrapper$Builder.class */
    public static class Builder {
        private boolean useNewKafkaAPI;
        private List<String> brokers;
        private Config config;

        private Builder() {
            this.useNewKafkaAPI = false;
            this.brokers = Lists.newArrayList();
            this.config = ConfigFactory.empty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Builder withNewKafkaAPI() {
            this.useNewKafkaAPI = true;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Builder withBrokers(List<String> list) {
            for (String str : list) {
                Preconditions.checkArgument(str.matches(".+:\\d+"), String.format("Invalid broker: %s. Must be in the format of address:port.", str));
            }
            this.brokers = Lists.newArrayList(list);
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Builder withConfig(Config config) {
            this.config = config;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public KafkaWrapper build() {
            Preconditions.checkArgument(!this.brokers.isEmpty(), "Need to specify at least one Kafka broker.");
            return new KafkaWrapper(this);
        }
    }

    @Deprecated
    /* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaWrapper$KafkaAPI.class */
    private abstract class KafkaAPI implements Closeable {
        protected KafkaAPI(Config config) {
        }

        protected abstract List<KafkaTopic> getFilteredTopics(List<Pattern> list, List<Pattern> list2);

        protected abstract long getEarliestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException;

        protected abstract long getLatestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException;

        protected abstract Iterator<MessageAndOffset> fetchNextMessageBuffer(KafkaPartition kafkaPartition, long j, long j2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Deprecated
    /* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaWrapper$KafkaNewAPI.class */
    public class KafkaNewAPI extends KafkaAPI {
        protected KafkaNewAPI(Config config) {
            super(config);
        }

        @Override // gobblin.source.extractor.extract.kafka.KafkaWrapper.KafkaAPI
        public List<KafkaTopic> getFilteredTopics(List<Pattern> list, List<Pattern> list2) {
            throw new NotImplementedException("kafka new API has not been implemented");
        }

        @Override // gobblin.source.extractor.extract.kafka.KafkaWrapper.KafkaAPI
        protected long getEarliestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException {
            throw new NotImplementedException("kafka new API has not been implemented");
        }

        @Override // gobblin.source.extractor.extract.kafka.KafkaWrapper.KafkaAPI
        protected long getLatestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException {
            throw new NotImplementedException("kafka new API has not been implemented");
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            throw new NotImplementedException("kafka new API has not been implemented");
        }

        @Override // gobblin.source.extractor.extract.kafka.KafkaWrapper.KafkaAPI
        protected Iterator<MessageAndOffset> fetchNextMessageBuffer(KafkaPartition kafkaPartition, long j, long j2) {
            throw new NotImplementedException("kafka new API has not been implemented");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Deprecated
    /* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaWrapper$KafkaOldAPI.class */
    public class KafkaOldAPI extends KafkaAPI {
        public static final String CONFIG_PREFIX = "source.kafka.";
        public static final String CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE = "source.kafka.socketTimeoutMillis";
        public static final int CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT = 30000;
        public static final String CONFIG_KAFKA_BUFFER_SIZE_BYTES = "source.kafka.bufferSizeBytes";
        public static final int CONFIG_KAFKA_BUFFER_SIZE_BYTES_DEFAULT = 1048576;
        public static final String CONFIG_KAFKA_CLIENT_NAME = "source.kafka.clientName";
        public static final String CONFIG_KAFKA_CLIENT_NAME_DEFAULT = "gobblin-kafka";
        public static final String CONFIG_KAFKA_FETCH_REQUEST_CORRELATION_ID = "source.kafka.fetchCorrelationId";
        private static final int CONFIG_KAFKA_FETCH_REQUEST_CORRELATION_ID_DEFAULT = -1;
        public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = "source.kafka.fetchTimeoutMillis";
        public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000;
        public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = "source.kafka.fetchMinBytes";
        private static final int CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES_DEFAULT = 1024;
        public static final String CONFIG_KAFKA_FETCH_TOPIC_NUM_TRIES = "source.kafka.fetchTopicNumTries";
        private static final int CONFIG_KAFKA_FETCH_TOPIC_NUM_TRIES_DEFAULT = 3;
        public static final String CONFIG_KAFKA_FETCH_OFFSET_NUM_TRIES = "source.kafka.fetchOffsetNumTries";
        private static final int CONFIG_KAFKA_FETCH_OFFSET_NUM_TRIES_DEFAULT = 3;
        private final int socketTimeoutMillis;
        private final int bufferSize;
        private final String clientName;
        private final int fetchCorrelationId;
        private final int fetchTimeoutMillis;
        private final int fetchMinBytes;
        private final int fetchTopicRetries;
        private final int fetchOffsetRetries;
        private final ConcurrentMap<String, SimpleConsumer> activeConsumers;

        protected KafkaOldAPI(Config config) {
            super(config);
            this.activeConsumers = Maps.newConcurrentMap();
            this.socketTimeoutMillis = ConfigUtils.getInt(config, CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE, Integer.valueOf(CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT)).intValue();
            this.bufferSize = ConfigUtils.getInt(config, "source.kafka.bufferSizeBytes", 1048576).intValue();
            this.clientName = ConfigUtils.getString(config, "source.kafka.clientName", "gobblin-kafka");
            this.fetchCorrelationId = ConfigUtils.getInt(config, "source.kafka.fetchCorrelationId", Integer.valueOf(CONFIG_KAFKA_FETCH_REQUEST_CORRELATION_ID_DEFAULT)).intValue();
            this.fetchTimeoutMillis = ConfigUtils.getInt(config, CONFIG_KAFKA_FETCH_TIMEOUT_VALUE, Integer.valueOf(CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT)).intValue();
            this.fetchMinBytes = ConfigUtils.getInt(config, CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES, Integer.valueOf(CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES_DEFAULT)).intValue();
            this.fetchTopicRetries = ConfigUtils.getInt(config, "source.kafka.fetchTopicNumTries", 3).intValue();
            this.fetchOffsetRetries = ConfigUtils.getInt(config, "source.kafka.fetchOffsetNumTries", 3).intValue();
            Preconditions.checkArgument(this.fetchTimeoutMillis < this.socketTimeoutMillis, "Kafka Source configuration error: FetchTimeout " + this.fetchTimeoutMillis + " must be smaller than SocketTimeout " + this.socketTimeoutMillis);
        }

        @Override // gobblin.source.extractor.extract.kafka.KafkaWrapper.KafkaAPI
        public List<KafkaTopic> getFilteredTopics(List<Pattern> list, List<Pattern> list2) {
            List<TopicMetadata> filteredMetadataList = getFilteredMetadataList(list, list2);
            ArrayList newArrayList = Lists.newArrayList();
            for (TopicMetadata topicMetadata : filteredMetadataList) {
                newArrayList.add(new KafkaTopic(topicMetadata.topic(), getPartitionsForTopic(topicMetadata)));
            }
            return newArrayList;
        }

        private List<KafkaPartition> getPartitionsForTopic(TopicMetadata topicMetadata) {
            ArrayList newArrayList = Lists.newArrayList();
            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                if (KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API == partitionMetadata) {
                    KafkaWrapper.LOG.error("Ignoring topic with null partition metadata " + topicMetadata.topic());
                    return Collections.emptyList();
                }
                if (KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API == partitionMetadata.leader()) {
                    KafkaWrapper.LOG.error("Ignoring topic with null partition leader " + topicMetadata.topic() + " metatada=" + partitionMetadata);
                    return Collections.emptyList();
                }
                newArrayList.add(new KafkaPartition.Builder().withId(partitionMetadata.partitionId()).withTopicName(topicMetadata.topic()).withLeaderId(partitionMetadata.leader().id()).withLeaderHostAndPort(partitionMetadata.leader().host(), partitionMetadata.leader().port()).build());
            }
            return newArrayList;
        }

        private List<TopicMetadata> getFilteredMetadataList(List<Pattern> list, List<Pattern> list2) {
            Lists.newArrayList();
            Iterator<String> it = KafkaWrapper.this.getBrokers().iterator();
            while (it.hasNext()) {
                List<TopicMetadata> fetchTopicMetadataFromBroker = fetchTopicMetadataFromBroker(it.next(), list, list2);
                if (fetchTopicMetadataFromBroker != null) {
                    return fetchTopicMetadataFromBroker;
                }
            }
            throw new RuntimeException("Fetching topic metadata from all brokers failed. See log warning for more information.");
        }

        private List<TopicMetadata> fetchTopicMetadataFromBroker(String str, List<Pattern> list, List<Pattern> list2) {
            List<TopicMetadata> fetchTopicMetadataFromBroker = fetchTopicMetadataFromBroker(str, new String[KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API]);
            if (fetchTopicMetadataFromBroker == null) {
                return null;
            }
            ArrayList newArrayList = Lists.newArrayList();
            for (TopicMetadata topicMetadata : fetchTopicMetadataFromBroker) {
                if (DatasetFilterUtils.survived(topicMetadata.topic(), list, list2)) {
                    newArrayList.add(topicMetadata);
                }
            }
            return newArrayList;
        }

        private List<TopicMetadata> fetchTopicMetadataFromBroker(String str, String... strArr) {
            KafkaWrapper.LOG.info(String.format("Fetching topic metadata from broker %s", str));
            SimpleConsumer simpleConsumer = KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API;
            try {
                simpleConsumer = getSimpleConsumer(str);
                for (int i = KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API; i < this.fetchTopicRetries; i++) {
                    try {
                        List<TopicMetadata> list = simpleConsumer.send(new TopicMetadataRequest(Arrays.asList(strArr))).topicsMetadata();
                        if (simpleConsumer != null) {
                            simpleConsumer.close();
                        }
                        return list;
                    } catch (Exception e) {
                        KafkaWrapper.LOG.warn(String.format("Fetching topic metadata from broker %s has failed %d times.", str, Integer.valueOf(i + 1)), e);
                        try {
                            Thread.sleep((long) ((i + Math.random()) * 1000.0d));
                        } catch (InterruptedException e2) {
                            KafkaWrapper.LOG.warn("Caught InterruptedException: " + e2);
                        }
                    }
                }
                if (simpleConsumer == null) {
                    return null;
                }
                simpleConsumer.close();
                return null;
            } catch (Throwable th) {
                if (simpleConsumer != null) {
                    simpleConsumer.close();
                }
                throw th;
            }
        }

        private SimpleConsumer getSimpleConsumer(String str) {
            if (this.activeConsumers.containsKey(str)) {
                return this.activeConsumers.get(str);
            }
            SimpleConsumer createSimpleConsumer = createSimpleConsumer(str);
            this.activeConsumers.putIfAbsent(str, createSimpleConsumer);
            return createSimpleConsumer;
        }

        private SimpleConsumer getSimpleConsumer(HostAndPort hostAndPort) {
            return getSimpleConsumer(hostAndPort.toString());
        }

        private SimpleConsumer createSimpleConsumer(String str) {
            List splitToList = Splitter.on(':').trimResults().omitEmptyStrings().splitToList(str);
            return createSimpleConsumer((String) splitToList.get(KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API), Integer.parseInt((String) splitToList.get(1)));
        }

        private SimpleConsumer createSimpleConsumer(String str, int i) {
            return new SimpleConsumer(str, i, this.socketTimeoutMillis, this.bufferSize, this.clientName);
        }

        @Override // gobblin.source.extractor.extract.kafka.KafkaWrapper.KafkaAPI
        protected long getEarliestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException {
            return getOffset(kafkaPartition, Collections.singletonMap(new TopicAndPartition(kafkaPartition.getTopicName(), kafkaPartition.getId()), new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1)));
        }

        @Override // gobblin.source.extractor.extract.kafka.KafkaWrapper.KafkaAPI
        protected long getLatestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException {
            return getOffset(kafkaPartition, Collections.singletonMap(new TopicAndPartition(kafkaPartition.getTopicName(), kafkaPartition.getId()), new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1)));
        }

        private long getOffset(KafkaPartition kafkaPartition, Map<TopicAndPartition, PartitionOffsetRequestInfo> map) throws KafkaOffsetRetrievalFailureException {
            SimpleConsumer simpleConsumer = getSimpleConsumer(kafkaPartition.getLeader().getHostAndPort());
            for (int i = KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API; i < this.fetchOffsetRetries; i++) {
                try {
                    OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(map, OffsetRequest.CurrentVersion(), this.clientName));
                    if (offsetsBefore.hasError()) {
                        throw new RuntimeException("offsetReponse has error: " + ((int) offsetsBefore.errorCode(kafkaPartition.getTopicName(), kafkaPartition.getId())));
                    }
                    return offsetsBefore.offsets(kafkaPartition.getTopicName(), kafkaPartition.getId())[KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API];
                } catch (Exception e) {
                    KafkaWrapper.LOG.warn(String.format("Fetching offset for partition %s has failed %d time(s). Reason: %s", kafkaPartition, Integer.valueOf(i + 1), e));
                    if (i < this.fetchOffsetRetries - 1) {
                        try {
                            Thread.sleep((long) ((i + Math.random()) * 1000.0d));
                        } catch (InterruptedException e2) {
                            KafkaWrapper.LOG.error("Caught interrupted exception between retries of getting latest offsets. " + e2);
                        }
                    }
                }
            }
            throw new KafkaOffsetRetrievalFailureException(String.format("Fetching offset for partition %s has failed.", kafkaPartition));
        }

        @Override // gobblin.source.extractor.extract.kafka.KafkaWrapper.KafkaAPI
        protected Iterator<MessageAndOffset> fetchNextMessageBuffer(KafkaPartition kafkaPartition, long j, long j2) {
            if (j > j2) {
                return null;
            }
            FetchRequest createFetchRequest = createFetchRequest(kafkaPartition, j);
            try {
                return getIteratorFromFetchResponse(getFetchResponseForFetchRequest(createFetchRequest, kafkaPartition), kafkaPartition);
            } catch (Exception e) {
                KafkaWrapper.LOG.warn(String.format("Fetch message buffer for partition %s has failed: %s. Will refresh topic metadata and retry", kafkaPartition, e));
                return refreshTopicMetadataAndRetryFetch(kafkaPartition, createFetchRequest);
            }
        }

        private synchronized FetchResponse getFetchResponseForFetchRequest(FetchRequest fetchRequest, KafkaPartition kafkaPartition) {
            FetchResponse fetch = getSimpleConsumer(kafkaPartition.getLeader().getHostAndPort()).fetch(fetchRequest);
            if (fetch.hasError()) {
                throw new RuntimeException(String.format("error code %d", Short.valueOf(fetch.errorCode(kafkaPartition.getTopicName(), kafkaPartition.getId()))));
            }
            return fetch;
        }

        private Iterator<MessageAndOffset> getIteratorFromFetchResponse(FetchResponse fetchResponse, KafkaPartition kafkaPartition) {
            try {
                return fetchResponse.messageSet(kafkaPartition.getTopicName(), kafkaPartition.getId()).iterator();
            } catch (Exception e) {
                KafkaWrapper.LOG.warn(String.format("Failed to retrieve next message buffer for partition %s: %s.The remainder of this partition will be skipped.", kafkaPartition, e));
                return null;
            }
        }

        private Iterator<MessageAndOffset> refreshTopicMetadataAndRetryFetch(KafkaPartition kafkaPartition, FetchRequest fetchRequest) {
            try {
                refreshTopicMetadata(kafkaPartition);
                return getIteratorFromFetchResponse(getFetchResponseForFetchRequest(fetchRequest, kafkaPartition), kafkaPartition);
            } catch (Exception e) {
                KafkaWrapper.LOG.warn(String.format("Fetch message buffer for partition %s has failed: %s. This partition will be skipped.", kafkaPartition, e));
                return null;
            }
        }

        private void refreshTopicMetadata(KafkaPartition kafkaPartition) {
            Iterator<String> it = KafkaWrapper.this.getBrokers().iterator();
            while (it.hasNext()) {
                List<TopicMetadata> fetchTopicMetadataFromBroker = fetchTopicMetadataFromBroker(it.next(), kafkaPartition.getTopicName());
                if (fetchTopicMetadataFromBroker != null && !fetchTopicMetadataFromBroker.isEmpty()) {
                    for (PartitionMetadata partitionMetadata : fetchTopicMetadataFromBroker.get(KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API).partitionsMetadata()) {
                        if (partitionMetadata.partitionId() == kafkaPartition.getId()) {
                            kafkaPartition.setLeader(partitionMetadata.leader().id(), partitionMetadata.leader().host(), partitionMetadata.leader().port());
                            return;
                        }
                    }
                    return;
                }
            }
        }

        private FetchRequest createFetchRequest(KafkaPartition kafkaPartition, long j) {
            return new FetchRequest(this.fetchCorrelationId, this.clientName, this.fetchTimeoutMillis, this.fetchMinBytes, Collections.singletonMap(new TopicAndPartition(kafkaPartition.getTopicName(), kafkaPartition.getId()), new PartitionFetchInfo(j, this.bufferSize)));
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            int i = KafkaWrapper.DEFAULT_USE_NEW_KAFKA_API;
            for (SimpleConsumer simpleConsumer : this.activeConsumers.values()) {
                if (simpleConsumer != null) {
                    try {
                        simpleConsumer.close();
                    } catch (Exception e) {
                        KafkaWrapper.LOG.warn(String.format("Failed to close Kafka Consumer %s:%d", simpleConsumer.host(), Integer.valueOf(simpleConsumer.port())));
                        i++;
                    }
                }
            }
            this.activeConsumers.clear();
            if (i > 0) {
                throw new IOException(i + " consumer(s) failed to close.");
            }
        }
    }

    private KafkaWrapper(Builder builder) {
        this.useNewKafkaAPI = builder.useNewKafkaAPI;
        this.brokers = builder.brokers;
        this.kafkaAPI = getKafkaAPI(builder.config);
    }

    public static KafkaWrapper create(State state) {
        Preconditions.checkNotNull(state.getProp("kafka.brokers"), "Need to specify at least one Kafka broker.");
        Builder builder = new Builder();
        if (state.getPropAsBoolean(USE_NEW_KAFKA_API, false)) {
            builder = builder.withNewKafkaAPI();
        }
        return builder.withBrokers(state.getPropAsList("kafka.brokers")).withConfig(ConfigUtils.propertiesToConfig(state.getProperties())).build();
    }

    public List<String> getBrokers() {
        return this.brokers;
    }

    public List<KafkaTopic> getFilteredTopics(List<Pattern> list, List<Pattern> list2) {
        return this.kafkaAPI.getFilteredTopics(list, list2);
    }

    public long getEarliestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException {
        return this.kafkaAPI.getEarliestOffset(kafkaPartition);
    }

    public long getLatestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException {
        return this.kafkaAPI.getLatestOffset(kafkaPartition);
    }

    public Iterator<MessageAndOffset> fetchNextMessageBuffer(KafkaPartition kafkaPartition, long j, long j2) {
        return this.kafkaAPI.fetchNextMessageBuffer(kafkaPartition, j, j2);
    }

    private KafkaAPI getKafkaAPI(Config config) {
        return this.useNewKafkaAPI ? new KafkaNewAPI(config) : new KafkaOldAPI(config);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.kafkaAPI.close();
    }
}
