/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.UUID;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperLeaderElectionService
implements LeaderElectionService,
LeaderLatchListener,
NodeCacheListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
    private final CuratorFramework client;
    private final LeaderLatch leaderLatch;
    private final NodeCache cache;
    private final String leaderPath;
    private UUID issuedLeaderSessionID;
    private UUID confirmedLeaderSessionID;
    private volatile LeaderContender leaderContender;
    private final Object lock = new Object();

    public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) {
        this.client = client;
        this.leaderPath = leaderPath;
        this.leaderLatch = new LeaderLatch(client, latchPath);
        this.cache = new NodeCache(client, leaderPath);
    }

    public UUID getLeaderSessionID() {
        return this.confirmedLeaderSessionID;
    }

    @Override
    public void start(LeaderContender contender) throws Exception {
        Preconditions.checkNotNull(contender, "Contender must not be null.");
        Preconditions.checkState(this.leaderContender == null, "Contender was already set.");
        LOG.info("Starting ZooKeeperLeaderElectionService.");
        this.leaderContender = contender;
        this.leaderLatch.addListener(this);
        this.leaderLatch.start();
        this.cache.getListenable().addListener(this);
        this.cache.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        LOG.info("Stopping ZooKeeperLeaderElectionService.");
        this.cache.close();
        this.leaderLatch.close();
        this.client.close();
        Object object = this.lock;
        synchronized (object) {
            this.confirmedLeaderSessionID = null;
            this.issuedLeaderSessionID = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void confirmLeaderSessionID(UUID leaderSessionID) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Confirm leader session ID {} for leader {}.", (Object)leaderSessionID, (Object)this.leaderContender.getAddress());
        }
        Preconditions.checkNotNull(leaderSessionID);
        if (this.leaderLatch.hasLeadership()) {
            Object object = this.lock;
            synchronized (object) {
                if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
                    this.confirmedLeaderSessionID = leaderSessionID;
                    this.writeLeaderInformation(this.confirmedLeaderSessionID);
                }
            }
        } else {
            LOG.warn("The leader session ID {} was confirmed even though thecorresponding JobManager was not elected as the leader.", (Object)leaderSessionID);
        }
    }

    @Override
    public boolean hasLeadership() {
        return this.leaderLatch.hasLeadership();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void isLeader() {
        Object object = this.lock;
        synchronized (object) {
            this.issuedLeaderSessionID = UUID.randomUUID();
            this.confirmedLeaderSessionID = null;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Grant leadership to contender {} with session ID {}.", (Object)this.leaderContender.getAddress(), (Object)this.issuedLeaderSessionID);
            }
            this.leaderContender.grantLeadership(this.issuedLeaderSessionID);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notLeader() {
        Object object = this.lock;
        synchronized (object) {
            this.issuedLeaderSessionID = null;
            this.confirmedLeaderSessionID = null;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Revoke leadership of {}.", (Object)this.leaderContender.getAddress());
            }
            this.leaderContender.revokeLeadership();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void nodeChanged() throws Exception {
        block15: {
            try {
                if (!this.leaderLatch.hasLeadership()) break block15;
                Object object = this.lock;
                synchronized (object) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Leader node changed while {} is the leader with session ID {}.", (Object)this.leaderContender.getAddress(), (Object)this.confirmedLeaderSessionID);
                    }
                    if (this.confirmedLeaderSessionID != null) {
                        ChildData childData = this.cache.getCurrentData();
                        if (childData == null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Writing leader information into empty node by {}.", (Object)this.leaderContender.getAddress());
                            }
                            this.writeLeaderInformation(this.confirmedLeaderSessionID);
                        } else {
                            byte[] data = childData.getData();
                            if (data == null || data.length == 0) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Writing leader information into node with empty data field by {}.", (Object)this.leaderContender.getAddress());
                                }
                                this.writeLeaderInformation(this.confirmedLeaderSessionID);
                            } else {
                                ByteArrayInputStream bais = new ByteArrayInputStream(data);
                                ObjectInputStream ois = new ObjectInputStream(bais);
                                String leaderAddress = ois.readUTF();
                                UUID leaderSessionID = (UUID)ois.readObject();
                                if (!leaderAddress.equals(this.leaderContender.getAddress()) || leaderSessionID == null || !leaderSessionID.equals(this.confirmedLeaderSessionID)) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug("Correcting leader information by {}.", (Object)this.leaderContender.getAddress());
                                    }
                                    this.writeLeaderInformation(this.confirmedLeaderSessionID);
                                }
                            }
                        }
                    }
                }
            }
            catch (Exception e) {
                this.leaderContender.handleError(new Exception("Could not handle node changed event.", e));
                throw e;
            }
        }
    }

    protected void writeLeaderInformation(UUID leaderSessionID) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Write leader information: Leader={}, session ID={}.", (Object)this.leaderContender.getAddress(), (Object)leaderSessionID);
            }
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeUTF(this.leaderContender.getAddress());
            oos.writeObject(leaderSessionID);
            oos.close();
            boolean dataWritten = false;
            while (!dataWritten && this.leaderLatch.hasLeadership()) {
                Stat stat = (Stat)this.client.checkExists().forPath(this.leaderPath);
                if (stat != null) {
                    long sessionID;
                    long owner = stat.getEphemeralOwner();
                    if (owner == (sessionID = this.client.getZookeeperClient().getZooKeeper().getSessionId())) {
                        try {
                            this.client.setData().forPath(this.leaderPath, baos.toByteArray());
                            dataWritten = true;
                        }
                        catch (KeeperException.NoNodeException noNodeException) {}
                        continue;
                    }
                    try {
                        this.client.delete().forPath(this.leaderPath);
                    }
                    catch (KeeperException.NoNodeException noNodeException) {}
                    continue;
                }
                try {
                    ((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(this.leaderPath, baos.toByteArray());
                    dataWritten = true;
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {}
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully wrote leader information: Leader={}, session ID={}.", (Object)this.leaderContender.getAddress(), (Object)leaderSessionID);
            }
        }
        catch (Exception e) {
            this.leaderContender.handleError(new Exception("Could not write leader address and leader session ID to ZooKeeper.", e));
        }
    }
}

