package org.apache.pulsar.broker.resourcegroup;

import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.Runnables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.class */
public class ResourceUsageTopicTransportManager implements ResourceUsageTransportManager {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTopicTransportManager.class);
    public static final String RESOURCE_USAGE_TOPIC_NAME = "non-persistent://pulsar/system/resource-usage";
    private final PulsarService pulsarService;
    private final PulsarClient pulsarClient;
    private final ResourceUsageWriterTask pTask;
    private final ResourceUsageReader consumer;
    private final Map<String, ResourceUsagePublisher> publisherMap = new ConcurrentHashMap();
    private final Map<String, ResourceUsageConsumer> consumerMap = new ConcurrentHashMap();
    private long staleMessageCount = 0;

    /* loaded from: input_file:org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager$ResourceUsageReader.class */
    private class ResourceUsageReader implements ReaderListener<byte[]>, AutoCloseable {
        private final ResourceUsageInfo recdUsageInfo = new ResourceUsageInfo();
        private final Reader<byte[]> consumer;

        public ResourceUsageReader() throws PulsarClientException {
            this.consumer = ResourceUsageTopicTransportManager.this.pulsarClient.newReader().topic(ResourceUsageTopicTransportManager.RESOURCE_USAGE_TOPIC_NAME).startMessageId(MessageId.latest).readerListener(this).create();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.consumer.close();
        }

        public void received(Reader<byte[]> reader, Message<byte[]> message) {
            long publishTime = message.getPublishTime();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis - publishTime;
            this.recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(message.getData()), message.getData().length);
            if (j > TimeUnit.SECONDS.toMillis(2 * ResourceUsageTopicTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs())) {
                ResourceUsageTopicTransportManager.LOG.error("Stale resource usage msg from broker {} publish time {} current time{}", new Object[]{this.recdUsageInfo.getBroker(), Long.valueOf(publishTime), Long.valueOf(currentTimeMillis)});
                ResourceUsageTopicTransportManager.access$408(ResourceUsageTopicTransportManager.this);
                return;
            }
            try {
                this.recdUsageInfo.getUsageMapsList().forEach(resourceUsage -> {
                    ResourceUsageConsumer resourceUsageConsumer = (ResourceUsageConsumer) ResourceUsageTopicTransportManager.this.consumerMap.get(resourceUsage.getOwner());
                    if (resourceUsageConsumer != null) {
                        resourceUsageConsumer.acceptResourceUsage(this.recdUsageInfo.getBroker(), resourceUsage);
                    }
                });
            } catch (IllegalStateException e) {
                ResourceUsageTopicTransportManager.LOG.error("Resource usage reader: Error parsing incoming message", e);
            } catch (Exception e2) {
                ResourceUsageTopicTransportManager.LOG.error("Resource usage reader: Unknown exception while parsing message", e2);
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager$ResourceUsageWriterTask.class */
    private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
        private final Producer<ByteBuffer> producer = createProducer();
        private final ScheduledFuture<?> resourceUsagePublishTask;

        private Producer<ByteBuffer> createProducer() throws PulsarClientException {
            return ResourceUsageTopicTransportManager.this.pulsarClient.newProducer(Schema.BYTEBUFFER).topic(ResourceUsageTopicTransportManager.RESOURCE_USAGE_TOPIC_NAME).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(false).compressionType(CompressionType.LZ4).create();
        }

        public ResourceUsageWriterTask() throws PulsarClientException {
            this.resourceUsagePublishTask = ResourceUsageTopicTransportManager.this.pulsarService.getExecutor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this), ResourceUsageTopicTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(), ResourceUsageTopicTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(), TimeUnit.SECONDS);
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            if (this.resourceUsagePublishTask.isCancelled() || ResourceUsageTopicTransportManager.this.publisherMap.isEmpty()) {
                return;
            }
            ResourceUsageInfo resourceUsageInfo = new ResourceUsageInfo();
            resourceUsageInfo.setBroker(ResourceUsageTopicTransportManager.this.pulsarService.getBrokerServiceUrl());
            ResourceUsageTopicTransportManager.this.publisherMap.forEach((str, resourceUsagePublisher) -> {
                resourceUsagePublisher.fillResourceUsage(resourceUsageInfo.addUsageMap());
            });
            ByteBuf heapBuffer = PulsarByteBufAllocator.DEFAULT.heapBuffer(resourceUsageInfo.getSerializedSize());
            resourceUsageInfo.writeTo(heapBuffer);
            this.producer.sendAsync(heapBuffer.nioBuffer()).whenComplete((messageId, th) -> {
                if (null != th) {
                    ResourceUsageTopicTransportManager.LOG.error("Resource usage publisher: error sending message ID {}", messageId, th);
                }
                heapBuffer.release();
            });
        }

        @Override // java.lang.AutoCloseable
        public synchronized void close() throws Exception {
            this.resourceUsagePublishTask.cancel(true);
            this.producer.close();
        }
    }

    private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
        TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME);
        PulsarAdmin adminClient = this.pulsarService.getAdminClient();
        ServiceConfiguration config = this.pulsarService.getConfig();
        String clusterName = config.getClusterName();
        String tenant = topicName.getTenant();
        String namespace = topicName.getNamespace();
        if (!adminClient.tenants().getTenants().contains(tenant)) {
            try {
                adminClient.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(new String[]{clusterName})));
            } catch (PulsarAdminException e) {
                if (!(e instanceof PulsarAdminException.ConflictException)) {
                    LOG.error("Unexpected exception {} when creating tenant {}", e, tenant);
                    throw e;
                }
            }
        }
        if (adminClient.namespaces().getNamespaces(tenant).contains(namespace)) {
            return;
        }
        try {
            adminClient.namespaces().createNamespace(namespace);
        } catch (PulsarAdminException e2) {
            if (e2 instanceof PulsarAdminException.ConflictException) {
                return;
            }
            LOG.error("Unexpected exception {} when creating namespace {}", e2, namespace);
            throw e2;
        }
    }

    public ResourceUsageTopicTransportManager(PulsarService pulsarService) throws PulsarServerException, PulsarAdminException, PulsarClientException {
        this.pulsarService = pulsarService;
        this.pulsarClient = pulsarService.getClient();
        try {
            createTenantAndNamespace();
            this.consumer = new ResourceUsageReader();
            this.pTask = new ResourceUsageWriterTask();
        } catch (Exception e) {
            LOG.error("Error initializing resource usage transport manager", e);
            throw e;
        }
    }

    @Override // org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager
    public void registerResourceUsagePublisher(ResourceUsagePublisher resourceUsagePublisher) {
        this.publisherMap.put(resourceUsagePublisher.getID(), resourceUsagePublisher);
    }

    @Override // org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager
    public void unregisterResourceUsagePublisher(ResourceUsagePublisher resourceUsagePublisher) {
        this.publisherMap.remove(resourceUsagePublisher.getID());
    }

    @Override // org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager
    public void registerResourceUsageConsumer(ResourceUsageConsumer resourceUsageConsumer) {
        this.consumerMap.put(resourceUsageConsumer.getID(), resourceUsageConsumer);
    }

    @Override // org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager
    public void unregisterResourceUsageConsumer(ResourceUsageConsumer resourceUsageConsumer) {
        this.consumerMap.remove(resourceUsageConsumer.getID());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            this.pTask.close();
            this.consumer.close();
        } catch (Exception e) {
            LOG.error("Error closing producer/consumer for resource-usage topic", e);
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager.access$408(org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$408(org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager r8) {
        /*
            r0 = r8
            r1 = r0
            long r1 = r1.staleMessageCount
            // decode failed: arraycopy: source index -1 out of bounds for object array[8]
            r2 = 1
            long r1 = r1 + r2
            r0.staleMessageCount = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager.access$408(org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager):long");
    }

    static {
    }
}
