package io.mantisrx.publish.netty.pipeline;

import com.netflix.mantis.discovery.proto.MantisWorker;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import io.mantisrx.publish.EventChannel;
import io.mantisrx.publish.api.Event;
import io.mantisrx.publish.internal.exceptions.NonRetryableException;
import io.mantisrx.publish.internal.exceptions.RetryableException;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.netty.proto.MantisEvent;
import io.mantisrx.shaded.io.netty.channel.Channel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/mantis-publish-netty-1.3.41.jar:io/mantisrx/publish/netty/pipeline/HttpEventChannel.class */
public class HttpEventChannel implements EventChannel {
    public static final String CHANNEL_TYPE = "netty";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HttpEventChannel.class);
    private final Registry registry;
    private final Counter writeSuccess;
    private final Counter writeFailure;
    private final Counter nettyChannelDropped;
    private final Timer nettyWriteTime;
    private final HttpEventChannelManager channelManager;

    public HttpEventChannel(Registry registry, HttpEventChannelManager httpEventChannelManager) {
        this.registry = registry;
        this.writeSuccess = SpectatorUtils.buildAndRegisterCounter(this.registry, "writeSuccess", "channel", CHANNEL_TYPE);
        this.writeFailure = SpectatorUtils.buildAndRegisterCounter(this.registry, "writeFailure", "channel", CHANNEL_TYPE);
        this.nettyChannelDropped = SpectatorUtils.buildAndRegisterCounter(this.registry, "mantisEventsDropped", "channel", CHANNEL_TYPE, "reason", "nettyBufferFull");
        this.nettyWriteTime = SpectatorUtils.buildAndRegisterTimer(this.registry, "writeTime", "channel", CHANNEL_TYPE);
        this.channelManager = httpEventChannelManager;
    }

    @Override // io.mantisrx.publish.EventChannel
    public CompletableFuture<Void> send(MantisWorker mantisWorker, Event event) {
        Channel findOrCreate = this.channelManager.findOrCreate(mantisWorker.toInetSocketAddress());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (!findOrCreate.isActive()) {
            completableFuture.completeExceptionally(new NonRetryableException("channel not active"));
        } else if (findOrCreate.isWritable()) {
            LOG.debug("channel is writable: {} bytes remaining", Long.valueOf(findOrCreate.bytesBeforeUnwritable()));
            long wallTime = this.registry.clock().wallTime();
            findOrCreate.writeAndFlush(new MantisEvent(1, event.toJsonString())).addListener2(future -> {
                if (future.isSuccess()) {
                    this.writeSuccess.increment();
                    completableFuture.complete(null);
                } else {
                    LOG.debug("failed to send event over netty channel", future.cause());
                    this.writeFailure.increment();
                    completableFuture.completeExceptionally(new RetryableException(future.cause().getMessage()));
                }
            });
            this.nettyWriteTime.record(this.registry.clock().wallTime() - wallTime, TimeUnit.MILLISECONDS);
        } else {
            LOG.debug("channel not writable: {} bytes before writable", Long.valueOf(findOrCreate.bytesBeforeWritable()));
            this.nettyChannelDropped.increment();
            completableFuture.completeExceptionally(new RetryableException("channel not writable: " + findOrCreate.bytesBeforeWritable() + " bytes before writable"));
        }
        return completableFuture;
    }

    @Override // io.mantisrx.publish.EventChannel
    public double bufferSize(MantisWorker mantisWorker) {
        Channel findOrCreate = this.channelManager.findOrCreate(mantisWorker.toInetSocketAddress());
        return findOrCreate.bytesBeforeUnwritable() / findOrCreate.config().getWriteBufferHighWaterMark();
    }

    @Override // io.mantisrx.publish.EventChannel
    public void close(MantisWorker mantisWorker) {
        this.channelManager.close(mantisWorker.toInetSocketAddress());
    }
}
