package org.apache.fluo.core.oracle;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.core.impl.CuratorCnxnListener;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.metrics.MetricsUtil;
import org.apache.fluo.core.thrift.OracleService;
import org.apache.fluo.core.thrift.Stamps;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/fluo/core/oracle/OracleClient.class */
public class OracleClient implements AutoCloseable {
    public static final Logger log = LoggerFactory.getLogger(OracleClient.class);
    private static final int MAX_ORACLE_WAIT_PERIOD = 60;
    private final Timer responseTimer;
    private final Histogram stampsHistogram;
    private Participant currentLeader;
    private final Environment env;
    private final ArrayBlockingQueue<TimeRequest> queue = new ArrayBlockingQueue<>(10000);
    private AtomicBoolean closed = new AtomicBoolean(false);
    private final TimestampRetriever timestampRetriever = new TimestampRetriever();
    private final Thread thread = new Thread(this.timestampRetriever);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/fluo/core/oracle/OracleClient$TimeRequest.class */
    public static final class TimeRequest implements Callable<Stamp> {
        CountDownLatch cdl;
        AtomicReference<Stamp> stampRef;
        ListenableFutureTask<Stamp> lf;

        private TimeRequest() {
            this.cdl = new CountDownLatch(1);
            this.stampRef = new AtomicReference<>();
            this.lf = null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Stamp call() throws Exception {
            return this.stampRef.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/fluo/core/oracle/OracleClient$TimestampRetriever.class */
    public class TimestampRetriever extends LeaderSelectorListenerAdapter implements Runnable, PathChildrenCacheListener {
        private LeaderSelector leaderSelector;
        private CuratorFramework curatorFramework;
        private OracleService.Client client;
        private PathChildrenCache pathChildrenCache;
        private TTransport transport;

        private TimestampRetriever() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                synchronized (this) {
                    if (OracleClient.this.closed.get()) {
                        return;
                    }
                    this.curatorFramework = CuratorUtil.newAppCurator(OracleClient.this.env.getConfiguration());
                    CuratorCnxnListener curatorCnxnListener = new CuratorCnxnListener();
                    this.curatorFramework.getConnectionStateListenable().addListener(curatorCnxnListener);
                    this.curatorFramework.start();
                    while (!curatorCnxnListener.isConnected()) {
                        Thread.sleep(200L);
                    }
                    this.pathChildrenCache = new PathChildrenCache(this.curatorFramework, "/oracle/server", true);
                    this.pathChildrenCache.getListenable().addListener(this);
                    this.pathChildrenCache.start();
                    this.leaderSelector = new LeaderSelector(this.curatorFramework, "/oracle/server", this);
                    connect();
                    doWork();
                }
            } catch (Exception e) {
                if (OracleClient.this.closed.get()) {
                    OracleClient.log.debug("Exception occurred in run() method", e);
                } else {
                    OracleClient.log.error("Exception occurred in run() method", e);
                }
            }
        }

        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED) || pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                Participant leader = this.leaderSelector.getLeader();
                synchronized (this) {
                    if (isLeader(leader)) {
                        OracleClient.this.currentLeader = this.leaderSelector.getLeader();
                    } else {
                        OracleClient.this.currentLeader = null;
                    }
                }
            }
        }

        private void doWork() {
            String oracle;
            OracleService.Client client;
            Timer.Context time;
            long j;
            long j2;
            String oracle2;
            ArrayList arrayList = new ArrayList();
            while (true) {
                try {
                    arrayList.clear();
                    TimeRequest timeRequest = null;
                    while (timeRequest == null) {
                        if (OracleClient.this.closed.get()) {
                            return;
                        } else {
                            timeRequest = (TimeRequest) OracleClient.this.queue.poll(1L, TimeUnit.SECONDS);
                        }
                    }
                    arrayList.add(timeRequest);
                    OracleClient.this.queue.drainTo(arrayList);
                    while (true) {
                        try {
                            synchronized (this) {
                                oracle = OracleClient.this.getOracle();
                                client = this.client;
                            }
                            time = OracleClient.this.responseTimer.time();
                            Stamps timestamps = client.getTimestamps(OracleClient.this.env.getFluoApplicationID(), arrayList.size());
                            j = timestamps.txStampsStart;
                            j2 = timestamps.gcStamp;
                            oracle2 = OracleClient.this.getOracle();
                        } catch (TException e) {
                            OracleClient.log.error("TException occurred in doWork() method", e);
                        } catch (TTransportException e2) {
                            OracleClient.log.info("Oracle connection lost. Retrying...");
                            reconnect();
                        }
                        if (oracle2 == null || oracle2.equals(oracle)) {
                            break;
                        } else {
                            reconnect();
                        }
                    }
                    OracleClient.this.stampsHistogram.update(arrayList.size());
                    time.close();
                    for (int i = 0; i < arrayList.size(); i++) {
                        TimeRequest timeRequest2 = (TimeRequest) arrayList.get(i);
                        timeRequest2.stampRef.set(new Stamp(j + i, j2));
                        if (timeRequest2.lf == null) {
                            timeRequest2.cdl.countDown();
                        } else {
                            timeRequest2.lf.run();
                        }
                    }
                } catch (InterruptedException e3) {
                    if (OracleClient.this.closed.get()) {
                        OracleClient.log.debug("InterruptedException occurred in doWork() method", e3);
                    } else {
                        OracleClient.log.error("InterruptedException occurred in doWork() method", e3);
                    }
                } catch (Exception e4) {
                    OracleClient.log.error("Exception occurred in doWork() method", e4);
                }
            }
        }

        private synchronized void connect() throws IOException, KeeperException, InterruptedException, TTransportException {
            getLeader();
            while (true) {
                OracleClient.log.debug("Connecting to oracle at " + OracleClient.this.currentLeader.getId());
                String[] split = OracleClient.this.currentLeader.getId().split(":");
                try {
                    this.transport = new TFastFramedTransport(new TSocket(split[0], Integer.parseInt(split[1])));
                    this.transport.open();
                    this.client = new OracleService.Client(new TCompactProtocol(this.transport));
                    OracleClient.log.info("Connected to oracle at " + OracleClient.this.getOracle());
                    return;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } catch (TTransportException e2) {
                    sleepRandom();
                    getLeader();
                }
            }
        }

        private synchronized void reconnect() throws InterruptedException, TTransportException, KeeperException, IOException {
            if (this.transport.isOpen()) {
                this.transport.close();
            }
            connect();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void close() {
            if (this.transport != null && this.transport.isOpen()) {
                this.transport.close();
            }
            try {
                if (this.pathChildrenCache != null) {
                    this.pathChildrenCache.close();
                }
                if (this.curatorFramework != null) {
                    this.curatorFramework.close();
                }
                this.transport = null;
                this.pathChildrenCache = null;
                this.leaderSelector = null;
                this.curatorFramework = null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private boolean getLeaderAttempt() {
            Participant participant = null;
            try {
                participant = this.leaderSelector.getLeader();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } catch (KeeperException e2) {
                OracleClient.log.debug("Exception throw in getLeaderAttempt()", e2);
            }
            if (!isLeader(participant)) {
                return false;
            }
            OracleClient.this.currentLeader = participant;
            return true;
        }

        private void getLeader() {
            boolean leaderAttempt = getLeaderAttempt();
            while (!leaderAttempt) {
                sleepRandom();
                leaderAttempt = getLeaderAttempt();
            }
        }

        private void sleepRandom() {
            UtilWaitThread.sleep(100 + ((long) (1000.0d * Math.random())), OracleClient.this.closed);
        }

        private boolean isLeader(Participant participant) {
            return participant != null && participant.isLeader();
        }

        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        }
    }

    public OracleClient(Environment environment) {
        this.env = environment;
        this.responseTimer = MetricsUtil.getTimer(environment.getConfiguration(), environment.getSharedResources().getMetricRegistry(), environment.getMetricNames().getOracleResponseTime());
        this.stampsHistogram = MetricsUtil.getHistogram(environment.getConfiguration(), environment.getSharedResources().getMetricRegistry(), environment.getMetricNames().getOracleClientStamps());
        this.thread.setDaemon(true);
        this.thread.start();
    }

    public Stamp getStamp() {
        checkClosed();
        TimeRequest timeRequest = new TimeRequest();
        try {
            this.queue.put(timeRequest);
            int clientRetryTimeout = this.env.getConfiguration().getClientRetryTimeout();
            if (clientRetryTimeout < 0) {
                long j = 1;
                long j2 = 0;
                while (!timeRequest.cdl.await(j, TimeUnit.SECONDS)) {
                    checkClosed();
                    j2 += j;
                    if (j < 60) {
                        j *= 2;
                    }
                    log.warn("Waiting for timestamp from Oracle. Is it running? waitTotal={}s waitPeriod={}s", Long.valueOf(j2), Long.valueOf(j));
                }
            } else if (!timeRequest.cdl.await(clientRetryTimeout, TimeUnit.MILLISECONDS)) {
                throw new FluoException("Timed out (after " + clientRetryTimeout + "ms) trying to retrieve timestamp from Oracle.  Is the Oracle running?");
            }
            return timeRequest.stampRef.get();
        } catch (InterruptedException e) {
            throw new FluoException("Interrupted while retrieving timestamp from Oracle", e);
        }
    }

    public ListenableFuture<Stamp> getStampAsync() {
        checkClosed();
        TimeRequest timeRequest = new TimeRequest();
        ListenableFutureTask<Stamp> create = ListenableFutureTask.create(timeRequest);
        timeRequest.lf = create;
        try {
            this.queue.put(timeRequest);
            return create;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized String getOracle() {
        checkClosed();
        if (this.currentLeader != null) {
            return this.currentLeader.getId();
        }
        return null;
    }

    private void checkClosed() {
        if (this.closed.get()) {
            throw new IllegalStateException(OracleClient.class.getSimpleName() + " is closed");
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.closed.get()) {
            return;
        }
        this.closed.set(true);
        try {
            this.thread.interrupt();
            this.thread.join();
            this.timestampRetriever.close();
        } catch (InterruptedException e) {
            throw new FluoException("Interrupted during close", e);
        }
    }
}
