package io.mantisrx.publish.netty.pipeline;

import com.google.common.net.HttpHeaders;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.impl.AtomicDouble;
import io.mantisrx.publish.internal.exceptions.RetryableException;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.netty.proto.MantisEvent;
import io.mantisrx.publish.netty.proto.MantisEventEnvelope;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectWriter;
import io.mantisrx.shaded.io.netty.buffer.ByteBufOutputStream;
import io.mantisrx.shaded.io.netty.channel.ChannelHandlerContext;
import io.mantisrx.shaded.io.netty.channel.ChannelOutboundHandlerAdapter;
import io.mantisrx.shaded.io.netty.channel.ChannelPromise;
import io.mantisrx.shaded.io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.mantisrx.shaded.io.netty.handler.codec.http.FullHttpRequest;
import io.mantisrx.shaded.io.netty.handler.codec.http.HttpHeaderNames;
import io.mantisrx.shaded.io.netty.handler.codec.http.HttpHeaderValues;
import io.mantisrx.shaded.io.netty.handler.codec.http.HttpMethod;
import io.mantisrx.shaded.io.netty.handler.codec.http.HttpVersion;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Clock;
import java.util.ArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/mantis-publish-netty-1.3.53.jar:io/mantisrx/publish/netty/pipeline/MantisEventAggregator.class */
class MantisEventAggregator extends ChannelOutboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MantisEventAggregator.class);
    private final Clock clock;
    private final Timer batchFlushTime;
    private final Counter droppedBatches;
    private final Counter flushSuccess;
    private final Counter flushFailure;
    private final AtomicDouble batchSize;
    private final ObjectWriter objectWriter = new ObjectMapper().writer();
    private final boolean compress;
    private ScheduledFuture<?> writerTimeout;
    private long flushIntervalMs;
    private int flushIntervalBytes;
    private MantisEventEnvelope currentMessage;
    private int currentMessageSize;

    /* loaded from: input_file:WEB-INF/lib/mantis-publish-netty-1.3.53.jar:io/mantisrx/publish/netty/pipeline/MantisEventAggregator$WriterTimeoutTask.class */
    private class WriterTimeoutTask implements Runnable {
        private final ChannelHandlerContext ctx;

        WriterTimeoutTask(ChannelHandlerContext channelHandlerContext) {
            this.ctx = channelHandlerContext;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!this.ctx.channel().isActive()) {
                MantisEventAggregator.LOG.debug("channel not active");
            } else {
                if (!this.ctx.channel().isWritable() || MantisEventAggregator.this.currentMessage == null || MantisEventAggregator.this.currentMessageSize == 0) {
                    return;
                }
                MantisEventAggregator.this.writeBatch(this.ctx, this.ctx.channel().newPromise());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MantisEventAggregator(Registry registry, Clock clock, boolean z, long j, int i) {
        this.clock = clock;
        this.batchFlushTime = SpectatorUtils.buildAndRegisterTimer(registry, "batchFlushTime", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.droppedBatches = SpectatorUtils.buildAndRegisterCounter(registry, "droppedBatches", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.flushSuccess = SpectatorUtils.buildAndRegisterCounter(registry, "flushSuccess", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.flushFailure = SpectatorUtils.buildAndRegisterCounter(registry, "flushFailure", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.batchSize = SpectatorUtils.buildAndRegisterGauge(registry, "batchSize", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.flushIntervalMs = j;
        this.flushIntervalBytes = i;
        this.compress = z;
        this.currentMessage = new MantisEventEnvelope(clock.millis(), HttpHeaders.ReferrerPolicyValues.ORIGIN, new ArrayList());
    }

    private boolean acceptOutboundMessage(Object obj) {
        return obj instanceof MantisEvent;
    }

    @Override // io.mantisrx.shaded.io.netty.channel.ChannelOutboundHandlerAdapter, io.mantisrx.shaded.io.netty.channel.ChannelOutboundHandler
    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
        if (!acceptOutboundMessage(obj)) {
            channelHandlerContext.write(obj, channelPromise);
            return;
        }
        MantisEvent mantisEvent = (MantisEvent) obj;
        if (this.currentMessageSize + mantisEvent.size() > this.flushIntervalBytes) {
            writeBatch(channelHandlerContext, channelPromise);
        } else {
            channelPromise.setSuccess();
        }
        this.currentMessage.addEvent(mantisEvent);
        this.currentMessageSize += mantisEvent.size();
    }

    @Override // io.mantisrx.shaded.io.netty.channel.ChannelHandlerAdapter, io.mantisrx.shaded.io.netty.channel.ChannelHandler
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) {
        this.writerTimeout = channelHandlerContext.executor().scheduleAtFixedRate((Runnable) new WriterTimeoutTask(channelHandlerContext), this.flushIntervalMs, this.flushIntervalMs, TimeUnit.MILLISECONDS);
    }

    @Override // io.mantisrx.shaded.io.netty.channel.ChannelHandlerAdapter, io.mantisrx.shaded.io.netty.channel.ChannelHandler
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) {
        if (this.writerTimeout != null) {
            this.writerTimeout.cancel(false);
            this.writerTimeout = null;
        }
    }

    void writeBatch(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) {
        try {
            try {
                FullHttpRequest buildRequest = buildRequest(channelHandlerContext, this.currentMessage);
                this.batchSize.set(this.currentMessage.getEventList().size());
                long millis = this.clock.millis();
                channelHandlerContext.writeAndFlush(buildRequest).addListener2(future -> {
                    this.batchFlushTime.record(this.clock.millis() - millis, TimeUnit.MILLISECONDS);
                    if (future.isSuccess()) {
                        channelPromise.setSuccess();
                        this.flushSuccess.increment();
                    } else {
                        channelPromise.setFailure((Throwable) new RetryableException(future.cause().getMessage()));
                        this.flushFailure.increment();
                    }
                });
                this.currentMessage = new MantisEventEnvelope(this.clock.millis(), HttpHeaders.ReferrerPolicyValues.ORIGIN, new ArrayList());
                this.currentMessageSize = 0;
            } catch (IOException e) {
                LOG.debug("unable to serialize batch", (Throwable) e);
                this.droppedBatches.increment();
                channelHandlerContext.fireExceptionCaught((Throwable) e);
                this.currentMessage = new MantisEventEnvelope(this.clock.millis(), HttpHeaders.ReferrerPolicyValues.ORIGIN, new ArrayList());
                this.currentMessageSize = 0;
            }
        } catch (Throwable th) {
            this.currentMessage = new MantisEventEnvelope(this.clock.millis(), HttpHeaders.ReferrerPolicyValues.ORIGIN, new ArrayList());
            this.currentMessageSize = 0;
            throw th;
        }
    }

    FullHttpRequest buildRequest(ChannelHandlerContext channelHandlerContext, MantisEventEnvelope mantisEventEnvelope) throws IOException {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
        DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, URI.create("http://" + inetSocketAddress.getHostString() + ':' + inetSocketAddress.getPort() + "/api/v1/events").getRawPath(), channelHandlerContext.alloc().directBuffer());
        defaultFullHttpRequest.headers().add(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON);
        defaultFullHttpRequest.headers().add(HttpHeaderNames.ORIGIN, "localhost");
        if (this.compress) {
            defaultFullHttpRequest.headers().add(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
        }
        defaultFullHttpRequest.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
        mantisEventEnvelope.setTs(this.clock.millis());
        ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(defaultFullHttpRequest.content());
        Throwable th = null;
        try {
            this.objectWriter.writeValue((OutputStream) byteBufOutputStream, (Object) mantisEventEnvelope);
            if (byteBufOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteBufOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    byteBufOutputStream.close();
                }
            }
            return defaultFullHttpRequest;
        } catch (Throwable th3) {
            if (byteBufOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteBufOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    byteBufOutputStream.close();
                }
            }
            throw th3;
        }
    }
}
