/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.writer.topic.register;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicExtractor;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

@Internal
public class DynamicTopicRegister<IN>
implements TopicRegister<IN> {
    private static final long serialVersionUID = 4374769306761301456L;
    private final TopicExtractor<IN> topicExtractor;
    private transient PulsarAdmin pulsarAdmin;
    private transient TopicExtractor.TopicMetadataProvider metadataProvider;
    private transient LoadingCache<String, List<String>> partitionsCache;

    public DynamicTopicRegister(TopicExtractor<IN> topicExtractor) {
        this.topicExtractor = (TopicExtractor)Preconditions.checkNotNull(topicExtractor);
    }

    @Override
    public List<String> topics(IN in) {
        TopicPartition partition = this.topicExtractor.extract(in, this.metadataProvider);
        String topicName = partition.getFullTopicName();
        if (partition.isPartition()) {
            return Collections.singletonList(topicName);
        }
        try {
            return (List)this.partitionsCache.get((Object)topicName);
        }
        catch (ExecutionException e) {
            throw new FlinkRuntimeException("Failed to query Pulsar topic partitions.", (Throwable)e);
        }
    }

    @Override
    public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) {
        long refreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval();
        this.pulsarAdmin = PulsarClientFactory.createAdmin(sinkConfiguration);
        this.metadataProvider = new DefaultTopicMetadataProvider(this.pulsarAdmin, refreshInterval);
        this.partitionsCache = CacheBuilder.newBuilder().expireAfterWrite(refreshInterval, TimeUnit.MILLISECONDS).build((CacheLoader)new CacheLoader<String, List<String>>(){

            public List<String> load(String topic) throws Exception {
                TopicMetadata metadata = DynamicTopicRegister.this.metadataProvider.query(topic);
                if (metadata.isPartitioned()) {
                    int partitionSize = metadata.getPartitionSize();
                    ArrayList<String> partitions = new ArrayList<String>(partitionSize);
                    for (int i = 0; i < partitionSize; ++i) {
                        partitions.add(TopicNameUtils.topicNameWithPartition(topic, i));
                    }
                    return partitions;
                }
                return Collections.singletonList(topic);
            }
        });
        this.topicExtractor.open(sinkConfiguration);
    }

    @Override
    public void close() throws IOException {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
    }

    private static class DefaultTopicMetadataProvider
    implements TopicExtractor.TopicMetadataProvider {
        private final LoadingCache<String, TopicMetadata> metadataCache;

        private DefaultTopicMetadataProvider(final PulsarAdmin pulsarAdmin, long refreshInterval) {
            this.metadataCache = CacheBuilder.newBuilder().expireAfterWrite(refreshInterval, TimeUnit.MILLISECONDS).build((CacheLoader)new CacheLoader<String, TopicMetadata>(){

                public TopicMetadata load(String topic) throws Exception {
                    PartitionedTopicMetadata metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
                    return new TopicMetadata(topic, metadata.partitions);
                }
            });
        }

        @Override
        public TopicMetadata query(String topic) throws ExecutionException {
            return (TopicMetadata)this.metadataCache.get((Object)topic);
        }
    }
}

