package net.soundvibe.reacto.vertx.agent;

import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.Json;
import io.vertx.core.spi.cluster.ClusterManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.soundvibe.reacto.agent.AgentIsInDesiredClusterState;
import net.soundvibe.reacto.agent.AgentSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/soundvibe/reacto/vertx/agent/VertxAgentSystem.class */
public final class VertxAgentSystem implements AgentSystem<VertxAgentFactory>, Closeable {
    private static final Logger log = LoggerFactory.getLogger(VertxAgentSystem.class);
    private static final String GROUP_DEFAULT = "__DEFAULT__";
    public final Vertx vertx;
    public final String group;
    private final ConcurrentMap<String, VertxSupervisorAgent> deployedSupervisors = new ConcurrentHashMap();
    private final AtomicReference<Disposable> syncRef = new AtomicReference<>();

    private VertxAgentSystem(Vertx vertx, String str) {
        this.vertx = vertx;
        this.group = str;
    }

    public static VertxAgentSystem of(Vertx vertx) {
        return of(vertx, GROUP_DEFAULT);
    }

    public static VertxAgentSystem ofClustered(VertxOptions vertxOptions) {
        return ofClustered(vertxOptions, Duration.ofSeconds(30L));
    }

    public static VertxAgentSystem of(VertxOptions vertxOptions) {
        return vertxOptions.getClusterManager() != null ? ofClustered(vertxOptions) : of(Vertx.vertx(vertxOptions), vertxOptions.getHAGroup());
    }

    public static VertxAgentSystem ofClustered(VertxOptions vertxOptions, Duration duration) {
        if (vertxOptions.getClusterManager() == null) {
            throw new IllegalArgumentException("ClusterManager must be set for VertxOptions");
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        Vertx.clusteredVertx(vertxOptions, asyncResult -> {
            atomicReference.set(asyncResult);
            countDownLatch.countDown();
        });
        try {
            countDownLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
            AsyncResult asyncResult2 = (AsyncResult) atomicReference.get();
            if (asyncResult2 == null) {
                throw new RuntimeException("Unable to obtain clustered vertx");
            }
            if (asyncResult2.failed()) {
                throw new RuntimeException(asyncResult2.cause());
            }
            return of((Vertx) asyncResult2.result(), vertxOptions.getHAGroup());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static VertxAgentSystem of(Vertx vertx, String str) {
        return new VertxAgentSystem(vertx, str);
    }

    public Maybe<String> run(VertxAgentFactory vertxAgentFactory) {
        return Maybe.create(maybeEmitter -> {
            String uuid = UUID.randomUUID().toString();
            VertxSupervisorAgent vertxSupervisorAgent = new VertxSupervisorAgent(this, vertxAgentFactory);
            this.vertx.deployVerticle(vertxSupervisorAgent, new DeploymentOptions().setWorker(true).setWorkerPoolName(uuid + "-pool").setWorkerPoolSize(1).setInstances(1).setHa(true), asyncResult -> {
                if (asyncResult.succeeded()) {
                    log.info("Supervisor deployed for {}. ID: {}", uuid, asyncResult.result());
                    this.deployedSupervisors.put(asyncResult.result(), vertxSupervisorAgent);
                    initSyncIfNeeded();
                    maybeEmitter.onSuccess(asyncResult.result());
                    return;
                }
                if (asyncResult.failed()) {
                    Throwable cause = asyncResult.cause();
                    if (cause instanceof AgentIsInDesiredClusterState) {
                        initSyncIfNeeded();
                        maybeEmitter.onComplete();
                    } else {
                        log.error("Unable to deploy agent: " + uuid, cause);
                        maybeEmitter.onError(cause);
                    }
                }
            });
        });
    }

    public int deployedSupervisorsCount() {
        return this.deployedSupervisors.size();
    }

    public List<VertxSupervisorAgent> deployedSupervisors() {
        return new ArrayList(this.deployedSupervisors.values());
    }

    public List<AgentVerticle<?>> deployedAgents() {
        return (List) this.deployedSupervisors.values().stream().flatMap(vertxSupervisorAgent -> {
            return (Stream) vertxSupervisorAgent.agentDeploymentId().map(str -> {
                return (Stream) vertxSupervisorAgent.agentVerticle().map((v0) -> {
                    return Stream.of(v0);
                }).orElse(Stream.empty());
            }).orElse(Stream.empty());
        }).collect(Collectors.toList());
    }

    public Optional<ClusterManager> clusterManager() {
        if (this.vertx.isClustered() && (this.vertx instanceof VertxInternal)) {
            return Optional.ofNullable(this.vertx.getClusterManager());
        }
        return Optional.empty();
    }

    public void close() {
        closeSync();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.vertx.close(asyncResult -> {
            countDownLatch.countDown();
        });
        try {
            countDownLatch.await(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("Unable to close agent system gracefully: ", e);
        }
    }

    public void close(Handler<AsyncResult<Void>> handler) {
        closeSync();
        this.vertx.close(handler);
    }

    private synchronized void initSyncIfNeeded() {
        if (clusterManager().isPresent()) {
            ClusterManager clusterManager = clusterManager().get();
            Disposable disposable = this.syncRef.get();
            if (disposable == null || disposable.isDisposed()) {
                this.syncRef.set(Flowable.interval(1L, 1L, TimeUnit.MINUTES).flatMapIterable(l -> {
                    return deployedSupervisors();
                }).subscribe(vertxSupervisorAgent -> {
                    try {
                        vertxSupervisorAgent.checkForMissingAgents(clusterManager);
                    } catch (Throwable th) {
                        log.warn("CheckForMissingAgents threw an error: ", th);
                    }
                }, th -> {
                    log.error("Agent System sync failed:", th);
                }, () -> {
                    log.info("Agent System sync completed");
                }));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeSupervisor(String str) {
        this.deployedSupervisors.remove(str);
    }

    private synchronized void closeSync() {
        Disposable disposable = this.syncRef.get();
        if (disposable != null) {
            disposable.dispose();
        }
    }

    static {
        Json.mapper.registerModule(new JavaTimeModule());
        Json.prettyMapper.registerModule(new JavaTimeModule());
        Json.mapper.registerModule(new Jdk8Module());
        Json.prettyMapper.registerModule(new Jdk8Module());
    }
}
