package io.zeebe.broker.clustering.management.memberList;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.handler.Topology;
import io.zeebe.broker.clustering.management.ClusterManagerContext;
import io.zeebe.broker.clustering.management.OnOpenLogStreamListener;
import io.zeebe.broker.transport.cfg.TransportComponentCfg;
import io.zeebe.gossip.Gossip;
import io.zeebe.gossip.GossipCustomEventListener;
import io.zeebe.gossip.GossipMembershipListener;
import io.zeebe.gossip.membership.Member;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.raft.RaftStateListener;
import io.zeebe.raft.state.RaftState;
import io.zeebe.transport.ClientTransport;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.SocketAddress;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.buffer.BufferUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/management/memberList/ClusterMemberListManager.class */
public class ClusterMemberListManager implements RaftStateListener, OnOpenLogStreamListener {
    public static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    public static final DirectBuffer API_EVENT_TYPE = BufferUtil.wrapString("apis");
    public static final DirectBuffer MEMBER_RAFT_STATES_EVENT_TYPE = BufferUtil.wrapString("memberRaftStates");
    private final ClusterManagerContext context;
    private TransportComponentCfg transportComponentCfg;
    private final Consumer<SocketAddress> updatedMemberConsumer;
    private final TopologyCreator topologyCreator;
    private final ExpandableArrayBuffer apiAddressBuffer;
    private final ExpandableArrayBuffer memberRaftStatesBuffer;
    private final List<MemberRaftComposite> deadMembers = new ArrayList();
    private final DeferredCommandContext commandQueue = new DeferredCommandContext();

    /* loaded from: input_file:io/zeebe/broker/clustering/management/memberList/ClusterMemberListManager$APIEventListener.class */
    private final class APIEventListener implements GossipCustomEventListener {
        private APIEventListener() {
        }

        @Override // io.zeebe.gossip.GossipCustomEventListener
        public void onEvent(SocketAddress socketAddress, DirectBuffer directBuffer) {
            DirectBuffer cloneBuffer = BufferUtil.cloneBuffer(directBuffer);
            SocketAddress socketAddress2 = new SocketAddress(socketAddress);
            ClusterMemberListManager.this.commandQueue.runAsync(() -> {
                ClusterMemberListManager.LOG.debug("Received API event from member {}.", socketAddress2);
                SocketAddress socketAddress3 = new SocketAddress();
                SocketAddress socketAddress4 = new SocketAddress();
                SocketAddress socketAddress5 = new SocketAddress();
                GossipEventCreationHelper.readFromBufferIntoSocketAddress(GossipEventCreationHelper.readFromBufferIntoSocketAddress(GossipEventCreationHelper.readFromBufferIntoSocketAddress(0, cloneBuffer, socketAddress3), cloneBuffer, socketAddress4), cloneBuffer, socketAddress5);
                ClusterMemberListManager.LOG.debug("Setting API's for member {} was {}successful.", socketAddress2, ClusterMemberListManager.this.context.getMemberListService().setApis(socketAddress4, socketAddress5, socketAddress3) ? "" : "not ");
                ClusterMemberListManager.this.updatedMemberConsumer.accept(socketAddress2);
                ClusterMemberListManager.this.context.getManagementClient().registerRemoteAddress(socketAddress3);
                ClusterMemberListManager.this.context.getReplicationClient().registerRemoteAddress(socketAddress5);
            });
        }
    }

    /* loaded from: input_file:io/zeebe/broker/clustering/management/memberList/ClusterMemberListManager$MemberRaftStatesEventListener.class */
    private final class MemberRaftStatesEventListener implements GossipCustomEventListener {
        private MemberRaftStatesEventListener() {
        }

        @Override // io.zeebe.gossip.GossipCustomEventListener
        public void onEvent(SocketAddress socketAddress, DirectBuffer directBuffer) {
            DirectBuffer cloneBuffer = BufferUtil.cloneBuffer(directBuffer);
            SocketAddress socketAddress2 = new SocketAddress(socketAddress);
            ClusterMemberListManager.this.commandQueue.runAsync(() -> {
                ClusterMemberListManager.LOG.debug("Received raft state change event for member {}", socketAddress2);
                MemberRaftComposite member = ClusterMemberListManager.this.context.getMemberListService().getMember(socketAddress2);
                if (member == null) {
                    ClusterMemberListManager.LOG.debug("Member {} does not exist. Maybe dead? List of dead members: {}", socketAddress2, ClusterMemberListManager.this.deadMembers);
                } else {
                    GossipEventCreationHelper.updateMemberWithNewRaftState(member, cloneBuffer);
                    ClusterMemberListManager.LOG.debug("Handled raft state change event for member {} - local member state: {}", socketAddress2, ClusterMemberListManager.this.context.getMemberListService());
                }
            });
        }
    }

    /* loaded from: input_file:io/zeebe/broker/clustering/management/memberList/ClusterMemberListManager$MembershipListener.class */
    private class MembershipListener implements GossipMembershipListener {
        private MembershipListener() {
        }

        @Override // io.zeebe.gossip.GossipMembershipListener
        public void onAdd(Member member) {
            MemberRaftComposite memberRaftComposite = new MemberRaftComposite(member);
            ClusterMemberListManager.this.commandQueue.runAsync(() -> {
                ClusterMemberListManager.LOG.debug("Add member {} to member list.", memberRaftComposite);
                MemberRaftComposite memberRaftComposite2 = memberRaftComposite;
                int indexOf = ClusterMemberListManager.this.deadMembers.indexOf(memberRaftComposite);
                if (indexOf > -1) {
                    memberRaftComposite2 = (MemberRaftComposite) ClusterMemberListManager.this.deadMembers.remove(indexOf);
                    ClusterMemberListManager.LOG.debug("Re-add dead member {} to member list", memberRaftComposite2);
                }
                ClusterMemberListManager.this.context.getMemberListService().add(memberRaftComposite2);
            });
        }

        @Override // io.zeebe.gossip.GossipMembershipListener
        public void onRemove(Member member) {
            SocketAddress address = member.getAddress();
            ClusterMemberListManager.this.commandQueue.runAsync(() -> {
                MemberRaftComposite remove = ClusterMemberListManager.this.context.getMemberListService().remove(address);
                ClusterMemberListManager.LOG.debug("Remove member {} from member list.", remove);
                ClusterMemberListManager.this.deadMembers.add(remove);
                ClusterMemberListManager.this.deactivateRemote(ClusterMemberListManager.this.context.getManagementClient(), remove.getManagementApi());
                ClusterMemberListManager.this.deactivateRemote(ClusterMemberListManager.this.context.getReplicationClient(), remove.getReplicationApi());
            });
        }
    }

    public ClusterMemberListManager(ClusterManagerContext clusterManagerContext, TransportComponentCfg transportComponentCfg, Consumer<SocketAddress> consumer) {
        this.context = clusterManagerContext;
        this.transportComponentCfg = transportComponentCfg;
        this.updatedMemberConsumer = consumer;
        MemberListService memberListService = clusterManagerContext.getMemberListService();
        String str = transportComponentCfg.host;
        memberListService.add(new Member(transportComponentCfg.managementApi.toSocketAddress(str)));
        memberListService.setApis(transportComponentCfg.clientApi.toSocketAddress(str), transportComponentCfg.replicationApi.toSocketAddress(str), transportComponentCfg.managementApi.toSocketAddress(str));
        clusterManagerContext.getGossip().addMembershipListener(new MembershipListener());
        clusterManagerContext.getGossip().addCustomEventListener(API_EVENT_TYPE, new APIEventListener());
        clusterManagerContext.getGossip().addCustomEventListener(MEMBER_RAFT_STATES_EVENT_TYPE, new MemberRaftStatesEventListener());
        clusterManagerContext.getGossip().registerSyncRequestHandler(API_EVENT_TYPE, new APISyncHandler(this.commandQueue, clusterManagerContext));
        clusterManagerContext.getGossip().registerSyncRequestHandler(MEMBER_RAFT_STATES_EVENT_TYPE, new MemberRaftStatesSyncHandler(this.commandQueue, clusterManagerContext));
        this.topologyCreator = new TopologyCreator(clusterManagerContext);
        this.apiAddressBuffer = new ExpandableArrayBuffer();
        this.memberRaftStatesBuffer = new ExpandableArrayBuffer();
    }

    public int doWork() {
        return this.commandQueue.doWork();
    }

    public void publishNodeAPIAddresses() {
        Gossip gossip = this.context.getGossip();
        String str = this.transportComponentCfg.host;
        gossip.publishEvent(API_EVENT_TYPE, GossipEventCreationHelper.writeAPIAddressesIntoBuffer(this.transportComponentCfg.managementApi.toSocketAddress(str), this.transportComponentCfg.replicationApi.toSocketAddress(str), this.transportComponentCfg.clientApi.toSocketAddress(str), this.apiAddressBuffer));
    }

    public CompletableFuture<Topology> createTopology() {
        DeferredCommandContext deferredCommandContext = this.commandQueue;
        TopologyCreator topologyCreator = this.topologyCreator;
        topologyCreator.getClass();
        return deferredCommandContext.runAsync(topologyCreator::createTopology);
    }

    protected void deactivateRemote(ClientTransport clientTransport, SocketAddress socketAddress) {
        RemoteAddress remoteAddress = clientTransport.getRemoteAddress(socketAddress);
        if (remoteAddress != null) {
            clientTransport.deactivateRemoteAddress(remoteAddress);
        }
    }

    @Override // io.zeebe.broker.clustering.management.OnOpenLogStreamListener
    public void onOpenLogStreamService(LogStream logStream) {
        int partitionId = logStream.getPartitionId();
        DirectBuffer cloneBuffer = BufferUtil.cloneBuffer(logStream.getTopicName());
        this.commandQueue.runAsync(() -> {
            updateTopologyOnRaftStateChangeForPartition(RaftState.LEADER, partitionId, cloneBuffer);
        });
    }

    @Override // io.zeebe.raft.RaftStateListener
    public void onStateChange(int i, DirectBuffer directBuffer, SocketAddress socketAddress, RaftState raftState) {
        DirectBuffer cloneBuffer = BufferUtil.cloneBuffer(directBuffer);
        this.commandQueue.runAsync(() -> {
            if (raftState == RaftState.FOLLOWER) {
                updateTopologyOnRaftStateChangeForPartition(raftState, i, cloneBuffer);
            }
        });
    }

    private void updateTopologyOnRaftStateChangeForPartition(RaftState raftState, int i, DirectBuffer directBuffer) {
        MemberRaftComposite member = this.context.getMemberListService().getMember(this.transportComponentCfg.managementApi.toSocketAddress(this.transportComponentCfg.host));
        member.updateRaft(i, directBuffer, raftState);
        LOG.trace("On raft state change for {} - local member states: {}", member.getMember().getAddress(), this.context.getMemberListService());
        DirectBuffer writeRaftsIntoBuffer = GossipEventCreationHelper.writeRaftsIntoBuffer(member.getRafts(), this.memberRaftStatesBuffer);
        LOG.trace("Publish event for partition {} state change {}", Integer.valueOf(i), raftState);
        this.context.getGossip().publishEvent(MEMBER_RAFT_STATES_EVENT_TYPE, writeRaftsIntoBuffer);
    }
}
