package gobblin.kafka.client;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import gobblin.configuration.ConfigurationKeys;
import gobblin.source.extractor.extract.kafka.KafkaTopic;
import gobblin.util.ConfigUtils;
import gobblin.util.DatasetFilterUtils;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;

/* loaded from: input_file:WEB-INF/lib/gobblin-kafka-common-0.11.0.jar:gobblin/kafka/client/AbstractBaseKafkaConsumerClient.class */
public abstract class AbstractBaseKafkaConsumerClient implements GobblinKafkaConsumerClient {
    public static final String CONFIG_PREFIX = "source.kafka.";
    public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = "source.kafka.fetchTimeoutMillis";
    public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000;
    public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = "source.kafka.fetchMinBytes";
    private static final int CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES_DEFAULT = 1024;
    public static final String CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE = "source.kafka.socketTimeoutMillis";
    public static final int CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT = 30000;
    protected final List<String> brokers;
    protected final int fetchTimeoutMillis;
    protected final int fetchMinBytes;
    protected final int socketTimeoutMillis;

    public AbstractBaseKafkaConsumerClient(Config config) {
        this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
        if (this.brokers.isEmpty()) {
            throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
        }
        this.socketTimeoutMillis = ConfigUtils.getInt(config, CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE, 30000).intValue();
        this.fetchTimeoutMillis = ConfigUtils.getInt(config, CONFIG_KAFKA_FETCH_TIMEOUT_VALUE, 1000).intValue();
        this.fetchMinBytes = ConfigUtils.getInt(config, CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES, 1024).intValue();
        Preconditions.checkArgument(this.fetchTimeoutMillis < this.socketTimeoutMillis, "Kafka Source configuration error: FetchTimeout " + this.fetchTimeoutMillis + " must be smaller than SocketTimeout " + this.socketTimeoutMillis);
    }

    @Override // gobblin.kafka.client.GobblinKafkaConsumerClient
    public List<KafkaTopic> getFilteredTopics(final List<Pattern> list, final List<Pattern> list2) {
        return Lists.newArrayList(Iterables.filter(getTopics(), new Predicate<KafkaTopic>() { // from class: gobblin.kafka.client.AbstractBaseKafkaConsumerClient.1
            @Override // com.google.common.base.Predicate
            public boolean apply(@Nonnull KafkaTopic kafkaTopic) {
                return DatasetFilterUtils.survived(kafkaTopic.getName(), list, list2);
            }
        }));
    }

    public abstract List<KafkaTopic> getTopics();
}
