package net.soundvibe.reacto.vertx.agent;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import net.soundvibe.reacto.agent.AgentIsInDesiredClusterState;
import net.soundvibe.reacto.agent.AgentOptions;
import net.soundvibe.reacto.metric.Metrics;
import net.soundvibe.reacto.utils.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/soundvibe/reacto/vertx/agent/VertxSupervisorAgent.class */
public final class VertxSupervisorAgent extends AbstractVerticle {
    static final String MAP_NODES = "__reactoNodes__";
    private static final String MAP_AGENTS_OPTIONS = "__reactoAgentOptions__";
    private static final String ADDRESS_REACTO_AGENT_DEPLOYED = "reactoAgentDeployed";
    private static final String LOCK_REACTO_SUPERVISOR = "reactoSupervisorLock_";
    private final VertxAgentSystem vertxAgentSystem;
    private VertxAgentFactory agentFactory;
    private AgentVerticle<?> agent;
    private VertxAgent vertxAgent;
    private final AtomicReference<String> deploymentId = new AtomicReference<>();
    private Map<String, String> nodes;
    private Map<String, String> agentOptions;
    private VertxAgentOptions vertxAgentOptions;
    private Disposable clusterSyncSubscription;
    private Counter restartCounter;
    private AgentOptions.AgentRestartStrategy onErrorRestartStrategy;
    private AgentOptions.AgentRestartStrategy onCompleteRestartStrategy;
    private static final Logger log = LoggerFactory.getLogger(VertxSupervisorAgent.class);
    private static final ConcurrentMap<String, Boolean> idleSupervisors = new ConcurrentHashMap();
    private static final Object lock = new Object();

    /* renamed from: net.soundvibe.reacto.vertx.agent.VertxSupervisorAgent$2, reason: invalid class name */
    /* loaded from: input_file:net/soundvibe/reacto/vertx/agent/VertxSupervisorAgent$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$net$soundvibe$reacto$agent$AgentOptions$OnCompleteAction = new int[AgentOptions.OnCompleteAction.values().length];

        static {
            try {
                $SwitchMap$net$soundvibe$reacto$agent$AgentOptions$OnCompleteAction[AgentOptions.OnCompleteAction.undeploy.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$net$soundvibe$reacto$agent$AgentOptions$OnCompleteAction[AgentOptions.OnCompleteAction.restart.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/soundvibe/reacto/vertx/agent/VertxSupervisorAgent$DeploymentIdType.class */
    public enum DeploymentIdType {
        SUPERVISOR,
        AGENT
    }

    public VertxSupervisorAgent(VertxAgentSystem vertxAgentSystem, VertxAgentFactory vertxAgentFactory) {
        this.vertxAgentSystem = vertxAgentSystem;
        this.agentFactory = vertxAgentFactory;
    }

    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        initAgent();
        initAgentOptions();
        this.restartCounter = Metrics.REGISTRY.counter(MetricRegistry.name(getClass(), new String[]{this.agent.name(), "restartCount"}));
        this.vertxAgentSystem.clusterManager().ifPresent(this::setAgentInfo);
    }

    public void start(Future<Void> future) {
        try {
            if (((Boolean) this.vertxAgentSystem.clusterManager().map(clusterManager -> {
                return findRunningAgents(this.nodes, this.agent.name(), this.agent.version());
            }).map(list -> {
                return Boolean.valueOf(list.size() < this.vertxAgentOptions.getDesiredNumberOfInstances() && ((long) this.vertxAgentOptions.getMaxInstancesOnNode()) > list.stream().filter(vertxAgent -> {
                    return isThisNode(vertxAgent, this.vertxAgentSystem.clusterManager().orElseThrow(() -> {
                        return new IllegalStateException("Cluster manager is not used");
                    }));
                }).count());
            }).orElse(true)).booleanValue()) {
                this.vertx.deployVerticle(this.agent, this.vertxAgentOptions.toDeploymentOptions(), asyncResult -> {
                    handleDeployment(asyncResult, future);
                });
            } else {
                log.warn("There are already desired number of instances running in the cluster, skipping deployment for {}", this.agent.name());
                Boolean putIfAbsent = idleSupervisors.putIfAbsent(this.agent.name(), true);
                if (putIfAbsent == null || !putIfAbsent.booleanValue()) {
                    this.vertxAgentSystem.clusterManager().ifPresent(this::listenForClusterChanges);
                    future.complete();
                } else {
                    future.fail(new AgentIsInDesiredClusterState(this.agent.name(), this.vertxAgentOptions.getDesiredNumberOfInstances()));
                }
            }
        } catch (Throwable th) {
            future.fail(th);
        }
    }

    public void stop(Future<Void> future) {
        log.info("Stopping supervisor. DeploymentID: " + deploymentID());
        if (isClusterSyncRunning()) {
            this.clusterSyncSubscription.dispose();
        }
        future.complete();
    }

    public Optional<String> agentDeploymentId() {
        return Optional.ofNullable(this.deploymentId.get());
    }

    public Optional<AgentVerticle<?>> agentVerticle() {
        return Optional.ofNullable(this.agent);
    }

    private void initAgent() {
        this.agent = (AgentVerticle) this.agentFactory.create();
        this.agent.assign(this::handleChildError, this::handleChildComplete);
    }

    private void initAgentOptions() {
        AgentOptions options = this.agent.options();
        if (!(options instanceof VertxAgentOptions)) {
            throw new IllegalArgumentException("agent.deploymentOptions() should be of VertxAgentOptions class but was: " + options.getClass().getName());
        }
        this.vertxAgentOptions = (VertxAgentOptions) options;
        this.onErrorRestartStrategy = options.getAgentRestartStrategy();
        this.onCompleteRestartStrategy = options.getOnCompleteRestartStrategy();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleDeployment(AsyncResult<String> asyncResult, Future<Void> future) {
        try {
            if (asyncResult.succeeded()) {
                this.deploymentId.set(asyncResult.result());
                setAgent((String) this.vertxAgentSystem.clusterManager().map((v0) -> {
                    return v0.getNodeID();
                }).orElse(UUID.randomUUID().toString()));
                this.vertxAgentSystem.clusterManager().filter(clusterManager -> {
                    return this.vertxAgentOptions.isHA();
                }).ifPresent(clusterManager2 -> {
                    addToHA();
                    listenForClusterChanges(clusterManager2);
                });
                log.info("{} deployed successfully: {}", this.agent.name(), asyncResult.result());
                publishDeployedEvent();
                future.complete();
            } else if (asyncResult.failed()) {
                log.error("Supervisor was unable to deploy agent: " + this.agent.name(), asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        } catch (Throwable th) {
            log.error("Deployment error: ", th);
            future.fail(th);
        }
    }

    private VertxAgent toVertxAgent(String str) {
        return new VertxAgent(str, this.deploymentId.get(), this.agent.name(), this.vertxAgentSystem.group, deploymentID(), this.agent.version(), Instant.now());
    }

    private void addToHA() {
        synchronized (lock) {
            if (this.vertxAgent == null || this.nodes == null) {
                return;
            }
            String str = this.nodes.get(this.vertxAgent.nodeId);
            if (str == null) {
                this.nodes.put(this.vertxAgent.nodeId, toVertxNode().encode());
            } else {
                VertxNode fromJson = VertxNode.fromJson(str);
                if (fromJson.agents.stream().noneMatch(vertxAgent -> {
                    return vertxAgent.agentDeploymentId.equals(this.vertxAgent.agentDeploymentId);
                })) {
                    fromJson.agents.add(this.vertxAgent);
                    this.nodes.put(fromJson.nodeId, fromJson.encode());
                }
            }
        }
    }

    private void removeFromHA(String str, DeploymentIdType deploymentIdType) {
        synchronized (lock) {
            if (this.vertxAgent == null || this.nodes == null) {
                return;
            }
            String str2 = this.nodes.get(this.vertxAgent.nodeId);
            if (str2 != null) {
                VertxNode fromJson = VertxNode.fromJson(str2);
                if (fromJson.agents.removeIf(vertxAgent -> {
                    return deploymentIdType == DeploymentIdType.SUPERVISOR ? vertxAgent.supervisorDeploymentId.equals(str) : vertxAgent.agentDeploymentId.equals(str);
                })) {
                    this.nodes.put(this.vertxAgent.nodeId, fromJson.encode());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeNodeFromHA(String str) {
        synchronized (lock) {
            try {
                if (this.nodes.containsKey(str)) {
                    this.nodes.remove(str);
                }
            } catch (Throwable th) {
                log.warn("Cannot remove node from HA: ", th);
            }
        }
    }

    private VertxNode toVertxNode() {
        return new VertxNode(this.vertxAgent.nodeId, WebUtils.getLocalAddress(), this.vertxAgentSystem.group, Collections.singletonList(this.vertxAgent));
    }

    private void publishDeployedEvent() {
        this.vertx.eventBus().publish(ADDRESS_REACTO_AGENT_DEPLOYED, this.vertxAgent.encode());
    }

    private synchronized void syncDeploymentOptions() {
        String str = this.agentOptions.get(this.agent.name());
        if (str == null) {
            this.agentOptions.put(this.agent.name(), this.vertxAgentOptions.toJson().encode());
            return;
        }
        VertxAgentOptions from = VertxAgentOptions.from(str);
        if (!from.getAgentRestartStrategy().equals(this.onErrorRestartStrategy)) {
            this.onErrorRestartStrategy = from.getAgentRestartStrategy();
        }
        if (!from.getOnCompleteRestartStrategy().equals(this.onCompleteRestartStrategy)) {
            this.onCompleteRestartStrategy = from.getOnCompleteRestartStrategy();
        }
        this.vertxAgentOptions = from;
    }

    private synchronized void setAgentInfo(ClusterManager clusterManager) {
        this.nodes = clusterManager.getSyncMap(MAP_NODES);
        this.agentOptions = clusterManager.getSyncMap(MAP_AGENTS_OPTIONS);
    }

    private synchronized void setAgent(String str) {
        this.vertxAgent = toVertxAgent(str);
    }

    private void listenForClusterChanges(final ClusterManager clusterManager) {
        log.info("Starting to listen for cluster changes for agent: {}", this.agent.name());
        this.vertx.eventBus().consumer(ADDRESS_REACTO_AGENT_DEPLOYED).exceptionHandler(th -> {
            log.error("Supervisor eventBus consumer error in agent " + this.agent.name(), th);
        }).handler(message -> {
            VertxAgent fromJson = VertxAgent.fromJson((String) message.body());
            log.info("New agent deployed: {}", fromJson);
            checkForExcessiveAgents(clusterManager, fromJson);
        });
        clusterManager.nodeListener(new NodeListener() { // from class: net.soundvibe.reacto.vertx.agent.VertxSupervisorAgent.1
            public void nodeAdded(String str) {
                VertxSupervisorAgent.log.info("New node added to cluster: {}", str);
            }

            public void nodeLeft(String str) {
                VertxSupervisorAgent.log.info("Node left the cluster: {}", str);
                VertxSupervisorAgent.this.removeNodeFromHA(str);
                VertxSupervisorAgent.this.checkForMissingAgents(clusterManager);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkForMissingAgents(ClusterManager clusterManager) {
        if (isClusterSyncRunning()) {
            return;
        }
        log.debug("Checking for missing agents...");
        this.clusterSyncSubscription = Flowable.interval(0L, 1L, TimeUnit.MINUTES).map(l -> {
            return findRunningAgents(this.nodes, this.agent.name(), this.agent.version());
        }).doOnNext(list -> {
            syncDeploymentOptions();
        }).takeWhile(list2 -> {
            return canBeDeployedLocally(list2, clusterManager.getNodeID());
        }).subscribe(list3 -> {
            log.info("There are less nodes [{}] than desired [{}], will try to redeploy agent: {}:{}", new Object[]{Integer.valueOf(list3.size()), Integer.valueOf(this.vertxAgentOptions.getDesiredNumberOfInstances()), this.agent.name(), Integer.valueOf(this.agent.version())});
            deployAgent(clusterManager);
        }, th -> {
            log.error("Error when trying to set desired cluster state: ", th);
        }, () -> {
            log.debug("Cluster agent [{}: {}] is in it's desired state", this.agent.name(), Integer.valueOf(this.agent.version()));
        });
    }

    private boolean isClusterSyncRunning() {
        return (this.clusterSyncSubscription == null || this.clusterSyncSubscription.isDisposed()) ? false : true;
    }

    private boolean canBeDeployedLocally(List<VertxAgent> list, String str) {
        return list.size() < this.vertxAgentOptions.getDesiredNumberOfInstances() && list.stream().filter(vertxAgent -> {
            return vertxAgent.nodeId.equals(str);
        }).count() < ((long) this.vertxAgentOptions.getMaxInstancesOnNode());
    }

    private void checkForExcessiveAgents(ClusterManager clusterManager, VertxAgent vertxAgent) {
        if (!isClusterSyncRunning() && vertxAgent.name.equals(this.agent.name())) {
            this.clusterSyncSubscription = Flowable.interval(0L, 1L, TimeUnit.MINUTES).map(l -> {
                return findRunningAgents(this.nodes, this.agent.name(), this.agent.version());
            }).doOnNext(list -> {
                syncDeploymentOptions();
            }).takeWhile(list2 -> {
                return list2.size() > this.vertxAgentOptions.getDesiredNumberOfInstances();
            }).subscribe(list3 -> {
                log.info("There are more nodes [{}] than desired [{}], undeploying excessive agent {}...", new Object[]{Integer.valueOf(list3.size()), Integer.valueOf(this.vertxAgentOptions.getDesiredNumberOfInstances()), this.agent.name()});
                undeployAgentIfNeeded(list3, clusterManager);
            }, th -> {
                log.error("Error when trying to set excessive cluster state: ", th);
            }, () -> {
                log.debug("Cluster agent [{}] is in it's desired state", this.agent.name());
            });
        }
    }

    private void undeployAgentIfNeeded(List<VertxAgent> list, ClusterManager clusterManager) {
        ((Map) list.stream().collect(Collectors.groupingBy(vertxAgent -> {
            return vertxAgent.nodeId;
        }))).values().stream().max(Comparator.comparing((v0) -> {
            return v0.size();
        })).map(this::chooseAgent).filter(vertxAgent2 -> {
            return isThisNode(vertxAgent2, clusterManager);
        }).ifPresent(vertxAgent3 -> {
            undeployAgent(clusterManager, vertxAgent3);
        });
    }

    private boolean isThisNode(VertxAgent vertxAgent, ClusterManager clusterManager) {
        return vertxAgent.nodeId.equals(clusterManager.getNodeID());
    }

    private VertxAgent chooseAgent(List<VertxAgent> list) {
        return list.stream().min(Comparator.comparingLong(vertxAgent -> {
            return vertxAgent.updatedOn.toEpochMilli();
        })).orElseThrow(() -> {
            return new NoSuchElementException("No agents to choose from!");
        });
    }

    private void deployAgent(ClusterManager clusterManager) {
        clusterManager.getLockWithTimeout(LOCK_REACTO_SUPERVISOR + this.agent.name(), 2000L, asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.warn("Unable to obtain lock", asyncResult.cause());
            } else {
                Lock lock2 = (Lock) asyncResult.result();
                this.vertxAgentSystem.run(this.agentFactory).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).doOnDispose(() -> {
                    release(lock2);
                }).subscribe(str -> {
                    log.info("New agent was redeployed successfully: {}", this.agent.name());
                    try {
                        if (this.deploymentId.get() == null) {
                            String name = this.agent.name();
                            log.info("No need for idle supervisor agent, undeploy itself...");
                            Throwable blockingGet = undeploy(deploymentID(), DeploymentIdType.SUPERVISOR).blockingGet(30L, TimeUnit.SECONDS);
                            if (blockingGet != null) {
                                log.warn("Unable to undeploy not needed idle supervisor: ", blockingGet);
                            } else {
                                log.info("Idle supervisor undelployed successfully");
                                idleSupervisors.remove(name);
                            }
                        }
                    } finally {
                        release(lock2);
                    }
                }, th -> {
                    log.error("Unable to redeploy failed agent: " + this.agent.name(), th);
                    release(lock2);
                }, () -> {
                    log.info("No need to redeploy new agent: {}", this.agent.name());
                    release(lock2);
                });
            }
        });
    }

    private void release(Lock lock2) {
        try {
            lock2.release();
        } catch (Throwable th) {
            log.warn("Lock was not released: {}", th.getMessage());
        }
    }

    private void undeployAgent(ClusterManager clusterManager, VertxAgent vertxAgent) {
        clusterManager.getLockWithTimeout(LOCK_REACTO_SUPERVISOR + this.agent.name(), 2000L, asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.warn("Unable to acquire lock", asyncResult.cause());
                return;
            }
            Lock lock2 = (Lock) asyncResult.result();
            log.info("Lock acquired successfully: {}", LOCK_REACTO_SUPERVISOR + this.agent.name());
            List<VertxAgent> findRunningAgents = findRunningAgents(this.nodes, this.agent.name(), this.agent.version());
            if (findRunningAgents.size() > this.vertxAgentOptions.getDesiredNumberOfInstances()) {
                log.info("Starting to undeploy excessive agent: {}", this.agent.name());
                long count = findRunningAgents.stream().filter(vertxAgent2 -> {
                    return vertxAgent2.nodeId.equals(clusterManager.getNodeID());
                }).count();
                undeploy(count < 2 ? vertxAgent.agentDeploymentId : vertxAgent.supervisorDeploymentId, count < 2 ? DeploymentIdType.AGENT : DeploymentIdType.SUPERVISOR).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).doFinally(() -> {
                    release(lock2);
                }).subscribe(() -> {
                    log.info("Excessive agent undeployed successfully: {}", vertxAgent);
                }, th -> {
                    log.warn("Error on excessive agent undeployment: ", th);
                });
            }
        });
    }

    public static List<VertxAgent> findRunningAgents(Map<String, String> map, String str, int i) {
        try {
            return (List) map.values().stream().map(VertxNode::fromJson).flatMap(vertxNode -> {
                return vertxNode.agents.stream();
            }).filter(vertxAgent -> {
                return vertxAgent.name.equals(str) && vertxAgent.version == i;
            }).collect(Collectors.toList());
        } catch (Throwable th) {
            log.warn("No running agents found: ", th);
            return Collections.emptyList();
        }
    }

    private void handleChildError(Throwable th) {
        log.error("Error in child agent " + this.agent.name(), th);
        log.info("Restarting agent {}", this.agent.name());
        log.info("Using [{}'s] restart strategy: {}", this.agent.name(), this.onErrorRestartStrategy.getClass().getSimpleName());
        if (restartAgent(this.onErrorRestartStrategy)) {
            return;
        }
        undeploy().subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(() -> {
            log.info("Undeployed successfully on child error: ", th);
        }, th2 -> {
            log.warn("Unable to undeploy on child error: ", th2);
        });
    }

    private boolean restartAgent(AgentOptions.AgentRestartStrategy agentRestartStrategy) {
        return agentRestartStrategy.restart(() -> {
            synchronized (this) {
                initAgent();
                this.restartCounter.inc();
                this.agent.start();
            }
        });
    }

    private void handleChildComplete() {
        log.info("Got child completed: {}", this.agent.name());
        switch (AnonymousClass2.$SwitchMap$net$soundvibe$reacto$agent$AgentOptions$OnCompleteAction[this.vertxAgentOptions.getOnCompleteAction().ordinal()]) {
            case 1:
                undeploy().subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(() -> {
                    log.info("Undeployed successfully on child complete");
                }, th -> {
                    log.warn("Unable to undeploy on child complete: ", th);
                });
                return;
            case 2:
                log.info("Using [{}'s] on complete restart strategy: {}", this.agent.name(), this.onCompleteRestartStrategy.getClass().getSimpleName());
                restartAgent(this.onCompleteRestartStrategy);
                return;
            default:
                return;
        }
    }

    private Completable undeploy(String str, DeploymentIdType deploymentIdType) {
        return Completable.create(completableEmitter -> {
            this.vertx.undeploy(str, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    log.error("Unable to undeploy " + this.agent.name(), asyncResult.cause());
                    completableEmitter.onError(asyncResult.cause());
                    return;
                }
                log.info("Undeployed {}: {}", deploymentIdType, this.agent.name());
                removeFromHA(str, deploymentIdType);
                if (deploymentIdType == DeploymentIdType.SUPERVISOR) {
                    this.vertxAgentSystem.removeSupervisor(str);
                }
                completableEmitter.onComplete();
            });
        });
    }

    private Completable undeploy() {
        return this.vertxAgent == null ? Completable.complete() : undeploy(this.vertxAgent.supervisorDeploymentId, DeploymentIdType.SUPERVISOR);
    }
}
