/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.io.HexDump;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.cli.PulsarClientTool;
import org.apache.pulsar.client.internal.PulsarClientImplementationBinding;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameters(commandDescription="Consume messages from a specified topic")
public class CmdConsume {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarClientTool.class);
    private static final String MESSAGE_BOUNDARY = "----- got message -----";
    @Parameter(description="TopicName", required=true)
    private List<String> mainOptions = new ArrayList<String>();
    @Parameter(names={"-t", "--subscription-type"}, description="Subscription type.")
    private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
    @Parameter(names={"-m", "--subscription-mode"}, description="Subscription mode.")
    private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
    @Parameter(names={"-p", "--subscription-position"}, description="Subscription position.")
    private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
    @Parameter(names={"-s", "--subscription-name"}, required=true, description="Subscription name.")
    private String subscriptionName;
    @Parameter(names={"-n", "--num-messages"}, description="Number of messages to consume, 0 means to consume forever.")
    private int numMessagesToConsume = 1;
    @Parameter(names={"--hex"}, description="Display binary messages in hex.")
    private boolean displayHex = false;
    @Parameter(names={"--hide-content"}, description="Do not write the message to console.")
    private boolean hideContent = false;
    @Parameter(names={"-r", "--rate"}, description="Rate (in msg/sec) at which to consume, value 0 means to consume messages as fast as possible.")
    private double consumeRate = 0.0;
    @Parameter(names={"--regex"}, description="Indicate the topic name is a regex pattern")
    private boolean isRegex = false;
    @Parameter(names={"-q", "--queue-size"}, description="Consumer receiver queue size.")
    private int receiverQueueSize = 0;
    @Parameter(names={"-mc", "--max_chunked_msg"}, description="Max pending chunk messages")
    private int maxPendingChunkedMessage = 0;
    @Parameter(names={"-ac", "--auto_ack_chunk_q_full"}, description="Auto ack for oldest message on queue is full")
    private boolean autoAckOldestChunkedMessageOnQueueFull = false;
    @Parameter(names={"-ekv", "--encryption-key-value"}, description="The URI of private key to decrypt payload, for example file:///path/to/private.key or data:application/x-pem-file;base64,*****")
    private String encKeyValue;
    @Parameter(names={"-st", "--schema-type"}, description="Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
    private String schematype = "bytes";
    @Parameter(names={"-pm", "--pool-messages"}, description="Use the pooled message")
    private boolean poolMessages = true;
    private ClientBuilder clientBuilder;
    private Authentication authentication;
    private String serviceURL;

    public void updateConfig(ClientBuilder clientBuilder, Authentication authentication, String serviceURL) {
        this.clientBuilder = clientBuilder;
        this.authentication = authentication;
        this.serviceURL = serviceURL;
    }

    private String interpretMessage(Message<?> message, boolean displayHex) throws IOException {
        String data;
        StringBuilder sb = new StringBuilder();
        String properties = Arrays.toString(message.getProperties().entrySet().toArray());
        Object value = message.getValue();
        if (value == null) {
            data = "null";
        } else if (value instanceof byte[]) {
            byte[] msgData = (byte[])value;
            data = CmdConsume.interpretByteArray(displayHex, msgData);
        } else if (value instanceof GenericObject) {
            Map<String, Object> asMap = CmdConsume.genericObjectToMap((GenericObject)value, displayHex);
            data = asMap.toString();
        } else {
            data = value instanceof ByteBuffer ? new String(PulsarClientImplementationBinding.getBytes((ByteBuffer)((ByteBuffer)value))) : value.toString();
        }
        String key = null;
        if (message.hasKey()) {
            key = message.getKey();
        }
        sb.append("key:[").append(key).append("], ");
        if (!properties.isEmpty()) {
            sb.append("properties:").append(properties).append(", ");
        }
        sb.append("content:").append(data);
        return sb.toString();
    }

    private static String interpretByteArray(boolean displayHex, byte[] msgData) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        if (!displayHex) {
            return new String(msgData);
        }
        HexDump.dump((byte[])msgData, (long)0L, (OutputStream)out, (int)0);
        return out.toString();
    }

    private static Map<String, Object> genericObjectToMap(GenericObject value, boolean displayHex) throws IOException {
        switch (value.getSchemaType()) {
            case AVRO: 
            case JSON: 
            case PROTOBUF_NATIVE: {
                return CmdConsume.genericRecordToMap((GenericRecord)value, displayHex);
            }
            case KEY_VALUE: {
                return CmdConsume.keyValueToMap((KeyValue)value.getNativeObject(), displayHex);
            }
        }
        return CmdConsume.primitiveValueToMap(value.getNativeObject(), displayHex);
    }

    private static Map<String, Object> keyValueToMap(KeyValue value, boolean displayHex) throws IOException {
        if (value == null) {
            return ImmutableMap.of((Object)"value", (Object)"NULL");
        }
        return ImmutableMap.of((Object)"key", CmdConsume.primitiveValueToMap(value.getKey(), displayHex), (Object)"value", CmdConsume.primitiveValueToMap(value.getValue(), displayHex));
    }

    private static Map<String, Object> primitiveValueToMap(Object value, boolean displayHex) throws IOException {
        if (value == null) {
            return ImmutableMap.of((Object)"value", (Object)"NULL");
        }
        if (value instanceof GenericObject) {
            return CmdConsume.genericObjectToMap((GenericObject)value, displayHex);
        }
        if (value instanceof byte[]) {
            value = CmdConsume.interpretByteArray(displayHex, (byte[])value);
        }
        return ImmutableMap.of((Object)"value", (Object)value.toString(), (Object)"type", value.getClass());
    }

    private static Map<String, Object> genericRecordToMap(GenericRecord value, boolean displayHex) throws IOException {
        HashMap<String, Object> res = new HashMap<String, Object>();
        for (Field f : value.getFields()) {
            Map<String, Object> fieldValue = value.getField(f);
            if (fieldValue instanceof GenericRecord) {
                fieldValue = CmdConsume.genericRecordToMap((GenericRecord)fieldValue, displayHex);
            } else if (fieldValue == null) {
                fieldValue = "NULL";
            } else if (fieldValue instanceof byte[]) {
                fieldValue = CmdConsume.interpretByteArray(displayHex, (byte[])fieldValue);
            }
            res.put(f.getName(), fieldValue);
        }
        return res;
    }

    public int run() throws PulsarClientException, IOException {
        if (this.mainOptions.size() != 1) {
            throw new ParameterException("Please provide one and only one topic name.");
        }
        if (this.subscriptionName == null || this.subscriptionName.isEmpty()) {
            throw new ParameterException("Subscription name is not provided.");
        }
        if (this.numMessagesToConsume < 0) {
            throw new ParameterException("Number of messages should be zero or positive.");
        }
        String topic = this.mainOptions.get(0);
        if (this.serviceURL.startsWith("ws")) {
            return this.consumeFromWebSocket(topic);
        }
        return this.consume(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int consume(String topic) {
        int numMessagesConsumed = 0;
        int returnCode = 0;
        try {
            RateLimiter limiter;
            Schema schema;
            PulsarClient client = this.clientBuilder.build();
            Schema schema2 = schema = this.poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
            if ("auto_consume".equals(this.schematype)) {
                schema = Schema.AUTO_CONSUME();
            } else if (!"bytes".equals(this.schematype)) {
                throw new IllegalArgumentException("schema type must be 'bytes' or 'auto_consume");
            }
            ConsumerBuilder builder = client.newConsumer(schema).subscriptionName(this.subscriptionName).subscriptionType(this.subscriptionType).subscriptionMode(this.subscriptionMode).subscriptionInitialPosition(this.subscriptionInitialPosition).poolMessages(this.poolMessages);
            if (this.isRegex) {
                builder.topicsPattern(Pattern.compile(topic));
            } else {
                builder.topic(new String[]{topic});
            }
            if (this.maxPendingChunkedMessage > 0) {
                builder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
            }
            if (this.receiverQueueSize > 0) {
                builder.receiverQueueSize(this.receiverQueueSize);
            }
            builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
            if (StringUtils.isNotBlank((CharSequence)this.encKeyValue)) {
                builder.defaultCryptoKeyReader(this.encKeyValue);
            }
            Consumer consumer = builder.subscribe();
            RateLimiter rateLimiter = limiter = this.consumeRate > 0.0 ? RateLimiter.create((double)this.consumeRate) : null;
            while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
                Message msg;
                if (limiter != null) {
                    limiter.acquire();
                }
                if ((msg = consumer.receive(5, TimeUnit.SECONDS)) == null) {
                    LOG.debug("No message to consume after waiting for 5 seconds.");
                    continue;
                }
                try {
                    ++numMessagesConsumed;
                    if (!this.hideContent) {
                        System.out.println(MESSAGE_BOUNDARY);
                        String output = this.interpretMessage(msg, this.displayHex);
                        System.out.println(output);
                    } else if (numMessagesConsumed % 1000 == 0) {
                        System.out.println("Received " + numMessagesConsumed + " messages");
                    }
                    consumer.acknowledge(msg);
                }
                finally {
                    msg.release();
                }
            }
            client.close();
        }
        catch (Exception e) {
            LOG.error("Error while consuming messages");
            LOG.error(e.getMessage(), (Throwable)e);
            returnCode = -1;
        }
        finally {
            LOG.info("{} messages successfully consumed", (Object)numMessagesConsumed);
        }
        return returnCode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int consumeFromWebSocket(String topic) {
        int numMessagesConsumed = 0;
        int returnCode = 0;
        TopicName topicName = TopicName.get((String)topic);
        String wsTopic = String.format("%s/%s/" + (StringUtils.isEmpty((CharSequence)topicName.getCluster()) ? "" : topicName.getCluster() + "/") + "%s/%s/%s?subscriptionType=%s&subscriptionMode=%s", topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName(), this.subscriptionName, this.subscriptionType.toString(), this.subscriptionMode.toString());
        String consumerBaseUri = this.serviceURL + (this.serviceURL.endsWith("/") ? "" : "/") + "ws/consumer/" + wsTopic;
        URI consumerUri = URI.create(consumerBaseUri);
        WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
        ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
        try {
            if (this.authentication != null) {
                this.authentication.start();
                AuthenticationDataProvider authData = this.authentication.getAuthData();
                if (authData.hasDataForHttp()) {
                    for (Map.Entry kv : authData.getHttpHeaders()) {
                        produceRequest.setHeader((String)kv.getKey(), (String)kv.getValue());
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.error("Authentication plugin error: " + e.getMessage());
            return -1;
        }
        CompletableFuture<Void> connected = new CompletableFuture<Void>();
        ConsumerSocket consumerSocket = new ConsumerSocket(connected);
        try {
            produceClient.start();
        }
        catch (Exception e) {
            LOG.error("Failed to start websocket-client", (Throwable)e);
            return -1;
        }
        try {
            LOG.info("Trying to create websocket session..{}", (Object)consumerUri);
            produceClient.connect((Object)consumerSocket, consumerUri, produceRequest);
            connected.get();
        }
        catch (Exception e) {
            LOG.error("Failed to create web-socket session", (Throwable)e);
            return -1;
        }
        try {
            RateLimiter limiter;
            RateLimiter rateLimiter = limiter = this.consumeRate > 0.0 ? RateLimiter.create((double)this.consumeRate) : null;
            while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
                String msg;
                if (limiter != null) {
                    limiter.acquire();
                }
                if ((msg = consumerSocket.receive(5L, TimeUnit.SECONDS)) == null) {
                    LOG.debug("No message to consume after waiting for 5 seconds.");
                    continue;
                }
                try {
                    String output = CmdConsume.interpretByteArray(this.displayHex, Base64.getDecoder().decode(msg));
                    System.out.println(output);
                }
                catch (Exception e) {
                    System.out.println(msg);
                }
                ++numMessagesConsumed;
            }
            consumerSocket.awaitClose(2, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Error while consuming messages");
            LOG.error(e.getMessage(), (Throwable)e);
            returnCode = -1;
        }
        finally {
            LOG.info("{} messages successfully consumed", (Object)numMessagesConsumed);
        }
        return returnCode;
    }

    @WebSocket(maxTextMessageSize=65536)
    public static class ConsumerSocket {
        private static final String X_PULSAR_MESSAGE_ID = "messageId";
        private final CountDownLatch closeLatch = new CountDownLatch(1);
        private Session session;
        private CompletableFuture<Void> connected;
        final BlockingQueue<String> incomingMessages;
        private static final Logger log = LoggerFactory.getLogger(ConsumerSocket.class);

        public ConsumerSocket(CompletableFuture<Void> connected) {
            this.connected = connected;
            this.incomingMessages = new GrowableArrayBlockingQueue();
        }

        public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
            return this.closeLatch.await(duration, unit);
        }

        @OnWebSocketClose
        public void onClose(int statusCode, String reason) {
            log.info("Connection closed: {} - {}", (Object)statusCode, (Object)reason);
            this.session = null;
            this.closeLatch.countDown();
        }

        @OnWebSocketConnect
        public void onConnect(Session session) throws InterruptedException {
            log.info("Got connect: {}", (Object)session);
            this.session = session;
            this.connected.complete(null);
        }

        @OnWebSocketMessage
        public synchronized void onMessage(String msg) throws Exception {
            JsonObject message = (JsonObject)new Gson().fromJson(msg, JsonObject.class);
            JsonObject ack = new JsonObject();
            String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
            ack.add(X_PULSAR_MESSAGE_ID, (JsonElement)new JsonPrimitive(messageId));
            this.getRemote().sendString(ack.toString());
            this.incomingMessages.put(msg);
        }

        public String receive(long timeout, TimeUnit unit) throws Exception {
            return this.incomingMessages.poll(timeout, unit);
        }

        public RemoteEndpoint getRemote() {
            return this.session.getRemote();
        }

        public Session getSession() {
            return this.session;
        }

        public void close() {
            this.session.close();
        }
    }
}

