package com.github.liuyehcf.framework.flow.engine.runtime.remote.io;

import com.github.liuyehcf.framework.flow.engine.runtime.config.FlowProperties;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.Member;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.MemberConfig;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.MemberIdentifier;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.MemberRole;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.MemberStatus;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.cluster.Topology;
import com.github.liuyehcf.framework.flow.engine.runtime.remote.io.message.JoiningRequest;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/liuyehcf/framework/flow/engine/runtime/remote/io/TopologyProbe.class */
public abstract class TopologyProbe extends BaseScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopologyProbe.class);
    private static final String DEFAULT_GROUP = "topologyProbe";
    private static final String KEY_EVENT_LOOP = "eventLoop";
    private static final String KEY_OTHER_SEEDS = "seeds";

    /* loaded from: input_file:com/github/liuyehcf/framework/flow/engine/runtime/remote/io/TopologyProbe$TopologyProbeJob.class */
    public static final class TopologyProbeJob implements Job {
        public void execute(JobExecutionContext jobExecutionContext) {
            JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
            ClusterEventLoop clusterEventLoop = (ClusterEventLoop) jobDataMap.get(TopologyProbe.KEY_EVENT_LOOP);
            List<MemberConfig> list = (List) jobDataMap.get(TopologyProbe.KEY_OTHER_SEEDS);
            keepAliveWithClusterMember(clusterEventLoop);
            leaderRepairMemberWithAbnormalStatus(clusterEventLoop);
            seedInit(clusterEventLoop);
            nonSeedInit(clusterEventLoop, list);
            everyMemberKeepAliveWithSeedToAvoidIsolatedIslands(clusterEventLoop, list);
        }

        private void keepAliveWithClusterMember(ClusterEventLoop clusterEventLoop) {
            Topology<Member> topology = clusterEventLoop.getTopology();
            for (Member member : topology) {
                if (!topology.isSelf(member) && member.getStatus().isActive()) {
                    if (clusterEventLoop.hasChannel(member)) {
                        member.setLocalStatus(MemberStatus.active);
                    } else {
                        try {
                            clusterEventLoop.createChannel(member);
                        } catch (Throwable th) {
                            TopologyProbe.LOGGER.error("create cluster channel catch unknown error, errorMsg={}", th.getMessage());
                        }
                    }
                }
            }
        }

        private void leaderRepairMemberWithAbnormalStatus(ClusterEventLoop clusterEventLoop) {
            Topology topology = clusterEventLoop.getTopology();
            if (topology.isSelfLeader()) {
                for (Member member : topology) {
                    if (member.getLocalStatus().isUnreachable()) {
                        clusterEventLoop.updateAndBroadcastMemberStatus(member.clone(Long.valueOf(topology.generateNextMemberId()), MemberRole.follower, MemberStatus.leaving));
                        clusterEventLoop.updateAndBroadcastMemberStatus(member.clone(Long.valueOf(topology.generateNextMemberId()), MemberRole.follower, MemberStatus.inactive));
                    }
                    if (member.getStatus().isInactive() && clusterEventLoop.hasChannel(member)) {
                        clusterEventLoop.updateAndBroadcastMemberStatus(member.clone(Long.valueOf(topology.generateNextMemberId()), MemberRole.follower, MemberStatus.active));
                    }
                }
            }
        }

        private void seedInit(ClusterEventLoop clusterEventLoop) {
            Topology topology = clusterEventLoop.getTopology();
            if (clusterEventLoop.getEngine().getProperties().isSeed()) {
                if (topology.getLeader() == null || !topology.getLeader().getLocalStatus().isActive()) {
                    TopologyProbe.LOGGER.info("seed [{}] start as leader", topology.getSelf().getIdentifier());
                    do {
                    } while (!topology.assumeLeader(topology.getTransactionId()));
                }
            }
        }

        private void nonSeedInit(ClusterEventLoop clusterEventLoop, List<MemberConfig> list) {
            Topology topology = clusterEventLoop.getTopology();
            FlowProperties properties = clusterEventLoop.getEngine().getProperties();
            if (properties.isSeed() || !topology.getSelf().getStatus().isInit()) {
                return;
            }
            Collections.shuffle(list);
            for (MemberConfig memberConfig : list) {
                if (!topology.hasMember(memberConfig)) {
                    try {
                        clusterEventLoop.createChannel(memberConfig);
                        ClusterChannel channel = clusterEventLoop.getChannel(memberConfig);
                        if (channel != null) {
                            Member self = topology.getSelf();
                            if (!self.getStatus().isInit()) {
                                topology.addOrReplaceMember(self.clone(-1L, null, MemberStatus.init));
                                self = topology.getSelf();
                            }
                            channel.write(new JoiningRequest(self, false));
                            TopologyProbe.LOGGER.info("member [{}] is joining cluster through [{}]", properties.getSelfConfig().getIdentifier(), memberConfig.getIdentifier());
                            return;
                        }
                        if (TopologyProbe.LOGGER.isDebugEnabled()) {
                            TopologyProbe.LOGGER.debug("member [{}] is joining cluster failed due to seed [{}] is unreachable", properties.getSelfConfig().getIdentifier(), memberConfig.getIdentifier());
                        }
                    } catch (Throwable th) {
                        TopologyProbe.LOGGER.warn("member [{}] joining cluster catch unknown error, errorMsg={}", new Object[]{properties.getSelfConfig().getIdentifier(), th.getMessage(), th});
                    }
                }
            }
        }

        private void everyMemberKeepAliveWithSeedToAvoidIsolatedIslands(ClusterEventLoop clusterEventLoop, List<MemberConfig> list) {
            FlowProperties properties = clusterEventLoop.getEngine().getProperties();
            for (MemberIdentifier memberIdentifier : list) {
                if (!clusterEventLoop.hasChannel(memberIdentifier)) {
                    try {
                        clusterEventLoop.createChannel(memberIdentifier);
                        if (clusterEventLoop.getChannel(memberIdentifier) == null) {
                            if (TopologyProbe.LOGGER.isDebugEnabled()) {
                                TopologyProbe.LOGGER.debug("member [{}] keep alive with seed failed due to seed [{}] is unreachable", properties.getSelfConfig().getIdentifier(), memberIdentifier.getIdentifier());
                                return;
                            }
                            return;
                        } else if (TopologyProbe.LOGGER.isDebugEnabled()) {
                            TopologyProbe.LOGGER.debug("member [{}] is keep alive with seed [{}]", properties.getSelfConfig().getIdentifier(), memberIdentifier.getIdentifier());
                        }
                    } catch (Throwable th) {
                        TopologyProbe.LOGGER.warn("member [{}] keep alive with seed catch unknown error, errorMsg={}", new Object[]{properties.getSelfConfig().getIdentifier(), th.getMessage(), th});
                    }
                }
            }
        }
    }

    TopologyProbe() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void register(ClusterEventLoop clusterEventLoop) {
        FlowProperties properties = clusterEventLoop.getEngine().getProperties();
        int topologyProbeInterval = properties.getTopologyProbeInterval();
        List<MemberConfig> initOtherSeeds = initOtherSeeds(properties);
        String uuid = UUID.randomUUID().toString();
        JobDetail build = JobBuilder.newJob(TopologyProbeJob.class).withIdentity(uuid, DEFAULT_GROUP).build();
        build.getJobDataMap().put(KEY_EVENT_LOOP, clusterEventLoop);
        build.getJobDataMap().put(KEY_OTHER_SEEDS, initOtherSeeds);
        Trigger build2 = TriggerBuilder.newTrigger().withIdentity(uuid, DEFAULT_GROUP).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(topologyProbeInterval).repeatForever()).startAt(new Date(System.currentTimeMillis() + (topologyProbeInterval * 1000))).build();
        try {
            scheduler.scheduleJob(build, build2);
            clusterEventLoop.addCloseListener(promise -> {
                if (scheduler.unscheduleJob(build2.getKey())) {
                    LOGGER.info("unregister topology probe due to cluster event loop shutdown");
                }
            });
        } catch (SchedulerException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private static List<MemberConfig> initOtherSeeds(FlowProperties flowProperties) {
        ArrayList newArrayList = Lists.newArrayList();
        String host = flowProperties.getHost();
        int port = flowProperties.getPort();
        for (MemberConfig memberConfig : flowProperties.getClusterConfig().getSeeds()) {
            if (!Objects.equals(host, memberConfig.getHost()) || !Objects.equals(Integer.valueOf(port), memberConfig.getPort())) {
                newArrayList.add(memberConfig);
            }
        }
        return newArrayList;
    }
}
