package org.apache.samza.system.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.admin.AdminClient;
import kafka.utils.ZkUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.system.ExtendedSystemAdmin;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.ScalaJavaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/samza/system/kafka/KafkaSystemAdmin.class */
public class KafkaSystemAdmin implements ExtendedSystemAdmin {
    protected static final double DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER = 2.0d;
    protected static final long DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS = 500;
    protected static final long DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS = 10000;
    protected static final int MAX_RETRIES_ON_EXCEPTION = 5;
    protected static final int DEFAULT_REPL_FACTOR = 2;
    protected final String systemName;
    protected final Consumer metadataConsumer;
    protected final Config config;
    private final Properties coordinatorStreamProperties;
    private final int coordinatorStreamReplicationFactor;
    private final Map<String, ChangelogInfo> changelogTopicMetaInformation;
    private final Map<String, Properties> intermediateStreamProperties;
    protected final boolean deleteCommittedMessages;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemAdmin.class);

    @VisibleForTesting
    public static volatile boolean deleteMessageCalled = false;
    protected AdminClient adminClient = null;
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/system/kafka/KafkaSystemAdmin$OffsetsMaps.class */
    public static class OffsetsMaps {
        private final Map<SystemStreamPartition, String> oldestOffsets;
        private final Map<SystemStreamPartition, String> newestOffsets;
        private final Map<SystemStreamPartition, String> upcomingOffsets;

        private OffsetsMaps(Map<SystemStreamPartition, String> map, Map<SystemStreamPartition, String> map2, Map<SystemStreamPartition, String> map3) {
            this.oldestOffsets = map;
            this.newestOffsets = map2;
            this.upcomingOffsets = map3;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<SystemStreamPartition, String> getOldestOffsets() {
            return this.oldestOffsets;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<SystemStreamPartition, String> getNewestOffsets() {
            return this.newestOffsets;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<SystemStreamPartition, String> getUpcomingOffsets() {
            return this.upcomingOffsets;
        }
    }

    public KafkaSystemAdmin(String str, Config config, Consumer consumer) {
        this.systemName = str;
        this.config = config;
        if (consumer == null) {
            throw new SamzaException("Cannot construct KafkaSystemAdmin for system " + str + " with null metadataConsumer");
        }
        this.metadataConsumer = consumer;
        KafkaConfig kafkaConfig = new KafkaConfig(config);
        this.coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor()).intValue();
        this.coordinatorStreamProperties = KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig);
        Map map = (Map) JavaConverters.mapAsJavaMapConverter(kafkaConfig.getKafkaChangelogEnabledStores()).asJava();
        this.changelogTopicMetaInformation = new HashMap();
        for (Map.Entry entry : map.entrySet()) {
            String str2 = (String) entry.getKey();
            String str3 = (String) entry.getValue();
            String changelogStreamReplicationFactor = kafkaConfig.getChangelogStreamReplicationFactor(str2);
            int intValue = StringUtils.isEmpty(changelogStreamReplicationFactor) ? DEFAULT_REPL_FACTOR : Integer.valueOf(changelogStreamReplicationFactor).intValue();
            ChangelogInfo changelogInfo = new ChangelogInfo(intValue, kafkaConfig.getChangelogKafkaProperties(str2));
            LOG.info(String.format("Creating topic meta information for topic: %s with replication factor: %s", str3, Integer.valueOf(intValue)));
            this.changelogTopicMetaInformation.put(str3, changelogInfo);
        }
        this.deleteCommittedMessages = new SystemConfig(config).deleteCommittedMessages(str);
        this.intermediateStreamProperties = (Map) JavaConverters.mapAsJavaMapConverter(KafkaSystemAdminUtilsScala.getIntermediateStreamProperties(config)).asJava();
        LOG.info(String.format("Created KafkaSystemAdmin for system %s", str));
    }

    public void start() {
        if (this.stopped.get()) {
            throw new IllegalStateException("SamzaKafkaAdmin.start() is called after stop()");
        }
    }

    public void stop() {
        if (this.stopped.compareAndSet(false, true)) {
            try {
                this.metadataConsumer.close();
            } catch (Exception e) {
                LOG.warn("metadataConsumer.close for system " + this.systemName + " failed with exception.", e);
            }
        }
        if (this.adminClient != null) {
            try {
                this.adminClient.close();
            } catch (Exception e2) {
                LOG.warn("adminClient.close for system " + this.systemName + " failed with exception.", e2);
            }
        }
    }

    public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(final Set<String> set, long j) {
        final SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata = new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null) { // from class: org.apache.samza.system.kafka.KafkaSystemAdmin.1
            String msg = "getSystemStreamPartitionCounts does not populate SystemStreaMetadata info. Only number of partitions";

            public String getOldestOffset() {
                throw new NotImplementedException(this.msg);
            }

            public String getNewestOffset() {
                throw new NotImplementedException(this.msg);
            }

            public String getUpcomingOffset() {
                throw new NotImplementedException(this.msg);
            }
        };
        Map<String, SystemStreamMetadata> map = (Map) new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER, DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS).run(new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() { // from class: org.apache.samza.system.kafka.KafkaSystemAdmin.2
            public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop retryLoop) {
                HashMap hashMap = new HashMap();
                Set set2 = set;
                SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata2 = systemStreamPartitionMetadata;
                set2.forEach(str -> {
                    HashMap hashMap2 = new HashMap();
                    List partitionsFor = KafkaSystemAdmin.this.metadataConsumer.partitionsFor(str);
                    KafkaSystemAdmin.LOG.debug("Stream {} has partitions {}", str, partitionsFor);
                    partitionsFor.forEach(partitionInfo -> {
                        hashMap2.put(new Partition(partitionInfo.partition()), systemStreamPartitionMetadata2);
                    });
                    hashMap.put(str, new SystemStreamMetadata(str, hashMap2));
                });
                retryLoop.done();
                return hashMap;
            }
        }, new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() { // from class: org.apache.samza.system.kafka.KafkaSystemAdmin.3
            public BoxedUnit apply(Exception exc, ExponentialSleepStrategy.RetryLoop retryLoop) {
                if (retryLoop.sleepCount() < KafkaSystemAdmin.MAX_RETRIES_ON_EXCEPTION) {
                    KafkaSystemAdmin.LOG.warn(String.format("Fetching systemstreampartition counts for: %s threw an exception. Retrying.", set), exc);
                    return null;
                }
                KafkaSystemAdmin.LOG.error(String.format("Fetching systemstreampartition counts for: %s threw an exception.", set), exc);
                retryLoop.done();
                throw new SamzaException(exc);
            }
        }).get();
        LOG.info("SystemStream partition counts for system {}: {}", this.systemName, map);
        return map;
    }

    public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return String.valueOf(Long.valueOf((String) entry.getValue()).longValue() + 1);
        }));
    }

    public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> set) {
        return getSystemStreamMetadata(set, new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER, DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS));
    }

    public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(Set<SystemStreamPartition> set) {
        LOG.info("Fetching SSP metadata for: {}", set);
        OffsetsMaps fetchTopicPartitionsMetadata = fetchTopicPartitionsMetadata((List) set.stream().map(systemStreamPartition -> {
            return new TopicPartition(systemStreamPartition.getStream(), systemStreamPartition.getPartition().getPartitionId());
        }).collect(Collectors.toList()));
        HashMap hashMap = new HashMap();
        for (SystemStreamPartition systemStreamPartition2 : set) {
            hashMap.put(systemStreamPartition2, new SystemStreamMetadata.SystemStreamPartitionMetadata((String) fetchTopicPartitionsMetadata.getOldestOffsets().get(systemStreamPartition2), (String) fetchTopicPartitionsMetadata.getNewestOffsets().get(systemStreamPartition2), (String) fetchTopicPartitionsMetadata.getUpcomingOffsets().get(systemStreamPartition2)));
        }
        return hashMap;
    }

    public Map<String, SystemStreamMetadata> getSystemStreamMetadata(final Set<String> set, ExponentialSleepStrategy exponentialSleepStrategy) {
        LOG.info("Fetching system stream metadata for {} from system {}", set, this.systemName);
        AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> abstractFunction1 = new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() { // from class: org.apache.samza.system.kafka.KafkaSystemAdmin.4
            public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop retryLoop) {
                Map<String, SystemStreamMetadata> fetchSystemStreamMetadata = KafkaSystemAdmin.this.fetchSystemStreamMetadata(set);
                retryLoop.done();
                return fetchSystemStreamMetadata;
            }
        };
        AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> abstractFunction2 = new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() { // from class: org.apache.samza.system.kafka.KafkaSystemAdmin.5
            public BoxedUnit apply(Exception exc, ExponentialSleepStrategy.RetryLoop retryLoop) {
                if (retryLoop.sleepCount() < KafkaSystemAdmin.MAX_RETRIES_ON_EXCEPTION) {
                    KafkaSystemAdmin.LOG.warn(String.format("Fetching system stream metadata for: %s threw an exception. Retrying.", set), exc);
                    return null;
                }
                KafkaSystemAdmin.LOG.error(String.format("Fetching system stream metadata for: %s threw an exception.", set), exc);
                retryLoop.done();
                throw new SamzaException(exc);
            }
        };
        return (Map) exponentialSleepStrategy.run(abstractFunction1, abstractFunction2).getOrElse(new AbstractFunction0<Map<String, SystemStreamMetadata>>() { // from class: org.apache.samza.system.kafka.KafkaSystemAdmin.6
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Map<String, SystemStreamMetadata> m59apply() {
                throw new SamzaException("Failed to get system stream metadata");
            }
        });
    }

    public String getNewestOffset(final SystemStreamPartition systemStreamPartition, final Integer num) {
        LOG.info("Fetching newest offset for: {}", systemStreamPartition);
        return (String) new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER, DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS).run(new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, String>() { // from class: org.apache.samza.system.kafka.KafkaSystemAdmin.7
            public String apply(ExponentialSleepStrategy.RetryLoop retryLoop) {
                String fetchNewestOffset = KafkaSystemAdmin.this.fetchNewestOffset(systemStreamPartition);
                retryLoop.done();
                return fetchNewestOffset;
            }
        }, new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() { // from class: org.apache.samza.system.kafka.KafkaSystemAdmin.8
            public BoxedUnit apply(Exception exc, ExponentialSleepStrategy.RetryLoop retryLoop) {
                if (retryLoop.sleepCount() < num.intValue()) {
                    KafkaSystemAdmin.LOG.warn(String.format("Fetching newest offset for: %s threw an exception. Retrying.", systemStreamPartition), exc);
                    return null;
                }
                KafkaSystemAdmin.LOG.error(String.format("Fetching newest offset for: %s threw an exception.", systemStreamPartition), exc);
                retryLoop.done();
                throw new SamzaException("Exception while trying to get newest offset", exc);
            }
        }).get();
    }

    private SystemStreamPartition toSystemStreamPartition(TopicPartition topicPartition) {
        return new SystemStreamPartition(this.systemName, topicPartition.topic(), new Partition(topicPartition.partition()));
    }

    private OffsetsMaps fetchTopicPartitionsMetadata(List<TopicPartition> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        Map beginningOffsets = this.metadataConsumer.beginningOffsets(list);
        LOG.debug("Kafka-fetched beginningOffsets: {}", beginningOffsets);
        Map endOffsets = this.metadataConsumer.endOffsets(list);
        LOG.debug("Kafka-fetched endOffsets: {}", endOffsets);
        beginningOffsets.forEach((topicPartition, l) -> {
            hashMap.put(toSystemStreamPartition(topicPartition), String.valueOf(l));
        });
        endOffsets.forEach((topicPartition2, l2) -> {
            hashMap3.put(toSystemStreamPartition(topicPartition2), String.valueOf(l2));
            if (l2.longValue() > 0) {
                hashMap2.put(toSystemStreamPartition(topicPartition2), String.valueOf(l2.longValue() - 1));
            } else {
                LOG.warn("Empty Kafka topic partition {} with upcoming offset {}. Skipping newest offset and setting oldest offset to 0 to consume from beginning", topicPartition2, l2);
                hashMap.put(toSystemStreamPartition(topicPartition2), "0");
            }
        });
        return new OffsetsMaps(hashMap, hashMap2, hashMap3);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, SystemStreamMetadata> fetchSystemStreamMetadata(Set<String> set) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        LOG.info("Fetching SystemStreamMetadata for topics {} on system {}", set, this.systemName);
        set.forEach(str -> {
            List partitionsFor = this.metadataConsumer.partitionsFor(str);
            if (partitionsFor == null) {
                throw new SamzaException(String.format("Partition info not(yet?) available for system %s topic %s", this.systemName, str));
            }
            OffsetsMaps fetchTopicPartitionsMetadata = fetchTopicPartitionsMetadata((List) partitionsFor.stream().map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).collect(Collectors.toList()));
            hashMap.putAll(fetchTopicPartitionsMetadata.getOldestOffsets());
            hashMap2.putAll(fetchTopicPartitionsMetadata.getNewestOffsets());
            hashMap3.putAll(fetchTopicPartitionsMetadata.getUpcomingOffsets());
        });
        scala.collection.immutable.Map<String, SystemStreamMetadata> assembleMetadata = KafkaSystemAdminUtilsScala.assembleMetadata(ScalaJavaUtil.toScalaMap(hashMap), ScalaJavaUtil.toScalaMap(hashMap2), ScalaJavaUtil.toScalaMap(hashMap3));
        LOG.debug("assembled SystemStreamMetadata is: {}", assembleMetadata);
        return (Map) JavaConverters.mapAsJavaMapConverter(assembleMetadata).asJava();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String fetchNewestOffset(SystemStreamPartition systemStreamPartition) {
        String valueOf;
        LOG.debug("Fetching newest offset for {}", systemStreamPartition);
        TopicPartition topicPartition = new TopicPartition(systemStreamPartition.getStream(), systemStreamPartition.getPartition().getPartitionId());
        Long l = (Long) this.metadataConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
        if (l.longValue() <= 0) {
            LOG.debug("Stripping newest offsets for {} because the topic appears empty.", topicPartition);
            valueOf = null;
        } else {
            valueOf = String.valueOf(l.longValue() - 1);
        }
        LOG.info("Newest offset for ssp {} is: {}", systemStreamPartition, valueOf);
        return valueOf;
    }

    public Integer offsetComparator(String str, String str2) {
        if (str == null || str2 == null) {
            return -1;
        }
        return Integer.valueOf(Long.valueOf(str).compareTo(Long.valueOf(str2)));
    }

    public boolean createStream(StreamSpec streamSpec) {
        LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
        return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection());
    }

    public boolean clearStream(StreamSpec streamSpec) {
        LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
        KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection());
        return getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName())).get(streamSpec.getPhysicalName()).isEmpty();
    }

    public KafkaStreamSpec toKafkaSpec(StreamSpec streamSpec) {
        KafkaStreamSpec kafkaStreamSpec;
        if (streamSpec.isChangeLogStream()) {
            String physicalName = streamSpec.getPhysicalName();
            ChangelogInfo changelogInfo = this.changelogTopicMetaInformation.get(physicalName);
            if (changelogInfo == null) {
                throw new StreamValidationException("Unable to find topic information for topic " + physicalName);
            }
            kafkaStreamSpec = new KafkaStreamSpec(streamSpec.getId(), physicalName, this.systemName, streamSpec.getPartitionCount(), changelogInfo.replicationFactor(), changelogInfo.kafkaProps());
        } else {
            kafkaStreamSpec = streamSpec.isCoordinatorStream() ? new KafkaStreamSpec(streamSpec.getId(), streamSpec.getPhysicalName(), this.systemName, 1, this.coordinatorStreamReplicationFactor, this.coordinatorStreamProperties) : this.intermediateStreamProperties.containsKey(streamSpec.getId()) ? KafkaStreamSpec.fromSpec(streamSpec).copyWithProperties(this.intermediateStreamProperties.get(streamSpec.getId())) : KafkaStreamSpec.fromSpec(streamSpec);
        }
        return kafkaStreamSpec;
    }

    public void validateStream(StreamSpec streamSpec) throws StreamValidationException {
        LOG.info("About to validate stream = " + streamSpec);
        String physicalName = streamSpec.getPhysicalName();
        SystemStreamMetadata systemStreamMetadata = getSystemStreamMetadata(Collections.singleton(physicalName)).get(physicalName);
        if (systemStreamMetadata == null) {
            throw new StreamValidationException("Failed to obtain metadata for stream " + physicalName + ". Validation failed.");
        }
        int size = systemStreamMetadata.getSystemStreamPartitionMetadata().size();
        int partitionCount = streamSpec.getPartitionCount();
        LOG.info("actualCount=" + size + "; expectedCount=" + partitionCount);
        if (size != partitionCount) {
            throw new StreamValidationException(String.format("Mismatch of partitions for stream %s. Expected %d, got %d. Validation failed.", physicalName, Integer.valueOf(partitionCount), Integer.valueOf(size)));
        }
    }

    Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> set) {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            hashMap.put(str, this.metadataConsumer.partitionsFor(str));
        }
        return hashMap;
    }

    public void deleteMessages(Map<SystemStreamPartition, String> map) {
        if (this.deleteCommittedMessages) {
            if (this.adminClient == null) {
                this.adminClient = AdminClient.create(createAdminClientProperties());
            }
            KafkaSystemAdminUtilsScala.deleteMessages(this.adminClient, map);
            deleteMessageCalled = true;
        }
    }

    protected Properties createAdminClientProperties() {
        Properties properties = new Properties();
        properties.putAll(this.config.subset(String.format("systems.%s.consumer.", this.systemName), true));
        String str = (String) this.config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), this.systemName, "bootstrap.servers"));
        if (str == null) {
            str = (String) this.config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), this.systemName, "bootstrap.servers"));
        }
        if (str == null) {
            throw new SamzaException("bootstrap.servers is required for systemAdmin for system " + this.systemName);
        }
        properties.put("bootstrap.servers", str);
        String str2 = (String) this.config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), this.systemName, KafkaConsumerConfig.ZOOKEEPER_CONNECT));
        if (StringUtils.isBlank(str2)) {
            throw new SamzaException("Missing zookeeper.connect config for admin for system " + this.systemName);
        }
        properties.put(KafkaConsumerConfig.ZOOKEEPER_CONNECT, str2);
        return properties;
    }

    private Supplier<ZkUtils> getZkConnection() {
        String str = (String) this.config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), this.systemName, KafkaConsumerConfig.ZOOKEEPER_CONNECT));
        if (StringUtils.isBlank(str)) {
            throw new SamzaException("Missing zookeeper.connect config for admin for system " + this.systemName);
        }
        return () -> {
            return ZkUtils.apply(str, 6000, 6000, false);
        };
    }
}
