/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.testutils.runtime;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

public class PulsarRuntimeOperator
implements Closeable {
    public static final int DEFAULT_PARTITIONS = 10;
    public static final int NUM_RECORDS_PER_PARTITION = 20;
    public static final String SUBSCRIPTION_NAME = "PulsarRuntimeOperator";
    private final String serviceUrl;
    private final String adminUrl;
    private final PulsarClient client;
    private final PulsarAdmin admin;

    public PulsarRuntimeOperator(String serviceUrl, String adminUrl) {
        this(serviceUrl, serviceUrl, adminUrl, adminUrl);
    }

    public PulsarRuntimeOperator(String serviceUrl, String containerServiceUrl, String adminUrl, String containerAdminUrl) {
        this.serviceUrl = containerServiceUrl;
        this.adminUrl = containerAdminUrl;
        this.client = (PulsarClient)PulsarExceptionUtils.sneakyClient(() -> PulsarClient.builder().serviceUrl(serviceUrl).enableTransaction(true).build());
        this.admin = (PulsarAdmin)PulsarExceptionUtils.sneakyClient(() -> PulsarAdmin.builder().serviceHttpUrl(adminUrl).build());
    }

    public boolean topicExists(String topic) {
        TopicName topicName = TopicName.get((String)topic);
        String namespace = topicName.getNamespace();
        String parsedTopic = topicName.toString();
        return ((List)PulsarExceptionUtils.sneakyAdmin(() -> this.admin().topics().getList(namespace, TopicDomain.persistent))).stream().map(TopicNameUtils::topicName).anyMatch(name -> name.equals(parsedTopic));
    }

    public void setupTopic(String topic) {
        Random random = new Random(System.currentTimeMillis());
        this.setupTopic(topic, Schema.STRING, () -> RandomStringUtils.randomAlphanumeric((int)(10 + random.nextInt(20))));
    }

    public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier) {
        this.setupTopic(topic, schema, supplier, 20);
    }

    public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier, int numRecordsPerSplit) {
        String topicName = TopicNameUtils.topicName((String)topic);
        this.createTopic(topicName, 10);
        for (int i = 0; i < 10; ++i) {
            String partitionName = TopicNameUtils.topicNameWithPartition((String)topic, (int)i);
            List messages = Stream.generate(supplier).limit(numRecordsPerSplit).collect(Collectors.toList());
            this.sendMessages(partitionName, schema, messages);
        }
    }

    public void createTopic(String topic, int numberOfPartitions) {
        Preconditions.checkArgument((numberOfPartitions >= 0 ? 1 : 0) != 0);
        if (numberOfPartitions == 0) {
            this.createNonPartitionedTopic(topic);
        } else {
            this.createPartitionedTopic(topic, numberOfPartitions);
        }
    }

    public void increaseTopicPartitions(String topic, int newPartitionsNum) {
        PartitionedTopicMetadata metadata = (PartitionedTopicMetadata)PulsarExceptionUtils.sneakyAdmin(() -> this.admin().topics().getPartitionedTopicMetadata(topic));
        Preconditions.checkArgument((metadata.partitions < newPartitionsNum ? 1 : 0) != 0, (Object)"The new partition size which should greater than previous size.");
        PulsarExceptionUtils.sneakyAdmin(() -> this.admin().topics().updatePartitionedTopic(topic, newPartitionsNum));
    }

    public void deleteTopic(String topic) {
        PartitionedTopicMetadata metadata;
        String topicName = TopicNameUtils.topicName((String)topic);
        try {
            metadata = this.admin().topics().getPartitionedTopicMetadata(topicName);
        }
        catch (PulsarAdminException.NotFoundException e) {
            return;
        }
        catch (PulsarAdminException e) {
            PulsarExceptionUtils.sneakyThrow((Exception)((Object)e));
            return;
        }
        if (metadata.partitions == 0) {
            PulsarExceptionUtils.sneakyAdmin(() -> this.admin().topics().delete(topicName));
        } else {
            PulsarExceptionUtils.sneakyAdmin(() -> this.admin().topics().deletePartitionedTopic(topicName));
        }
    }

    public List<TopicPartition> topicInfo(String topic) {
        try {
            return ((List)this.client().getPartitionsForTopic(topic).get()).stream().map(p -> new TopicPartition(topic, TopicName.getPartitionIndex((String)p), Collections.singletonList(TopicRange.createFullRange()))).collect(Collectors.toList());
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    public <T> MessageId sendMessage(String topic, Schema<T> schema, T message) {
        List<MessageId> messageIds = this.sendMessages(topic, schema, Collections.singletonList(message));
        Preconditions.checkArgument((messageIds.size() == 1 ? 1 : 0) != 0);
        return messageIds.get(0);
    }

    public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T message) {
        List<MessageId> messageIds = this.sendMessages(topic, schema, key, Collections.singletonList(message));
        Preconditions.checkArgument((messageIds.size() == 1 ? 1 : 0) != 0);
        return messageIds.get(0);
    }

    public <T> List<MessageId> sendMessages(String topic, Schema<T> schema, Collection<T> messages) {
        return this.sendMessages(topic, schema, null, messages);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public <T> List<MessageId> sendMessages(String topic, Schema<T> schema, String key, Collection<T> messages) {
        try (Producer<T> producer = this.createProducer(topic, schema);){
            ArrayList<MessageId> messageIds = new ArrayList<MessageId>(messages.size());
            for (T message : messages) {
                TypedMessageBuilder builder = producer.newMessage().value(message);
                if (!Strings.isNullOrEmpty((String)key)) {
                    builder.key(key);
                }
                MessageId messageId = builder.send();
                messageIds.add(messageId);
            }
            producer.flush();
            ArrayList<MessageId> arrayList = messageIds;
            return arrayList;
        }
        catch (PulsarClientException e) {
            PulsarExceptionUtils.sneakyThrow((Exception)((Object)e));
            return Collections.emptyList();
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public <T> Message<T> receiveMessage(String topic, Schema<T> schema) {
        try (Consumer<T> consumer = this.createConsumer(topic, schema);){
            Message message2 = consumer.receive();
            consumer.acknowledge(message2.getMessageId());
            Message message = message2;
            return message;
        }
        catch (PulsarClientException e) {
            PulsarExceptionUtils.sneakyThrow((Exception)((Object)e));
            return null;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public <T> Message<T> receiveMessage(String topic, Schema<T> schema, Duration timeout) {
        try (Consumer<T> consumer = this.createConsumer(topic, schema);){
            Message message2 = consumer.receive(Math.toIntExact(timeout.toMillis()), TimeUnit.MILLISECONDS);
            consumer.acknowledge(message2.getMessageId());
            Message message = message2;
            return message;
        }
        catch (Exception e) {
            return null;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public <T> List<Message<T>> receiveMessages(String topic, Schema<T> schema, int counts) {
        if (counts == 0) {
            return Collections.emptyList();
        }
        if (counts < 0) {
            return this.receiveAllMessages(topic, schema, Duration.ofMinutes(1L));
        }
        if (counts == 1) {
            Message<T> message = this.receiveMessage(topic, schema);
            return Collections.singletonList(message);
        }
        try (Consumer<T> consumer = this.createConsumer(topic, schema);){
            ArrayList<Message<T>> messages = new ArrayList<Message<T>>(counts);
            for (int i = 0; i < counts; ++i) {
                Message message = consumer.receive();
                messages.add(message);
                consumer.acknowledge(message.getMessageId());
            }
            ArrayList<Message<T>> arrayList = messages;
            return arrayList;
        }
        catch (PulsarClientException e) {
            PulsarExceptionUtils.sneakyThrow((Exception)((Object)e));
            return Collections.emptyList();
        }
    }

    public <T> List<Message<T>> receiveAllMessages(String topic, Schema<T> schema, Duration timeout) {
        ArrayList<Message<T>> messages = new ArrayList<Message<T>>();
        Message<T> message = this.receiveMessage(topic, schema, timeout);
        while (message != null) {
            messages.add(message);
            message = this.receiveMessage(topic, schema, timeout);
        }
        return messages;
    }

    public TransactionCoordinatorClient coordinatorClient() {
        return ((PulsarClientImpl)this.client()).getTcClient();
    }

    public String serviceUrl() {
        return this.serviceUrl;
    }

    public String adminUrl() {
        return this.adminUrl;
    }

    public PulsarClient client() {
        return this.client;
    }

    public PulsarAdmin admin() {
        return this.admin;
    }

    public Configuration config() {
        Configuration configuration = new Configuration();
        configuration.set(PulsarOptions.PULSAR_SERVICE_URL, (Object)this.serviceUrl());
        configuration.set(PulsarOptions.PULSAR_ADMIN_URL, (Object)this.adminUrl());
        return configuration;
    }

    public Configuration sinkConfig(DeliveryGuarantee deliveryGuarantee) {
        Configuration configuration = this.config();
        configuration.set(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE, (Object)deliveryGuarantee);
        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            configuration.set(PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT, (Object)Duration.ofMinutes(5L).toMillis());
            configuration.set(PulsarOptions.PULSAR_ENABLE_TRANSACTION, (Object)true);
            configuration.set(PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS, (Object)0L);
        }
        return configuration;
    }

    @Override
    public void close() throws IOException {
        if (this.admin != null) {
            this.admin.close();
        }
        if (this.client != null) {
            this.client.close();
        }
    }

    private void createNonPartitionedTopic(String topic) {
        block2: {
            try {
                this.admin().topics().createNonPartitionedTopic(topic);
            }
            catch (PulsarAdminException e) {
                if (e instanceof PulsarAdminException.ConflictException && e.getMessage().equals("This topic already exists")) break block2;
                PulsarExceptionUtils.sneakyThrow((Exception)((Object)e));
            }
        }
    }

    private void createPartitionedTopic(String topic, int numberOfPartitions) {
        block2: {
            try {
                this.admin().topics().createPartitionedTopic(topic, numberOfPartitions);
            }
            catch (PulsarAdminException e) {
                if (e instanceof PulsarAdminException.ConflictException && e.getMessage().equals("This topic already exists")) break block2;
                PulsarExceptionUtils.sneakyThrow((Exception)((Object)e));
            }
        }
    }

    public <T> Producer<T> createProducer(String topic, Schema<T> schema) {
        ProducerBuilder builder = this.client().newProducer(schema).topic(topic).enableBatching(false).enableMultiSchema(true).accessMode(ProducerAccessMode.Shared);
        return (Producer)PulsarExceptionUtils.sneakyClient(() -> ((ProducerBuilder)builder).create());
    }

    private <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
        List subscriptions = (List)PulsarExceptionUtils.sneakyAdmin(() -> this.admin().topics().getSubscriptions(topic));
        if (!subscriptions.contains(SUBSCRIPTION_NAME)) {
            PulsarExceptionUtils.sneakyAdmin(() -> this.admin().topics().createSubscription(topic, SUBSCRIPTION_NAME, MessageId.earliest));
        }
        ConsumerBuilder builder = this.client().newConsumer(schema).topic(new String[]{topic}).subscriptionName(SUBSCRIPTION_NAME).subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Exclusive);
        return (Consumer)PulsarExceptionUtils.sneakyClient(() -> ((ConsumerBuilder)builder).subscribe());
    }
}

