package com.xunmo.utils;

import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.StrUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.xunmo.rabbitmq.MqChannelPool;
import com.xunmo.rabbitmq.MqChannelPoolConfig;
import com.xunmo.rabbitmq.entity.DeadConfig;
import com.xunmo.rabbitmq.entity.MqConfig;
import com.xunmo.rabbitmq.enums.ConsumeAction;
import com.xunmo.rabbitmq.enums.ExchangeType;
import com.xunmo.rabbitmq.enums.SendAction;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.noear.solon.Solon;
import org.noear.solon.SolonProps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xunmo/utils/MqHelper.class */
public class MqHelper {
    private static final int tryCountMax = 6;
    private static MqChannelPoolConfig mqChannelPoolConfig;
    private static final Logger log = LoggerFactory.getLogger(MqHelper.class);
    private static final AtomicBoolean isInit = new AtomicBoolean(false);
    private static final Object initLock = new Object();
    private static final Map<MqConfig, MqChannelPool> poolMap = new ConcurrentHashMap();

    @FunctionalInterface
    /* loaded from: input_file:com/xunmo/utils/MqHelper$TryDoing.class */
    public interface TryDoing {
        void tryDo() throws IOException, TimeoutException;
    }

    @FunctionalInterface
    /* loaded from: input_file:com/xunmo/utils/MqHelper$TryReturn.class */
    public interface TryReturn<T> {
        T get() throws IOException, TimeoutException;
    }

    private MqHelper() {
    }

    public static synchronized void initFromSolon() {
        if (isInit.get()) {
            return;
        }
        synchronized (initLock) {
            if (isInit.get()) {
                return;
            }
            if (Solon.app() != null) {
                SolonProps cfg = Solon.cfg();
                mqChannelPoolConfig = new MqChannelPoolConfig();
                mqChannelPoolConfig.setHost(cfg.get("xm.mq.host"));
                mqChannelPoolConfig.setUsername(cfg.get("xm.mq.username"));
                mqChannelPoolConfig.setPassword(cfg.get("xm.mq.password"));
                mqChannelPoolConfig.setPort(cfg.getInt("xm.mq.port", 5432));
                mqChannelPoolConfig.setMaxIdle(10);
                mqChannelPoolConfig.setMaxTotal(20);
                mqChannelPoolConfig.setMinIdle(1);
                isInit.compareAndSet(false, true);
            }
        }
    }

    public static synchronized void initArgs(String str, String str2, String str3, int i) {
        if (isInit.get()) {
            return;
        }
        synchronized (initLock) {
            if (isInit.get()) {
                return;
            }
            mqChannelPoolConfig = new MqChannelPoolConfig();
            mqChannelPoolConfig.setHost(str);
            mqChannelPoolConfig.setUsername(str2);
            mqChannelPoolConfig.setPassword(str3);
            mqChannelPoolConfig.setPort(i);
            mqChannelPoolConfig.setMaxIdle(10);
            mqChannelPoolConfig.setMaxTotal(20);
            mqChannelPoolConfig.setMinIdle(1);
            isInit.compareAndSet(false, true);
        }
    }

    public static void sendMsg(MqConfig mqConfig, String str, BiConsumer<Channel, SendAction> biConsumer) throws IOException, TimeoutException {
        sendMsg(mqConfig, str, null, biConsumer);
    }

    private static void sendMsg(MqConfig mqConfig, String str, Integer num, final BiConsumer<Channel, SendAction> biConsumer) throws IOException, TimeoutException {
        String str2;
        String str3;
        String changeName = mqConfig.getChangeName();
        String routingKey = mqConfig.getRoutingKey();
        Boolean durable = mqConfig.getDurable();
        Boolean isDelay = mqConfig.getIsDelay();
        ExchangeType exchangeType = mqConfig.getExchangeType();
        Long delayTime = mqConfig.getDelayTime();
        Boolean isAutoClose = mqConfig.getIsAutoClose();
        String queueName = mqConfig.getQueueName();
        DeadConfig deadConfig = mqConfig.getDeadConfig();
        if (exchangeType == null) {
            exchangeType = ExchangeType.direct;
        }
        if (durable == null) {
            durable = true;
        }
        if (isAutoClose == null) {
            isAutoClose = false;
        }
        if (StrUtil.isBlankOrUndefined(routingKey)) {
            routingKey = "#";
        }
        if (isDelay == null) {
            isDelay = false;
        } else {
            if (isDelay.booleanValue() && delayTime == null) {
                delayTime = 1000L;
            }
            if (isDelay.booleanValue() && StrUtil.isBlankOrUndefined(changeName)) {
                throw new NullPointerException("延迟队列, 交换机不能为空");
            }
        }
        MqConfig build = MqConfig.of().title(mqConfig.getTitle()).changeName(changeName).queueName(mqConfig.getQueueName()).routingKey(routingKey).durable(durable).ttl(mqConfig.getTtl()).max(mqConfig.getMax()).exchangeType(exchangeType).isAutoClose(isAutoClose).isDelay(isDelay).delayTime(delayTime).deadConfig(deadConfig).build();
        final AtomicReference atomicReference = new AtomicReference(getSendChannel(build));
        ConfirmListener confirmListener = new ConfirmListener() { // from class: com.xunmo.utils.MqHelper.1
            public void handleAck(long j, boolean z) {
                MqHelper.log.debug("已收到消息，标识：{}", Long.valueOf(j));
                biConsumer.accept(atomicReference.get(), SendAction.SUCCESS);
            }

            public void handleNack(long j, boolean z) {
                MqHelper.log.warn("未确认消息，标识：{}", Long.valueOf(j));
                biConsumer.accept(atomicReference.get(), SendAction.MQ_FAIL);
            }
        };
        ((Channel) atomicReference.get()).clearConfirmListeners();
        ((Channel) atomicReference.get()).addConfirmListener(confirmListener);
        try {
            try {
                AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
                builder.deliveryMode(2);
                if (num != null) {
                    builder.expiration(num.toString());
                }
                if (isDelay.booleanValue()) {
                    HashMap hashMap = new HashMap();
                    hashMap.put("x-delay", delayTime);
                    builder.headers(hashMap);
                    str2 = changeName;
                    str3 = routingKey;
                } else if (StrUtil.isNotBlank(changeName)) {
                    str2 = changeName;
                    str3 = routingKey;
                } else {
                    str2 = routingKey;
                    str3 = queueName;
                    builder.priority((Integer) null);
                }
                String str4 = str2;
                String str5 = str3;
                tryDo(() -> {
                    log.trace("系统发送发送消息前记录: {} {}", Integer.valueOf(((Channel) atomicReference.get()).getChannelNumber()), str);
                    ((Channel) atomicReference.get()).basicPublish(str4, str5, builder.build(), str.getBytes(StandardCharsets.UTF_8));
                    log.trace("系统发送发送消息: {} {}", Integer.valueOf(((Channel) atomicReference.get()).getChannelNumber()), str);
                }, () -> {
                    return "发送消息异常";
                }, () -> {
                    atomicReference.set(getSendChannel(build));
                });
                returnChannel(build, (Channel) atomicReference.get());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            returnChannel(build, (Channel) atomicReference.get());
            throw th;
        }
    }

    public static void consumeMsg(MqConfig mqConfig, final BiFunction<Channel, String, ConsumeAction> biFunction) throws IOException, TimeoutException {
        ExchangeType exchangeType = mqConfig.getExchangeType();
        Boolean durable = mqConfig.getDurable();
        String routingKey = mqConfig.getRoutingKey();
        Boolean isAutoClose = mqConfig.getIsAutoClose();
        String changeName = mqConfig.getChangeName();
        String queueName = mqConfig.getQueueName();
        DeadConfig deadConfig = mqConfig.getDeadConfig();
        if (exchangeType == null) {
            exchangeType = ExchangeType.direct;
        }
        if (durable == null) {
            durable = true;
        }
        if (isAutoClose == null) {
            isAutoClose = false;
        }
        if (StrUtil.isBlankOrUndefined(routingKey)) {
            routingKey = "#";
        }
        final MqConfig build = MqConfig.of().title(mqConfig.getTitle()).changeName(changeName).queueName(mqConfig.getQueueName()).routingKey(routingKey).durable(durable).ttl(mqConfig.getTtl()).max(mqConfig.getMax()).exchangeType(exchangeType).isAutoClose(isAutoClose).deadConfig(deadConfig).build();
        final AtomicReference atomicReference = new AtomicReference(getConsumeChannel(build));
        try {
            try {
                DefaultConsumer defaultConsumer = new DefaultConsumer((Channel) atomicReference.get()) { // from class: com.xunmo.utils.MqHelper.2
                    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                        long deliveryTag = envelope.getDeliveryTag();
                        AtomicReference atomicReference2 = new AtomicReference(ConsumeAction.RETRY);
                        try {
                            atomicReference2.set(biFunction.apply(atomicReference.get(), new String(bArr, StandardCharsets.UTF_8)));
                            if (atomicReference2.get() == ConsumeAction.ACCEPT) {
                                AtomicReference atomicReference3 = atomicReference;
                                TryDoing tryDoing = () -> {
                                    ((Channel) atomicReference3.get()).basicAck(deliveryTag, false);
                                };
                                Supplier supplier = () -> {
                                    return "确认消息消费失败";
                                };
                                AtomicReference atomicReference4 = atomicReference;
                                MqConfig mqConfig2 = build;
                                MqHelper.tryDo(tryDoing, supplier, () -> {
                                    atomicReference4.set(MqHelper.getConsumeChannel(mqConfig2));
                                });
                            } else {
                                AtomicReference atomicReference5 = atomicReference;
                                TryDoing tryDoing2 = () -> {
                                    ((Channel) atomicReference5.get()).basicNack(deliveryTag, false, atomicReference2.get() == ConsumeAction.RETRY);
                                };
                                Supplier supplier2 = () -> {
                                    return "消息重回队列失败失败";
                                };
                                AtomicReference atomicReference6 = atomicReference;
                                MqConfig mqConfig3 = build;
                                MqHelper.tryDo(tryDoing2, supplier2, () -> {
                                    atomicReference6.set(MqHelper.getConsumeChannel(mqConfig3));
                                });
                            }
                        } catch (Exception e) {
                            MqHelper.log.error("异常: {}", ExceptionUtil.stacktraceToString(e));
                            throw new RuntimeException(e);
                        }
                    }
                };
                tryDo(() -> {
                    ((Channel) atomicReference.get()).basicConsume(queueName, false, defaultConsumer);
                }, () -> {
                    return "消费回信应答失败";
                }, () -> {
                    atomicReference.set(getConsumeChannel(build));
                });
                returnChannel(build, (Channel) atomicReference.get());
            } catch (Exception e) {
                log.error("异常: {}", ExceptionUtil.stacktraceToString(e));
                throw e;
            }
        } catch (Throwable th) {
            returnChannel(build, (Channel) atomicReference.get());
            throw th;
        }
    }

    private static Channel getSendChannel(MqConfig mqConfig) throws IOException, TimeoutException {
        mqConfig.setType("0");
        return getMqChannelPool(mqConfig).getChannel();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Channel getConsumeChannel(MqConfig mqConfig) throws IOException, TimeoutException {
        mqConfig.setType("1");
        return getMqChannelPool(mqConfig).getChannel();
    }

    private static MqChannelPool getMqChannelPool(MqConfig mqConfig) throws IOException, TimeoutException {
        MqChannelPool mqChannelPool;
        if (poolMap.containsKey(mqConfig)) {
            mqChannelPool = poolMap.get(mqConfig);
        } else {
            mqChannelPool = new MqChannelPool(mqChannelPoolConfig, mqConfig);
            poolMap.put(mqConfig, mqChannelPool);
        }
        return mqChannelPool;
    }

    private static void returnChannel(MqConfig mqConfig, Channel channel) throws IOException, TimeoutException {
        getMqChannelPool(mqConfig).returnChannel(channel);
    }

    public static void tryDo(TryDoing tryDoing, Supplier<String> supplier) throws TimeoutException, IOException, AlreadyClosedException {
        int i = tryCountMax;
        do {
            try {
                tryDoing.tryDo();
                return;
            } catch (IOException e) {
                log.error("{}, 连接重试第{}次获取!", supplier.get(), Integer.valueOf(tryCountMax - i));
                i--;
                if (i <= 0) {
                    throw e;
                }
                try {
                    TimeUnit.SECONDS.sleep(3L);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            } catch (AlreadyClosedException e3) {
                log.error("{}, 重连重试第{}次获取!", supplier.get(), Integer.valueOf(tryCountMax - i));
                i--;
                if (i <= 0) {
                    throw e3;
                }
                TimeUnit.SECONDS.sleep(3L);
            } catch (TimeoutException e4) {
                log.error("{}, 超时重试第{}次获取!", supplier.get(), Integer.valueOf(tryCountMax - i));
                i--;
                if (i <= 0) {
                    throw e4;
                }
                TimeUnit.SECONDS.sleep(3L);
            }
        } while (i > 0);
    }

    public static void tryDo(TryDoing tryDoing, Supplier<String> supplier, TryDoing tryDoing2) throws IOException, TimeoutException, AlreadyClosedException {
        int i = tryCountMax;
        while (true) {
            try {
                tryDoing.tryDo();
                return;
            } catch (TimeoutException e) {
                log.error("{}, 超时重试第{}次获取!", supplier.get(), Integer.valueOf(tryCountMax - i));
                i--;
                if (i <= 0) {
                    throw e;
                }
                try {
                    TimeUnit.SECONDS.sleep(3L);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            } catch (AlreadyClosedException e3) {
                log.error("{}, 重连重试第{}次获取!", supplier.get(), Integer.valueOf(tryCountMax - i));
                if (i - 1 <= 0) {
                    throw e3;
                }
                tryDoing2.tryDo();
                return;
            } catch (IOException e4) {
                log.error("{}, 连接重试第{}次获取!", supplier.get(), Integer.valueOf(tryCountMax - i));
                i--;
                if (i <= 0) {
                    throw e4;
                }
                TimeUnit.SECONDS.sleep(3L);
            }
        }
    }

    public static <T> T tryReturn(TryReturn<T> tryReturn, Supplier<String> supplier) throws Exception {
        int i = tryCountMax;
        while (true) {
            try {
                return tryReturn.get();
            } catch (Exception e) {
                log.error("{}, 重试第{}次获取!", supplier.get(), Integer.valueOf(tryCountMax - i));
                i--;
                if (i <= 0) {
                    throw e;
                }
                try {
                    TimeUnit.SECONDS.sleep(3L);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
    }
}
