package org.apache.pulsar.client.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.util.concurrent.RateLimiter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.HexDump;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameters(commandDescription = "Consume messages from a specified topic")
/* loaded from: input_file:org/apache/pulsar/client/cli/CmdConsume.class */
public class CmdConsume {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarClientTool.class);
    private static final String MESSAGE_BOUNDARY = "----- got message -----";

    @Parameter(names = {"-s", "--subscription-name"}, required = true, description = "Subscription name.")
    private String subscriptionName;
    ClientBuilder clientBuilder;

    @Parameter(description = "TopicName", required = true)
    private List<String> mainOptions = new ArrayList();

    @Parameter(names = {"-t", "--subscription-type"}, description = "Subscription type: Exclusive, Shared, Failover.")
    private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

    @Parameter(names = {"-n", "--num-messages"}, description = "Number of messages to consume, 0 means to consume forever.")
    private int numMessagesToConsume = 1;

    @Parameter(names = {"--hex"}, description = "Display binary messages in hex.")
    private boolean displayHex = false;

    @Parameter(names = {"-r", "--rate"}, description = "Rate (in msg/sec) at which to consume, value 0 means to consume messages as fast as possible.")
    private double consumeRate = 0.0d;

    public void updateConfig(ClientBuilder clientBuilder) {
        this.clientBuilder = clientBuilder;
    }

    private String interpretMessage(Message<byte[]> message, boolean z) throws IOException {
        byte[] data = message.getData();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        if (!z) {
            return new String(data);
        }
        HexDump.dump(data, 0L, byteArrayOutputStream, 0);
        return new String(byteArrayOutputStream.toByteArray());
    }

    public int run() throws PulsarClientException, IOException {
        if (this.mainOptions.size() != 1) {
            throw new ParameterException("Please provide one and only one topic name.");
        }
        if (this.subscriptionName == null || this.subscriptionName.isEmpty()) {
            throw new ParameterException("Subscription name is not provided.");
        }
        if (this.numMessagesToConsume < 0) {
            throw new ParameterException("Number of messages should be zero or positive.");
        }
        String str = this.mainOptions.get(0);
        int i = 0;
        int i2 = 0;
        try {
            try {
                PulsarClient build = this.clientBuilder.build();
                Consumer subscribe = build.newConsumer().topic(new String[]{str}).subscriptionName(this.subscriptionName).subscriptionType(this.subscriptionType).subscribe();
                RateLimiter create = this.consumeRate > 0.0d ? RateLimiter.create(this.consumeRate) : null;
                while (true) {
                    if (this.numMessagesToConsume != 0 && i >= this.numMessagesToConsume) {
                        break;
                    }
                    if (create != null) {
                        create.acquire();
                    }
                    Message<byte[]> receive = subscribe.receive(5, TimeUnit.SECONDS);
                    if (receive == null) {
                        LOG.debug("No message to consume after waiting for 5 seconds.");
                    } else {
                        i++;
                        System.out.println(MESSAGE_BOUNDARY);
                        System.out.println(interpretMessage(receive, this.displayHex));
                        subscribe.acknowledge(receive);
                    }
                }
                build.close();
                LOG.info("{} messages successfully consumed", Integer.valueOf(i));
            } catch (Exception e) {
                LOG.error("Error while consuming messages");
                LOG.error(e.getMessage(), e);
                i2 = -1;
                LOG.info("{} messages successfully consumed", Integer.valueOf(i));
            }
            return i2;
        } catch (Throwable th) {
            LOG.info("{} messages successfully consumed", Integer.valueOf(i));
            throw th;
        }
    }
}
