package org.apache.fluo.core.oracle;

import com.codahale.metrics.Histogram;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.fluo.accumulo.util.LongUtil;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.core.impl.CuratorCnxnListener;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.metrics.MetricsUtil;
import org.apache.fluo.core.thrift.OracleService;
import org.apache.fluo.core.thrift.Stamps;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.fluo.core.util.Halt;
import org.apache.fluo.core.util.HostUtil;
import org.apache.fluo.core.util.PortUtils;
import org.apache.rya.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.rya.shaded.com.google.common.base.Preconditions;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/fluo/core/oracle/OracleServer.class */
public class OracleServer extends LeaderSelectorListenerAdapter implements OracleService.Iface, PathChildrenCacheListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OracleServer.class);
    private final Histogram stampsHistogram;
    public static final long ORACLE_MAX_READ_BUFFER_BYTES = 2048;
    private final Environment env;
    private Thread serverThread;
    private THsHaServer server;
    private LeaderSelector leaderSelector;
    private PathChildrenCache pathChildrenCache;
    private CuratorFramework curatorFramework;
    private Participant currentLeader;
    private GcTimestampTracker gcTsTracker;
    private volatile long currentTs = 0;
    private volatile long maxTs = 0;
    private volatile boolean started = false;
    private int port = 0;
    private volatile boolean isLeader = false;
    private CuratorCnxnListener cnxnListener = new CuratorCnxnListener();
    private final String maxTsPath = ZookeeperPath.ORACLE_MAX_TIMESTAMP;
    private final String oraclePath = ZookeeperPath.ORACLE_SERVER;

    /* loaded from: input_file:org/apache/fluo/core/oracle/OracleServer$GcTimestampTracker.class */
    private class GcTimestampTracker {
        private volatile long advertisedGcTimetamp;
        private CuratorFramework curator;
        private Timer timer;

        GcTimestampTracker() throws Exception {
            this.curator = OracleServer.this.env.getSharedResources().getCurator();
        }

        private void updateAdvertisedGcTimestamp(long j) throws Exception {
            if (j <= this.advertisedGcTimetamp || !OracleServer.this.isLeader) {
                return;
            }
            this.advertisedGcTimetamp = j;
            this.curator.setData().forPath(ZookeeperPath.ORACLE_GC_TIMESTAMP, LongUtil.toByteArray(Long.valueOf(this.advertisedGcTimetamp)));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateGcTimestamp() throws Exception {
            List<String> emptyList;
            try {
                emptyList = this.curator.getChildren().forPath(ZookeeperPath.TRANSACTOR_TIMESTAMPS);
            } catch (KeeperException.NoNodeException e) {
                emptyList = Collections.emptyList();
            }
            long j = Long.MAX_VALUE;
            boolean z = false;
            Iterator<String> it = emptyList.iterator();
            while (it.hasNext()) {
                Long fromByteArray = LongUtil.fromByteArray(this.curator.getData().forPath("/transactor/timestamps/" + it.next()));
                z = true;
                if (fromByteArray.longValue() < j) {
                    j = fromByteArray.longValue();
                }
            }
            if (z) {
                updateAdvertisedGcTimestamp(j);
            } else {
                updateAdvertisedGcTimestamp(OracleServer.this.currentTs);
            }
        }

        void start() throws Exception {
            this.advertisedGcTimetamp = LongUtil.fromByteArray(this.curator.getData().forPath(ZookeeperPath.ORACLE_GC_TIMESTAMP)).longValue();
            TimerTask timerTask = new TimerTask() { // from class: org.apache.fluo.core.oracle.OracleServer.GcTimestampTracker.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        GcTimestampTracker.this.updateGcTimestamp();
                    } catch (Exception e) {
                        OracleServer.log.warn("Failed to update GC timestamp.", (Throwable) e);
                    }
                }
            };
            TimerTask timerTask2 = new TimerTask() { // from class: org.apache.fluo.core.oracle.OracleServer.GcTimestampTracker.2
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    OracleServer.log.info("Current timestamp: {}", Long.valueOf(OracleServer.this.currentTs));
                }
            };
            this.timer = new Timer("Oracle gc update timer", true);
            long j = OracleServer.this.env.getConfiguration().getLong(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, FluoConfigurationImpl.ZK_UPDATE_PERIOD_MS_DEFAULT);
            this.timer.schedule(timerTask, j, j);
            this.timer.schedule(timerTask2, 0L, FluoConfiguration.TRANSACTION_ROLLBACK_TIME_DEFAULT);
        }

        void stop() {
            if (this.timer != null) {
                this.timer.cancel();
                this.timer = null;
            }
        }
    }

    public OracleServer(Environment environment) throws Exception {
        this.env = environment;
        this.stampsHistogram = MetricsUtil.getHistogram(environment.getConfiguration(), environment.getSharedResources().getMetricRegistry(), environment.getMetricNames().getOracleServerStamps());
    }

    private void allocateTimestamp() throws Exception {
        Stat stat = new Stat();
        long parseLong = Long.parseLong(new String(this.curatorFramework.getData().storingStatIn(stat).forPath(this.maxTsPath))) + 1000;
        this.curatorFramework.setData().withVersion(stat.getVersion()).forPath(this.maxTsPath, LongUtil.toByteArray(Long.valueOf(parseLong)));
        this.maxTs = parseLong;
        if (!this.isLeader) {
            throw new IllegalStateException();
        }
    }

    @Override // org.apache.fluo.core.thrift.OracleService.Iface
    public Stamps getTimestamps(String str, int i) throws TException {
        long timestampsImpl = getTimestampsImpl(str, i);
        this.stampsHistogram.update(i);
        return new Stamps(timestampsImpl, this.gcTsTracker.advertisedGcTimetamp);
    }

    private synchronized long getTimestampsImpl(String str, int i) throws TException {
        if (!this.started) {
            throw new IllegalStateException("Received timestamp request but Oracle has not started");
        }
        if (!str.equals(this.env.getFluoApplicationID())) {
            throw new IllegalArgumentException("Received timestamp request with a Fluo application ID [" + str + "] that does not match the application ID [" + this.env.getFluoApplicationID() + "] of the Oracle");
        }
        if (!this.isLeader) {
            throw new IllegalStateException("Received timestamp request but Oracle is not leader");
        }
        while (i + this.currentTs >= this.maxTs) {
            try {
                allocateTimestamp();
            } catch (Exception e) {
                throw new TException(e);
            }
        }
        long j = this.currentTs;
        this.currentTs += i;
        return j;
    }

    @Override // org.apache.fluo.core.thrift.OracleService.Iface
    public boolean isLeader() throws TException {
        return this.isLeader;
    }

    private boolean isLeader(Participant participant) {
        return participant != null && participant.isLeader();
    }

    @VisibleForTesting
    public int getPort() {
        return this.port;
    }

    @VisibleForTesting
    public boolean isConnected() {
        return this.started && this.cnxnListener.isConnected();
    }

    private InetSocketAddress startServer() throws TTransportException {
        if (this.env.getConfiguration().containsKey(FluoConfigurationImpl.ORACLE_PORT_PROP)) {
            this.port = this.env.getConfiguration().getInt(FluoConfigurationImpl.ORACLE_PORT_PROP);
            Preconditions.checkArgument(this.port >= 1 && this.port <= 65535, "fluo.impl.oracle.port must be valid port (1-65535)");
        } else {
            this.port = PortUtils.getRandomFreePort();
        }
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.port);
        THsHaServer.Args args = new THsHaServer.Args(new TNonblockingServerSocket(inetSocketAddress));
        args.processor(new OracleService.Processor(this));
        args.maxReadBufferBytes = ORACLE_MAX_READ_BUFFER_BYTES;
        args.inputProtocolFactory(new TCompactProtocol.Factory());
        args.outputProtocolFactory(new TCompactProtocol.Factory());
        this.server = new THsHaServer(args);
        this.serverThread = new Thread(new Runnable() { // from class: org.apache.fluo.core.oracle.OracleServer.1
            @Override // java.lang.Runnable
            public void run() {
                OracleServer.this.server.serve();
            }
        });
        this.serverThread.setDaemon(true);
        this.serverThread.start();
        return inetSocketAddress;
    }

    public synchronized void start() throws Exception {
        if (this.started) {
            throw new IllegalStateException();
        }
        InetSocketAddress startServer = startServer();
        this.curatorFramework = CuratorUtil.newAppCurator(this.env.getConfiguration());
        this.curatorFramework.getConnectionStateListenable().addListener(this.cnxnListener);
        this.curatorFramework.start();
        while (!this.cnxnListener.isConnected()) {
            Thread.sleep(200L);
        }
        this.leaderSelector = new LeaderSelector(this.curatorFramework, ZookeeperPath.ORACLE_SERVER, this);
        String str = HostUtil.getHostName() + ":" + startServer.getPort();
        this.leaderSelector.setId(str);
        log.info("Leader ID = " + str);
        this.leaderSelector.start();
        this.pathChildrenCache = new PathChildrenCache(this.curatorFramework, this.oraclePath, true);
        this.pathChildrenCache.getListenable().addListener(this);
        this.pathChildrenCache.start();
        while (!this.cnxnListener.isConnected()) {
            Thread.sleep(200L);
        }
        log.info("Listening " + startServer);
        this.started = true;
    }

    public synchronized void stop() throws Exception {
        if (this.started) {
            this.server.stop();
            this.serverThread.join();
            if (this.gcTsTracker != null) {
                this.gcTsTracker.stop();
            }
            this.started = false;
            this.currentLeader = null;
            if (this.curatorFramework.getState().equals(CuratorFrameworkState.STARTED)) {
                this.pathChildrenCache.getListenable().removeListener(this);
                this.pathChildrenCache.close();
                this.leaderSelector.close();
                this.curatorFramework.getConnectionStateListenable().removeListener(this);
                this.curatorFramework.close();
            }
            log.info("Oracle server has been stopped.");
        }
    }

    private OracleService.Client getOracleClient(String str, int i) {
        try {
            TFastFramedTransport tFastFramedTransport = new TFastFramedTransport(new TSocket(str, i));
            tFastFramedTransport.open();
            TCompactProtocol tCompactProtocol = new TCompactProtocol(tFastFramedTransport);
            log.info("Former leader was reachable at " + str + ":" + i);
            return new OracleService.Client(tCompactProtocol);
        } catch (TTransportException e) {
            log.debug("Exception thrown in getOracleClient()", (Throwable) e);
            return null;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // org.apache.curator.framework.recipes.leader.LeaderSelectorListener
    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        try {
            if (this.currentLeader != null) {
                String[] split = this.currentLeader.getId().split(":");
                OracleService.Client oracleClient = getOracleClient(split[0], Integer.parseInt(split[1]));
                if (oracleClient != null) {
                    while (oracleClient.isLeader()) {
                        try {
                            Thread.sleep(500L);
                        } catch (Exception e) {
                            log.debug("Exception thrown in takeLeadership()", (Throwable) e);
                        }
                    }
                }
            }
            synchronized (this) {
                long longValue = LongUtil.fromByteArray(curatorFramework.getData().forPath(this.maxTsPath)).longValue();
                this.maxTs = longValue;
                this.currentTs = longValue;
            }
            this.gcTsTracker = new GcTimestampTracker();
            this.gcTsTracker.start();
            this.isLeader = true;
            while (this.started) {
                Thread.sleep(100L);
            }
        } finally {
            this.isLeader = false;
            if (this.started) {
                Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
            }
        }
    }

    @Override // org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
        try {
            if (isConnected() && (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED) || pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))) {
                synchronized (this) {
                    Participant leader = this.leaderSelector.getLeader();
                    if (isLeader(leader) && !this.leaderSelector.hasLeadership()) {
                        this.currentLeader = leader;
                    }
                }
            }
        } catch (InterruptedException e) {
            log.warn("Oracle leadership watcher has been interrupted unexpectedly");
        }
    }
}
