package org.apache.eventmesh.storage.knative.consumer;

import com.google.common.collect.Lists;
import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.storage.knative.domain.NonStandardKeys;
import org.apache.eventmesh.storage.knative.patch.EventMeshConsumeConcurrentlyContext;
import org.apache.eventmesh.storage.knative.patch.EventMeshConsumeConcurrentlyStatus;
import org.apache.eventmesh.storage.knative.patch.EventMeshMessageListenerConcurrently;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/storage/knative/consumer/PullConsumerImpl.class */
public class PullConsumerImpl {
    private static final Logger LOG = LoggerFactory.getLogger(PullConsumerImpl.class);
    private transient List<SubscriptionItem> topicList;
    private final transient Properties properties;
    private transient EventListener eventListener;
    private final transient AtomicBoolean started = new AtomicBoolean(false);
    private transient ConcurrentMap<String, String> subscriptionInner = new ConcurrentHashMap();
    private final transient ConcurrentHashMap<String, AtomicLong> offsetMap = new ConcurrentHashMap<>();
    private final transient DefaultConsumer defaultConsumer = new DefaultConsumer();

    /* renamed from: org.apache.eventmesh.storage.knative.consumer.PullConsumerImpl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/eventmesh/storage/knative/consumer/PullConsumerImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$eventmesh$api$EventMeshAction = new int[EventMeshAction.values().length];

        static {
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.CommitMessage.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.ReconsumeLater.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.ManualAck.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/eventmesh/storage/knative/consumer/PullConsumerImpl$ClusteringMessageListener.class */
    private class ClusteringMessageListener extends EventMeshMessageListenerConcurrently {
        private ClusteringMessageListener() {
        }

        @Override // org.apache.eventmesh.storage.knative.patch.EventMeshMessageListenerConcurrently
        public EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext eventMeshConsumeConcurrentlyContext) {
            final Properties properties = new Properties();
            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() { // from class: org.apache.eventmesh.storage.knative.consumer.PullConsumerImpl.ClusteringMessageListener.1
                public void commit(EventMeshAction eventMeshAction) {
                    switch (AnonymousClass1.$SwitchMap$org$apache$eventmesh$api$EventMeshAction[eventMeshAction.ordinal()]) {
                        case 1:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
                            return;
                        case 2:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
                            return;
                        case 3:
                            properties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
                            return;
                        default:
                            return;
                    }
                }
            };
            eventMeshAsyncConsumeContext.setAbstractContext((AbstractContext) eventMeshConsumeConcurrentlyContext);
            PullConsumerImpl.this.eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
            return EventMeshConsumeConcurrentlyStatus.valueOf(properties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
        }

        /* synthetic */ ClusteringMessageListener(PullConsumerImpl pullConsumerImpl, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public PullConsumerImpl(Properties properties) throws Exception {
        this.topicList = null;
        this.properties = properties;
        this.topicList = Lists.newArrayList();
        this.defaultConsumer.registerMessageListener(new ClusteringMessageListener(this, null));
    }

    public void subscribe(String str) {
        try {
            this.topicList.add(new SubscriptionItem(str, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
            this.topicList.forEach(subscriptionItem -> {
                try {
                    this.subscriptionInner.put(subscriptionItem.getTopic(), this.defaultConsumer.pullMessage(subscriptionItem.getTopic(), this.properties.getProperty("serviceAddr")));
                } catch (Exception e) {
                    LOG.error("store received message error", e);
                }
            });
        } catch (Exception e) {
            LOG.error("other error", e);
        }
    }

    public void unsubscribe(String str) {
        try {
            this.topicList.remove(str);
        } catch (Exception e) {
            LOG.error("unsubscribe topic error", e);
        }
    }

    public void updateOffset(List<CloudEvent> list, AbstractContext abstractContext) {
        list.forEach(cloudEvent -> {
            updateOffset(cloudEvent.getSubject(), (Long) cloudEvent.getExtension("offset"));
        });
    }

    public void updateOffset(String str, Long l) {
        this.offsetMap.computeIfPresent(str, (str2, atomicLong) -> {
            atomicLong.set(l.longValue());
            return atomicLong;
        });
    }

    public void start() {
        this.started.set(true);
    }

    public synchronized void shutdown() {
        this.started.set(false);
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public boolean isClosed() {
        return !isStarted();
    }

    public void registerEventListener(EventListener eventListener) {
        this.eventListener = eventListener;
    }
}
