package org.apache.pulsar.zookeeper;

import java.lang.reflect.Constructor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.class */
public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory {
    private static final Logger log = LoggerFactory.getLogger(ZookeeperBkClientFactoryImpl.class);

    @Override // org.apache.pulsar.zookeeper.ZooKeeperClientFactory
    public CompletableFuture<ZooKeeper> create(String str, ZooKeeperClientFactory.SessionType sessionType, int i) {
        boolean z = sessionType == ZooKeeperClientFactory.SessionType.AllowReadOnly;
        CompletableFuture<ZooKeeper> completableFuture = new CompletableFuture<>();
        try {
            CompletableFuture completableFuture2 = new CompletableFuture();
            try {
                Constructor declaredConstructor = ZooKeeperClient.class.getDeclaredConstructor(String.class, Integer.TYPE, ZooKeeperWatcherBase.class, RetryPolicy.class);
                declaredConstructor.setAccessible(true);
                ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(i);
                zooKeeperWatcherBase.addChildWatcher(watchedEvent -> {
                    if (watchedEvent.getType() == Watcher.Event.EventType.None) {
                        switch (watchedEvent.getState()) {
                            case ConnectedReadOnly:
                                Preconditions.checkArgument(z);
                                break;
                            case SyncConnected:
                                break;
                            case Expired:
                                completableFuture2.completeExceptionally(KeeperException.create(KeeperException.Code.SESSIONEXPIRED));
                                return;
                            default:
                                log.warn("Unexpected ZK event received: {}", watchedEvent);
                                return;
                        }
                        completableFuture2.complete(null);
                    }
                });
                ZooKeeperClient zooKeeperClient = (ZooKeeperClient) declaredConstructor.newInstance(str, Integer.valueOf(i), zooKeeperWatcherBase, new BoundExponentialBackoffRetryPolicy(i, i, 0));
                completableFuture2.thenRun(() -> {
                    log.info("ZooKeeper session established: {}", zooKeeperClient);
                    completeFutureOnConnect(completableFuture, zooKeeperClient, System.currentTimeMillis() + i, Executors.newSingleThreadScheduledExecutor());
                }).exceptionally(th -> {
                    log.error("Failed to establish ZooKeeper session: {}", th.getMessage());
                    completableFuture.completeExceptionally(th);
                    return null;
                });
                completableFuture2.complete(null);
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        } catch (Exception e2) {
            completableFuture.completeExceptionally(e2);
        }
        return completableFuture;
    }

    private void completeFutureOnConnect(CompletableFuture<ZooKeeper> completableFuture, ZooKeeperClient zooKeeperClient, long j, ScheduledExecutorService scheduledExecutorService) {
        if (zooKeeperClient.getState().isConnected()) {
            completableFuture.complete(zooKeeperClient);
            scheduledExecutorService.shutdown();
        } else if (System.currentTimeMillis() - j < 0) {
            scheduledExecutorService.schedule(() -> {
                completeFutureOnConnect(completableFuture, zooKeeperClient, j, scheduledExecutorService);
            }, 100L, TimeUnit.MILLISECONDS);
        } else {
            completableFuture.completeExceptionally(new IllegalStateException("zookeeper couldn't connect with in given zkSessionTimeout"));
            scheduledExecutorService.shutdown();
        }
    }
}
