package org.apache.eventmesh.storage.rocketmq.consumer;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.exception.StorageRuntimeException;
import org.apache.eventmesh.storage.rocketmq.cloudevent.RocketMQMessageFactory;
import org.apache.eventmesh.storage.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.storage.rocketmq.config.ClientConfig;
import org.apache.eventmesh.storage.rocketmq.domain.NonStandardKeys;
import org.apache.eventmesh.storage.rocketmq.patch.EventMeshConsumeConcurrentlyContext;
import org.apache.eventmesh.storage.rocketmq.patch.EventMeshConsumeConcurrentlyStatus;
import org.apache.eventmesh.storage.rocketmq.patch.EventMeshMessageListenerConcurrently;
import org.apache.eventmesh.storage.rocketmq.utils.BeanUtils;
import org.apache.eventmesh.storage.rocketmq.utils.CloudEventUtils;
import org.apache.eventmesh.storage.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.LanguageCode;

/* loaded from: input_file:org/apache/eventmesh/storage/rocketmq/consumer/PushConsumerImpl.class */
public class PushConsumerImpl {
    private final Properties properties;
    private EventListener eventListener;
    private final ClientConfig clientConfig;
    private AtomicBoolean started = new AtomicBoolean(false);
    private final DefaultMQPushConsumer rocketmqPushConsumer = new DefaultMQPushConsumer();

    /* renamed from: org.apache.eventmesh.storage.rocketmq.consumer.PushConsumerImpl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/eventmesh/storage/rocketmq/consumer/PushConsumerImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$eventmesh$api$EventMeshAction = new int[EventMeshAction.values().length];

        static {
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.CommitMessage.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.ReconsumeLater.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.ManualAck.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/eventmesh/storage/rocketmq/consumer/PushConsumerImpl$BroadCastingMessageListener.class */
    private class BroadCastingMessageListener extends EventMeshMessageListenerConcurrently {
        private BroadCastingMessageListener() {
        }

        @Override // org.apache.eventmesh.storage.rocketmq.patch.EventMeshMessageListenerConcurrently
        public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, EventMeshConsumeConcurrentlyContext eventMeshConsumeConcurrentlyContext) {
            if (messageExt == null) {
                return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            messageExt.putUserProperty("borntimestamp", String.valueOf(messageExt.getBornTimestamp()));
            messageExt.putUserProperty("storetimestamp", String.valueOf(messageExt.getStoreTimestamp()));
            CloudEvent event = RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(messageExt)).toEvent();
            CloudEventBuilder cloudEventBuilder = null;
            Iterator it = MessageConst.STRING_HASH_SET.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (StringUtils.isNotEmpty(messageExt.getProperty(str))) {
                    cloudEventBuilder = CloudEventBuilder.from(event).withExtension(str.toLowerCase().replace("_", "99"), messageExt.getProperty(str));
                }
            }
            if (cloudEventBuilder != null) {
                event = cloudEventBuilder.build();
            }
            if (PushConsumerImpl.this.eventListener == null) {
                throw new StorageRuntimeException(String.format("The topic/queue %s isn't attached to this consumer", messageExt.getTopic()));
            }
            final Properties properties = new Properties();
            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() { // from class: org.apache.eventmesh.storage.rocketmq.consumer.PushConsumerImpl.BroadCastingMessageListener.1
                public void commit(EventMeshAction eventMeshAction) {
                    switch (AnonymousClass1.$SwitchMap$org$apache$eventmesh$api$EventMeshAction[eventMeshAction.ordinal()]) {
                        case 1:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
                            return;
                        case 2:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
                            return;
                        case 3:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
                            return;
                        default:
                            return;
                    }
                }
            };
            eventMeshAsyncConsumeContext.setAbstractContext(eventMeshConsumeConcurrentlyContext);
            PushConsumerImpl.this.eventListener.consume(event, eventMeshAsyncConsumeContext);
            return EventMeshConsumeConcurrentlyStatus.valueOf(properties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
        }

        /* synthetic */ BroadCastingMessageListener(PushConsumerImpl pushConsumerImpl, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/eventmesh/storage/rocketmq/consumer/PushConsumerImpl$ClusteringMessageListener.class */
    private class ClusteringMessageListener extends EventMeshMessageListenerConcurrently {
        private ClusteringMessageListener() {
        }

        @Override // org.apache.eventmesh.storage.rocketmq.patch.EventMeshMessageListenerConcurrently
        public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, EventMeshConsumeConcurrentlyContext eventMeshConsumeConcurrentlyContext) {
            if (messageExt == null) {
                return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            messageExt.putUserProperty("borntimestamp", String.valueOf(messageExt.getBornTimestamp()));
            messageExt.putUserProperty(EventMeshConstants.STORE_TIMESTAMP, String.valueOf(messageExt.getStoreTimestamp()));
            CloudEvent event = RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(messageExt)).toEvent();
            CloudEventBuilder cloudEventBuilder = null;
            Iterator it = MessageConst.STRING_HASH_SET.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (StringUtils.isNotEmpty(messageExt.getProperty(str))) {
                    cloudEventBuilder = CloudEventBuilder.from(event).withExtension(str.toLowerCase().replace("_", "99"), messageExt.getProperty(str));
                }
            }
            if (cloudEventBuilder != null) {
                event = cloudEventBuilder.build();
            }
            if (PushConsumerImpl.this.eventListener == null) {
                throw new StorageRuntimeException(String.format("The topic/queue %s isn't attached to this consumer", messageExt.getTopic()));
            }
            final Properties properties = new Properties();
            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() { // from class: org.apache.eventmesh.storage.rocketmq.consumer.PushConsumerImpl.ClusteringMessageListener.1
                public void commit(EventMeshAction eventMeshAction) {
                    switch (AnonymousClass1.$SwitchMap$org$apache$eventmesh$api$EventMeshAction[eventMeshAction.ordinal()]) {
                        case 1:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
                            return;
                        case 2:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
                            return;
                        case 3:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
                            return;
                        default:
                            return;
                    }
                }
            };
            eventMeshAsyncConsumeContext.setAbstractContext(eventMeshConsumeConcurrentlyContext);
            PushConsumerImpl.this.eventListener.consume(event, eventMeshAsyncConsumeContext);
            return EventMeshConsumeConcurrentlyStatus.valueOf(properties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
        }

        /* synthetic */ ClusteringMessageListener(PushConsumerImpl pushConsumerImpl, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public PushConsumerImpl(Properties properties) {
        this.properties = properties;
        this.clientConfig = (ClientConfig) BeanUtils.populate(properties, ClientConfig.class);
        String accessPoints = this.clientConfig.getAccessPoints();
        if (accessPoints == null || accessPoints.isEmpty()) {
            throw new StorageRuntimeException("OMS AccessPoints is null or empty.");
        }
        this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
        String consumerId = this.clientConfig.getConsumerId();
        if (null == consumerId || consumerId.isEmpty()) {
            throw new StorageRuntimeException("Consumer Group is necessary for RocketMQ, please set it.");
        }
        this.rocketmqPushConsumer.setConsumerGroup(consumerId);
        this.rocketmqPushConsumer.setMaxReconsumeTimes(this.clientConfig.getRmqMaxRedeliveryTimes());
        this.rocketmqPushConsumer.setConsumeTimeout(this.clientConfig.getRmqMessageConsumeTimeout());
        this.rocketmqPushConsumer.setConsumeThreadMax(this.clientConfig.getRmqMaxConsumeThreadNums());
        this.rocketmqPushConsumer.setConsumeThreadMin(this.clientConfig.getRmqMinConsumeThreadNums());
        this.rocketmqPushConsumer.setMessageModel(MessageModel.valueOf(this.clientConfig.getMessageModel()));
        String buildInstanceName = OMSUtil.buildInstanceName();
        this.rocketmqPushConsumer.setInstanceName(properties.getProperty("instanceName"));
        properties.put("CONSUMER_ID", buildInstanceName);
        this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
        if (this.clientConfig.getMessageModel().equalsIgnoreCase(MessageModel.BROADCASTING.name())) {
            this.rocketmqPushConsumer.registerMessageListener(new BroadCastingMessageListener(this, null));
        } else {
            this.rocketmqPushConsumer.registerMessageListener(new ClusteringMessageListener(this, null));
        }
    }

    public Properties attributes() {
        return this.properties;
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            try {
                this.rocketmqPushConsumer.start();
            } catch (Exception e) {
                throw new StorageRuntimeException(e.getMessage());
            }
        }
    }

    public synchronized void shutdown() {
        if (this.started.compareAndSet(true, false)) {
            this.rocketmqPushConsumer.shutdown();
        }
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public boolean isClosed() {
        return !isStarted();
    }

    public DefaultMQPushConsumer getRocketmqPushConsumer() {
        return this.rocketmqPushConsumer;
    }

    public void subscribe(String str, String str2) {
        try {
            this.rocketmqPushConsumer.subscribe(str, str2);
        } catch (MQClientException e) {
            throw new StorageRuntimeException(String.format("RocketMQ push consumer can't attach to %s.", str));
        }
    }

    public void unsubscribe(String str) {
        try {
            this.rocketmqPushConsumer.unsubscribe(str);
        } catch (Exception e) {
            throw new StorageRuntimeException(String.format("RocketMQ push consumer fails to unsubscribe topic: %s", str));
        }
    }

    public void updateOffset(List<CloudEvent> list, AbstractContext abstractContext) {
        ConsumeMessageService consumeMessageService = this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().getConsumeMessageService();
        ArrayList arrayList = new ArrayList(list.size());
        for (CloudEvent cloudEvent : list) {
            if (cloudEvent != null) {
                arrayList.add(CloudEventUtils.msgConvertExt((Message) RocketMQMessageFactory.createWriter((String) Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent)));
            }
        }
        ((ConsumeMessageConcurrentlyService) consumeMessageService).updateOffset(arrayList, (EventMeshConsumeConcurrentlyContext) abstractContext);
    }

    public void registerEventListener(EventListener eventListener) {
        this.eventListener = eventListener;
    }
}
