package net.kut3.mq.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import net.kut3.Kut3NetException;
import net.kut3.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/kut3/mq/rabbitmq/Consumer.class */
public final class Consumer extends net.kut3.mq.amqp.Consumer {
    private static int MAX_THREAD_COUNT;
    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    private final Channel channel;
    private volatile boolean isClosed;
    private int prefetchCount;
    private String consumerTag;
    private int threadCount;
    private ExecutorService executor;

    public Consumer(Connection connection) {
        super(connection);
        this.prefetchCount = 128;
        this.threadCount = 1;
        this.channel = connection.createChannel();
    }

    public int prefetchCount() {
        return this.prefetchCount;
    }

    public Consumer prefetchCount(int i) {
        this.prefetchCount = i;
        return this;
    }

    public Consumer threadCount(int i) {
        if (i < 0 && i > MAX_THREAD_COUNT) {
            throw new IllegalArgumentException("Invalid threadCount: " + i);
        }
        this.threadCount = i;
        return this;
    }

    public int threadCount() {
        return this.threadCount;
    }

    @Override // net.kut3.mq.amqp.Consumer
    public Consumer consumeAsync(final Function<net.kut3.mq.amqp.Message, Boolean> function) {
        this.executor = Executors.newFixedThreadPool(this.threadCount);
        consume0(new DefaultConsumer(this.channel) { // from class: net.kut3.mq.rabbitmq.Consumer.1
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                ExecutorService executorService = Consumer.this.executor;
                Function function2 = function;
                executorService.submit(() -> {
                    Message message = new Message(bArr);
                    try {
                        boolean booleanValue = ((Boolean) function2.apply(message)).booleanValue();
                        if (!Consumer.this.autoAck()) {
                            if (booleanValue) {
                                Consumer.this.channel.basicAck(envelope.getDeliveryTag(), false);
                            } else {
                                Consumer.this.channel.basicReject(envelope.getDeliveryTag(), true);
                            }
                        }
                    } catch (IOException e) {
                        Consumer.LOGGER.error("Handle " + message.body(), e);
                    }
                });
            }
        });
        LOGGER.info("Consume " + toString() + " asynchronous started");
        return this;
    }

    @Override // net.kut3.mq.amqp.Consumer
    public Consumer consume(final Function<net.kut3.mq.amqp.Message, Boolean> function) {
        consume0(new DefaultConsumer(this.channel) { // from class: net.kut3.mq.rabbitmq.Consumer.2
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                boolean booleanValue = ((Boolean) function.apply(new Message(bArr))).booleanValue();
                if (Consumer.this.autoAck()) {
                    return;
                }
                if (booleanValue) {
                    Consumer.this.channel.basicAck(envelope.getDeliveryTag(), false);
                } else {
                    Consumer.this.channel.basicReject(envelope.getDeliveryTag(), true);
                }
            }
        });
        LOGGER.info("Consume " + toString() + " started");
        return this;
    }

    @Override // net.kut3.mq.amqp.Consumer, net.kut3.mq.amqp.Node
    public synchronized void close() {
        if (this.isClosed) {
            return;
        }
        try {
            if (null != this.consumerTag) {
                this.channel.basicCancel(this.consumerTag);
                LOGGER.info("Consumer " + toString() + " stopped");
            }
            closeExecutor();
            closeChannel();
        } catch (IOException e) {
            LOGGER.error("Stop consumer " + toString(), e);
        }
        super.close();
        this.isClosed = true;
    }

    @Override // net.kut3.mq.amqp.Consumer
    public String toString() {
        StringBuilder sb = new StringBuilder(super.toString());
        sb.deleteCharAt(sb.length() - 1);
        sb.append(",prefetch=").append(this.prefetchCount).append(",threadCount=").append(this.threadCount);
        return sb.append(')').toString();
    }

    private void consume0(com.rabbitmq.client.Consumer consumer) {
        new Thread(() -> {
            try {
                String queue = queue();
                if (null == queue) {
                    if (null == exchange()) {
                        throw new Kut3NetException("Cannot consume " + toString());
                    }
                    queue = this.channel.queueDeclare().getQueue();
                    if (!Strings.isNullOrBlank(exchange())) {
                        this.channel.queueBind(queue, exchange(), "");
                    }
                    queue(queue);
                }
                this.consumerTag = queue + System.nanoTime();
                this.channel.basicQos(this.prefetchCount, false);
                this.channel.basicConsume(queue, autoAck(), this.consumerTag, consumer);
            } catch (IOException e) {
                LOGGER.error("Consume " + toString(), e);
            }
        }).start();
    }

    private void closeChannel() {
        try {
            this.channel.close();
            LOGGER.info("Channel " + toString() + " closed");
        } catch (IOException | TimeoutException e) {
            LOGGER.error("Close channel " + toString(), e);
        }
    }

    private void closeExecutor() {
        if (null != this.executor) {
            this.executor.shutdown();
        }
    }

    @Override // net.kut3.mq.amqp.Consumer
    public /* bridge */ /* synthetic */ net.kut3.mq.amqp.Consumer consumeAsync(Function function) {
        return consumeAsync((Function<net.kut3.mq.amqp.Message, Boolean>) function);
    }

    @Override // net.kut3.mq.amqp.Consumer
    public /* bridge */ /* synthetic */ net.kut3.mq.amqp.Consumer consume(Function function) {
        return consume((Function<net.kut3.mq.amqp.Message, Boolean>) function);
    }

    static {
        int parseInt;
        MAX_THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;
        String property = System.getProperty("rabbitmqConsumerMaxThreadCount");
        if (null == property || (parseInt = Integer.parseInt(property)) <= 0) {
            return;
        }
        MAX_THREAD_COUNT = parseInt;
        LOGGER.info("MAX_THREAD_COUNT=" + parseInt);
    }
}
