package io.mantisrx.publish.netty.transmitters;

import com.netflix.mantis.discovery.proto.JobDiscoveryInfo;
import com.netflix.mantis.discovery.proto.MantisWorker;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import io.mantisrx.publish.EventChannel;
import io.mantisrx.publish.EventTransmitter;
import io.mantisrx.publish.api.Event;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.discovery.MantisJobDiscovery;
import io.mantisrx.publish.internal.exceptions.NonRetryableException;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.netty.pipeline.HttpEventChannel;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/mantis-publish-netty-1.3.51.jar:io/mantisrx/publish/netty/transmitters/ChoiceOfTwoEventTransmitter.class */
public class ChoiceOfTwoEventTransmitter implements EventTransmitter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ChoiceOfTwoEventTransmitter.class);
    private final MrePublishConfiguration configuration;
    private final Registry registry;
    private final Timer channelSendTime;
    private final MantisJobDiscovery jobDiscovery;
    private final EventChannel eventChannel;
    private final ChoiceOfTwoWorkerPool workerPool;
    private final Counter noWorkersDroppedCount;
    private final Counter noDiscoveryDroppedCount;

    public ChoiceOfTwoEventTransmitter(MrePublishConfiguration mrePublishConfiguration, Registry registry, MantisJobDiscovery mantisJobDiscovery, EventChannel eventChannel) {
        this.configuration = mrePublishConfiguration;
        this.registry = registry;
        this.channelSendTime = SpectatorUtils.buildAndRegisterTimer(registry, "sendTime", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.noWorkersDroppedCount = SpectatorUtils.buildAndRegisterCounter(registry, "mantisEventsDropped", "reason", "transmitterNoWorkers");
        this.noDiscoveryDroppedCount = SpectatorUtils.buildAndRegisterCounter(registry, "mantisEventsDropped", "reason", "transmitterNoDiscoveryInfo");
        this.jobDiscovery = mantisJobDiscovery;
        this.eventChannel = eventChannel;
        this.workerPool = new ChoiceOfTwoWorkerPool(mrePublishConfiguration, registry, this.eventChannel);
    }

    @Override // io.mantisrx.publish.EventTransmitter
    public void send(Event event, String str) {
        String jobCluster = this.jobDiscovery.getJobCluster(this.configuration.appName(), str);
        Optional<JobDiscoveryInfo> currentJobWorkers = this.jobDiscovery.getCurrentJobWorkers(jobCluster);
        if (!currentJobWorkers.isPresent()) {
            LOG.trace("No job discovery info for job cluster {}, dropping event", jobCluster);
            this.noDiscoveryDroppedCount.increment();
            return;
        }
        List<MantisWorker> workers = currentJobWorkers.get().getIngestStageWorkers().getWorkers();
        if (workers.size() <= 0) {
            LOG.trace("No workers for job cluster {}, dropping event", jobCluster);
            this.noWorkersDroppedCount.increment();
            return;
        }
        this.workerPool.refresh(workers);
        long wallTime = this.registry.clock().wallTime();
        try {
            ChoiceOfTwoWorkerPool choiceOfTwoWorkerPool = this.workerPool;
            EventChannel eventChannel = this.eventChannel;
            eventChannel.getClass();
            choiceOfTwoWorkerPool.record(event, eventChannel::send);
        } catch (NonRetryableException e) {
            LOG.trace("No workers for job cluster {}, dropping event", jobCluster);
            this.noWorkersDroppedCount.increment();
        }
        this.channelSendTime.record(this.registry.clock().wallTime() - wallTime, TimeUnit.MILLISECONDS);
    }
}
