package org.apache.pulsar.websocket;

import java.io.IOException;
import java.time.format.DateTimeParseException;
import java.util.Base64;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pulsar.shade.com.google.common.base.Enums;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.apache.pulsar.shade.org.rocksdb.HashLinkedListMemTableConfig;
import org.apache.pulsar.shade.org.rocksdb.RateLimiter;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/websocket/ProducerHandler.class */
public class ProducerHandler extends AbstractWebSocketHandler {
    private Producer<byte[]> producer;
    private final LongAdder numMsgsSent;
    private final LongAdder numMsgsFailed;
    private final LongAdder numBytesSent;
    private final StatsBuckets publishLatencyStatsUSec;
    private volatile long msgPublishedCounter;
    private static final AtomicLongFieldUpdater<ProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, "msgPublishedCounter");
    public static final long[] ENTRY_LATENCY_BUCKETS_USEC = {500, 1000, 5000, 10000, 20000, HashLinkedListMemTableConfig.DEFAULT_BUCKET_COUNT, RateLimiter.DEFAULT_REFILL_PERIOD_MICROS, 200000, 1000000};
    private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);

    public ProducerHandler(WebSocketService webSocketService, HttpServletRequest httpServletRequest, ServletUpgradeResponse servletUpgradeResponse) {
        super(webSocketService, httpServletRequest, servletUpgradeResponse);
        this.msgPublishedCounter = 0L;
        this.numMsgsSent = new LongAdder();
        this.numBytesSent = new LongAdder();
        this.numMsgsFailed = new LongAdder();
        this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
        if (checkAuth(servletUpgradeResponse)) {
            try {
                this.producer = getProducerBuilder(webSocketService.getPulsarClient()).topic(this.topic.toString()).create();
                if (!this.service.addProducer(this)) {
                    log.warn("[{}:{}] Failed to add producer handler for topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic});
                }
            } catch (Exception e) {
                log.warn("[{}:{}] Failed in creating producer on topic {}: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic, e.getMessage()});
                try {
                    servletUpgradeResponse.sendError(getErrorCode(e), getErrorMessage(e));
                } catch (IOException e2) {
                    log.warn("[{}:{}] Failed to send error: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), e2.getMessage(), e2});
                }
            }
        }
    }

    private static int getErrorCode(Exception exc) {
        if (exc instanceof IllegalArgumentException) {
            return 400;
        }
        if (exc instanceof PulsarClientException.ProducerBusyException) {
            return 409;
        }
        return ((exc instanceof PulsarClientException.ProducerBlockedQuotaExceededError) || (exc instanceof PulsarClientException.ProducerBlockedQuotaExceededException)) ? 503 : 500;
    }

    private static String getErrorMessage(Exception exc) {
        return exc instanceof IllegalArgumentException ? "Invalid query params: " + exc.getMessage() : "Failed to create producer: " + exc.getMessage();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.producer != null) {
            if (!this.service.removeProducer(this)) {
                log.warn("[{}] Failed to remove producer handler", this.producer.getTopic());
            }
            this.producer.closeAsync().thenAccept(r5 -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed producer asynchronously", this.producer.getTopic());
                }
            }).exceptionally(th -> {
                log.warn("[{}] Failed to close producer", this.producer.getTopic(), th);
                return null;
            });
        }
    }

    @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketAdapter, org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketListener
    public void onWebSocketText(String str) {
        String str2 = null;
        try {
            ProducerMessage producerMessage = (ProducerMessage) ObjectMapperFactory.getThreadLocal().readValue(str, ProducerMessage.class);
            str2 = producerMessage.context;
            byte[] decode = Base64.getDecoder().decode(producerMessage.payload);
            long length = decode.length;
            TypedMessageBuilder<byte[]> newMessage = this.producer.newMessage();
            try {
                newMessage.value(decode);
                if (producerMessage.properties != null) {
                    newMessage.properties(producerMessage.properties);
                }
                if (producerMessage.key != null) {
                    newMessage.key(producerMessage.key);
                }
                if (producerMessage.replicationClusters != null) {
                    newMessage.replicationClusters(producerMessage.replicationClusters);
                }
                if (producerMessage.eventTime != null) {
                    try {
                        newMessage.eventTime(DateFormatter.parse(producerMessage.eventTime));
                    } catch (DateTimeParseException e) {
                        sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e.getMessage(), null, str2));
                        return;
                    }
                }
                long nanoTime = System.nanoTime();
                newMessage.sendAsync().thenAccept(messageId -> {
                    updateSentMsgStats(length, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime));
                    if (isConnected()) {
                        sendAckResponse(new ProducerAck(Base64.getEncoder().encodeToString(messageId.toByteArray()), producerMessage.context));
                    }
                }).exceptionally(th -> {
                    log.warn("[{}] Error occurred while producer handler was sending msg from {}: {}", new Object[]{this.producer.getTopic(), getRemote().getInetSocketAddress().toString(), th.getMessage()});
                    this.numMsgsFailed.increment();
                    sendAckResponse(new ProducerAck(WebSocketError.UnknownError, th.getMessage(), null, producerMessage.context));
                    return null;
                });
            } catch (SchemaSerializationException e2) {
                sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e2.getMessage(), null, str2));
            }
        } catch (IOException e3) {
            sendAckResponse(new ProducerAck(WebSocketError.FailedToDeserializeFromJSON, e3.getMessage(), null, null));
        } catch (IllegalArgumentException e4) {
            sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, String.format("Invalid Base64 message-payload error=%s", e4.getMessage()), null, str2));
        } catch (NullPointerException e5) {
            sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e5.getMessage(), null, str2));
        }
    }

    public Producer<byte[]> getProducer() {
        return this.producer;
    }

    public long getAndResetNumMsgsSent() {
        return this.numMsgsSent.sumThenReset();
    }

    public long getAndResetNumBytesSent() {
        return this.numBytesSent.sumThenReset();
    }

    public long getAndResetNumMsgsFailed() {
        return this.numMsgsFailed.sumThenReset();
    }

    public long[] getAndResetPublishLatencyStatsUSec() {
        this.publishLatencyStatsUSec.refresh();
        return this.publishLatencyStatsUSec.getBuckets();
    }

    public StatsBuckets getPublishLatencyStatsUSec() {
        return this.publishLatencyStatsUSec;
    }

    public long getMsgPublishedCounter() {
        return this.msgPublishedCounter;
    }

    @Override // org.apache.pulsar.websocket.AbstractWebSocketHandler
    protected Boolean isAuthorized(String str, AuthenticationDataSource authenticationDataSource) throws Exception {
        return Boolean.valueOf(this.service.getAuthorizationService().canProduce(this.topic, str, authenticationDataSource));
    }

    private void sendAckResponse(ProducerAck producerAck) {
        try {
            getSession().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(producerAck), new WriteCallback() { // from class: org.apache.pulsar.websocket.ProducerHandler.1
                @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback
                public void writeFailed(Throwable th) {
                    ProducerHandler.log.warn("[{}] Failed to send ack: {}", ProducerHandler.this.producer.getTopic(), th.getMessage());
                }

                @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback
                public void writeSuccess() {
                    if (ProducerHandler.log.isDebugEnabled()) {
                        ProducerHandler.log.debug("[{}] Ack was sent successfully to {}", ProducerHandler.this.producer.getTopic(), ProducerHandler.this.getRemote().getInetSocketAddress().toString());
                    }
                }
            });
        } catch (JsonProcessingException e) {
            log.warn("[{}] Failed to generate ack json-response: {}", this.producer.getTopic(), e.getMessage());
        } catch (Exception e2) {
            log.warn("[{}] Failed to send ack: {}", this.producer.getTopic(), e2.getMessage());
        }
    }

    private void updateSentMsgStats(long j, long j2) {
        this.publishLatencyStatsUSec.addValue(j2);
        this.numBytesSent.add(j);
        this.numMsgsSent.increment();
        MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
    }

    private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient pulsarClient) {
        ProducerBuilder<byte[]> messageRoutingMode = pulsarClient.newProducer().enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition);
        messageRoutingMode.blockIfQueueFull(false);
        if (this.queryParams.containsKey("producerName")) {
            messageRoutingMode.producerName(this.queryParams.get("producerName"));
        }
        if (this.queryParams.containsKey("initialSequenceId")) {
            messageRoutingMode.initialSequenceId(Long.parseLong("initialSequenceId"));
        }
        if (this.queryParams.containsKey("hashingScheme")) {
            messageRoutingMode.hashingScheme(HashingScheme.valueOf(this.queryParams.get("hashingScheme")));
        }
        if (this.queryParams.containsKey("sendTimeoutMillis")) {
            messageRoutingMode.sendTimeout(Integer.parseInt(this.queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("batchingEnabled")) {
            messageRoutingMode.enableBatching(Boolean.parseBoolean(this.queryParams.get("batchingEnabled")));
        }
        if (this.queryParams.containsKey("batchingMaxMessages")) {
            messageRoutingMode.batchingMaxMessages(Integer.parseInt(this.queryParams.get("batchingMaxMessages")));
        }
        if (this.queryParams.containsKey("maxPendingMessages")) {
            messageRoutingMode.maxPendingMessages(Integer.parseInt(this.queryParams.get("maxPendingMessages")));
        }
        if (this.queryParams.containsKey("batchingMaxPublishDelay")) {
            messageRoutingMode.batchingMaxPublishDelay(Integer.parseInt(this.queryParams.get("batchingMaxPublishDelay")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("messageRoutingMode")) {
            Preconditions.checkArgument(Enums.getIfPresent(MessageRoutingMode.class, this.queryParams.get("messageRoutingMode")).isPresent(), "Invalid messageRoutingMode %s", this.queryParams.get("messageRoutingMode"));
            MessageRoutingMode valueOf = MessageRoutingMode.valueOf(this.queryParams.get("messageRoutingMode"));
            if (!MessageRoutingMode.CustomPartition.equals(valueOf)) {
                messageRoutingMode.messageRoutingMode(valueOf);
            }
        }
        if (this.queryParams.containsKey("compressionType")) {
            Preconditions.checkArgument(Enums.getIfPresent(CompressionType.class, this.queryParams.get("compressionType")).isPresent(), "Invalid compressionType %s", this.queryParams.get("compressionType"));
            messageRoutingMode.compressionType(CompressionType.valueOf(this.queryParams.get("compressionType")));
        }
        return messageRoutingMode;
    }
}
