package net.soundvibe.reacto.vertx.agent;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.util.function.Consumer;
import net.soundvibe.reacto.agent.Agent;
import net.soundvibe.reacto.agent.AgentOptions;
import net.soundvibe.reacto.metric.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/soundvibe/reacto/vertx/agent/AgentVerticle.class */
public abstract class AgentVerticle<T> extends AbstractVerticle implements Agent<T> {
    private static final Logger log = LoggerFactory.getLogger(AgentVerticle.class);
    private final Meter errorMeter;
    private final Timer timer;
    private final Meter eventMeter;
    private final String name;
    private final AgentOptions agentOptions;
    private Consumer<Throwable> onError;
    private Runnable onComplete;
    private Disposable disposable;

    protected AgentVerticle(AgentOptions agentOptions) {
        this.errorMeter = Metrics.REGISTRY.meter(MetricRegistry.name(getClass(), new String[]{"errorMeter"}));
        this.timer = Metrics.REGISTRY.timer(MetricRegistry.name(getClass(), new String[]{"flowDuration"}));
        this.eventMeter = Metrics.REGISTRY.meter(MetricRegistry.name(getClass(), new String[]{"eventMeter"}));
        this.name = getClass().getSimpleName();
        this.agentOptions = agentOptions;
    }

    protected AgentVerticle(String str, AgentOptions agentOptions) {
        this.errorMeter = Metrics.REGISTRY.meter(MetricRegistry.name(getClass(), new String[]{"errorMeter"}));
        this.timer = Metrics.REGISTRY.timer(MetricRegistry.name(getClass(), new String[]{"flowDuration"}));
        this.eventMeter = Metrics.REGISTRY.meter(MetricRegistry.name(getClass(), new String[]{"eventMeter"}));
        this.name = str;
        this.agentOptions = agentOptions;
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public abstract Flowable<T> m1run();

    public String name() {
        return this.name;
    }

    public AgentOptions options() {
        return this.agentOptions;
    }

    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        context.exceptionHandler(this::handleError);
    }

    public void start() {
        log.info("Starting {}", name());
        Timer.Context time = this.timer.time();
        try {
            this.disposable = m1run().subscribe(this::handleEvent, th -> {
                time.close();
                handleError(th);
            }, () -> {
                handleComplete(time);
            });
        } catch (Throwable th2) {
            log.error("Flow cannot be constructed for agent: " + this.name, th2);
            time.close();
            this.onError.accept(th2);
        }
    }

    public void stop() {
        if (this.disposable == null) {
            throw new IllegalStateException("Agent must be started before stopping");
        }
        log.info("Stopping {}: {}", name(), deploymentID());
        this.disposable.dispose();
        close();
    }

    private void handleEvent(T t) {
        this.eventMeter.mark();
    }

    private void handleError(Throwable th) {
        this.errorMeter.mark();
        this.onError.accept(th);
    }

    private void handleComplete(Timer.Context context) {
        log.info("{} flow completed", this.name);
        context.close();
        this.onComplete.run();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assign(Consumer<Throwable> consumer, Runnable runnable) {
        this.onError = consumer;
        this.onComplete = runnable;
    }
}
