package org.apache.eventmesh.storage.rocketmq.producer;

import io.cloudevents.CloudEvent;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.api.exception.StorageRuntimeException;
import org.apache.eventmesh.storage.rocketmq.cloudevent.RocketMQMessageFactory;
import org.apache.eventmesh.storage.rocketmq.utils.CloudEventUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/storage/rocketmq/producer/ProducerImpl.class */
public class ProducerImpl extends AbstractProducer {
    private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
    public static final int eventMeshServerAsyncAccumulationThreshold = 1000;

    public ProducerImpl(Properties properties) {
        super(properties);
    }

    public Properties attributes() {
        return this.properties;
    }

    public void setExtFields() {
        super.getRocketmqProducer().setRetryTimesWhenSendFailed(0);
        super.getRocketmqProducer().setRetryTimesWhenSendAsyncFailed(0);
        super.getRocketmqProducer().setPollNameServerInterval(60000);
        super.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getNettyClientConfig().setClientAsyncSemaphoreValue(eventMeshServerAsyncAccumulationThreshold);
        super.getRocketmqProducer().setCompressMsgBodyOverHowmuch(10);
    }

    public SendResult send(CloudEvent cloudEvent) {
        checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
        Message message = (Message) RocketMQMessageFactory.createWriter((String) Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
        supplySysProp(message, cloudEvent);
        String str = null;
        try {
            org.apache.rocketmq.client.producer.SendResult send = this.rocketmqProducer.send(message);
            SendResult sendResult = new SendResult();
            sendResult.setTopic(send.getMessageQueue().getTopic());
            str = send.getMsgId();
            sendResult.setMessageId(str);
            return sendResult;
        } catch (Exception e) {
            log.error(String.format("Send message Exception, %s", message), e);
            throw checkProducerException(message.getTopic(), str, e);
        }
    }

    public void sendOneway(CloudEvent cloudEvent) {
        checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
        Message message = (Message) RocketMQMessageFactory.createWriter((String) Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
        supplySysProp(message, cloudEvent);
        try {
            this.rocketmqProducer.sendOneway(message);
        } catch (Exception e) {
            log.error(String.format("Send message oneway Exception, %s", message), e);
            throw checkProducerException(message.getTopic(), MessageClientIDSetter.getUniqID(message), e);
        }
    }

    public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
        checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
        Message supplySysProp = supplySysProp((Message) RocketMQMessageFactory.createWriter((String) Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent), cloudEvent);
        try {
            this.rocketmqProducer.send(supplySysProp, sendCallbackConvert(supplySysProp, sendCallback));
        } catch (Exception e) {
            log.error(String.format("Send message async Exception, %s", supplySysProp), e);
            throw checkProducerException(supplySysProp.getTopic(), MessageClientIDSetter.getUniqID(supplySysProp), e);
        }
    }

    public void request(CloudEvent cloudEvent, RequestReplyCallback requestReplyCallback, long j) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
        Message message = (Message) RocketMQMessageFactory.createWriter((String) Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
        supplySysProp(message, cloudEvent);
        this.rocketmqProducer.request(message, rrCallbackConvert(message, requestReplyCallback), j);
    }

    public void reply(CloudEvent cloudEvent, SendCallback sendCallback) {
        checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
        Message message = (Message) RocketMQMessageFactory.createWriter((String) Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
        MessageAccessor.putProperty(message, "MSG_TYPE", "reply");
        supplySysProp(message, cloudEvent);
        try {
            this.rocketmqProducer.send(message, sendCallbackConvert(message, sendCallback));
        } catch (Exception e) {
            log.error(String.format("Send message async Exception, %s", message), e);
            throw checkProducerException(message.getTopic(), MessageClientIDSetter.getUniqID(message), e);
        }
    }

    private Message supplySysProp(Message message, CloudEvent cloudEvent) {
        Iterator it = MessageConst.STRING_HASH_SET.iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            String replace = str.toLowerCase().replace("_", "99");
            if (cloudEvent.getExtension(replace) != null && StringUtils.isNotEmpty(Objects.requireNonNull(cloudEvent.getExtension(replace)).toString())) {
                MessageAccessor.putProperty(message, str, Objects.requireNonNull(cloudEvent.getExtension(replace)).toString());
                message.getProperties().remove(replace);
            }
        }
        return message;
    }

    private RequestCallback rrCallbackConvert(final Message message, final RequestReplyCallback requestReplyCallback) {
        return new RequestCallback() { // from class: org.apache.eventmesh.storage.rocketmq.producer.ProducerImpl.1
            public void onSuccess(Message message2) {
                Iterator it = MessageConst.STRING_HASH_SET.iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    if (StringUtils.isNotEmpty(message2.getProperty(str))) {
                        MessageAccessor.putProperty(message2, str.toLowerCase().replace("_", "99"), message2.getProperty(str));
                        message2.getProperties().remove(str);
                    }
                }
                requestReplyCallback.onSuccess(RocketMQMessageFactory.createReader(message2).toEvent());
            }

            public void onException(Throwable th) {
                String topic = message.getTopic();
                StorageRuntimeException checkProducerException = ProducerImpl.this.checkProducerException(topic, null, th);
                OnExceptionContext onExceptionContext = new OnExceptionContext();
                onExceptionContext.setTopic(topic);
                onExceptionContext.setException(checkProducerException);
                requestReplyCallback.onException(th);
            }
        };
    }

    private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final Message message, final SendCallback sendCallback) {
        return new org.apache.rocketmq.client.producer.SendCallback() { // from class: org.apache.eventmesh.storage.rocketmq.producer.ProducerImpl.2
            public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
                sendCallback.onSuccess(CloudEventUtils.convertSendResult(sendResult));
            }

            public void onException(Throwable th) {
                String topic = message.getTopic();
                StorageRuntimeException checkProducerException = ProducerImpl.this.checkProducerException(topic, null, th);
                OnExceptionContext onExceptionContext = new OnExceptionContext();
                onExceptionContext.setTopic(topic);
                onExceptionContext.setException(checkProducerException);
                sendCallback.onException(onExceptionContext);
            }
        };
    }
}
