/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.writer.topic.register;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicExtractor;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister;
import org.apache.flink.connector.pulsar.sink.writer.topic.metadata.CachedTopicMetadataProvider;
import org.apache.flink.connector.pulsar.sink.writer.topic.metadata.NotExistedTopicMetadataProvider;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;

@Internal
public class DynamicTopicRegister<IN>
implements TopicRegister<IN> {
    private static final long serialVersionUID = 4374769306761301456L;
    private final TopicExtractor<IN> topicExtractor;
    private transient PulsarAdmin pulsarAdmin;
    private transient CachedTopicMetadataProvider cachedMetadataProvider;
    private transient NotExistedTopicMetadataProvider notExistedMetadataProvider;
    private transient Cache<String, List<String>> partitionsCache;

    public DynamicTopicRegister(TopicExtractor<IN> topicExtractor) {
        this.topicExtractor = (TopicExtractor)Preconditions.checkNotNull(topicExtractor);
    }

    @Override
    public List<String> topics(IN in) {
        TopicPartition partition = this.topicExtractor.extract(in, this.cachedMetadataProvider);
        String topicName = partition.getFullTopicName();
        if (partition.isPartition()) {
            return Collections.singletonList(topicName);
        }
        try {
            List<String> topics = (List<String>)this.partitionsCache.getIfPresent((Object)topicName);
            if (topics == null) {
                topics = this.queryTopics(topicName);
                this.partitionsCache.put((Object)topicName, topics);
            }
            return topics;
        }
        catch (PulsarAdminException e) {
            throw new FlinkRuntimeException("Failed to query Pulsar topic partitions.", e.getCause());
        }
    }

    private List<String> queryTopics(String topic) throws PulsarAdminException {
        TopicMetadata metadata = this.notExistedMetadataProvider.query(topic);
        if (metadata.isPartitioned()) {
            int partitionSize = metadata.getPartitionSize();
            ArrayList<String> partitions = new ArrayList<String>(partitionSize);
            for (int i = 0; i < partitionSize; ++i) {
                partitions.add(TopicNameUtils.topicNameWithPartition(topic, i));
            }
            return partitions;
        }
        return Collections.singletonList(topic);
    }

    @Override
    public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) {
        this.pulsarAdmin = PulsarClientFactory.createAdmin(sinkConfiguration);
        this.cachedMetadataProvider = new CachedTopicMetadataProvider(this.pulsarAdmin, sinkConfiguration);
        this.notExistedMetadataProvider = new NotExistedTopicMetadataProvider(this.pulsarAdmin, sinkConfiguration);
        long refreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval();
        if (refreshInterval <= 0L) {
            refreshInterval = Long.MAX_VALUE;
        }
        this.partitionsCache = CacheBuilder.newBuilder().expireAfterWrite(refreshInterval, TimeUnit.MILLISECONDS).maximumSize(1000L).build();
        this.topicExtractor.open(sinkConfiguration);
    }

    @Override
    public void close() throws IOException {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
    }
}

