package io.zeebe.broker.clustering.management;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.ClusterServiceNames;
import io.zeebe.broker.clustering.handler.Topology;
import io.zeebe.broker.clustering.management.handler.ClusterManagerFragmentHandler;
import io.zeebe.broker.clustering.management.memberList.ClusterMemberListManager;
import io.zeebe.broker.clustering.management.memberList.MemberRaftComposite;
import io.zeebe.broker.clustering.management.message.CreatePartitionRequest;
import io.zeebe.broker.clustering.management.message.InvitationRequest;
import io.zeebe.broker.clustering.management.message.InvitationResponse;
import io.zeebe.broker.clustering.raft.RaftPersistentFileStorage;
import io.zeebe.broker.clustering.raft.RaftService;
import io.zeebe.broker.logstreams.LogStreamsManager;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.transport.TransportServiceNames;
import io.zeebe.broker.transport.cfg.SocketBindingCfg;
import io.zeebe.broker.transport.cfg.TransportComponentCfg;
import io.zeebe.logstreams.impl.log.fs.FsLogStorage;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.protocol.Protocol;
import io.zeebe.raft.Raft;
import io.zeebe.raft.RaftPersistentStorage;
import io.zeebe.raft.state.RaftState;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.RequestResponseController;
import io.zeebe.transport.ServerInputSubscription;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerResponse;
import io.zeebe.transport.SocketAddress;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.actor.Actor;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/management/ClusterManager.class */
public class ClusterManager implements Actor {
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private static final DirectBuffer EMPTY_BUF = new UnsafeBuffer(0, 0);
    private final ClusterManagerContext context;
    private final ServiceContainer serviceContainer;
    private TransportComponentCfg transportComponentCfg;
    private final ServerInputSubscription inputSubscription;
    private final LogStreamsManager logStreamsManager;
    private final ClusterMemberListManager clusterMemberListManager;
    private final CreatePartitionRequest createPartitionRequest = new CreatePartitionRequest();
    private final ServerResponse response = new ServerResponse();
    private final List<Raft> rafts = new CopyOnWriteArrayList();
    private final List<StartLogStreamServiceController> startLogStreamServiceControllers = new CopyOnWriteArrayList();
    private final DeferredCommandContext commandQueue = new DeferredCommandContext();
    private final List<RequestResponseController> activeRequestControllers = new CopyOnWriteArrayList();
    private final InvitationRequest invitationRequest = new InvitationRequest();
    private final InvitationResponse invitationResponse = new InvitationResponse();

    public ClusterManager(ClusterManagerContext clusterManagerContext, ServiceContainer serviceContainer, TransportComponentCfg transportComponentCfg) {
        this.context = clusterManagerContext;
        this.serviceContainer = serviceContainer;
        this.transportComponentCfg = transportComponentCfg;
        this.logStreamsManager = clusterManagerContext.getLogStreamsManager();
        ClusterManagerFragmentHandler clusterManagerFragmentHandler = new ClusterManagerFragmentHandler(this, clusterManagerContext.getWorkflowRequestMessageHandler());
        this.inputSubscription = clusterManagerContext.getServerTransport().openSubscription("cluster-management", clusterManagerFragmentHandler, clusterManagerFragmentHandler).join();
        this.clusterMemberListManager = new ClusterMemberListManager(clusterManagerContext, transportComponentCfg, this::inviteUpdatedMember);
        List<SocketAddress> list = (List) Arrays.stream(transportComponentCfg.gossip.initialContactPoints).map(SocketAddress::from).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        clusterManagerContext.getGossip().join(list);
    }

    public void open() {
        this.clusterMemberListManager.publishNodeAPIAddresses();
        LogStreamsManager logStreamsManager = this.context.getLogStreamsManager();
        File file = new File(this.transportComponentCfg.management.directory);
        if (!file.exists()) {
            try {
                file.getParentFile().mkdirs();
                Files.createDirectory(file.toPath(), new FileAttribute[0]);
            } catch (IOException e) {
                LOG.error("Unable to create directory {}", file, e);
            }
        }
        SocketBindingCfg socketBindingCfg = this.transportComponentCfg.replicationApi;
        SocketAddress socketAddress = new SocketAddress(socketBindingCfg.getHost(this.transportComponentCfg.host), socketBindingCfg.port);
        File[] listFiles = file.listFiles();
        if (listFiles == null || listFiles.length <= 0) {
            if (this.transportComponentCfg.gossip.initialContactPoints.length == 0) {
                LOG.debug("Broker bootstraps the system topic");
                createPartition(Protocol.SYSTEM_TOPIC_BUF, 0);
                return;
            }
            return;
        }
        for (File file2 : listFiles) {
            RaftPersistentFileStorage raftPersistentFileStorage = new RaftPersistentFileStorage(file2.getAbsolutePath());
            DirectBuffer topicName = raftPersistentFileStorage.getTopicName();
            int partitionId = raftPersistentFileStorage.getPartitionId();
            LogStream logStream = logStreamsManager.getLogStream(partitionId);
            if (logStream == null) {
                logStream = logStreamsManager.createLogStream(topicName, partitionId, raftPersistentFileStorage.getLogDirectory());
            }
            raftPersistentFileStorage.setLogStream(logStream);
            createRaft(socketAddress, logStream, raftPersistentFileStorage.getMembers(), raftPersistentFileStorage);
        }
    }

    @Override // io.zeebe.util.actor.Actor
    public String name() {
        return "management";
    }

    @Override // io.zeebe.util.actor.Actor
    public int doWork() {
        int doWork = 0 + this.commandQueue.doWork() + this.clusterMemberListManager.doWork() + this.inputSubscription.poll();
        int i = 0;
        while (i < this.activeRequestControllers.size()) {
            RequestResponseController requestResponseController = this.activeRequestControllers.get(i);
            doWork += requestResponseController.doWork();
            if (requestResponseController.isFailed()) {
                LOG.debug("Invitation request failed");
            }
            if (requestResponseController.isFailed() || requestResponseController.isResponseAvailable()) {
                requestResponseController.close();
            }
            if (requestResponseController.isClosed()) {
                this.activeRequestControllers.remove(i);
            } else {
                i++;
            }
        }
        for (int i2 = 0; i2 < this.startLogStreamServiceControllers.size(); i2++) {
            doWork += this.startLogStreamServiceControllers.get(i2).doWork();
        }
        return doWork;
    }

    private void inviteUpdatedMember(SocketAddress socketAddress) {
        LOG.debug("Send raft invitations to member {}.", socketAddress);
        for (Raft raft : this.rafts) {
            if (raft.getState() == RaftState.LEADER) {
                inviteMemberToRaft(socketAddress, raft);
            }
        }
    }

    protected void inviteMemberToRaft(SocketAddress socketAddress, Raft raft) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(raft.getSocketAddress());
        raft.getMembers().forEach(raftMember -> {
            arrayList.add(raftMember.getRemoteAddress().getAddress());
        });
        LogStream logStream = raft.getLogStream();
        InvitationRequest members = new InvitationRequest().topicName(logStream.getTopicName()).partitionId(logStream.getPartitionId()).term(raft.getTerm()).members(arrayList);
        LOG.debug("Send invitation request to {} for partition {} in term {}", socketAddress, Integer.valueOf(logStream.getPartitionId()), Integer.valueOf(raft.getTerm()));
        RequestResponseController requestResponseController = new RequestResponseController(this.context.getManagementClient());
        requestResponseController.open(socketAddress, members, (directBuffer, i, i2) -> {
            LOG.debug("Got invitation response from {} for partition id {}.", socketAddress, Integer.valueOf(logStream.getPartitionId()));
        });
        this.activeRequestControllers.add(requestResponseController);
    }

    public void createRaft(SocketAddress socketAddress, LogStream logStream, List<SocketAddress> list) {
        String path = ((FsLogStorage) logStream.getLogStorage()).getConfig().getPath();
        RaftPersistentFileStorage raftPersistentFileStorage = new RaftPersistentFileStorage(String.format("%s%s.meta", this.transportComponentCfg.management.directory, logStream.getLogName()));
        raftPersistentFileStorage.setLogStream(logStream).setLogDirectory(path).save();
        createRaft(socketAddress, logStream, list, raftPersistentFileStorage);
    }

    public void createRaft(SocketAddress socketAddress, LogStream logStream, List<SocketAddress> list, RaftPersistentStorage raftPersistentStorage) {
        RaftService raftService = new RaftService(socketAddress, logStream, list, raftPersistentStorage, this.clusterMemberListManager);
        this.serviceContainer.createService(ClusterServiceNames.raftServiceName(logStream.getLogName()), raftService).group(ClusterServiceNames.RAFT_SERVICE_GROUP).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, raftService.getActorSchedulerInjector()).dependency(TransportServiceNames.bufferingServerTransport(TransportServiceNames.REPLICATION_API_SERVER_NAME), raftService.getServerTransportInjector()).dependency(TransportServiceNames.clientTransport(TransportServiceNames.REPLICATION_API_CLIENT_NAME), raftService.getClientTransportInjector()).install();
    }

    protected boolean partitionExists(int i) {
        return this.logStreamsManager.hasLogStream(i);
    }

    protected void createPartition(DirectBuffer directBuffer, int i) {
        createPartition(directBuffer, i, Collections.emptyList());
    }

    protected void createPartition(DirectBuffer directBuffer, int i, List<SocketAddress> list) {
        LogStream createLogStream = this.logStreamsManager.createLogStream(directBuffer, i);
        SocketBindingCfg socketBindingCfg = this.transportComponentCfg.replicationApi;
        createRaft(new SocketAddress(socketBindingCfg.getHost(this.transportComponentCfg.host), socketBindingCfg.port), createLogStream, list);
    }

    public boolean onInvitationRequest(DirectBuffer directBuffer, int i, int i2, ServerOutput serverOutput, RemoteAddress remoteAddress, long j) {
        this.invitationRequest.reset();
        this.invitationRequest.wrap(directBuffer, i, i2);
        LOG.debug("Received invitation request from {} for partition {}", remoteAddress.getAddress(), Integer.valueOf(this.invitationRequest.partitionId()));
        createPartition(this.invitationRequest.topicName(), this.invitationRequest.partitionId(), new ArrayList(this.invitationRequest.members()));
        this.invitationResponse.reset();
        this.response.reset().remoteAddress(remoteAddress).requestId(j).writer(this.invitationResponse);
        return serverOutput.sendResponse(this.response);
    }

    public boolean onCreatePartitionRequest(DirectBuffer directBuffer, int i, int i2, ServerOutput serverOutput, RemoteAddress remoteAddress, long j) {
        this.createPartitionRequest.wrap(directBuffer, i, i2);
        LOG.debug("Received create partition request for partition {}", Integer.valueOf(this.createPartitionRequest.getPartitionId()));
        int partitionId = this.createPartitionRequest.getPartitionId();
        if (partitionExists(partitionId)) {
            LOG.debug("Partition {} exists already. Ignoring creation request.", Integer.valueOf(this.createPartitionRequest.getPartitionId()));
        } else {
            LOG.debug("Creating partition {}", Integer.valueOf(this.createPartitionRequest.getPartitionId()));
            createPartition(this.createPartitionRequest.getTopicName(), partitionId);
        }
        this.response.reset().remoteAddress(remoteAddress).requestId(j).buffer(EMPTY_BUF);
        return serverOutput.sendResponse(this.response);
    }

    public CompletableFuture<Topology> requestTopology() {
        return this.clusterMemberListManager.createTopology();
    }

    public void addRaftCallback(ServiceName<Raft> serviceName, Raft raft) {
        boolean z = raft.getMemberSize() == 0;
        this.commandQueue.runAsync(() -> {
            LOG.trace("ADD raft {} for partition {} state {}.", raft.getSocketAddress(), Integer.valueOf(raft.getLogStream().getPartitionId()), raft.getState());
            this.rafts.add(raft);
            this.startLogStreamServiceControllers.add(new StartLogStreamServiceController(serviceName, raft, this.serviceContainer, this.clusterMemberListManager));
            if (z) {
                Iterator<MemberRaftComposite> it = this.context.getMemberListService().iterator();
                while (it.hasNext()) {
                    MemberRaftComposite next = it.next();
                    if (!next.getMember().getAddress().equals(this.transportComponentCfg.managementApi.toSocketAddress(this.transportComponentCfg.host))) {
                        inviteMemberToRaft(next.getMember().getAddress(), raft);
                    }
                }
            }
        });
    }

    public void removeRaftCallback(Raft raft) {
        int partitionId = raft.getLogStream().getPartitionId();
        this.commandQueue.runAsync(() -> {
            int i = 0;
            while (true) {
                if (i >= this.rafts.size()) {
                    break;
                }
                if (partitionId == this.rafts.get(i).getLogStream().getPartitionId()) {
                    this.rafts.remove(i);
                    break;
                }
                i++;
            }
            for (int i2 = 0; i2 < this.startLogStreamServiceControllers.size(); i2++) {
                if (partitionId == this.startLogStreamServiceControllers.get(i2).getRaft().getLogStream().getPartitionId()) {
                    this.startLogStreamServiceControllers.remove(i2);
                    return;
                }
            }
        });
    }
}
