package com.twitter.distributedlog.subscription;

import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.util.Future;
import com.twitter.util.Promise;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:com/twitter/distributedlog/subscription/ZKSubscriptionsStore.class */
public class ZKSubscriptionsStore implements SubscriptionsStore {
    private final ZooKeeperClient zkc;
    private final String zkPath;
    private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers = new ConcurrentHashMap();

    public ZKSubscriptionsStore(ZooKeeperClient zooKeeperClient, String str) {
        this.zkc = zooKeeperClient;
        this.zkPath = str;
    }

    private ZKSubscriptionStateStore getSubscriber(String str) {
        ZKSubscriptionStateStore zKSubscriptionStateStore = this.subscribers.get(str);
        if (zKSubscriptionStateStore == null) {
            ZKSubscriptionStateStore zKSubscriptionStateStore2 = new ZKSubscriptionStateStore(this.zkc, String.format("%s/%s", this.zkPath, str));
            ZKSubscriptionStateStore putIfAbsent = this.subscribers.putIfAbsent(str, zKSubscriptionStateStore2);
            if (putIfAbsent == null) {
                zKSubscriptionStateStore = zKSubscriptionStateStore2;
            } else {
                try {
                    zKSubscriptionStateStore2.close();
                } catch (IOException e) {
                }
                zKSubscriptionStateStore = putIfAbsent;
            }
        }
        return zKSubscriptionStateStore;
    }

    @Override // com.twitter.distributedlog.subscription.SubscriptionsStore
    public Future<DLSN> getLastCommitPosition(String str) {
        return getSubscriber(str).getLastCommitPosition();
    }

    @Override // com.twitter.distributedlog.subscription.SubscriptionsStore
    public Future<Map<String, DLSN>> getLastCommitPositions() {
        final Promise promise = new Promise();
        try {
            this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() { // from class: com.twitter.distributedlog.subscription.ZKSubscriptionsStore.1
                public void processResult(int i, String str, Object obj, List<String> list, Stat stat) {
                    if (KeeperException.Code.NONODE.intValue() == i) {
                        promise.setValue(new HashMap());
                    } else if (KeeperException.Code.OK.intValue() != i) {
                        promise.setException(KeeperException.create(KeeperException.Code.get(i), str));
                    } else {
                        ZKSubscriptionsStore.this.getLastCommitPositions(promise, list);
                    }
                }
            }, (Object) null);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            promise.setException(e);
        } catch (InterruptedException e2) {
            promise.setException(new DLInterruptedException("getLastCommitPositions was interrupted", e2));
        }
        return promise;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getLastCommitPositions(final Promise<Map<String, DLSN>> promise, List<String> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (final String str : list) {
            arrayList.add(getSubscriber(str).getLastCommitPositionFromZK().map(new AbstractFunction1<DLSN, Pair<String, DLSN>>() { // from class: com.twitter.distributedlog.subscription.ZKSubscriptionsStore.2
                public Pair<String, DLSN> apply(DLSN dlsn) {
                    return Pair.of(str, dlsn);
                }
            }));
        }
        Future.collect(arrayList).foreach(new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() { // from class: com.twitter.distributedlog.subscription.ZKSubscriptionsStore.3
            public BoxedUnit apply(List<Pair<String, DLSN>> list2) {
                HashMap hashMap = new HashMap();
                for (Pair<String, DLSN> pair : list2) {
                    hashMap.put(pair.getLeft(), pair.getRight());
                }
                promise.setValue(hashMap);
                return BoxedUnit.UNIT;
            }
        });
    }

    @Override // com.twitter.distributedlog.subscription.SubscriptionsStore
    public Future<BoxedUnit> advanceCommitPosition(String str, DLSN dlsn) {
        return getSubscriber(str).advanceCommitPosition(dlsn);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<ZKSubscriptionStateStore> it = this.subscribers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }
}
