package org.apache.zookeeper.server.quorum;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.TxnLogEntry;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.server.util.MessageTracker;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.8.1.jar:org/apache/zookeeper/server/quorum/Learner.class */
public class Learner {
    QuorumPeer self;
    LearnerZooKeeperServer zk;
    protected BufferedOutputStream bufferedOutput;
    protected Socket sock;
    protected MultipleAddresses leaderAddr;
    protected InputArchive leaderIs;
    protected OutputArchive leaderOs;
    private static final int BUFFERED_MESSAGE_SIZE = 10;
    protected static final Logger LOG = LoggerFactory.getLogger((Class<?>) Learner.class);
    private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100).intValue();
    private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
    public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
    private static boolean asyncSending = Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
    public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync";
    public static final boolean closeSocketAsync = Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC));
    protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
    LearnerSender sender = null;
    protected int leaderProtocolVersion = 1;
    protected final MessageTracker messageTracker = new MessageTracker(10);
    final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.8.1.jar:org/apache/zookeeper/server/quorum/Learner$LeaderConnector.class */
    public class LeaderConnector implements Runnable {
        private AtomicReference<Socket> socket;
        private InetSocketAddress address;
        private CountDownLatch latch;

        LeaderConnector(InetSocketAddress inetSocketAddress, AtomicReference<Socket> atomicReference, CountDownLatch countDownLatch) {
            this.address = inetSocketAddress;
            this.socket = atomicReference;
            this.latch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Thread.currentThread().setName("LeaderConnector-" + this.address);
                Socket connectToLeader = connectToLeader();
                if (connectToLeader != null && connectToLeader.isConnected()) {
                    if (this.socket.compareAndSet(null, connectToLeader)) {
                        Learner.LOG.info("Successfully connected to leader, using address: {}", this.address);
                    } else {
                        Learner.LOG.info("Connection to the leader is already established, close the redundant connection");
                        connectToLeader.close();
                    }
                }
            } catch (Exception e) {
                Learner.LOG.error("Failed connect to {}", this.address, e);
            } finally {
                this.latch.countDown();
            }
        }

        /* JADX WARN: Code restructure failed: missing block: B:20:0x016e, code lost:
        
            return r8;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private java.net.Socket connectToLeader() throws java.io.IOException, org.apache.zookeeper.common.X509Exception, java.lang.InterruptedException {
            /*
                Method dump skipped, instructions count: 367
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.zookeeper.server.quorum.Learner.LeaderConnector.connectToLeader():java.net.Socket");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.8.1.jar:org/apache/zookeeper/server/quorum/Learner$PacketInFlight.class */
    public static class PacketInFlight {
        TxnHeader hdr;
        Record rec;
        TxnDigest digest;

        PacketInFlight() {
        }
    }

    public Socket getSocket() {
        return this.sock;
    }

    public int getPendingRevalidationsCount() {
        return this.pendingRevalidations.size();
    }

    protected static void setAsyncSending(boolean z) {
        asyncSending = z;
        LOG.info("{} = {}", LEARNER_ASYNC_SENDING, Boolean.valueOf(asyncSending));
    }

    protected static boolean getAsyncSending() {
        return asyncSending;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void validateSession(ServerCnxn serverCnxn, long j, int i) throws IOException {
        LOG.info("Revalidating client: 0x{}", Long.toHexString(j));
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeLong(j);
        dataOutputStream.writeInt(i);
        dataOutputStream.close();
        QuorumPacket quorumPacket = new QuorumPacket(6, -1L, byteArrayOutputStream.toByteArray(), null);
        this.pendingRevalidations.put(Long.valueOf(j), serverCnxn);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, 32L, "To validate session 0x" + Long.toHexString(j));
        }
        writePacket(quorumPacket, true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writePacket(QuorumPacket quorumPacket, boolean z) throws IOException {
        if (asyncSending) {
            this.sender.queuePacket(quorumPacket);
        } else {
            writePacketNow(quorumPacket, z);
        }
    }

    void writePacketNow(QuorumPacket quorumPacket, boolean z) throws IOException {
        synchronized (this.leaderOs) {
            if (quorumPacket != null) {
                this.messageTracker.trackSent(quorumPacket.getType());
                this.leaderOs.writeRecord(quorumPacket, "packet");
            }
            if (z) {
                this.bufferedOutput.flush();
            }
        }
    }

    protected void startSendingThread() {
        this.sender = new LearnerSender(this);
        this.sender.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void readPacket(QuorumPacket quorumPacket) throws IOException {
        synchronized (this.leaderIs) {
            this.leaderIs.readRecord(quorumPacket, "packet");
            this.messageTracker.trackReceived(quorumPacket.getType());
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logQuorumPacket(LOG, quorumPacket.getType() == 5 ? 128L : 16L, 'i', quorumPacket);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void request(Request request) throws IOException {
        if (request.isThrottled()) {
            LOG.error("Throttled request sent to leader: {}. Exiting", request);
            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeLong(request.sessionId);
        dataOutputStream.writeInt(request.cxid);
        dataOutputStream.writeInt(request.type);
        if (request.request != null) {
            request.request.rewind();
            byte[] bArr = new byte[request.request.remaining()];
            request.request.get(bArr);
            request.request.rewind();
            dataOutputStream.write(bArr);
        }
        dataOutputStream.close();
        writePacket(new QuorumPacket(1, -1L, byteArrayOutputStream.toByteArray(), request.authInfo), true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public QuorumPeer.QuorumServer findLeader() {
        QuorumPeer.QuorumServer quorumServer = null;
        Vote currentVote = this.self.getCurrentVote();
        Iterator<QuorumPeer.QuorumServer> it = this.self.getView().values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            QuorumPeer.QuorumServer next = it.next();
            if (next.id == currentVote.getId()) {
                next.recreateSocketAddresses();
                quorumServer = next;
                break;
            }
        }
        if (quorumServer == null) {
            LOG.warn("Couldn't find the leader with id = {}", Long.valueOf(currentVote.getId()));
        }
        return quorumServer;
    }

    protected long nanoTime() {
        return System.nanoTime();
    }

    protected void sockConnect(Socket socket, InetSocketAddress inetSocketAddress, int i) throws IOException {
        socket.connect(inetSocketAddress, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void connectToLeader(MultipleAddresses multipleAddresses, String str) throws IOException {
        this.leaderAddr = multipleAddresses;
        Set<InetSocketAddress> allReachableAddressesOrAll = this.self.isMultiAddressReachabilityCheckEnabled() ? multipleAddresses.getAllReachableAddressesOrAll() : multipleAddresses.getAllAddresses();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(allReachableAddressesOrAll.size());
        CountDownLatch countDownLatch = new CountDownLatch(allReachableAddressesOrAll.size());
        AtomicReference atomicReference = new AtomicReference(null);
        Stream<R> map = allReachableAddressesOrAll.stream().map(inetSocketAddress -> {
            return new LeaderConnector(inetSocketAddress, atomicReference, countDownLatch);
        });
        newFixedThreadPool.getClass();
        map.forEach((v1) -> {
            r1.submit(v1);
        });
        try {
            try {
                countDownLatch.await();
                newFixedThreadPool.shutdown();
                try {
                    if (!newFixedThreadPool.awaitTermination(1L, TimeUnit.SECONDS)) {
                        LOG.error("not all the LeaderConnector terminated properly");
                    }
                } catch (InterruptedException e) {
                    LOG.error("Interrupted while terminating LeaderConnector executor.", (Throwable) e);
                }
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted while trying to connect to Leader", (Throwable) e2);
                newFixedThreadPool.shutdown();
                try {
                    if (!newFixedThreadPool.awaitTermination(1L, TimeUnit.SECONDS)) {
                        LOG.error("not all the LeaderConnector terminated properly");
                    }
                } catch (InterruptedException e3) {
                    LOG.error("Interrupted while terminating LeaderConnector executor.", (Throwable) e3);
                }
            }
            if (atomicReference.get() == null) {
                throw new IOException("Failed connect to " + multipleAddresses);
            }
            this.sock = (Socket) atomicReference.get();
            this.sockBeingClosed.set(false);
            this.self.authLearner.authenticate(this.sock, str);
            this.leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(this.sock.getInputStream()));
            this.bufferedOutput = new BufferedOutputStream(this.sock.getOutputStream());
            this.leaderOs = BinaryOutputArchive.getArchive(this.bufferedOutput);
            if (asyncSending) {
                startSendingThread();
            }
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            try {
                if (!newFixedThreadPool.awaitTermination(1L, TimeUnit.SECONDS)) {
                    LOG.error("not all the LeaderConnector terminated properly");
                }
            } catch (InterruptedException e4) {
                LOG.error("Interrupted while terminating LeaderConnector executor.", (Throwable) e4);
            }
            throw th;
        }
    }

    protected Socket createSocket() throws X509Exception, IOException {
        Socket createSSLSocket = this.self.isSslQuorum() ? this.self.getX509Util().createSSLSocket() : new Socket();
        createSSLSocket.setSoTimeout(this.self.tickTime * this.self.initLimit);
        return createSSLSocket;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long registerWithLeader(int i) throws IOException {
        long lastLoggedZxid = this.self.getLastLoggedZxid();
        QuorumPacket quorumPacket = new QuorumPacket();
        quorumPacket.setType(i);
        quorumPacket.setZxid(ZxidUtils.makeZxid(this.self.getAcceptedEpoch(), 0L));
        LearnerInfo learnerInfo = new LearnerInfo(this.self.getMyId(), KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE, this.self.getQuorumVerifier().getVersion());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryOutputArchive.getArchive(byteArrayOutputStream).writeRecord(learnerInfo, "LearnerInfo");
        quorumPacket.setData(byteArrayOutputStream.toByteArray());
        writePacket(quorumPacket, true);
        readPacket(quorumPacket);
        long epochFromZxid = ZxidUtils.getEpochFromZxid(quorumPacket.getZxid());
        if (quorumPacket.getType() != 17) {
            if (epochFromZxid > this.self.getAcceptedEpoch()) {
                this.self.setAcceptedEpoch(epochFromZxid);
            }
            if (quorumPacket.getType() == 10) {
                return quorumPacket.getZxid();
            }
            LOG.error("First packet should have been NEWLEADER");
            throw new IOException("First packet should have been NEWLEADER");
        }
        this.leaderProtocolVersion = ByteBuffer.wrap(quorumPacket.getData()).getInt();
        byte[] bArr = new byte[4];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        if (epochFromZxid > this.self.getAcceptedEpoch()) {
            wrap.putInt((int) this.self.getCurrentEpoch());
            this.self.setAcceptedEpoch(epochFromZxid);
        } else {
            if (epochFromZxid != this.self.getAcceptedEpoch()) {
                throw new IOException("Leaders epoch, " + epochFromZxid + " is less than accepted epoch, " + this.self.getAcceptedEpoch());
            }
            wrap.putInt(-1);
        }
        writePacket(new QuorumPacket(18, lastLoggedZxid, bArr, null), true);
        return ZxidUtils.makeZxid(epochFromZxid, 0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Failed to find 'out' block for switch in B:16:0x01e8. Please report as an issue. */
    public void syncWithLeader(long j) throws Exception {
        QuorumPacket quorumPacket = new QuorumPacket(3, 0L, null, null);
        QuorumPacket quorumPacket2 = new QuorumPacket();
        long epochFromZxid = ZxidUtils.getEpochFromZxid(j);
        QuorumVerifier quorumVerifier = null;
        boolean z = true;
        boolean z2 = false;
        readPacket(quorumPacket2);
        ArrayDeque arrayDeque = new ArrayDeque();
        ArrayDeque<PacketInFlight> arrayDeque2 = new ArrayDeque();
        synchronized (this.zk) {
            if (quorumPacket2.getType() == 13) {
                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(quorumPacket2.getZxid()));
                this.self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                if (this.zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
                    LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
                    z = true;
                    z2 = true;
                } else {
                    z = false;
                }
            } else if (quorumPacket2.getType() == 15) {
                this.self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(quorumPacket2.getZxid()));
                this.zk.getZKDatabase().deserializeSnapshot(this.leaderIs);
                if (!this.self.isReconfigEnabled()) {
                    LOG.debug("Reset config node content from local config after deserialization of snapshot.");
                    this.zk.getZKDatabase().initConfigInZKDatabase(this.self.getQuorumVerifier());
                }
                String readString = this.leaderIs.readString("signature");
                if (!readString.equals("BenWasHere")) {
                    LOG.error("Missing signature. Got {}", readString);
                    throw new IOException("Missing signature");
                }
                this.zk.getZKDatabase().setlastProcessedZxid(quorumPacket2.getZxid());
                z2 = true;
            } else if (quorumPacket2.getType() == 14) {
                this.self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(quorumPacket2.getZxid()));
                if (!this.zk.getZKDatabase().truncateLog(quorumPacket2.getZxid())) {
                    LOG.error("Not able to truncate the log 0x{}", Long.toHexString(quorumPacket2.getZxid()));
                    ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
                }
                this.zk.getZKDatabase().setlastProcessedZxid(quorumPacket2.getZxid());
            } else {
                LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(quorumPacket2));
                ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
            }
            this.zk.getZKDatabase().initConfigInZKDatabase(this.self.getQuorumVerifier());
            this.zk.createSessionTracker();
            long j2 = 0;
            boolean z3 = true;
            boolean z4 = !z;
            while (true) {
                if (this.self.isRunning()) {
                    readPacket(quorumPacket2);
                    switch (quorumPacket2.getType()) {
                        case 2:
                            PacketInFlight packetInFlight = new PacketInFlight();
                            TxnLogEntry deserializeTxn = SerializeUtils.deserializeTxn(quorumPacket2.getData());
                            packetInFlight.hdr = deserializeTxn.getHeader();
                            packetInFlight.rec = deserializeTxn.getTxn();
                            packetInFlight.digest = deserializeTxn.getDigest();
                            if (packetInFlight.hdr.getZxid() != j2 + 1) {
                                LOG.warn("Got zxid 0x{} expected 0x{}", Long.toHexString(packetInFlight.hdr.getZxid()), Long.toHexString(j2 + 1));
                            }
                            j2 = packetInFlight.hdr.getZxid();
                            if (packetInFlight.hdr.getType() == 16) {
                                this.self.setLastSeenQuorumVerifier(this.self.configFromString(new String(((SetDataTxn) packetInFlight.rec).getData(), StandardCharsets.UTF_8)), true);
                            }
                            arrayDeque2.add(packetInFlight);
                        case 4:
                        case 9:
                            PacketInFlight packetInFlight2 = (PacketInFlight) arrayDeque2.peekFirst();
                            if (packetInFlight2.hdr.getZxid() == quorumPacket2.getZxid() && quorumPacket2.getType() == 9) {
                                if (this.self.processReconfig(this.self.configFromString(new String(((SetDataTxn) packetInFlight2.rec).getData(), StandardCharsets.UTF_8)), Long.valueOf(ByteBuffer.wrap(quorumPacket2.getData()).getLong()), Long.valueOf(quorumPacket2.getZxid()), true)) {
                                    throw new Exception("changes proposed in reconfig");
                                }
                            }
                            if (z4) {
                                arrayDeque.add(Long.valueOf(quorumPacket2.getZxid()));
                            } else if (packetInFlight2.hdr.getZxid() != quorumPacket2.getZxid()) {
                                LOG.warn("Committing 0x{}, but next proposal is 0x{}", Long.toHexString(quorumPacket2.getZxid()), Long.toHexString(packetInFlight2.hdr.getZxid()));
                            } else {
                                this.zk.processTxn(packetInFlight2.hdr, packetInFlight2.rec);
                                arrayDeque2.remove();
                            }
                            break;
                        case 8:
                        case 19:
                            PacketInFlight packetInFlight3 = new PacketInFlight();
                            if (quorumPacket2.getType() == 19) {
                                ByteBuffer wrap = ByteBuffer.wrap(quorumPacket2.getData());
                                long j3 = wrap.getLong();
                                byte[] bArr = new byte[wrap.remaining()];
                                wrap.get(bArr);
                                TxnLogEntry deserializeTxn2 = SerializeUtils.deserializeTxn(bArr);
                                packetInFlight3.hdr = deserializeTxn2.getHeader();
                                packetInFlight3.rec = deserializeTxn2.getTxn();
                                packetInFlight3.digest = deserializeTxn2.getDigest();
                                if (this.self.processReconfig(this.self.configFromString(new String(((SetDataTxn) packetInFlight3.rec).getData(), StandardCharsets.UTF_8)), Long.valueOf(j3), Long.valueOf(quorumPacket2.getZxid()), true)) {
                                    throw new Exception("changes proposed in reconfig");
                                }
                            } else {
                                TxnLogEntry deserializeTxn3 = SerializeUtils.deserializeTxn(quorumPacket2.getData());
                                packetInFlight3.rec = deserializeTxn3.getTxn();
                                packetInFlight3.hdr = deserializeTxn3.getHeader();
                                packetInFlight3.digest = deserializeTxn3.getDigest();
                                if (packetInFlight3.hdr.getZxid() != j2 + 1) {
                                    LOG.warn("Got zxid 0x{} expected 0x{}", Long.toHexString(packetInFlight3.hdr.getZxid()), Long.toHexString(j2 + 1));
                                }
                                j2 = packetInFlight3.hdr.getZxid();
                            }
                            if (z4) {
                                arrayDeque2.add(packetInFlight3);
                                arrayDeque.add(Long.valueOf(quorumPacket2.getZxid()));
                            } else {
                                this.zk.processTxn(packetInFlight3.hdr, packetInFlight3.rec);
                            }
                        case 10:
                            LOG.info("Learner received NEWLEADER message");
                            if (quorumPacket2.getData() != null && quorumPacket2.getData().length > 1) {
                                try {
                                    QuorumVerifier configFromString = this.self.configFromString(new String(quorumPacket2.getData(), StandardCharsets.UTF_8));
                                    this.self.setLastSeenQuorumVerifier(configFromString, true);
                                    quorumVerifier = configFromString;
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                            if (z) {
                                this.zk.takeSnapshot(z2);
                            }
                            this.self.setCurrentEpoch(epochFromZxid);
                            z4 = true;
                            z3 = false;
                            this.sock.setSoTimeout(this.self.tickTime * this.self.syncLimit);
                            this.self.setSyncMode(QuorumPeer.SyncMode.NONE);
                            this.zk.startupWithoutServing();
                            if (this.zk instanceof FollowerZooKeeperServer) {
                                FollowerZooKeeperServer followerZooKeeperServer = (FollowerZooKeeperServer) this.zk;
                                for (PacketInFlight packetInFlight4 : arrayDeque2) {
                                    followerZooKeeperServer.logRequest(packetInFlight4.hdr, packetInFlight4.rec, packetInFlight4.digest);
                                }
                                arrayDeque2.clear();
                            }
                            writePacket(new QuorumPacket(3, j, null, null), true);
                            break;
                        case 12:
                            LOG.info("Learner received UPTODATE message");
                            if (quorumVerifier != null && this.self.processReconfig(quorumVerifier, null, null, true)) {
                                throw new Exception("changes proposed in reconfig");
                            }
                            if (z3) {
                                this.zk.takeSnapshot(z2);
                                this.self.setCurrentEpoch(epochFromZxid);
                            }
                            this.self.setZooKeeperServer(this.zk);
                            this.self.adminServer.setZooKeeperServer(this.zk);
                            break;
                    }
                }
            }
        }
        quorumPacket.setZxid(ZxidUtils.makeZxid(epochFromZxid, 0L));
        writePacket(quorumPacket, true);
        this.zk.startServing();
        this.self.updateElectionVote(epochFromZxid);
        if (this.zk instanceof FollowerZooKeeperServer) {
            FollowerZooKeeperServer followerZooKeeperServer2 = (FollowerZooKeeperServer) this.zk;
            for (PacketInFlight packetInFlight5 : arrayDeque2) {
                followerZooKeeperServer2.logRequest(packetInFlight5.hdr, packetInFlight5.rec, packetInFlight5.digest);
            }
            Iterator it = arrayDeque.iterator();
            while (it.hasNext()) {
                followerZooKeeperServer2.commit(((Long) it.next()).longValue());
            }
            return;
        }
        if (!(this.zk instanceof ObserverZooKeeperServer)) {
            throw new UnsupportedOperationException("Unknown server type");
        }
        ObserverZooKeeperServer observerZooKeeperServer = (ObserverZooKeeperServer) this.zk;
        for (PacketInFlight packetInFlight6 : arrayDeque2) {
            Long l = (Long) arrayDeque.peekFirst();
            if (packetInFlight6.hdr.getZxid() != l.longValue()) {
                LOG.warn("Committing 0x{}, but next proposal is 0x{}", Long.toHexString(l.longValue()), Long.toHexString(packetInFlight6.hdr.getZxid()));
            } else {
                arrayDeque.remove();
                Request request = new Request((ServerCnxn) null, packetInFlight6.hdr.getClientId(), packetInFlight6.hdr.getCxid(), packetInFlight6.hdr.getType(), (ByteBuffer) null, (List<Id>) null);
                request.setTxn(packetInFlight6.rec);
                request.setHdr(packetInFlight6.hdr);
                request.setTxnDigest(packetInFlight6.digest);
                observerZooKeeperServer.commitRequest(request);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void revalidate(QuorumPacket quorumPacket) throws IOException {
        DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(quorumPacket.getData()));
        long readLong = dataInputStream.readLong();
        boolean readBoolean = dataInputStream.readBoolean();
        ServerCnxn remove = this.pendingRevalidations.remove(Long.valueOf(readLong));
        if (remove == null) {
            LOG.warn("Missing session 0x{} for validation", Long.toHexString(readLong));
        } else {
            this.zk.finishSessionInit(remove, readBoolean);
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, 32L, "Session 0x" + Long.toHexString(readLong) + " is valid: " + readBoolean);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void ping(QuorumPacket quorumPacket) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        for (Map.Entry<Long, Integer> entry : this.zk.getTouchSnapshot().entrySet()) {
            dataOutputStream.writeLong(entry.getKey().longValue());
            dataOutputStream.writeInt(entry.getValue().intValue());
        }
        writePacket(new QuorumPacket(quorumPacket.getType(), quorumPacket.getZxid(), byteArrayOutputStream.toByteArray(), quorumPacket.getAuthinfo()), true);
    }

    public void shutdown() {
        this.self.setZooKeeperServer(null);
        this.self.closeAllConnections();
        this.self.adminServer.setZooKeeperServer(null);
        if (this.sender != null) {
            this.sender.shutdown();
        }
        closeSocket();
        if (this.zk != null) {
            this.zk.shutdown(this.self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRunning() {
        return this.self.isRunning() && this.zk.isRunning();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeSocket() {
        if (this.sock == null || !this.sockBeingClosed.compareAndSet(false, true)) {
            return;
        }
        if (!closeSocketAsync) {
            closeSockSync();
            return;
        }
        Thread thread = new Thread(() -> {
            closeSockSync();
        }, "CloseSocketThread(sid:" + this.zk.getServerId());
        thread.setDaemon(true);
        thread.start();
    }

    void closeSockSync() {
        try {
            long currentElapsedTime = Time.currentElapsedTime();
            if (this.sock != null) {
                this.sock.close();
                this.sock = null;
            }
            ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - currentElapsedTime);
        } catch (IOException e) {
            LOG.warn("Ignoring error closing connection to leader", (Throwable) e);
        }
    }

    static {
        LOG.info("leaderConnectDelayDuringRetryMs: {}", Integer.valueOf(leaderConnectDelayDuringRetryMs));
        LOG.info("TCP NoDelay set to: {}", Boolean.valueOf(nodelay));
        LOG.info("{} = {}", LEARNER_ASYNC_SENDING, Boolean.valueOf(asyncSending));
        LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, Boolean.valueOf(closeSocketAsync));
    }
}
