package group.idealworld.dew.core.cluster.spi.rocket;

import com.ecfront.dew.common.exception.RTUnsupportedEncodingException;
import group.idealworld.dew.core.cluster.AbsClusterMQ;
import group.idealworld.dew.core.cluster.dto.MessageWrap;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

/* loaded from: input_file:group/idealworld/dew/core/cluster/spi/rocket/RocketClusterMQ.class */
public class RocketClusterMQ extends AbsClusterMQ {
    private static SendBeforeFun sendBeforeFun = (str, map) -> {
        return null;
    };
    private static SendErrorFun sendErrorFun = (exc, obj) -> {
    };
    private static SendFinishFun sendFinishFun = obj -> {
    };
    private static ReceiveBeforeFun receiveBeforeFun = (str, map) -> {
        return null;
    };
    private static ReceiveErrorFun receiveErrorFun = (exc, obj) -> {
    };
    private static ReceiveFinishFun receiveFinishFun = obj -> {
    };
    private final RocketAdapter rocketAdapter;
    private final String nameServer;
    private final String producerGroupName;
    private final String consumerGroupName;

    public RocketClusterMQ(RocketAdapter rocketAdapter, String str, String str2, String str3) {
        this.rocketAdapter = rocketAdapter;
        this.nameServer = str;
        this.producerGroupName = str2;
        this.consumerGroupName = str3;
    }

    public static void setSendBeforeFun(SendBeforeFun sendBeforeFun2) {
        sendBeforeFun = sendBeforeFun2;
    }

    public static void setSendErrorFun(SendErrorFun sendErrorFun2) {
        sendErrorFun = sendErrorFun2;
    }

    public static void setSendFinishFun(SendFinishFun sendFinishFun2) {
        sendFinishFun = sendFinishFun2;
    }

    public static void setReceiveBeforeFun(ReceiveBeforeFun receiveBeforeFun2) {
        receiveBeforeFun = receiveBeforeFun2;
    }

    public static void setReceiveErrorFun(ReceiveErrorFun receiveErrorFun2) {
        receiveErrorFun = receiveErrorFun2;
    }

    public static void setReceiveFinishFun(ReceiveFinishFun receiveFinishFun2) {
        receiveFinishFun = receiveFinishFun2;
    }

    protected boolean doPublish(String str, String str2, Optional<Map<String, Object>> optional, boolean z) {
        RocketMQTemplate rocketMQTemplate = this.rocketAdapter.getRocketMQTemplate();
        Object obj = null;
        try {
            if (z) {
                throw new RTUnsupportedEncodingException("Rocket doesn't support confirm mode");
            }
            try {
                Map<String, Object> mQHeader = getMQHeader(str);
                mQHeader.getClass();
                optional.ifPresent(mQHeader::putAll);
                Message build = MessageBuilder.withPayload(str2).copyHeaders(mQHeader).build();
                obj = sendBeforeFun.invoke(str, mQHeader);
                rocketMQTemplate.syncSend(str, build);
                sendFinishFun.invoke(obj);
                return true;
            } catch (Exception e) {
                logger.error("[MQ] Rocket publish error.", e);
                sendErrorFun.invoke(e, obj);
                sendFinishFun.invoke(obj);
                return false;
            }
        } catch (Throwable th) {
            sendFinishFun.invoke(obj);
            throw th;
        }
    }

    protected void doSubscribe(String str, Consumer<MessageWrap> consumer) {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(this.producerGroupName);
        defaultMQPushConsumer.setNamesrvAddr(this.nameServer);
        defaultMQPushConsumer.setInstanceName(UUID.randomUUID().toString());
        try {
            defaultMQPushConsumer.subscribe(str, "*");
            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            receiveMessage(str, defaultMQPushConsumer, consumer);
            defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            logger.error("[MQ] Rocket response error.", e);
        }
    }

    protected boolean doRequest(String str, String str2, Optional<Map<String, Object>> optional, boolean z) {
        RocketMQTemplate rocketMQTemplate = this.rocketAdapter.getRocketMQTemplate();
        Object obj = null;
        try {
            if (z) {
                throw new RTUnsupportedEncodingException("Rocket doesn't support confirm mode");
            }
            try {
                Map<String, Object> mQHeader = getMQHeader(str);
                mQHeader.getClass();
                optional.ifPresent(mQHeader::putAll);
                obj = sendBeforeFun.invoke(str, mQHeader);
                rocketMQTemplate.syncSend(str, MessageBuilder.withPayload(str2).copyHeaders(mQHeader).build());
                sendFinishFun.invoke(obj);
                return true;
            } catch (Exception e) {
                logger.error("[MQ] Rocket publish error.", e);
                sendErrorFun.invoke(e, obj);
                sendFinishFun.invoke(obj);
                return false;
            }
        } catch (Throwable th) {
            sendFinishFun.invoke(obj);
            throw th;
        }
    }

    protected void doResponse(String str, Consumer<MessageWrap> consumer) {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(this.consumerGroupName);
        defaultMQPushConsumer.setNamesrvAddr(this.nameServer);
        defaultMQPushConsumer.setInstanceName(UUID.randomUUID().toString());
        try {
            defaultMQPushConsumer.subscribe(str, "*");
            defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            receiveMessage(str, defaultMQPushConsumer, consumer);
            defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            logger.error("[MQ] Rocket response error.", e);
        }
    }

    private void receiveMessage(String str, DefaultMQPushConsumer defaultMQPushConsumer, Consumer<MessageWrap> consumer) {
        defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
            try {
                list.parallelStream().forEach(messageExt -> {
                    Map<String, Object> mQHeader = setMQHeader(str, (Map) messageExt.getProperties().entrySet().stream().collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, (v0) -> {
                        return v0.getValue();
                    })));
                    copyOnWriteArrayList.add(receiveBeforeFun.invoke(str, mQHeader));
                    consumer.accept(new MessageWrap(str, Optional.of(mQHeader), new String(messageExt.getBody(), StandardCharsets.UTF_8)));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                copyOnWriteArrayList.forEach(obj -> {
                    receiveErrorFun.invoke(e, obj);
                });
                logger.error("[MQ] Rocket response error.", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            } finally {
                ReceiveFinishFun receiveFinishFun2 = receiveFinishFun;
                receiveFinishFun2.getClass();
                copyOnWriteArrayList.forEach(receiveFinishFun2::invoke);
            }
        });
    }

    public boolean supportHeader() {
        return true;
    }
}
