package org.apache.eventmesh.connector.standalone.consumer;

import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.consumer.Consumer;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.connector.standalone.broker.StandaloneBroker;
import org.apache.eventmesh.connector.standalone.broker.model.TopicMetadata;
import org.apache.eventmesh.connector.standalone.broker.task.SubScribeTask;

/* loaded from: input_file:org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.class */
public class StandaloneConsumer implements Consumer {
    private EventListener listener;
    private final StandaloneBroker standaloneBroker = StandaloneBroker.getInstance();
    private final ConcurrentHashMap<String, SubScribeTask> subscribeTaskTable = new ConcurrentHashMap<>(16);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final ExecutorService consumeExecutorService = ThreadPoolFactory.createThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, "StandaloneConsumerThread");

    public StandaloneConsumer(Properties properties) {
    }

    public boolean isStarted() {
        return this.isStarted.get();
    }

    public boolean isClosed() {
        return !this.isStarted.get();
    }

    public void start() {
        this.isStarted.compareAndSet(false, true);
    }

    public void shutdown() {
        this.isStarted.compareAndSet(true, false);
        this.subscribeTaskTable.forEach((str, subScribeTask) -> {
            subScribeTask.shutdown();
        });
        this.subscribeTaskTable.clear();
    }

    public void init(Properties properties) throws Exception {
    }

    public void updateOffset(List<CloudEvent> list, AbstractContext abstractContext) {
        list.forEach(cloudEvent -> {
            this.standaloneBroker.updateOffset(new TopicMetadata(cloudEvent.getSubject()), ((Long) Objects.requireNonNull((Long) cloudEvent.getExtension("offset"))).longValue());
        });
    }

    public void subscribe(String str) throws Exception {
        if (this.subscribeTaskTable.containsKey(str)) {
            return;
        }
        synchronized (this.subscribeTaskTable) {
            this.standaloneBroker.createTopicIfAbsent(str);
            SubScribeTask subScribeTask = new SubScribeTask(str, this.standaloneBroker, this.listener);
            this.subscribeTaskTable.put(str, subScribeTask);
            this.consumeExecutorService.execute(subScribeTask);
        }
    }

    public void unsubscribe(String str) {
        if (this.subscribeTaskTable.containsKey(str)) {
            synchronized (this.subscribeTaskTable) {
                this.subscribeTaskTable.get(str).shutdown();
                this.subscribeTaskTable.remove(str);
            }
        }
    }

    public void registerEventListener(EventListener eventListener) {
        this.listener = eventListener;
    }
}
