package io.mantisrx.publish;

import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.api.EventPublisher;
import io.mantisrx.publish.config.MrePublishConfiguration;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/mantis-publish-netty-1.3.39.jar:io/mantisrx/publish/MrePublishClientInitializer.class
 */
/* loaded from: input_file:WEB-INF/lib/mantis-publish-core-1.3.39.jar:io/mantisrx/publish/MrePublishClientInitializer.class */
public class MrePublishClientInitializer {
    private final MrePublishConfiguration config;
    private final Registry registry;
    private final StreamManager streamManager;
    private final EventPublisher eventPublisher;
    private final SubscriptionTracker subscriptionsTracker;
    private final EventTransmitter eventTransmitter;
    private final Tee tee;
    private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MrePublishClientInitializer.class);
    private static final ScheduledThreadPoolExecutor DRAINER_EXECUTOR = new ScheduledThreadPoolExecutor(1, runnable -> {
        return new Thread(runnable, "MantisDrainer");
    });
    private static final ScheduledThreadPoolExecutor SUBSCRIPTIONS_EXECUTOR = new ScheduledThreadPoolExecutor(1, runnable -> {
        return new Thread(runnable, "MantisSubscriptionsTracker");
    });

    public MrePublishClientInitializer(MrePublishConfiguration mrePublishConfiguration, Registry registry, StreamManager streamManager, EventPublisher eventPublisher, SubscriptionTracker subscriptionTracker, EventTransmitter eventTransmitter, Tee tee) {
        this.config = mrePublishConfiguration;
        this.registry = registry;
        this.streamManager = streamManager;
        this.eventPublisher = eventPublisher;
        this.subscriptionsTracker = subscriptionTracker;
        this.eventTransmitter = eventTransmitter;
        this.tee = tee;
    }

    public void start() {
        this.scheduledFutures.add(setupSubscriptionTracker(this.subscriptionsTracker));
        this.scheduledFutures.add(setupDrainer(this.streamManager, this.eventTransmitter, this.tee));
    }

    public void stop() {
        for (ScheduledFuture<?> scheduledFuture : this.scheduledFutures) {
            if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
                scheduledFuture.cancel(false);
            }
        }
        this.scheduledFutures.clear();
    }

    public EventPublisher getEventPublisher() {
        return this.eventPublisher;
    }

    private ScheduledFuture<?> setupDrainer(StreamManager streamManager, EventTransmitter eventTransmitter, Tee tee) {
        EventDrainer eventDrainer = new EventDrainer(this.config, streamManager, this.registry, new EventProcessor(this.config, streamManager, tee), eventTransmitter, Clock.systemUTC());
        return DRAINER_EXECUTOR.scheduleAtFixedRate(() -> {
            try {
                eventDrainer.run();
            } catch (Throwable th) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("event drainer caught exception", th);
                }
            }
        }, 0L, this.config.drainerIntervalMsec(), TimeUnit.MILLISECONDS);
    }

    private ScheduledFuture<?> setupSubscriptionTracker(SubscriptionTracker subscriptionTracker) {
        return SUBSCRIPTIONS_EXECUTOR.scheduleAtFixedRate(() -> {
            try {
                subscriptionTracker.refreshSubscriptions();
            } catch (Throwable th) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("failed to refresh subscriptions", th);
                }
            }
        }, 1L, this.config.subscriptionRefreshIntervalSec(), TimeUnit.SECONDS);
    }

    static {
        DRAINER_EXECUTOR.setRemoveOnCancelPolicy(true);
        SUBSCRIPTIONS_EXECUTOR.setRemoveOnCancelPolicy(true);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            DRAINER_EXECUTOR.shutdown();
            SUBSCRIPTIONS_EXECUTOR.shutdown();
        }));
    }
}
