/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;

public class SystemTopicBaseTxnBufferSnapshotService
implements TransactionBufferSnapshotService {
    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
        this.clients = new ConcurrentHashMap<TopicName, SystemTopicClient<TransactionBufferSnapshot>>();
    }

    @Override
    public CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
        return this.getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
    }

    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(TopicName topicName) {
        TopicName systemTopicName = NamespaceEventsSystemTopicFactory.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
        if (systemTopicName == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, because the topicName is null!"));
        }
        return CompletableFuture.completedFuture(this.clients.computeIfAbsent(systemTopicName, v -> this.namespaceEventsSystemTopicFactory.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
    }

    @Override
    public CompletableFuture<SystemTopicClient.Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
        return this.getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
    }

    @Override
    public void removeClient(TopicName topicName, TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
        if (transactionBufferSystemTopicClient.getReaders().size() == 0 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
            this.clients.remove(topicName);
        }
    }

    @Override
    public void close() throws Exception {
        for (Map.Entry<TopicName, SystemTopicClient<TransactionBufferSnapshot>> entry : this.clients.entrySet()) {
            entry.getValue().close();
        }
    }
}

