package com.github.liuyehcf.framework.flow.engine.runtime.remote.io;

import com.github.liuyehcf.framework.compile.engine.utils.Assert;
import com.github.liuyehcf.framework.flow.engine.FlowEngine;
import com.github.liuyehcf.framework.flow.engine.promise.AbstractPromise;
import com.github.liuyehcf.framework.flow.engine.promise.Promise;
import com.github.liuyehcf.framework.flow.engine.promise.PromiseListener;
import com.github.liuyehcf.framework.flow.engine.runtime.config.FlowProperties;
import com.github.liuyehcf.framework.flow.engine.runtime.config.RuntimeMode;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.DefaultTopology;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.Identifier;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.Member;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.MemberIdentifier;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.MemberRole;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.MemberStatus;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.Topology;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.handler.FrameAggregatorHandler;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.handler.FrameChunkedWriteHandler;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.handler.FrameHandler;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.handler.MessageHandler;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.handler.NettyConstant;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.handler.SerializeWriteHandler;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.Election;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.ElectionResult;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.ElectionState;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.Greet;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.LeaderStatusRequest;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.LeaderStatusResponse;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.SyncMemberStatus;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.Vote;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.support.LeaderStatusSupport;
import com.github.liuyehcf.framework.flow.engine.util.StatisticsUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/liuyehcf/framework/flow/engine/runtime/remote/io/DefaultClusterEventLoop.class */
public class DefaultClusterEventLoop implements ClusterEventLoop {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);
    private final FlowEngine engine;
    private final FlowProperties properties;
    private final Topology topology;
    private ChannelFuture channelFuture;
    private final EventLoopGroup serverBossGroup = new NioEventLoopGroup(1);
    private final EventLoopGroup serverWorkerGroup = new NioEventLoopGroup();
    private final EventLoopGroup clientWorkGroup = new NioEventLoopGroup();
    private final Map<String, ClusterChannel> channels = Maps.newConcurrentMap();
    private final LeaderStatusSupport leaderStatusSupport = new LeaderStatusSupport();
    private final AtomicReference<Election> proposedFirstStateElection = new AtomicReference<>();
    private final AtomicReference<Election> proposedSecondStateElection = new AtomicReference<>();
    private final AtomicReference<Election> lastAcceptedFirstStateElection = new AtomicReference<>();
    private final AtomicReference<Election> lastAcceptedSecondStateElection = new AtomicReference<>();
    private final AtomicBoolean firstStateElectionFinished = new AtomicBoolean(false);
    private final AtomicBoolean secondStateElectionFinished = new AtomicBoolean(false);
    private final List<Vote> receivedFirstStateVotes = Lists.newCopyOnWriteArrayList();
    private final List<Vote> receivedSecondStateVotes = Lists.newCopyOnWriteArrayList();
    private final Promise<Void> closePromise = new AbstractPromise<Void>() { // from class: com.github.liuyehcf.framework.flow.engine.runtime.remote.io.DefaultClusterEventLoop.1
    };

    public DefaultClusterEventLoop(FlowEngine flowEngine) {
        Assert.assertNotNull(flowEngine, "engine");
        this.engine = flowEngine;
        this.properties = flowEngine.getProperties();
        this.topology = new DefaultTopology(this.properties.getSelfConfig());
        init();
    }

    private void init() {
        if (this.properties.getClusterConfig() == null) {
            LOGGER.warn("[{}] mode missing cluster config, downgrade to [{}] mode", RuntimeMode.cluster, RuntimeMode.singleton);
            return;
        }
        if (CollectionUtils.isEmpty(this.properties.getClusterConfig().getSeeds())) {
            LOGGER.warn("[{}] mode missing seed member config, downgrade to [{}] mode", RuntimeMode.cluster, RuntimeMode.singleton);
            return;
        }
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(this.serverBossGroup, this.serverWorkerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { // from class: com.github.liuyehcf.framework.flow.engine.runtime.remote.io.DefaultClusterEventLoop.2
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) {
                DefaultClusterEventLoop.this.addHandlers(socketChannel, ChannelMode.server, null);
            }
        }).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_RCVBUF, Integer.valueOf(NettyConstant.MAX_RCVBUF)).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_REUSEADDR, true);
        try {
            this.channelFuture = serverBootstrap.bind(this.properties.getPort()).sync();
            TopologyProbe.register(this);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final FlowEngine getEngine() {
        return this.engine;
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final Topology getTopology() {
        return this.topology;
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final synchronized boolean addChannelIfAbsent(ClusterChannel clusterChannel) {
        ClusterChannel put;
        String identifier = clusterChannel.getPeerIdentifier().getIdentifier();
        String identifier2 = this.topology.getSelf().getIdentifier();
        ClusterChannel clusterChannel2 = this.channels.get(identifier);
        if (clusterChannel2 != null) {
            if (Objects.equals(clusterChannel.getChannelMode(), clusterChannel2.getChannelMode())) {
                clusterChannel.closeAsync();
                return false;
            }
            if (StringUtils.compare(identifier2, identifier) < 0) {
                if (clusterChannel.getChannelMode().isServerMode()) {
                    clusterChannel.closeAsync();
                    return false;
                }
            } else if (StringUtils.compare(identifier2, identifier) > 0 && clusterChannel.getChannelMode().isClientMode()) {
                clusterChannel.closeAsync();
                return false;
            }
        }
        synchronized (this.channels) {
            put = this.channels.put(identifier, clusterChannel);
        }
        if (put != null) {
            put.closeAsync();
        }
        clusterChannel.getChannel().closeFuture().addListener(channelFuture -> {
            synchronized (this.channels) {
                if (Objects.equals(this.channels.get(identifier).getChannel(), channelFuture.channel())) {
                    this.channels.remove(identifier);
                }
            }
        });
        return true;
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final boolean hasChannel(Identifier identifier) {
        return this.channels.containsKey(identifier.getIdentifier());
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final ClusterChannel getChannel(Identifier identifier) {
        return this.channels.get(identifier.getIdentifier());
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final Collection<ClusterChannel> getChannels() {
        return this.channels.values();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final void createChannel(final MemberIdentifier memberIdentifier) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.clientWorkGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { // from class: com.github.liuyehcf.framework.flow.engine.runtime.remote.io.DefaultClusterEventLoop.3
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) {
                DefaultClusterEventLoop.this.addHandlers(socketChannel, ChannelMode.client, memberIdentifier);
            }
        }).option(ChannelOption.SO_RCVBUF, Integer.valueOf(NettyConstant.MAX_RCVBUF)).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true);
        try {
            DefaultClusterChannel defaultClusterChannel = new DefaultClusterChannel(memberIdentifier, bootstrap.connect(memberIdentifier.getHost(), memberIdentifier.getPort().intValue()).sync().channel(), ChannelMode.client);
            if (addChannelIfAbsent(defaultClusterChannel)) {
                defaultClusterChannel.write(new Greet(this.properties.getHost(), this.properties.getPort()));
                HeartBeat.register(this, defaultClusterChannel, this.properties.getHeartbeatInterval());
            }
        } catch (Throwable th) {
            Member member = this.topology.getMember(memberIdentifier);
            if (member != null && !member.getStatus().isInactive()) {
                member.setLocalStatus(MemberStatus.unreachable);
                if (this.topology.isSelfLeader()) {
                    updateAndBroadcastMemberStatus(member.clone(Long.valueOf(this.topology.generateNextMemberId()), MemberRole.follower, MemberStatus.leaving));
                    updateAndBroadcastMemberStatus(member.clone(Long.valueOf(this.topology.generateNextMemberId()), MemberRole.follower, MemberStatus.inactive));
                } else if (this.topology.isLeader(member)) {
                    if (System.nanoTime() - this.leaderStatusSupport.getPreviousRequestId() > 3000000000L) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("[{}] broadcast leader status request", this.topology.getLeader().getIdentifier());
                        }
                        sendLeaderStatusRequest(new LeaderStatusRequest(this.leaderStatusSupport.start()));
                    } else if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("[{}] broadcasting leader status request is forbidden because still in silence [{}]ms", this.topology.getLeader().getIdentifier(), Long.valueOf(memberIdentifier / 1000000));
                    }
                }
            }
            if (th instanceof InterruptedException) {
                throw ((InterruptedException) th);
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("create channel error, errorMsg={}", th.getMessage());
            }
        }
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final boolean unicast(Identifier identifier, Object obj) {
        ClusterChannel channel = getChannel(identifier);
        if (channel == null) {
            return false;
        }
        channel.write(obj);
        return true;
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final void broadcast(Object obj) {
        Iterator<ClusterChannel> it = getChannels().iterator();
        while (it.hasNext()) {
            it.next().write(obj);
        }
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final void statisticsForLeaderStatus(LeaderStatusResponse leaderStatusResponse) {
        if (MemberStatus.active.equals(leaderStatusResponse.getStatus()) || !StatisticsUtils.isAccept(this.leaderStatusSupport.increase(leaderStatusResponse.getRequestId()), this.topology.activeNum())) {
            return;
        }
        this.leaderStatusSupport.finish();
        Member self = this.topology.getSelf();
        Election election = new Election(ElectionState.first, this.topology.getTransactionId() + 1, self, self);
        if (this.proposedFirstStateElection.compareAndSet(null, election)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[{}] launch leader election process, send first state election, {}", self.getIdentifier(), election);
            }
            sendElection(election);
        }
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final void updateAndBroadcastMemberStatus(Member member) {
        this.topology.addOrReplaceMember(member);
        broadcast(new SyncMemberStatus(member));
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x009e, code lost:
    
        if (r0.longValue() < r0) goto L13;
     */
    /* JADX WARN: Removed duplicated region for block: B:15:0x00ad  */
    /* JADX WARN: Removed duplicated region for block: B:23:0x00d1  */
    /* JADX WARN: Removed duplicated region for block: B:30:0x0132  */
    /* JADX WARN: Removed duplicated region for block: B:35:0x0169  */
    /* JADX WARN: Removed duplicated region for block: B:37:0x0172  */
    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final synchronized void voteForFirstStateElection(com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.Election r10) {
        /*
            Method dump skipped, instructions count: 382
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.github.liuyehcf.framework.flow.engine.runtime.remote.io.DefaultClusterEventLoop.voteForFirstStateElection(com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.Election):void");
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x00ac, code lost:
    
        if (r0.longValue() <= r0) goto L13;
     */
    /* JADX WARN: Removed duplicated region for block: B:15:0x00bb  */
    /* JADX WARN: Removed duplicated region for block: B:23:0x00e2  */
    /* JADX WARN: Removed duplicated region for block: B:30:0x015c  */
    /* JADX WARN: Removed duplicated region for block: B:32:0x0165  */
    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final synchronized void voteForSecondStateElection(com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.Election r10) {
        /*
            Method dump skipped, instructions count: 369
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.github.liuyehcf.framework.flow.engine.runtime.remote.io.DefaultClusterEventLoop.voteForSecondStateElection(com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.Election):void");
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final synchronized void statisticsForFirstStateElection(Vote vote) {
        long transactionId = vote.getTransactionId();
        long id = vote.getProposer().getId();
        if (LOGGER.isDebugEnabled()) {
            Logger logger = LOGGER;
            Object[] objArr = new Object[5];
            objArr[0] = this.topology.getSelf().getIdentifier();
            objArr[1] = vote.isAccept() ? "accept" : "reject";
            objArr[2] = vote.getState();
            objArr[3] = Long.valueOf(transactionId);
            objArr[4] = Long.valueOf(id);
            logger.debug("[{}] receive vote [{}], state={}, transactionId={}, proposerMemberId={}", objArr);
        }
        int activeNum = this.topology.activeNum();
        Election election = this.proposedFirstStateElection.get();
        if (election == null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[{}] receive first state vote, but proposedFirstStateElection is null, transactionId={}, proposerMemberId={}", new Object[]{this.topology.getSelf().getIdentifier(), Long.valueOf(transactionId), Long.valueOf(id)});
                return;
            }
            return;
        }
        long transactionId2 = election.getTransactionId();
        if (!Objects.equals(Long.valueOf(transactionId), Long.valueOf(transactionId2))) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[{}] receive first state vote, but transaction id [{}] of this vote is not equal with proposer's transaction id [{}], proposerMemberId={}", new Object[]{this.topology.getSelf().getIdentifier(), Long.valueOf(transactionId), Long.valueOf(transactionId2), Long.valueOf(id)});
                return;
            }
            return;
        }
        this.receivedFirstStateVotes.add(vote);
        int size = ((List) this.receivedFirstStateVotes.stream().filter((v0) -> {
            return v0.isAccept();
        }).collect(Collectors.toList())).size();
        int size2 = ((List) this.receivedFirstStateVotes.stream().filter((v0) -> {
            return v0.isReject();
        }).collect(Collectors.toList())).size();
        boolean isAccept = StatisticsUtils.isAccept(size, activeNum);
        boolean isReject = StatisticsUtils.isReject(size2, activeNum);
        if (!isAccept || !this.firstStateElectionFinished.compareAndSet(false, true)) {
            if (isReject && this.firstStateElectionFinished.compareAndSet(false, true)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("[{}]'s first state election is rejected [{}/{}/{}], {}", new Object[]{this.topology.getSelf().getIdentifier(), Integer.valueOf(size), Integer.valueOf(size2), Integer.valueOf(activeNum), election});
                }
                this.topology.increaseTransactionId(this.topology.getTransactionId());
                this.proposedFirstStateElection.set(null);
                this.firstStateElectionFinished.set(false);
                this.receivedFirstStateVotes.clear();
                return;
            }
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[{}]'s first state election is accepted [{}/{}/{}], {}", new Object[]{this.topology.getSelf().getIdentifier(), Integer.valueOf(size), Integer.valueOf(size2), Integer.valueOf(activeNum), election});
        }
        Member member = this.topology.getMember(this.properties.getSelfConfig());
        Pair<Long, Member> selectMaximumAcceptedSecondStateElection = selectMaximumAcceptedSecondStateElection(this.receivedFirstStateVotes);
        if (selectMaximumAcceptedSecondStateElection == null) {
            Election secondState = election.toSecondState();
            if (this.proposedSecondStateElection.compareAndSet(null, secondState)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("[{}] send second state election, {}", member.getIdentifier(), secondState);
                }
                sendElection(secondState);
                return;
            }
            return;
        }
        if (transactionId2 == ((Long) selectMaximumAcceptedSecondStateElection.getKey()).longValue()) {
            Member member2 = (Member) selectMaximumAcceptedSecondStateElection.getValue();
            Member proposer = election.getProposer();
            Election election2 = member2.getId() != proposer.getId() ? new Election(ElectionState.second, transactionId2, proposer, member2.clone(Long.valueOf(election.getCandidate().getId()), null, null)) : election.toSecondState();
            if (this.proposedSecondStateElection.compareAndSet(null, election2)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("[{}] send second state election, {}", member.getIdentifier(), election2);
                }
                sendElection(election2);
            }
        }
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final synchronized void statisticsForSecondStateElection(Vote vote) {
        long transactionId = vote.getTransactionId();
        long id = vote.getProposer().getId();
        if (LOGGER.isDebugEnabled()) {
            Logger logger = LOGGER;
            Object[] objArr = new Object[5];
            objArr[0] = this.topology.getSelf().getIdentifier();
            objArr[1] = vote.isAccept() ? "accept" : "reject";
            objArr[2] = vote.getState();
            objArr[3] = Long.valueOf(transactionId);
            objArr[4] = Long.valueOf(id);
            logger.debug("[{}] receive vote [{}], state={}, transactionId={}, proposerMemberId={}", objArr);
        }
        int activeNum = this.topology.activeNum();
        Election election = this.proposedFirstStateElection.get();
        if (election == null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[{}] receive second state vote, but election is already finished. transactionId={}, proposerMemberId={}", new Object[]{this.topology.getSelf().getIdentifier(), Long.valueOf(transactionId), Long.valueOf(id)});
                return;
            }
            return;
        }
        long transactionId2 = election.getTransactionId();
        if (!Objects.equals(Long.valueOf(vote.getTransactionId()), Long.valueOf(transactionId2))) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[{}] receive second state vote, but transaction id [{}] of this vote is not equal with proposer's transaction id [{}], proposerMemberId={}", new Object[]{this.topology.getSelf().getIdentifier(), Long.valueOf(transactionId), Long.valueOf(transactionId2), Long.valueOf(id)});
                return;
            }
            return;
        }
        this.receivedSecondStateVotes.add(vote);
        int size = ((List) this.receivedSecondStateVotes.stream().filter((v0) -> {
            return v0.isAccept();
        }).collect(Collectors.toList())).size();
        int size2 = ((List) this.receivedSecondStateVotes.stream().filter((v0) -> {
            return v0.isReject();
        }).collect(Collectors.toList())).size();
        boolean isAccept = StatisticsUtils.isAccept(size, activeNum);
        boolean isReject = StatisticsUtils.isReject(size2, activeNum);
        if (!isAccept || !this.secondStateElectionFinished.compareAndSet(false, true)) {
            if (isReject && this.secondStateElectionFinished.compareAndSet(false, true)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("[{}]'s second state election is rejected [{}/{}/{}], {}", new Object[]{this.topology.getSelf().getIdentifier(), Integer.valueOf(size), Integer.valueOf(size2), Integer.valueOf(activeNum), election});
                }
                this.proposedSecondStateElection.set(null);
                this.secondStateElectionFinished.set(false);
                this.receivedSecondStateVotes.clear();
                return;
            }
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[{}]'s second state election is accepted [{}/{}/{}], {}", new Object[]{this.topology.getSelf().getIdentifier(), Integer.valueOf(size), Integer.valueOf(size2), Integer.valueOf(activeNum), vote});
        }
        Member candidate = vote.getCandidate();
        if (!this.topology.isSelf(candidate)) {
            LOGGER.info("cluster member [{}] is the new leader non-locally", candidate.getIdentifier());
            return;
        }
        if (this.topology.isSelfLeader()) {
            return;
        }
        if (!this.topology.assumeLeader(this.topology.getTransactionId(), transactionId2)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[{}]'s second state election is accepted but assume leader failed", this.topology.getSelf().getIdentifier());
            }
        } else {
            Member leader = this.topology.getLeader();
            clearElectionStatus();
            broadcast(new ElectionResult(leader, this.topology.getTransactionId()));
            LOGGER.info("cluster member [{}] is the new leader locally", leader.getIdentifier());
        }
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final void clearElectionStatus() {
        this.proposedFirstStateElection.set(null);
        this.proposedSecondStateElection.set(null);
        this.lastAcceptedFirstStateElection.set(null);
        this.lastAcceptedSecondStateElection.set(null);
        this.firstStateElectionFinished.set(false);
        this.secondStateElectionFinished.set(false);
        this.receivedFirstStateVotes.clear();
        this.receivedSecondStateVotes.clear();
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final Promise<Void> addCloseListener(PromiseListener<Void> promiseListener) {
        return this.closePromise.addListener(promiseListener);
    }

    @Override // com.github.liuyehcf.framework.flow.engine.runtime.remote.io.ClusterEventLoop
    public final void shutdown() {
        this.closePromise.trySuccess(null);
        if (this.channelFuture != null) {
            try {
                this.channelFuture.channel().close().sync();
            } catch (Throwable th) {
                LOGGER.error("server socket close catch unknown error, error={}", th.getMessage(), th);
            }
            this.channelFuture = null;
        }
        Iterator<ClusterChannel> it = getChannels().iterator();
        while (it.hasNext()) {
            try {
                it.next().closeSync();
            } catch (Throwable th2) {
                LOGGER.error("socket close catch unknown error, error={}", th2.getMessage(), th2);
            }
        }
        this.serverBossGroup.shutdownGracefully();
        this.serverWorkerGroup.shutdownGracefully();
        this.clientWorkGroup.shutdownGracefully();
    }

    private void sendLeaderStatusRequest(LeaderStatusRequest leaderStatusRequest) {
        broadcast(leaderStatusRequest);
        statisticsForLeaderStatus(new LeaderStatusResponse(leaderStatusRequest.getRequestId(), MemberStatus.unreachable));
    }

    private void sendElection(Election election) {
        broadcast(election);
        if (election.getState().isFirstState()) {
            voteForFirstStateElection(election);
        } else {
            voteForSecondStateElection(election);
        }
    }

    private Pair<Long, Member> selectMaximumAcceptedSecondStateElection(List<Vote> list) {
        long j = -1;
        long j2 = -1;
        Member member = null;
        for (Vote vote : list) {
            long transactionId = vote.getTransactionId();
            Member candidate = vote.getCandidate();
            if (candidate != null) {
                if (j < transactionId) {
                    j = transactionId;
                    j2 = candidate.getId();
                    member = candidate;
                } else if (j == transactionId && j2 < candidate.getId()) {
                    j2 = candidate.getId();
                    member = candidate;
                }
            }
        }
        if (member == null) {
            return null;
        }
        return new ImmutablePair(Long.valueOf(j), member);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addHandlers(SocketChannel socketChannel, ChannelMode channelMode, Identifier identifier) {
        ChannelPipeline pipeline = socketChannel.pipeline();
        addLast(pipeline, new IdleStateHandler(0L, 0L, this.properties.getIdleTime(), TimeUnit.SECONDS));
        addLast(pipeline, new FrameHandler());
        addLast(pipeline, new FrameAggregatorHandler());
        addLast(pipeline, new FrameChunkedWriteHandler());
        addLast(pipeline, new MessageHandler(this, channelMode, identifier));
        addLast(pipeline, new SerializeWriteHandler(this.properties.getProtocolSerializeType()));
    }

    private void addLast(ChannelPipeline channelPipeline, ChannelHandler channelHandler) {
        Assert.assertNotNull(channelHandler, "handler");
        channelPipeline.addLast(channelHandler.getClass().getSimpleName(), channelHandler);
    }
}
