package gobblin.runtime.locks;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/locks/ZookeeperBasedJobLock.class */
public class ZookeeperBasedJobLock implements ListenableJobLock {
    private static final String LOCKS_ROOT_PATH = "/locks";
    private static final String CONNECTION_STRING_DEFAULT = "localhost:2181";
    private static final int LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS_DEFAULT = 5000;
    private static final int CONNECTION_TIMEOUT_SECONDS_DEFAULT = 30;
    private static final int SESSION_TIMEOUT_SECONDS_DEFAULT = 180;
    private static final int RETRY_BACKOFF_SECONDS_DEFAULT = 1;
    private static final int MAX_RETRY_COUNT_DEFAULT = 10;
    private static CuratorFramework curatorFramework;
    private static Thread curatorFrameworkShutdownHook;
    public static final String LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS = "gobblin.locks.zookeeper.acquire.timeout_milliseconds";
    public static final String CONNECTION_STRING = "gobblin.locks.zookeeper.connection_string";
    public static final String CONNECTION_TIMEOUT_SECONDS = "gobblin.locks.zookeeper.connection.timeout_seconds";
    public static final String SESSION_TIMEOUT_SECONDS = "gobblin.locks.zookeeper.session.timeout_seconds";
    public static final String RETRY_BACKOFF_SECONDS = "gobblin.locks.zookeeper.retry.backoff_seconds";
    public static final String MAX_RETRY_COUNT = "gobblin.locks.zookeeper.retry.max_count";
    private String lockPath;
    private long lockAcquireTimeoutMilliseconds;
    private InterProcessLock lock;
    private static final Logger log = LoggerFactory.getLogger(ZookeeperBasedJobLock.class);
    private static ConcurrentMap<String, JobLockEventListener> lockEventListeners = Maps.newConcurrentMap();

    /* renamed from: gobblin.runtime.locks.ZookeeperBasedJobLock$2, reason: invalid class name */
    /* loaded from: input_file:gobblin/runtime/locks/ZookeeperBasedJobLock$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$state$ConnectionState = new int[ConnectionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.LOST.ordinal()] = ZookeeperBasedJobLock.RETRY_BACKOFF_SECONDS_DEFAULT;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.SUSPENDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.CONNECTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.RECONNECTED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.READ_ONLY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/runtime/locks/ZookeeperBasedJobLock$CuratorFrameworkShutdownHook.class */
    public static class CuratorFrameworkShutdownHook extends Thread {
        private CuratorFrameworkShutdownHook() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            ZookeeperBasedJobLock.log.info("Shutting down curator framework...");
            try {
                ZookeeperBasedJobLock.shutdownCuratorFramework();
                ZookeeperBasedJobLock.log.info("Curator framework shut down.");
            } catch (Exception e) {
                ZookeeperBasedJobLock.log.error("Error while shutting down curator framework.", e);
            }
        }
    }

    public ZookeeperBasedJobLock(Properties properties) throws JobLockException {
        String property = properties.getProperty("job.name");
        this.lockAcquireTimeoutMilliseconds = getLong(properties, LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS, 5000L);
        this.lockPath = Paths.get(LOCKS_ROOT_PATH, property).toString();
        initializeCuratorFramework(properties);
        this.lock = new InterProcessSemaphoreMutex(curatorFramework, this.lockPath);
    }

    @Override // gobblin.runtime.locks.ListenableJobLock
    public void setEventListener(JobLockEventListener jobLockEventListener) {
        lockEventListeners.putIfAbsent(this.lockPath, jobLockEventListener);
    }

    @Override // gobblin.runtime.locks.JobLock
    public void lock() throws JobLockException {
        try {
            this.lock.acquire();
        } catch (Exception e) {
            throw new JobLockException("Failed to acquire lock " + this.lockPath, e);
        }
    }

    @Override // gobblin.runtime.locks.JobLock
    public void unlock() throws JobLockException {
        if (this.lock.isAcquiredInThisProcess()) {
            try {
                this.lock.release();
            } catch (Exception e) {
                throw new JobLockException("Failed to release lock " + this.lockPath, e);
            }
        }
    }

    @Override // gobblin.runtime.locks.JobLock
    public boolean tryLock() throws JobLockException {
        try {
            return this.lock.acquire(this.lockAcquireTimeoutMilliseconds, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new JobLockException("Failed to acquire lock " + this.lockPath, e);
        }
    }

    @Override // gobblin.runtime.locks.JobLock
    public boolean isLocked() throws JobLockException {
        return this.lock.isAcquiredInThisProcess();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            try {
                unlock();
                lockEventListeners.remove(this.lockPath);
            } catch (JobLockException e) {
                throw new IOException(e);
            }
        } catch (Throwable th) {
            lockEventListeners.remove(this.lockPath);
            throw th;
        }
    }

    private static synchronized void initializeCuratorFramework(Properties properties) {
        if (curatorFrameworkShutdownHook == null) {
            curatorFrameworkShutdownHook = new CuratorFrameworkShutdownHook();
            Runtime.getRuntime().addShutdownHook(curatorFrameworkShutdownHook);
        }
        if (curatorFramework == null) {
            CuratorFramework build = CuratorFrameworkFactory.builder().connectString(properties.getProperty(CONNECTION_STRING, CONNECTION_STRING_DEFAULT)).connectionTimeoutMs(getMilliseconds(properties, CONNECTION_TIMEOUT_SECONDS, CONNECTION_TIMEOUT_SECONDS_DEFAULT)).sessionTimeoutMs(getMilliseconds(properties, SESSION_TIMEOUT_SECONDS, SESSION_TIMEOUT_SECONDS_DEFAULT)).retryPolicy(new ExponentialBackoffRetry(getMilliseconds(properties, RETRY_BACKOFF_SECONDS, RETRY_BACKOFF_SECONDS_DEFAULT), getInt(properties, MAX_RETRY_COUNT, MAX_RETRY_COUNT_DEFAULT))).build();
            build.getConnectionStateListenable().addListener(new ConnectionStateListener() { // from class: gobblin.runtime.locks.ZookeeperBasedJobLock.1
                public void stateChanged(CuratorFramework curatorFramework2, ConnectionState connectionState) {
                    switch (AnonymousClass2.$SwitchMap$org$apache$curator$framework$state$ConnectionState[connectionState.ordinal()]) {
                        case ZookeeperBasedJobLock.RETRY_BACKOFF_SECONDS_DEFAULT /* 1 */:
                            ZookeeperBasedJobLock.log.warn("Lost connection with zookeeper");
                            for (Map.Entry entry : ZookeeperBasedJobLock.lockEventListeners.entrySet()) {
                                ZookeeperBasedJobLock.log.warn("Informing job %s that lock was lost", entry.getKey());
                                ((JobLockEventListener) entry.getValue()).onLost();
                            }
                            return;
                        case 2:
                            ZookeeperBasedJobLock.log.warn("Lost connection with zookeeper");
                            for (Map.Entry entry2 : ZookeeperBasedJobLock.lockEventListeners.entrySet()) {
                                ZookeeperBasedJobLock.log.warn("Informing job %s that lock was lost", entry2.getKey());
                                ((JobLockEventListener) entry2.getValue()).onLost();
                            }
                            return;
                        case 3:
                            ZookeeperBasedJobLock.log.info("Connected with zookeeper");
                            return;
                        case 4:
                            ZookeeperBasedJobLock.log.warn("Regained connection with zookeeper");
                            return;
                        case 5:
                            ZookeeperBasedJobLock.log.warn("Zookeeper connection went into read-only mode");
                            return;
                        default:
                            return;
                    }
                }
            });
            build.start();
            try {
                if (!build.blockUntilConnected(getInt(properties, CONNECTION_TIMEOUT_SECONDS, CONNECTION_TIMEOUT_SECONDS_DEFAULT), TimeUnit.SECONDS)) {
                    throw new RuntimeException("Time out while waiting to connect to zookeeper");
                }
                curatorFramework = build;
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting to connect to zookeeper");
            }
        }
    }

    @VisibleForTesting
    static synchronized void shutdownCuratorFramework() {
        if (curatorFramework != null) {
            curatorFramework.close();
            curatorFramework = null;
        }
    }

    private static int getInt(Properties properties, String str, int i) {
        return Integer.parseInt(properties.getProperty(str, Integer.toString(i)));
    }

    private static long getLong(Properties properties, String str, long j) {
        return Long.parseLong(properties.getProperty(str, Long.toString(j)));
    }

    private static int getMilliseconds(Properties properties, String str, int i) {
        return getInt(properties, str, i) * 1000;
    }
}
