package com.linkedin.d2.discovery.stores.zk;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
import com.linkedin.d2.discovery.PropertySerializer;
import com.linkedin.d2.discovery.stores.zk.SymlinkAwareZooKeeper;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.httpclient.cookie.CookieSpec;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKConnection.class */
public class ZKConnection {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ZKConnection.class);
    private static final int MAX_RETRIES = 10;
    private final String _connectString;
    private final int _timeout;
    private final int _retryLimit;
    private final boolean _exponentialBackoff;
    private final ScheduledExecutorService _scheduler;
    private final long _initInterval;
    private final boolean _shutdownAsynchronously;
    private final boolean _isSymlinkAware;
    private final Function<ZooKeeper, ZooKeeper> _zkDecorator;
    private PropertySerializer<String> _symlinkSerializer;
    private final boolean _isWaitForConnected;
    private final CountDownLatch _zkRefLatch;
    private final AtomicReference<ZooKeeper> _zkRef;
    private final Object _mutex;
    private final Set<StateListener> _listeners;
    private Watcher.Event.KeeperState _currentState;

    /* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKConnection$DefaultWatcher.class */
    private class DefaultWatcher implements Watcher {
        private DefaultWatcher() {
        }

        @Override // org.apache.zookeeper.Watcher
        public void process(WatchedEvent watchedEvent) {
            try {
                long sessionId = ZKConnection.this.zk().getSessionId();
                if (watchedEvent.getType() != Watcher.Event.EventType.None) {
                    ZKConnection.LOG.warn("Received unexpected event of type {} for session 0x{}. This event is NOT propagated and NONE of the watchers will receive data for this event", watchedEvent.getType(), Long.toHexString(sessionId));
                    return;
                }
                Watcher.Event.KeeperState state = watchedEvent.getState();
                ZKConnection.LOG.info("Received state notification {} for session 0x{}", state, Long.toHexString(sessionId));
                Set emptySet = Collections.emptySet();
                synchronized (ZKConnection.this._mutex) {
                    if (ZKConnection.this._currentState != state) {
                        ZKConnection.this._currentState = state;
                        ZKConnection.this._mutex.notifyAll();
                        emptySet = new HashSet(ZKConnection.this._listeners);
                    }
                }
                Iterator it = emptySet.iterator();
                while (it.hasNext()) {
                    ((StateListener) it.next()).notifyStateChange(state);
                }
            } catch (IllegalStateException e) {
                ZKConnection.LOG.debug("Watched event received after connection shutdown (type {}, state {}.", watchedEvent.getType(), watchedEvent.getState());
            }
        }
    }

    /* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKConnection$StateListener.class */
    public interface StateListener {
        void notifyStateChange(Watcher.Event.KeeperState keeperState);
    }

    public ZKConnection(String str, int i) {
        this(str, i, false);
    }

    public ZKConnection(String str, int i, boolean z) {
        this(str, i, 0, z);
    }

    public ZKConnection(String str, int i, boolean z, boolean z2) {
        this(str, i, 0, z, z2);
    }

    public ZKConnection(String str, int i, int i2) {
        this(str, i, i2, false);
    }

    public ZKConnection(String str, int i, int i2, boolean z) {
        this(str, i, i2, false, null, 0L, z);
    }

    public ZKConnection(String str, int i, int i2, boolean z, boolean z2) {
        this(str, i, i2, false, null, 0L, z, z2);
    }

    public ZKConnection(String str, int i, int i2, boolean z, ScheduledExecutorService scheduledExecutorService, long j) {
        this(str, i, i2, z, scheduledExecutorService, j, false);
    }

    public ZKConnection(String str, int i, int i2, boolean z, ScheduledExecutorService scheduledExecutorService, long j, boolean z2) {
        this(str, i, i2, z, scheduledExecutorService, j, z2, false);
    }

    public ZKConnection(String str, int i, int i2, boolean z, ScheduledExecutorService scheduledExecutorService, long j, boolean z2, boolean z3) {
        this(str, i, i2, z, scheduledExecutorService, j, z2, z3, null, false);
    }

    public ZKConnection(String str, int i, int i2, boolean z, ScheduledExecutorService scheduledExecutorService, long j, boolean z2, boolean z3, Function<ZooKeeper, ZooKeeper> function) {
        this(str, i, i2, z, scheduledExecutorService, j, z2, z3, function, false);
    }

    public ZKConnection(String str, int i, int i2, boolean z, ScheduledExecutorService scheduledExecutorService, long j, boolean z2, boolean z3, Function<ZooKeeper, ZooKeeper> function, boolean z4) {
        this._symlinkSerializer = new SymlinkAwareZooKeeper.DefaultSerializer();
        this._zkRefLatch = new CountDownLatch(1);
        this._zkRef = new AtomicReference<>();
        this._mutex = new Object();
        this._listeners = new HashSet();
        this._connectString = str;
        this._timeout = i;
        this._retryLimit = i2;
        this._exponentialBackoff = z;
        this._scheduler = scheduledExecutorService;
        this._initInterval = j;
        this._shutdownAsynchronously = z2;
        this._isSymlinkAware = z3;
        this._isWaitForConnected = z4;
        this._zkDecorator = function == null ? zooKeeper -> {
            return zooKeeper;
        } : function;
    }

    public void start() throws IOException {
        if (this._zkRef.get() != null) {
            throw new IllegalStateException("Already started");
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StateListener stateListener = keeperState -> {
            if (keeperState == Watcher.Event.KeeperState.SyncConnected || keeperState == Watcher.Event.KeeperState.ConnectedReadOnly) {
                countDownLatch.countDown();
            }
        };
        if (this._isWaitForConnected) {
            addStateListener(stateListener);
        }
        DefaultWatcher defaultWatcher = new DefaultWatcher();
        ZooKeeper apply = this._zkDecorator.apply(new VanillaZooKeeperAdapter(this._connectString, this._timeout, defaultWatcher));
        if (this._retryLimit > 0) {
            apply = new RetryZooKeeper(apply, this._retryLimit, this._exponentialBackoff, this._scheduler, this._initInterval);
            if (this._isSymlinkAware) {
                apply = new SymlinkAwareRetryZooKeeper((RetryZooKeeper) apply, defaultWatcher, this._symlinkSerializer);
                LOG.info("Using symlink aware RetryZooKeeper with retry limit set to " + this._retryLimit);
            } else {
                LOG.info("Using RetryZooKeeper with retry limit set to " + this._retryLimit);
            }
            if (this._exponentialBackoff) {
                LOG.info("Exponential backoff enabled. Initial retry interval set to " + this._initInterval + " ms.");
            } else {
                LOG.info("Exponential backoff disabled.");
            }
        } else if (this._isSymlinkAware) {
            apply = new SymlinkAwareZooKeeper(apply, defaultWatcher, this._symlinkSerializer);
            LOG.info("Using symlink aware ZooKeeper without retry");
        } else {
            LOG.info("Using vanilla ZooKeeper without retry.");
        }
        LOG.debug("Going to set zkRef");
        if (!this._zkRef.compareAndSet(null, apply)) {
            try {
                doShutdown(apply);
            } catch (InterruptedException e) {
                LOG.warn("Failed to shutdown extra ZooKeeperConnection", (Throwable) e);
            }
            throw new IllegalStateException("Already started");
        }
        LOG.debug("counting down");
        this._zkRefLatch.countDown();
        try {
            if (this._isWaitForConnected) {
                try {
                    if (!countDownLatch.await(this._timeout, TimeUnit.MILLISECONDS)) {
                        LOG.error("Error: Timeout waiting for zk connection");
                    }
                    removeStateListener(stateListener);
                } catch (InterruptedException e2) {
                    LOG.warn("Error: interrupted while waiting for zookeeper connecting", (Throwable) e2);
                    removeStateListener(stateListener);
                }
            }
        } catch (Throwable th) {
            removeStateListener(stateListener);
            throw th;
        }
    }

    public void shutdown() throws InterruptedException {
        ZooKeeper zooKeeper = this._zkRef.get();
        if (zooKeeper == null || !this._zkRef.compareAndSet(zooKeeper, null)) {
            throw new IllegalStateException("Already shutdown");
        }
        doShutdown(zooKeeper);
    }

    private void doShutdown(final ZooKeeper zooKeeper) throws InterruptedException {
        if (!this._shutdownAsynchronously) {
            LOG.info("Shutting down ZKConnection now");
            zooKeeper.close();
        } else {
            Runnable runnable = new Runnable() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        zooKeeper.close();
                    } catch (InterruptedException e) {
                        ZKConnection.LOG.warn("Failed to shutdown ZooKeeperConnection", (Throwable) e);
                    }
                }
            };
            LOG.info("Shutting down ZKConnection asynchronously");
            new Thread(runnable, "Asynchronous ZooKeeperConnection shutdown thread").start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ZooKeeper zk() {
        try {
            if (!this._zkRefLatch.await(this._timeout, TimeUnit.MILLISECONDS)) {
                throw new RuntimeException("Wait for zkRef timed out.");
            }
            LOG.debug("zkRefLatch complete");
            ZooKeeper zooKeeper = this._zkRef.get();
            if (zooKeeper == null) {
                throw new IllegalStateException("Null zkRef after countdownlatch. If this happened at shutdown, please check if your app has custom de-announcements. Mis-coordinating custom de-announcement with the default de-announcement could cause double de-announcing and lead to this exception.");
            }
            return zooKeeper;
        } catch (InterruptedException e) {
            throw new RuntimeException("Got Interrupt Exception while waiting for zk", e);
        }
    }

    public ZooKeeper getZooKeeper() {
        return zk();
    }

    public String getConnectString() {
        return this._connectString;
    }

    public int getTimeout() {
        return this._timeout;
    }

    public void waitForState(Watcher.Event.KeeperState keeperState, long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
        synchronized (this._mutex) {
            while (!keeperState.equals(this._currentState)) {
                long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                if (currentTimeMillis2 <= 0) {
                    throw new TimeoutException("timeout expired without state being reached, current state: " + this._currentState.name());
                }
                this._mutex.wait(currentTimeMillis2);
            }
        }
    }

    public void addStateListener(StateListener stateListener) {
        synchronized (this._mutex) {
            this._listeners.add(stateListener);
        }
    }

    public void removeStateListener(StateListener stateListener) {
        synchronized (this._mutex) {
            this._listeners.remove(stateListener);
        }
    }

    public void ensurePersistentNodeExists(String str, final Callback<None> callback) {
        ZooKeeper zk = zk();
        while (str.endsWith(CookieSpec.PATH_DELIM) && str.length() > 1) {
            str = str.substring(0, str.length() - 1);
        }
        final String str2 = str;
        try {
            zk.create(str2, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.2
                @Override // org.apache.zookeeper.AsyncCallback.StringCallback
                public void processResult(int i, String str3, Object obj, String str4) {
                    KeeperException.Code code = KeeperException.Code.get(i);
                    switch (code) {
                        case OK:
                        case NODEEXISTS:
                            callback.onSuccess(None.none());
                            return;
                        case NONODE:
                            ZKConnection.this.ensurePersistentNodeExists(str2.substring(0, str2.lastIndexOf(47)), new Callback<None>() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.2.1
                                @Override // com.linkedin.common.callback.SuccessCallback
                                public void onSuccess(None none) {
                                    ZKConnection.this.ensurePersistentNodeExists(str2, callback);
                                }

                                @Override // com.linkedin.common.callback.Callback
                                public void onError(Throwable th) {
                                    callback.onError(th);
                                }
                            });
                            return;
                        default:
                            callback.onError(KeeperException.create(code));
                            return;
                    }
                }
            }, null);
        } catch (Exception e) {
            callback.onError(e);
        }
    }

    public void setDataUnsafe(String str, byte[] bArr, Callback<None> callback) {
        setDataUnsafe(str, bArr, callback, 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setDataUnsafe(String str, final byte[] bArr, final Callback<None> callback, final int i) {
        final ZooKeeper zk = zk();
        final AsyncCallback.StatCallback statCallback = new AsyncCallback.StatCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.3
            @Override // org.apache.zookeeper.AsyncCallback.StatCallback
            public void processResult(int i2, String str2, Object obj, Stat stat) {
                KeeperException.Code code = KeeperException.Code.get(i2);
                switch (code) {
                    case OK:
                        callback.onSuccess(None.none());
                        return;
                    case BADVERSION:
                        if (i >= 10) {
                            callback.onError(KeeperException.create(code));
                            return;
                        } else {
                            ZKConnection.LOG.info("setDataUnsafe: ignored BADVERSION for {}", str2);
                            ZKConnection.this.setDataUnsafe(str2, bArr, callback, i + 1);
                            return;
                        }
                    default:
                        callback.onError(KeeperException.create(code));
                        return;
                }
            }
        };
        AsyncCallback.StatCallback statCallback2 = new AsyncCallback.StatCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.4
            @Override // org.apache.zookeeper.AsyncCallback.StatCallback
            public void processResult(int i2, String str2, Object obj, Stat stat) {
                KeeperException.Code code = KeeperException.Code.get(i2);
                switch (code) {
                    case OK:
                        zk.setData(str2, bArr, stat.getVersion(), statCallback, null);
                        return;
                    default:
                        callback.onError(KeeperException.create(code));
                        return;
                }
            }
        };
        try {
            if (zk instanceof SymlinkAwareZooKeeper) {
                ((SymlinkAwareZooKeeper) zk).rawExists(str, false, statCallback2, (Object) null);
            } else {
                zk.exists(str, false, statCallback2, (Object) null);
            }
        } catch (Exception e) {
            callback.onError(e);
        }
    }

    public void removeNodeUnsafe(String str, Callback<None> callback) {
        removeNodeUnsafe(str, callback, 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeNodeUnsafe(String str, final Callback<None> callback, final int i) {
        final ZooKeeper zk = zk();
        final AsyncCallback.VoidCallback voidCallback = new AsyncCallback.VoidCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.5
            @Override // org.apache.zookeeper.AsyncCallback.VoidCallback
            public void processResult(int i2, String str2, Object obj) {
                KeeperException.Code code = KeeperException.Code.get(i2);
                switch (code) {
                    case OK:
                        callback.onSuccess(None.none());
                        return;
                    case BADVERSION:
                        if (i >= 10) {
                            callback.onError(KeeperException.create(code));
                            return;
                        } else {
                            ZKConnection.LOG.info("removeNodeUnsafe: retrying after ignoring BADVERSION for {}", str2);
                            ZKConnection.this.removeNodeUnsafe(str2, callback, i + 1);
                            return;
                        }
                    default:
                        callback.onError(KeeperException.create(code));
                        return;
                }
            }
        };
        AsyncCallback.StatCallback statCallback = new AsyncCallback.StatCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.6
            @Override // org.apache.zookeeper.AsyncCallback.StatCallback
            public void processResult(int i2, String str2, Object obj, Stat stat) {
                KeeperException.Code code = KeeperException.Code.get(i2);
                switch (code) {
                    case OK:
                        zk.delete(str2, stat.getVersion(), voidCallback, null);
                        return;
                    case NONODE:
                        callback.onSuccess(None.none());
                        return;
                    default:
                        callback.onError(KeeperException.create(code));
                        return;
                }
            }
        };
        try {
            if (zk instanceof SymlinkAwareZooKeeper) {
                ((SymlinkAwareZooKeeper) zk).rawExists(str, false, statCallback, (Object) null);
            } else {
                zk.exists(str, false, statCallback, (Object) null);
            }
        } catch (Exception e) {
            callback.onError(e);
        }
    }

    public void removeNodeUnsafeRecursive(final String str, final Callback<None> callback) {
        ZooKeeper zk = zk();
        final Callback<None> callback2 = new Callback<None>() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.7
            @Override // com.linkedin.common.callback.SuccessCallback
            public void onSuccess(None none) {
                ZKConnection.this.removeNodeUnsafe(str, callback);
            }

            @Override // com.linkedin.common.callback.Callback
            public void onError(Throwable th) {
                callback.onError(th);
            }
        };
        AsyncCallback.ChildrenCallback childrenCallback = new AsyncCallback.ChildrenCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.8
            @Override // org.apache.zookeeper.AsyncCallback.ChildrenCallback
            public void processResult(int i, String str2, Object obj, List<String> list) {
                KeeperException.Code code = KeeperException.Code.get(i);
                switch (code) {
                    case OK:
                        Callback<None> countDown = Callbacks.countDown(callback2, list.size());
                        Iterator<String> it = list.iterator();
                        while (it.hasNext()) {
                            ZKConnection.this.removeNodeUnsafeRecursive(str2 + CookieSpec.PATH_DELIM + it.next(), countDown);
                        }
                        return;
                    default:
                        callback.onError(KeeperException.create(code));
                        return;
                }
            }
        };
        try {
            if (zk instanceof SymlinkAwareZooKeeper) {
                ((SymlinkAwareZooKeeper) zk).rawGetChildren(str, false, childrenCallback, (Object) null);
            } else {
                zk.getChildren(str, false, childrenCallback, (Object) null);
            }
        } catch (Exception e) {
            callback.onError(e);
        }
    }

    public void createSymlink(String str, String str2, final Callback<None> callback) {
        if (!SymlinkUtil.containsSymlink(str) || SymlinkUtil.firstSymlinkIndex(str) < str.length()) {
            callback.onError(new IllegalArgumentException("Cannot create symbolic link for path " + str));
            return;
        }
        try {
            zk().create(str, this._symlinkSerializer.toBytes(str2), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.9
                @Override // org.apache.zookeeper.AsyncCallback.StringCallback
                public void processResult(int i, String str3, Object obj, String str4) {
                    KeeperException.Code code = KeeperException.Code.get(i);
                    switch (code) {
                        case OK:
                            callback.onSuccess(None.none());
                            return;
                        default:
                            callback.onError(KeeperException.create(code));
                            return;
                    }
                }
            }, null);
        } catch (Exception e) {
            callback.onError(e);
        }
    }

    public void setSymlinkData(String str, String str2, Callback<None> callback) {
        if (!SymlinkUtil.containsSymlink(str) || SymlinkUtil.firstSymlinkIndex(str) < str.length()) {
            callback.onError(new IllegalArgumentException("Cannot set data to symbolic link " + str));
        } else {
            setSymlinkData(str, str2, callback, 0);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setSymlinkData(final String str, final String str2, final Callback<None> callback, final int i) {
        final ZooKeeper zk = zk();
        final AsyncCallback.StatCallback statCallback = new AsyncCallback.StatCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.10
            @Override // org.apache.zookeeper.AsyncCallback.StatCallback
            public void processResult(int i2, String str3, Object obj, Stat stat) {
                KeeperException.Code code = KeeperException.Code.get(i2);
                switch (AnonymousClass12.$SwitchMap$org$apache$zookeeper$KeeperException$Code[code.ordinal()]) {
                    case 1:
                        callback.onSuccess(None.none());
                        return;
                    case 4:
                        if (i < 10) {
                            ZKConnection.this.setSymlinkData(str, str2, callback, i + 1);
                            return;
                        }
                        return;
                    default:
                        callback.onError(KeeperException.create(code));
                        return;
                }
            }
        };
        AsyncCallback.StatCallback statCallback2 = new AsyncCallback.StatCallback() { // from class: com.linkedin.d2.discovery.stores.zk.ZKConnection.11
            @Override // org.apache.zookeeper.AsyncCallback.StatCallback
            public void processResult(int i2, String str3, Object obj, Stat stat) {
                KeeperException.Code code = KeeperException.Code.get(i2);
                switch (AnonymousClass12.$SwitchMap$org$apache$zookeeper$KeeperException$Code[code.ordinal()]) {
                    case 1:
                        zk.setData(str, ZKConnection.this._symlinkSerializer.toBytes(str2), stat.getVersion(), statCallback, null);
                        return;
                    case 3:
                        ZKConnection.this.createSymlink(str, str2, callback);
                        return;
                    default:
                        callback.onError(KeeperException.create(code));
                        return;
                }
            }
        };
        try {
            if (zk instanceof SymlinkAwareZooKeeper) {
                ((SymlinkAwareZooKeeper) zk).rawExists(str, (Watcher) null, statCallback2, (Object) null);
            } else {
                zk.exists(str, (Watcher) null, statCallback2, (Object) null);
            }
        } catch (Exception e) {
            callback.onError(e);
        }
    }
}
