package org.apache.submarine.commons.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.BroadcastService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.UnicastService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.cluster.protocol.HeartbeatMembershipProtocol;
import io.atomix.cluster.protocol.HeartbeatMembershipProtocolConfig;
import io.atomix.primitive.PrimitiveState;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import io.atomix.utils.Version;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Serializer;
import java.io.File;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.submarine.commons.cluster.meta.ClusterMeta;
import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
import org.apache.submarine.commons.cluster.protocol.RaftServerMessagingProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/submarine/commons/cluster/ClusterServer.class */
public class ClusterServer extends ClusterManager {
    private static Logger LOG = LoggerFactory.getLogger(ClusterServer.class);
    private static ClusterServer instance = null;
    protected RaftServer raftServer = null;
    protected MessagingService messagingService = null;

    private ClusterServer() {
    }

    public static ClusterServer getInstance() {
        synchronized (ClusterServer.class) {
            if (instance == null) {
                instance = new ClusterServer();
            }
        }
        return instance;
    }

    @Override // org.apache.submarine.commons.cluster.ClusterManager
    public void start() {
        if (this.sconf.isClusterMode()) {
            initThread();
            String clusterNodeName = getClusterNodeName();
            this.clusterMonitor = new ClusterMonitor(this);
            this.clusterMonitor.start(ClusterMetaType.SERVER_META, clusterNodeName);
            super.start();
        }
    }

    @VisibleForTesting
    void initTestCluster(String str, String str2, int i) {
        this.isTest = true;
        this.serverHost = str2;
        this.raftServerPort = i;
        this.clusterNodes.clear();
        this.raftAddressMap.clear();
        this.clusterMemberIds.clear();
        for (String str3 : str.split(",")) {
            String[] split = str3.split(":");
            String str4 = split[0];
            int intValue = Integer.valueOf(split[1]).intValue();
            String str5 = str4 + ":" + intValue;
            Address from = Address.from(str4, intValue);
            this.clusterNodes.add(Node.builder().withId(str5).withAddress(from).build());
            this.raftAddressMap.put(MemberId.from(str5), from);
            this.clusterMemberIds.add(MemberId.from(str5));
        }
    }

    @Override // org.apache.submarine.commons.cluster.ClusterManager
    public boolean raftInitialized() {
        return (null == this.raftServer || !this.raftServer.isRunning() || null == this.raftClient || null == this.raftSessionClient || this.raftSessionClient.getState() != PrimitiveState.CONNECTED) ? false : true;
    }

    @Override // org.apache.submarine.commons.cluster.ClusterManager
    public boolean isClusterLeader() {
        return null != this.raftServer && this.raftServer.isRunning() && this.raftServer.isLeader();
    }

    private void initThread() {
        new Thread(new Runnable() { // from class: org.apache.submarine.commons.cluster.ClusterServer.1
            @Override // java.lang.Runnable
            public void run() {
                ClusterServer.LOG.info("RaftServer run({}:{}) >>>", ClusterServer.this.serverHost, Integer.valueOf(ClusterServer.this.raftServerPort));
                Member build = Member.builder(MemberId.from(ClusterServer.this.serverHost + ":" + ClusterServer.this.raftServerPort)).withAddress(Address.from(ClusterServer.this.serverHost, ClusterServer.this.raftServerPort)).build();
                ClusterServer.this.messagingService = (MessagingService) new NettyMessagingService(ClusterServer.this.MESSAGING_SERVICE_NAME, build.address(), new MessagingConfig()).start().join();
                MessagingService messagingService = ClusterServer.this.messagingService;
                Serializer serializer = ClusterManager.protocolSerializer;
                Map<MemberId, Address> map = ClusterServer.this.raftAddressMap;
                map.getClass();
                RaftServerMessagingProtocol raftServerMessagingProtocol = new RaftServerMessagingProtocol(messagingService, serializer, (v1) -> {
                    return r4.get(v1);
                });
                BootstrapService bootstrapService = new BootstrapService() { // from class: org.apache.submarine.commons.cluster.ClusterServer.1.1
                    public MessagingService getMessagingService() {
                        return ClusterServer.this.messagingService;
                    }

                    public UnicastService getUnicastService() {
                        return new UnicastServiceAdapter();
                    }

                    public BroadcastService getBroadcastService() {
                        return new BroadcastServiceAdapter();
                    }
                };
                DefaultClusterMembershipService defaultClusterMembershipService = new DefaultClusterMembershipService(build, Version.from("1.0.0"), new DefaultNodeDiscoveryService(bootstrapService, build, new BootstrapDiscoveryProvider(ClusterServer.this.clusterNodes)), bootstrapService, new HeartbeatMembershipProtocol(new HeartbeatMembershipProtocolConfig()));
                File createTempDir = Files.createTempDir();
                createTempDir.deleteOnExit();
                ClusterServer.this.raftServer = (RaftServer) RaftServer.builder(build.id()).withMembershipService(defaultClusterMembershipService).withProtocol(raftServerMessagingProtocol).withStorage(RaftStorage.builder().withStorageLevel(StorageLevel.MEMORY).withDirectory(createTempDir).withNamespace(ClusterManager.storageNamespace).withMaxSegmentSize(1048576).build()).build();
                ClusterServer.this.raftServer.bootstrap(ClusterServer.this.clusterMemberIds);
                HashMap<String, Object> hashMap = new HashMap<>();
                String clusterNodeName = ClusterServer.this.getClusterNodeName();
                hashMap.put(ClusterMeta.NODE_NAME, clusterNodeName);
                hashMap.put(ClusterMeta.SERVER_HOST, ClusterServer.this.serverHost);
                hashMap.put(ClusterMeta.SERVER_PORT, Integer.valueOf(ClusterServer.this.raftServerPort));
                hashMap.put(ClusterMeta.SERVER_START_TIME, LocalDateTime.now());
                ClusterServer.this.putClusterMeta(ClusterMetaType.SERVER_META, clusterNodeName, hashMap);
                ClusterServer.LOG.info("RaftServer run() <<<");
            }
        }).start();
    }

    @Override // org.apache.submarine.commons.cluster.ClusterManager
    public void shutdown() {
        if (this.sconf.isClusterMode()) {
            LOG.info("ClusterServer::shutdown()");
            try {
                deleteClusterMeta(ClusterMetaType.SERVER_META, getClusterNodeName());
                Thread.sleep(500L);
                if (null != this.clusterMonitor) {
                    this.clusterMonitor.shutdown();
                }
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                LOG.error(e.getMessage(), e);
            }
            super.shutdown();
            if (null != this.raftServer && this.raftServer.isRunning()) {
                try {
                    LOG.info("ClusterServer::raftServer.shutdown()");
                    this.raftServer.shutdown().get(5L, TimeUnit.SECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e2) {
                    LOG.error(e2.getMessage(), e2);
                }
            }
            LOG.info("ClusterServer::super.shutdown()");
        }
    }

    public HashMap<String, Object> getIdleNodeMeta() {
        HashMap<String, Object> hashMap = null;
        long j = 0;
        Iterator<Map.Entry<String, HashMap<String, Object>>> it = getClusterMeta(ClusterMetaType.SERVER_META, "").entrySet().iterator();
        while (it.hasNext()) {
            HashMap<String, Object> value = it.next().getValue();
            String str = (String) value.get(ClusterMeta.STATUS);
            if (null != str && !StringUtils.isEmpty(str) && !str.equals(ClusterMeta.OFFLINE_STATUS)) {
                long longValue = ((Long) value.get(ClusterMeta.MEMORY_CAPACITY)).longValue() - ((Long) value.get(ClusterMeta.MEMORY_USED)).longValue();
                if (longValue > j) {
                    j = longValue;
                    hashMap = value;
                }
            }
        }
        return hashMap;
    }

    public void unicastClusterEvent(String str, int i, String str2, String str3) {
        LOG.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}", new Object[]{str, Integer.valueOf(i), str2, str3});
        this.messagingService.sendAndReceive(Address.from(str, i), str2, str3.getBytes(), Duration.ofSeconds(2L)).whenComplete((bArr, th) -> {
            if (null == th) {
                LOG.error(th.getMessage(), th);
            }
        });
    }

    public void broadcastClusterEvent(String str, String str2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("send broadcastClusterEvent message {}", str2);
        }
        for (Node node : this.clusterNodes) {
            if (!StringUtils.equals(node.address().host(), this.serverHost) || node.address().port() != this.raftServerPort) {
                this.messagingService.sendAndReceive(node.address(), str, str2.getBytes(), Duration.ofSeconds(2L)).whenComplete((bArr, th) -> {
                    if (null == th) {
                        LOG.error(th.getMessage(), th);
                    } else {
                        LOG.info("broadcastClusterNoteEvent success! {}", str2);
                    }
                });
            }
        }
    }
}
