/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket;

import java.io.IOException;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pulsar.shade.com.google.common.base.Enums;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.apache.pulsar.websocket.AbstractWebSocketHandler;
import org.apache.pulsar.websocket.WebSocketError;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerHandler
extends AbstractWebSocketHandler {
    private Producer<byte[]> producer;
    private final LongAdder numMsgsSent = new LongAdder();
    private final LongAdder numMsgsFailed;
    private final LongAdder numBytesSent = new LongAdder();
    private final StatsBuckets publishLatencyStatsUSec;
    private volatile long msgPublishedCounter = 0L;
    private static final AtomicLongFieldUpdater<ProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, "msgPublishedCounter");
    public static final List<Long> ENTRY_LATENCY_BUCKETS_USEC = Collections.unmodifiableList(Arrays.asList(500L, 1000L, 5000L, 10000L, 20000L, 50000L, 100000L, 200000L, 1000000L));
    private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);

    public ProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
        super(service, request, response);
        this.numMsgsFailed = new LongAdder();
        this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
        if (!this.checkAuth(response)) {
            return;
        }
        try {
            this.producer = this.getProducerBuilder(service.getPulsarClient()).topic(this.topic.toString()).create();
            if (!this.service.addProducer(this)) {
                log.warn("[{}:{}] Failed to add producer handler for topic {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.topic});
            }
        }
        catch (Exception e) {
            log.warn("[{}:{}] Failed in creating producer on topic {}: {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.topic, e.getMessage()});
            try {
                response.sendError(ProducerHandler.getErrorCode(e), ProducerHandler.getErrorMessage(e));
            }
            catch (IOException e1) {
                log.warn("[{}:{}] Failed to send error: {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1});
            }
        }
    }

    @Override
    public void close() throws IOException {
        if (this.producer != null) {
            if (!this.service.removeProducer(this)) {
                log.warn("[{}] Failed to remove producer handler", (Object)this.producer.getTopic());
            }
            ((CompletableFuture)this.producer.closeAsync().thenAccept(x -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed producer asynchronously", (Object)this.producer.getTopic());
                }
            })).exceptionally(exception -> {
                log.warn("[{}] Failed to close producer", (Object)this.producer.getTopic(), exception);
                return null;
            });
        }
    }

    @Override
    public void onWebSocketText(String message) {
        ProducerMessage sendRequest;
        byte[] rawPayload = null;
        String requestContext = null;
        try {
            sendRequest = ObjectMapperFactory.getThreadLocal().readValue(message, ProducerMessage.class);
            requestContext = sendRequest.context;
            rawPayload = Base64.getDecoder().decode(sendRequest.payload);
        }
        catch (IOException e) {
            this.sendAckResponse(new ProducerAck(WebSocketError.FailedToDeserializeFromJSON, e.getMessage(), null, null));
            return;
        }
        catch (IllegalArgumentException e) {
            String msg = String.format("Invalid Base64 message-payload error=%s", e.getMessage());
            this.sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, msg, null, requestContext));
            return;
        }
        catch (NullPointerException e) {
            this.sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e.getMessage(), null, requestContext));
            return;
        }
        long msgSize = rawPayload.length;
        TypedMessageBuilder<byte[]> builder = this.producer.newMessage();
        try {
            builder.value(rawPayload);
        }
        catch (SchemaSerializationException e) {
            this.sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e.getMessage(), null, requestContext));
            return;
        }
        if (sendRequest.properties != null) {
            builder.properties(sendRequest.properties);
        }
        if (sendRequest.key != null) {
            builder.key(sendRequest.key);
        }
        if (sendRequest.replicationClusters != null) {
            builder.replicationClusters(sendRequest.replicationClusters);
        }
        if (sendRequest.eventTime != null) {
            try {
                builder.eventTime(DateFormatter.parse(sendRequest.eventTime));
            }
            catch (DateTimeParseException e) {
                this.sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e.getMessage(), null, requestContext));
                return;
            }
        }
        if (sendRequest.deliverAt > 0L) {
            builder.deliverAt(sendRequest.deliverAt);
        }
        if (sendRequest.deliverAfterMs > 0L) {
            builder.deliverAfter(sendRequest.deliverAfterMs, TimeUnit.MILLISECONDS);
        }
        long now = System.nanoTime();
        ((CompletableFuture)builder.sendAsync().thenAccept(msgId -> {
            this.updateSentMsgStats(msgSize, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - now));
            if (this.isConnected()) {
                String messageId = Base64.getEncoder().encodeToString(msgId.toByteArray());
                this.sendAckResponse(new ProducerAck(messageId, sendRequest.context));
            }
        })).exceptionally(exception -> {
            log.warn("[{}] Error occurred while producer handler was sending msg from {}: {}", new Object[]{this.producer.getTopic(), this.getRemote().getInetSocketAddress().toString(), exception.getMessage()});
            this.numMsgsFailed.increment();
            this.sendAckResponse(new ProducerAck(WebSocketError.UnknownError, exception.getMessage(), null, sendRequest.context));
            return null;
        });
    }

    public Producer<byte[]> getProducer() {
        return this.producer;
    }

    public long getAndResetNumMsgsSent() {
        return this.numMsgsSent.sumThenReset();
    }

    public long getAndResetNumBytesSent() {
        return this.numBytesSent.sumThenReset();
    }

    public long getAndResetNumMsgsFailed() {
        return this.numMsgsFailed.sumThenReset();
    }

    public long[] getAndResetPublishLatencyStatsUSec() {
        this.publishLatencyStatsUSec.refresh();
        return this.publishLatencyStatsUSec.getBuckets();
    }

    public StatsBuckets getPublishLatencyStatsUSec() {
        return this.publishLatencyStatsUSec;
    }

    public long getMsgPublishedCounter() {
        return this.msgPublishedCounter;
    }

    @Override
    protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
        return this.service.getAuthorizationService().canProduce(this.topic, authRole, authenticationData);
    }

    private void sendAckResponse(ProducerAck response) {
        try {
            String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(response);
            this.getSession().getRemote().sendString(msg, new WriteCallback(){

                @Override
                public void writeFailed(Throwable th) {
                    log.warn("[{}] Failed to send ack: {}", (Object)ProducerHandler.this.producer.getTopic(), (Object)th.getMessage());
                }

                @Override
                public void writeSuccess() {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Ack was sent successfully to {}", (Object)ProducerHandler.this.producer.getTopic(), (Object)ProducerHandler.this.getRemote().getInetSocketAddress().toString());
                    }
                }
            });
        }
        catch (JsonProcessingException e) {
            log.warn("[{}] Failed to generate ack json-response: {}", (Object)this.producer.getTopic(), (Object)e.getMessage());
        }
        catch (Exception e) {
            log.warn("[{}] Failed to send ack: {}", (Object)this.producer.getTopic(), (Object)e.getMessage());
        }
    }

    private void updateSentMsgStats(long msgSize, long latencyUsec) {
        this.publishLatencyStatsUSec.addValue(latencyUsec);
        this.numBytesSent.add(msgSize);
        this.numMsgsSent.increment();
        MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
    }

    protected ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
        ProducerBuilder<byte[]> builder = client.newProducer().enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition);
        builder.blockIfQueueFull(false);
        if (this.queryParams.containsKey("producerName")) {
            builder.producerName((String)this.queryParams.get("producerName"));
        }
        if (this.queryParams.containsKey("initialSequenceId")) {
            builder.initialSequenceId(Long.parseLong((String)this.queryParams.get("initialSequenceId")));
        }
        if (this.queryParams.containsKey("hashingScheme")) {
            builder.hashingScheme(HashingScheme.valueOf((String)this.queryParams.get("hashingScheme")));
        }
        if (this.queryParams.containsKey("sendTimeoutMillis")) {
            builder.sendTimeout(Integer.parseInt((String)this.queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("batchingEnabled")) {
            builder.enableBatching(Boolean.parseBoolean((String)this.queryParams.get("batchingEnabled")));
        }
        if (this.queryParams.containsKey("batchingMaxMessages")) {
            builder.batchingMaxMessages(Integer.parseInt((String)this.queryParams.get("batchingMaxMessages")));
        }
        if (this.queryParams.containsKey("maxPendingMessages")) {
            builder.maxPendingMessages(Integer.parseInt((String)this.queryParams.get("maxPendingMessages")));
        }
        if (this.queryParams.containsKey("batchingMaxPublishDelay")) {
            builder.batchingMaxPublishDelay(Integer.parseInt((String)this.queryParams.get("batchingMaxPublishDelay")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("messageRoutingMode")) {
            Preconditions.checkArgument(Enums.getIfPresent(MessageRoutingMode.class, (String)this.queryParams.get("messageRoutingMode")).isPresent(), "Invalid messageRoutingMode %s", this.queryParams.get("messageRoutingMode"));
            MessageRoutingMode routingMode = MessageRoutingMode.valueOf((String)this.queryParams.get("messageRoutingMode"));
            if (!MessageRoutingMode.CustomPartition.equals((Object)routingMode)) {
                builder.messageRoutingMode(routingMode);
            }
        }
        if (this.queryParams.containsKey("compressionType")) {
            Preconditions.checkArgument(Enums.getIfPresent(CompressionType.class, (String)this.queryParams.get("compressionType")).isPresent(), "Invalid compressionType %s", this.queryParams.get("compressionType"));
            builder.compressionType(CompressionType.valueOf((String)this.queryParams.get("compressionType")));
        }
        return builder;
    }
}

