package co.cask.cdap.kafka.flow;

import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import org.apache.twill.common.Threads;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/kafka/flow/KafkaBrokerCache.class */
public final class KafkaBrokerCache extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerCache.class);
    private static final String BROKERS_PATH = "/brokers";
    private final ZKClient zkClient;
    private final Runnable invokeGetBrokers = new Runnable() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.1
        @Override // java.lang.Runnable
        public void run() {
            KafkaBrokerCache.this.getBrokers();
        }
    };
    private final Runnable invokeGetTopics = new Runnable() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.2
        @Override // java.lang.Runnable
        public void run() {
            KafkaBrokerCache.this.getTopics();
        }
    };
    private final Map<String, KafkaBroker> brokers = Maps.newConcurrentMap();
    private final Map<String, SortedMap<Integer, Collection<String>>> topicBrokers = Maps.newConcurrentMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/kafka/flow/KafkaBrokerCache$BrokerPartition.class */
    public static final class BrokerPartition {
        private final String brokerId;
        private final int partitionSize;

        private BrokerPartition(String str, int i) {
            this.brokerId = str;
            this.partitionSize = i;
        }

        public String getBrokerId() {
            return this.brokerId;
        }

        public int getPartitionSize() {
            return this.partitionSize;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/kafka/flow/KafkaBrokerCache$ExistsOnFailureFutureCallback.class */
    public abstract class ExistsOnFailureFutureCallback<V> implements FutureCallback<V> {
        private final String path;
        private final Runnable action;

        protected ExistsOnFailureFutureCallback(String str, Runnable runnable) {
            this.path = str;
            this.action = runnable;
        }

        public final void onFailure(Throwable th) {
            if (isNotExists(th)) {
                waitExists(this.path);
            } else {
                KafkaBrokerCache.LOG.error("Operation failed for path {}", this.path, th);
            }
        }

        private boolean isNotExists(Throwable th) {
            return (th instanceof KeeperException) && ((KeeperException) th).code() == KeeperException.Code.NONODE;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitExists(final String str) {
            KafkaBrokerCache.LOG.debug("Path {} not exists. Watch for creation.", str);
            Futures.addCallback(KafkaBrokerCache.this.zkClient.exists(str, new Watcher() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.ExistsOnFailureFutureCallback.1
                public void process(WatchedEvent watchedEvent) {
                    if (KafkaBrokerCache.this.isRunning()) {
                        if (watchedEvent.getType() == Watcher.Event.EventType.NodeCreated) {
                            ExistsOnFailureFutureCallback.this.action.run();
                        } else if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                            ExistsOnFailureFutureCallback.this.waitExists(str);
                        }
                    }
                }
            }), new FutureCallback<Stat>() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.ExistsOnFailureFutureCallback.2
                public void onSuccess(Stat stat) {
                    if (stat != null) {
                        ExistsOnFailureFutureCallback.this.action.run();
                    }
                }

                public void onFailure(Throwable th) {
                    KafkaBrokerCache.LOG.error("Failed to get existence of path {}", str, th);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaBrokerCache(ZKClient zKClient) {
        this.zkClient = zKClient;
    }

    protected void startUp() throws Exception {
        getBrokers();
        getTopics();
    }

    protected void shutDown() throws Exception {
    }

    public int getPartitionSize(String str) {
        SortedMap<Integer, Collection<String>> sortedMap = this.topicBrokers.get(str);
        if (sortedMap == null || sortedMap.isEmpty()) {
            return 1;
        }
        return sortedMap.lastKey().intValue();
    }

    public List<KafkaBroker> getBrokers(String str, int i) {
        SortedMap<Integer, Collection<String>> sortedMap = this.topicBrokers.get(str);
        if (sortedMap == null || sortedMap.isEmpty() || i >= sortedMap.lastKey().intValue()) {
            return ImmutableList.of();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = Iterables.concat(sortedMap.tailMap(Integer.valueOf(i + 1)).values()).iterator();
        while (it.hasNext()) {
            arrayList.add(this.brokers.get((String) it.next()));
        }
        Collections.sort(arrayList);
        return arrayList;
    }

    public KafkaBroker getRandomBroker() {
        Collection<KafkaBroker> values = this.brokers.values();
        if (values.isEmpty()) {
            return null;
        }
        return (KafkaBroker) Iterables.get(values, new Random().nextInt(values.size()), (Object) null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getBrokers() {
        Futures.addCallback(this.zkClient.getChildren("/brokers/ids", new Watcher() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.3
            public void process(WatchedEvent watchedEvent) {
                if (KafkaBrokerCache.this.isRunning()) {
                    KafkaBrokerCache.this.getBrokers();
                }
            }
        }), new ExistsOnFailureFutureCallback<NodeChildren>("/brokers/ids", this.invokeGetBrokers) { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.4
            public void onSuccess(NodeChildren nodeChildren) {
                ImmutableSet<String> copyOf = ImmutableSet.copyOf(nodeChildren.getChildren());
                for (String str : copyOf) {
                    KafkaBrokerCache.this.getBrokenData("/brokers/ids/" + str, str);
                }
                KafkaBrokerCache.this.removeDiff(copyOf, KafkaBrokerCache.this.brokers);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getTopics() {
        Futures.addCallback(this.zkClient.getChildren("/brokers/topics", new Watcher() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.5
            public void process(WatchedEvent watchedEvent) {
                if (KafkaBrokerCache.this.isRunning()) {
                    KafkaBrokerCache.this.getTopics();
                }
            }
        }), new ExistsOnFailureFutureCallback<NodeChildren>("/brokers/topics", this.invokeGetTopics) { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.6
            public void onSuccess(NodeChildren nodeChildren) {
                ImmutableSet copyOf = ImmutableSet.copyOf(nodeChildren.getChildren());
                Iterator it = Sets.difference(copyOf, KafkaBrokerCache.this.topicBrokers.keySet()).iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    KafkaBrokerCache.this.getTopicBrokers("/brokers/topics/" + str, str);
                }
                KafkaBrokerCache.this.removeDiff(copyOf, KafkaBrokerCache.this.topicBrokers);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getBrokenData(String str, final String str2) {
        Futures.addCallback(this.zkClient.getData(str), new FutureCallback<NodeData>() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.7
            public void onSuccess(NodeData nodeData) {
                Iterator it = Iterables.skip(Splitter.on(':').split(new String(nodeData.getData(), Charsets.UTF_8)), 1).iterator();
                KafkaBrokerCache.this.brokers.put(str2, new KafkaBroker(str2, (String) it.next(), Integer.parseInt((String) it.next())));
            }

            public void onFailure(Throwable th) {
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getTopicBrokers(final String str, final String str2) {
        Futures.addCallback(this.zkClient.getChildren(str, new Watcher() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.8
            public void process(WatchedEvent watchedEvent) {
                if (KafkaBrokerCache.this.isRunning() && watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    KafkaBrokerCache.this.getTopicBrokers(str, str2);
                }
            }
        }), new FutureCallback<NodeChildren>() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.9
            public void onSuccess(NodeChildren nodeChildren) {
                List<String> children = nodeChildren.getChildren();
                ArrayList arrayList = new ArrayList(children.size());
                for (final String str3 : children) {
                    arrayList.add(Futures.transform(KafkaBrokerCache.this.zkClient.getData(str + "/" + str3), new Function<NodeData, BrokerPartition>() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.9.1
                        public BrokerPartition apply(NodeData nodeData) {
                            return new BrokerPartition(str3, Integer.parseInt(new String(nodeData.getData(), Charsets.UTF_8)));
                        }
                    }));
                }
                Futures.addCallback(Futures.successfulAsList(arrayList), new FutureCallback<List<BrokerPartition>>() { // from class: co.cask.cdap.kafka.flow.KafkaBrokerCache.9.2
                    public void onSuccess(List<BrokerPartition> list) {
                        TreeMultimap create = TreeMultimap.create();
                        for (BrokerPartition brokerPartition : list) {
                            if (brokerPartition != null) {
                                create.put(Integer.valueOf(brokerPartition.getPartitionSize()), brokerPartition.getBrokerId());
                            }
                        }
                        KafkaBrokerCache.this.topicBrokers.put(str2, create.asMap());
                    }

                    public void onFailure(Throwable th) {
                    }
                }, Threads.SAME_THREAD_EXECUTOR);
            }

            public void onFailure(Throwable th) {
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <K, V> void removeDiff(Set<K> set, Map<K, V> map) {
        Iterator it = Sets.difference(map.keySet(), set).iterator();
        while (it.hasNext()) {
            map.remove(it.next());
        }
    }
}
