/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.resourcegroup;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageConsumer;
import org.apache.pulsar.broker.resourcegroup.ResourceUsagePublisher;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceUsageTopicTransportManager
implements ResourceUsageTransportManager {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTopicTransportManager.class);
    public static final String RESOURCE_USAGE_TOPIC_NAME = "non-persistent://pulsar/system/resource-usage";
    private final PulsarService pulsarService;
    private final PulsarClient pulsarClient;
    private final ResourceUsageWriterTask pTask;
    private final ResourceUsageReader consumer;
    private final Map<String, ResourceUsagePublisher> publisherMap = new ConcurrentHashMap<String, ResourceUsagePublisher>();
    private final Map<String, ResourceUsageConsumer> consumerMap = new ConcurrentHashMap<String, ResourceUsageConsumer>();
    private long staleMessageCount = 0L;

    private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
        block7: {
            List<String> nsList;
            String namespace;
            String tenant;
            PulsarAdmin admin;
            block6: {
                TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME);
                admin = this.pulsarService.getAdminClient();
                ServiceConfiguration config = this.pulsarService.getConfig();
                String cluster = config.getClusterName();
                tenant = topicName.getTenant();
                namespace = topicName.getNamespace();
                List<String> tenantList = admin.tenants().getTenants();
                if (!tenantList.contains(tenant)) {
                    try {
                        admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
                    }
                    catch (PulsarAdminException ex1) {
                        if (ex1 instanceof PulsarAdminException.ConflictException) break block6;
                        LOG.error("Unexpected exception {} when creating tenant {}", (Object)ex1, (Object)tenant);
                        throw ex1;
                    }
                }
            }
            if (!(nsList = admin.namespaces().getNamespaces(tenant)).contains(namespace)) {
                try {
                    admin.namespaces().createNamespace(namespace);
                }
                catch (PulsarAdminException ex1) {
                    if (ex1 instanceof PulsarAdminException.ConflictException) break block7;
                    LOG.error("Unexpected exception {} when creating namespace {}", (Object)ex1, (Object)namespace);
                    throw ex1;
                }
            }
        }
    }

    public ResourceUsageTopicTransportManager(PulsarService pulsarService) throws PulsarServerException, PulsarAdminException, PulsarClientException {
        this.pulsarService = pulsarService;
        this.pulsarClient = pulsarService.getClient();
        try {
            this.createTenantAndNamespace();
            this.consumer = new ResourceUsageReader();
            this.pTask = new ResourceUsageWriterTask();
        }
        catch (Exception ex) {
            LOG.error("Error initializing resource usage transport manager", (Throwable)ex);
            throw ex;
        }
    }

    @Override
    public void registerResourceUsagePublisher(ResourceUsagePublisher r) {
        this.publisherMap.put(r.getID(), r);
    }

    @Override
    public void unregisterResourceUsagePublisher(ResourceUsagePublisher r) {
        this.publisherMap.remove(r.getID());
    }

    @Override
    public void registerResourceUsageConsumer(ResourceUsageConsumer r) {
        this.consumerMap.put(r.getID(), r);
    }

    @Override
    public void unregisterResourceUsageConsumer(ResourceUsageConsumer r) {
        this.consumerMap.remove(r.getID());
    }

    @Override
    public void close() throws Exception {
        try {
            this.pTask.close();
            this.consumer.close();
        }
        catch (Exception ex1) {
            LOG.error("Error closing producer/consumer for resource-usage topic", (Throwable)ex1);
        }
    }

    private class ResourceUsageReader
    implements ReaderListener<byte[]>,
    AutoCloseable {
        private final ResourceUsageInfo recdUsageInfo = new ResourceUsageInfo();
        private final Reader<byte[]> consumer;

        public ResourceUsageReader() throws PulsarClientException {
            this.consumer = ResourceUsageTopicTransportManager.this.pulsarClient.newReader().topic(ResourceUsageTopicTransportManager.RESOURCE_USAGE_TOPIC_NAME).startMessageId(MessageId.latest).readerListener(this).create();
        }

        @Override
        public void close() throws Exception {
            this.consumer.close();
        }

        @Override
        public void received(Reader<byte[]> reader, Message<byte[]> msg) {
            long publishTime = msg.getPublishTime();
            long currentTime = System.currentTimeMillis();
            long timeDelta = currentTime - publishTime;
            this.recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(msg.getData()), msg.getData().length);
            if (timeDelta > TimeUnit.SECONDS.toMillis(2 * ResourceUsageTopicTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs())) {
                LOG.error("Stale resource usage msg from broker {} publish time {} current time{}", new Object[]{this.recdUsageInfo.getBroker(), publishTime, currentTime});
                ResourceUsageTopicTransportManager.this.staleMessageCount++;
                return;
            }
            try {
                this.recdUsageInfo.getUsageMapsList().forEach(ru -> {
                    ResourceUsageConsumer owner = (ResourceUsageConsumer)ResourceUsageTopicTransportManager.this.consumerMap.get(ru.getOwner());
                    if (owner != null) {
                        owner.acceptResourceUsage(this.recdUsageInfo.getBroker(), (ResourceUsage)ru);
                    }
                });
            }
            catch (IllegalStateException exception) {
                LOG.error("Resource usage reader: Error parsing incoming message", (Throwable)exception);
            }
            catch (Exception exception) {
                LOG.error("Resource usage reader: Unknown exception while parsing message", (Throwable)exception);
            }
        }
    }

    private class ResourceUsageWriterTask
    implements Runnable,
    AutoCloseable {
        private final Producer<ByteBuffer> producer = this.createProducer();
        private final ScheduledFuture<?> resourceUsagePublishTask;

        private Producer<ByteBuffer> createProducer() throws PulsarClientException {
            int publishDelayMilliSecs = 10;
            int sendTimeoutSecs = 10;
            return ResourceUsageTopicTransportManager.this.pulsarClient.newProducer(Schema.BYTEBUFFER).topic(ResourceUsageTopicTransportManager.RESOURCE_USAGE_TOPIC_NAME).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(false).compressionType(CompressionType.LZ4).create();
        }

        public ResourceUsageWriterTask() throws PulsarClientException {
            this.resourceUsagePublishTask = ResourceUsageTopicTransportManager.this.pulsarService.getExecutor().scheduleAtFixedRate(this, ResourceUsageTopicTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(), ResourceUsageTopicTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(), TimeUnit.SECONDS);
        }

        @Override
        public synchronized void run() {
            if (this.resourceUsagePublishTask.isCancelled()) {
                return;
            }
            if (!ResourceUsageTopicTransportManager.this.publisherMap.isEmpty()) {
                ResourceUsageInfo rUsageInfo = new ResourceUsageInfo();
                rUsageInfo.setBroker(ResourceUsageTopicTransportManager.this.pulsarService.getBrokerServiceUrl());
                ResourceUsageTopicTransportManager.this.publisherMap.forEach((key, item) -> item.fillResourceUsage(rUsageInfo.addUsageMap()));
                ByteBuf buf = PulsarByteBufAllocator.DEFAULT.heapBuffer(rUsageInfo.getSerializedSize());
                rUsageInfo.writeTo(buf);
                this.producer.sendAsync(buf.nioBuffer()).whenComplete((id, ex) -> {
                    if (null != ex) {
                        LOG.error("Resource usage publisher: error sending message ID {}", id, ex);
                    }
                    buf.release();
                });
            }
        }

        @Override
        public synchronized void close() throws Exception {
            this.resourceUsagePublishTask.cancel(true);
            this.producer.close();
        }
    }
}

