package com.twitter.distributedlog.lock;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:com/twitter/distributedlog/lock/DistributedLock.class */
public class DistributedLock implements LockListener, AsyncCloseable {
    static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);
    private final SessionLockFactory lockFactory;
    private final OrderedScheduler lockStateExecutor;
    private final String lockPath;
    private final long lockTimeout;
    private final DistributedLockContext lockContext = new DistributedLockContext();
    private Future<DistributedLock> lockAcquireFuture = null;
    private Future<DistributedLock> lockReacquireFuture = null;
    private SessionLock internalLock = null;
    private Future<LockWaiter> tryLockFuture = null;
    private LockWaiter lockWaiter = null;
    private LockingException lockReacquireException = null;
    private volatile boolean closed = false;
    private Future<Void> closeFuture = null;
    private final AtomicInteger reacquireCount = new AtomicInteger(0);
    private final StatsLogger lockStatsLogger;
    private final OpStatsLogger acquireStats;
    private final OpStatsLogger reacquireStats;
    private final Counter internalTryRetries;

    public DistributedLock(OrderedScheduler orderedScheduler, SessionLockFactory sessionLockFactory, String str, long j, StatsLogger statsLogger) {
        this.lockStateExecutor = orderedScheduler;
        this.lockPath = str;
        this.lockTimeout = j;
        this.lockFactory = sessionLockFactory;
        this.lockStatsLogger = statsLogger.scope("lock");
        this.acquireStats = this.lockStatsLogger.getOpStatsLogger("acquire");
        this.reacquireStats = this.lockStatsLogger.getOpStatsLogger("reacquire");
        this.internalTryRetries = this.lockStatsLogger.getCounter("internalTryRetries");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LockClosedException newLockClosedException() {
        return new LockClosedException(this.lockPath, "Lock is already closed");
    }

    private synchronized void checkLockState() throws LockingException {
        if (this.closed) {
            throw newLockClosedException();
        }
        if (null != this.lockReacquireException) {
            throw this.lockReacquireException;
        }
    }

    public synchronized Future<DistributedLock> asyncAcquire() {
        if (null != this.lockAcquireFuture) {
            return Future.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + this.lockPath));
        }
        final Promise promise = new Promise(new Function<Throwable, BoxedUnit>() { // from class: com.twitter.distributedlog.lock.DistributedLock.1
            public BoxedUnit apply(Throwable th) {
                DistributedLock.this.lockStateExecutor.submit(DistributedLock.this.lockPath, new Runnable() { // from class: com.twitter.distributedlog.lock.DistributedLock.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        DistributedLock.this.asyncClose();
                    }
                });
                return BoxedUnit.UNIT;
            }
        });
        final Stopwatch createStarted = Stopwatch.createStarted();
        promise.addEventListener(new FutureEventListener<DistributedLock>() { // from class: com.twitter.distributedlog.lock.DistributedLock.2
            public void onSuccess(DistributedLock distributedLock) {
                DistributedLock.this.acquireStats.registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            }

            public void onFailure(Throwable th) {
                DistributedLock.this.acquireStats.registerFailedEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
                DistributedLock.this.asyncClose();
            }
        });
        this.lockAcquireFuture = promise;
        this.lockStateExecutor.submit(this.lockPath, new Runnable() { // from class: com.twitter.distributedlog.lock.DistributedLock.3
            @Override // java.lang.Runnable
            public void run() {
                DistributedLock.this.doAsyncAcquire(promise, DistributedLock.this.lockTimeout);
            }
        });
        return promise;
    }

    void doAsyncAcquire(final Promise<DistributedLock> promise, final long j) {
        LOG.trace("Async Lock Acquire {}", this.lockPath);
        try {
            checkLockState();
            this.lockFactory.createLock(this.lockPath, this.lockContext).addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<SessionLock>() { // from class: com.twitter.distributedlog.lock.DistributedLock.4
                public void onSuccess(SessionLock sessionLock) {
                    synchronized (DistributedLock.this) {
                        if (DistributedLock.this.closed) {
                            DistributedLock.LOG.info("Skipping tryLocking lock {} since it is already closed", DistributedLock.this.lockPath);
                            FutureUtils.setException(promise, DistributedLock.this.newLockClosedException());
                            return;
                        }
                        synchronized (DistributedLock.this) {
                            DistributedLock.this.internalLock = sessionLock;
                            DistributedLock.this.internalLock.setLockListener(DistributedLock.this);
                        }
                        DistributedLock.this.asyncTryLock(sessionLock, promise, j);
                    }
                }

                public void onFailure(Throwable th) {
                    FutureUtils.setException(promise, th);
                }
            }, this.lockStateExecutor, this.lockPath));
        } catch (IOException e) {
            FutureUtils.setException(promise, e);
        }
    }

    void asyncTryLock(SessionLock sessionLock, final Promise<DistributedLock> promise, long j) {
        if (null != this.tryLockFuture) {
            this.tryLockFuture.cancel();
        }
        this.tryLockFuture = sessionLock.asyncTryLock(j, TimeUnit.MILLISECONDS);
        this.tryLockFuture.addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<LockWaiter>() { // from class: com.twitter.distributedlog.lock.DistributedLock.5
            public void onSuccess(LockWaiter lockWaiter) {
                synchronized (DistributedLock.this) {
                    if (DistributedLock.this.closed) {
                        DistributedLock.LOG.info("Skipping acquiring lock {} since it is already closed", DistributedLock.this.lockPath);
                        lockWaiter.getAcquireFuture().raise(new LockingException(DistributedLock.this.lockPath, "lock is already closed."));
                        FutureUtils.setException(promise, DistributedLock.this.newLockClosedException());
                    } else {
                        DistributedLock.this.tryLockFuture = null;
                        DistributedLock.this.lockWaiter = lockWaiter;
                        DistributedLock.this.waitForAcquire(lockWaiter, promise);
                    }
                }
            }

            public void onFailure(Throwable th) {
                FutureUtils.setException(promise, th);
            }
        }, this.lockStateExecutor, this.lockPath));
    }

    void waitForAcquire(final LockWaiter lockWaiter, final Promise<DistributedLock> promise) {
        lockWaiter.getAcquireFuture().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<Boolean>() { // from class: com.twitter.distributedlog.lock.DistributedLock.6
            public void onSuccess(Boolean bool) {
                DistributedLock.LOG.info("{} acquired lock {}", lockWaiter, DistributedLock.this.lockPath);
                if (bool.booleanValue()) {
                    FutureUtils.setValue(promise, DistributedLock.this);
                } else {
                    FutureUtils.setException(promise, new OwnershipAcquireFailedException(DistributedLock.this.lockPath, lockWaiter.getCurrentOwner()));
                }
            }

            public void onFailure(Throwable th) {
                FutureUtils.setException(promise, th);
            }
        }, this.lockStateExecutor, this.lockPath));
    }

    @Override // com.twitter.distributedlog.lock.LockListener
    public void onExpired() {
        try {
            reacquireLock(false);
        } catch (LockingException e) {
            LOG.error("Locking exception on re-acquiring lock {} : ", this.lockPath, e);
        }
    }

    public synchronized void checkOwnershipAndReacquire() throws LockingException {
        if (null == this.lockAcquireFuture || !this.lockAcquireFuture.isDefined()) {
            throw new LockingException(this.lockPath, "check ownership before acquiring");
        }
        if (haveLock()) {
            return;
        }
        reacquireLock(true);
    }

    public synchronized void checkOwnership() throws LockingException {
        if (null == this.lockAcquireFuture || !this.lockAcquireFuture.isDefined()) {
            throw new LockingException(this.lockPath, "check ownership before acquiring");
        }
        if (!haveLock()) {
            throw new LockingException(this.lockPath, "Lost lock ownership");
        }
    }

    @VisibleForTesting
    int getReacquireCount() {
        return this.reacquireCount.get();
    }

    @VisibleForTesting
    Future<DistributedLock> getLockReacquireFuture() {
        return this.lockReacquireFuture;
    }

    @VisibleForTesting
    Future<DistributedLock> getLockAcquireFuture() {
        return this.lockAcquireFuture;
    }

    @VisibleForTesting
    synchronized SessionLock getInternalLock() {
        return this.internalLock;
    }

    @VisibleForTesting
    LockWaiter getLockWaiter() {
        return this.lockWaiter;
    }

    synchronized boolean haveLock() {
        return (this.closed || this.internalLock == null || !this.internalLock.isLockHeld()) ? false : true;
    }

    void closeWaiter(LockWaiter lockWaiter, final Promise<Void> promise) {
        if (null == lockWaiter) {
            interruptTryLock(this.tryLockFuture, promise);
        } else {
            lockWaiter.getAcquireFuture().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<Boolean>() { // from class: com.twitter.distributedlog.lock.DistributedLock.7
                public void onSuccess(Boolean bool) {
                    DistributedLock.this.unlockInternalLock(promise);
                }

                public void onFailure(Throwable th) {
                    DistributedLock.this.unlockInternalLock(promise);
                }
            }, this.lockStateExecutor, this.lockPath));
            FutureUtils.cancel(lockWaiter.getAcquireFuture());
        }
    }

    void interruptTryLock(Future<LockWaiter> future, final Promise<Void> promise) {
        if (null == future) {
            unlockInternalLock(promise);
        } else {
            future.addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<LockWaiter>() { // from class: com.twitter.distributedlog.lock.DistributedLock.8
                public void onSuccess(LockWaiter lockWaiter) {
                    DistributedLock.this.closeWaiter(lockWaiter, promise);
                }

                public void onFailure(Throwable th) {
                    DistributedLock.this.unlockInternalLock(promise);
                }
            }, this.lockStateExecutor, this.lockPath));
            FutureUtils.cancel(future);
        }
    }

    synchronized void unlockInternalLock(final Promise<Void> promise) {
        if (this.internalLock == null) {
            FutureUtils.setValue(promise, null);
        } else {
            this.internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() { // from class: com.twitter.distributedlog.lock.DistributedLock.9
                /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                public BoxedUnit m84apply() {
                    FutureUtils.setValue(promise, null);
                    return BoxedUnit.UNIT;
                }
            });
        }
    }

    @Override // com.twitter.distributedlog.io.AsyncCloseable
    public Future<Void> asyncClose() {
        synchronized (this) {
            if (this.closed) {
                return this.closeFuture;
            }
            this.closed = true;
            final Promise promise = new Promise();
            this.closeFuture = promise;
            final Promise promise2 = new Promise();
            promise2.addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<Void>() { // from class: com.twitter.distributedlog.lock.DistributedLock.10
                public void onSuccess(Void r3) {
                    complete();
                }

                public void onFailure(Throwable th) {
                    complete();
                }

                private void complete() {
                    FutureUtils.setValue(promise, null);
                }
            }, this.lockStateExecutor, this.lockPath));
            this.lockStateExecutor.submit(this.lockPath, new Runnable() { // from class: com.twitter.distributedlog.lock.DistributedLock.11
                @Override // java.lang.Runnable
                public void run() {
                    DistributedLock.this.closeWaiter(DistributedLock.this.lockWaiter, promise2);
                }
            });
            return promise;
        }
    }

    void internalReacquireLock(final AtomicInteger atomicInteger, final long j, final Promise<DistributedLock> promise) {
        this.lockStateExecutor.submit(this.lockPath, new Runnable() { // from class: com.twitter.distributedlog.lock.DistributedLock.12
            @Override // java.lang.Runnable
            public void run() {
                DistributedLock.this.doInternalReacquireLock(atomicInteger, j, promise);
            }
        });
    }

    void doInternalReacquireLock(final AtomicInteger atomicInteger, final long j, final Promise<DistributedLock> promise) {
        this.internalTryRetries.inc();
        Promise<DistributedLock> promise2 = new Promise<>();
        promise2.addEventListener(new FutureEventListener<DistributedLock>() { // from class: com.twitter.distributedlog.lock.DistributedLock.13
            public void onSuccess(DistributedLock distributedLock) {
                FutureUtils.setValue(promise, distributedLock);
            }

            public void onFailure(Throwable th) {
                if (th instanceof OwnershipAcquireFailedException) {
                    FutureUtils.setException(promise, th);
                } else if (atomicInteger.getAndDecrement() <= 0 || DistributedLock.this.closed) {
                    FutureUtils.setException(promise, th);
                } else {
                    DistributedLock.this.internalReacquireLock(atomicInteger, j, promise);
                }
            }
        });
        doAsyncAcquire(promise2, 0L);
    }

    private Future<DistributedLock> reacquireLock(boolean z) throws LockingException {
        final Stopwatch createStarted = Stopwatch.createStarted();
        synchronized (this) {
            if (this.closed) {
                throw newLockClosedException();
            }
            if (null != this.lockReacquireException) {
                if (z) {
                    throw this.lockReacquireException;
                }
                return null;
            }
            if (null != this.lockReacquireFuture) {
                return this.lockReacquireFuture;
            }
            LOG.info("reacquiring lock at {}", this.lockPath);
            Promise<DistributedLock> promise = new Promise<>();
            this.lockReacquireFuture = promise;
            this.lockReacquireFuture.addEventListener(new FutureEventListener<DistributedLock>() { // from class: com.twitter.distributedlog.lock.DistributedLock.14
                public void onSuccess(DistributedLock distributedLock) {
                    synchronized (DistributedLock.this) {
                        DistributedLock.this.lockReacquireFuture = null;
                    }
                    DistributedLock.this.reacquireStats.registerSuccessfulEvent(createStarted.elapsed(TimeUnit.MICROSECONDS));
                }

                public void onFailure(Throwable th) {
                    synchronized (DistributedLock.this) {
                        if (th instanceof LockingException) {
                            DistributedLock.this.lockReacquireException = (LockingException) th;
                        } else {
                            DistributedLock.this.lockReacquireException = new LockingException(DistributedLock.this.lockPath, "Exception on re-acquiring lock", th);
                        }
                    }
                    DistributedLock.this.reacquireStats.registerFailedEvent(createStarted.elapsed(TimeUnit.MICROSECONDS));
                }
            });
            this.reacquireCount.incrementAndGet();
            internalReacquireLock(new AtomicInteger(DistributedLogConfiguration.BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS_DEFAULT), 0L, promise);
            return promise;
        }
    }
}
