package org.apache.eventmesh.connector.standalone.broker;

import io.cloudevents.CloudEvent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.eventmesh.connector.standalone.broker.model.MessageEntity;
import org.apache.eventmesh.connector.standalone.broker.model.TopicMetadata;
import org.apache.eventmesh.connector.standalone.broker.task.HistoryMessageClearTask;

/* loaded from: input_file:org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.class */
public class StandaloneBroker {
    private final ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/eventmesh/connector/standalone/broker/StandaloneBroker$StandaloneBrokerInstanceHolder.class */
    private static class StandaloneBrokerInstanceHolder {
        private static final StandaloneBroker instance = new StandaloneBroker();

        private StandaloneBrokerInstanceHolder() {
        }
    }

    private StandaloneBroker() {
        startHistoryMessageCleanTask();
    }

    public static StandaloneBroker getInstance() {
        return StandaloneBrokerInstanceHolder.instance;
    }

    public MessageEntity putMessage(String str, CloudEvent cloudEvent) throws InterruptedException {
        Pair<MessageQueue, AtomicLong> createTopicIfAbsent = createTopicIfAbsent(str);
        AtomicLong atomicLong = (AtomicLong) createTopicIfAbsent.getRight();
        MessageQueue messageQueue = (MessageQueue) createTopicIfAbsent.getLeft();
        MessageEntity messageEntity = new MessageEntity(new TopicMetadata(str), cloudEvent, atomicLong.getAndIncrement(), System.currentTimeMillis());
        messageQueue.put(messageEntity);
        return messageEntity;
    }

    public CloudEvent takeMessage(String str) throws InterruptedException {
        return this.messageContainer.computeIfAbsent(new TopicMetadata(str), topicMetadata -> {
            return new MessageQueue();
        }).take().getMessage();
    }

    public CloudEvent getMessage(String str) {
        MessageEntity head = this.messageContainer.computeIfAbsent(new TopicMetadata(str), topicMetadata -> {
            return new MessageQueue();
        }).getHead();
        if (head == null) {
            return null;
        }
        return head.getMessage();
    }

    public CloudEvent getMessage(String str, long j) {
        MessageEntity byOffset = this.messageContainer.computeIfAbsent(new TopicMetadata(str), topicMetadata -> {
            return new MessageQueue();
        }).getByOffset(j);
        if (byOffset == null) {
            return null;
        }
        return byOffset.getMessage();
    }

    private void startHistoryMessageCleanTask() {
        Thread thread = new Thread(new HistoryMessageClearTask(this.messageContainer));
        thread.setDaemon(true);
        thread.setName("StandaloneBroker-HistoryMessageCleanTask");
        thread.start();
    }

    public boolean checkTopicExist(String str) {
        return this.messageContainer.containsKey(new TopicMetadata(str));
    }

    public Pair<MessageQueue, AtomicLong> createTopicIfAbsent(String str) {
        TopicMetadata topicMetadata = new TopicMetadata(str);
        return Pair.of(this.messageContainer.computeIfAbsent(topicMetadata, topicMetadata2 -> {
            return new MessageQueue();
        }), this.offsetMap.computeIfAbsent(topicMetadata, topicMetadata3 -> {
            return new AtomicLong();
        }));
    }

    public void updateOffset(TopicMetadata topicMetadata, long j) {
        this.offsetMap.computeIfPresent(topicMetadata, (topicMetadata2, atomicLong) -> {
            atomicLong.set(j);
            return atomicLong;
        });
    }
}
