package org.apache.flink.connector.pulsar.testutils.runtime;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

/* loaded from: input_file:org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.class */
public class PulsarRuntimeOperator implements Closeable {
    public static final int DEFAULT_PARTITIONS = 10;
    public static final int NUM_RECORDS_PER_PARTITION = 20;
    public static final String SUBSCRIPTION_NAME = "PulsarRuntimeOperator";
    private final String serviceUrl;
    private final String adminUrl;
    private final PulsarClient client;
    private final PulsarAdmin admin;

    public PulsarRuntimeOperator(String str, String str2) {
        this(str, str, str2, str2);
    }

    public PulsarRuntimeOperator(String str, String str2, String str3, String str4) {
        this.serviceUrl = str2;
        this.adminUrl = str4;
        this.client = (PulsarClient) PulsarExceptionUtils.sneakyClient(() -> {
            return PulsarClient.builder().serviceUrl(str).enableTransaction(true).build();
        });
        this.admin = (PulsarAdmin) PulsarExceptionUtils.sneakyClient(() -> {
            return PulsarAdmin.builder().serviceHttpUrl(str3).build();
        });
    }

    public void setupTopic(String str) {
        Random random = new Random(System.currentTimeMillis());
        setupTopic(str, Schema.STRING, () -> {
            return RandomStringUtils.randomAlphanumeric(10 + random.nextInt(20));
        });
    }

    public <T> void setupTopic(String str, Schema<T> schema, Supplier<T> supplier) {
        setupTopic(str, schema, supplier, 20);
    }

    public <T> void setupTopic(String str, Schema<T> schema, Supplier<T> supplier, int i) {
        createTopic(TopicNameUtils.topicName(str), 10);
        for (int i2 = 0; i2 < 10; i2++) {
            sendMessages(TopicNameUtils.topicNameWithPartition(str, i2), schema, (List) Stream.generate(supplier).limit(i).collect(Collectors.toList()));
        }
    }

    public void createTopic(String str, int i) {
        Preconditions.checkArgument(i >= 0);
        if (i == 0) {
            createNonPartitionedTopic(str);
        } else {
            createPartitionedTopic(str, i);
        }
    }

    public void createSchema(String str, Schema<?> schema) {
        PulsarExceptionUtils.sneakyAdmin(() -> {
            admin().schemas().createSchema(str, schema.getSchemaInfo());
        });
    }

    public void increaseTopicPartitions(String str, int i) {
        Preconditions.checkArgument(((PartitionedTopicMetadata) PulsarExceptionUtils.sneakyAdmin(() -> {
            return admin().topics().getPartitionedTopicMetadata(str);
        })).partitions < i, "The new partition size which should greater than previous size.");
        PulsarExceptionUtils.sneakyAdmin(() -> {
            admin().topics().updatePartitionedTopic(str, i);
        });
    }

    public void deleteTopic(String str) {
        String str2 = TopicNameUtils.topicName(str);
        try {
            if (admin().topics().getPartitionedTopicMetadata(str2).partitions == 0) {
                PulsarExceptionUtils.sneakyAdmin(() -> {
                    admin().topics().delete(str2);
                });
            } else {
                PulsarExceptionUtils.sneakyAdmin(() -> {
                    admin().topics().deletePartitionedTopic(str2);
                });
            }
        } catch (PulsarAdminException.NotFoundException e) {
        } catch (PulsarAdminException e2) {
            PulsarExceptionUtils.sneakyThrow(e2);
        }
    }

    public List<TopicPartition> topicInfo(String str) {
        try {
            return (List) ((List) client().getPartitionsForTopic(str).get()).stream().map(str2 -> {
                return new TopicPartition(str, TopicName.getPartitionIndex(str2));
            }).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    public <T> MessageId sendMessage(String str, Schema<T> schema, T t) {
        List<MessageId> sendMessages = sendMessages(str, schema, Collections.singletonList(t));
        Preconditions.checkArgument(sendMessages.size() == 1);
        return sendMessages.get(0);
    }

    public <T> MessageId sendMessage(String str, Schema<T> schema, String str2, T t) {
        List<MessageId> sendMessages = sendMessages(str, schema, str2, Collections.singletonList(t));
        Preconditions.checkArgument(sendMessages.size() == 1);
        return sendMessages.get(0);
    }

    public <T> List<MessageId> sendMessages(String str, Schema<T> schema, Collection<T> collection) {
        return sendMessages(str, schema, null, collection);
    }

    public <T> List<MessageId> sendMessages(String str, Schema<T> schema, String str2, Collection<T> collection) {
        try {
            Producer<T> createProducer = createProducer(str, schema);
            Throwable th = null;
            try {
                try {
                    ArrayList arrayList = new ArrayList(collection.size());
                    Iterator<T> it = collection.iterator();
                    while (it.hasNext()) {
                        TypedMessageBuilder value = createProducer.newMessage().value(it.next());
                        if (!Strings.isNullOrEmpty(str2)) {
                            value.key(str2);
                        }
                        arrayList.add(value.send());
                    }
                    createProducer.flush();
                    if (createProducer != null) {
                        if (0 != 0) {
                            try {
                                createProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createProducer.close();
                        }
                    }
                    return arrayList;
                } finally {
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            PulsarExceptionUtils.sneakyThrow(e);
            return Collections.emptyList();
        }
    }

    public <T> Message<T> receiveMessage(String str, Schema<T> schema) {
        try {
            Consumer<T> createConsumer = createConsumer(str, schema);
            Throwable th = null;
            try {
                try {
                    Message<T> receive = createConsumer.receive();
                    createConsumer.acknowledge(receive.getMessageId());
                    if (createConsumer != null) {
                        if (0 != 0) {
                            try {
                                createConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createConsumer.close();
                        }
                    }
                    return receive;
                } finally {
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            PulsarExceptionUtils.sneakyThrow(e);
            return null;
        }
    }

    public <T> Message<T> receiveMessage(String str, Schema<T> schema, Duration duration) {
        try {
            Consumer<T> createConsumer = createConsumer(str, schema);
            Throwable th = null;
            try {
                try {
                    Message<T> receive = createConsumer.receive(Math.toIntExact(duration.toMillis()), TimeUnit.MILLISECONDS);
                    createConsumer.acknowledge(receive.getMessageId());
                    if (createConsumer != null) {
                        if (0 != 0) {
                            try {
                                createConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createConsumer.close();
                        }
                    }
                    return receive;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            return null;
        }
    }

    public <T> List<Message<T>> receiveMessages(String str, Schema<T> schema, int i) {
        if (i == 0) {
            return Collections.emptyList();
        }
        if (i < 0) {
            return receiveAllMessages(str, schema, Duration.ofMinutes(1L));
        }
        if (i == 1) {
            return Collections.singletonList(receiveMessage(str, schema));
        }
        try {
            Consumer<T> createConsumer = createConsumer(str, schema);
            Throwable th = null;
            try {
                try {
                    ArrayList arrayList = new ArrayList(i);
                    for (int i2 = 0; i2 < i; i2++) {
                        Message receive = createConsumer.receive();
                        arrayList.add(receive);
                        createConsumer.acknowledge(receive.getMessageId());
                    }
                    if (createConsumer != null) {
                        if (0 != 0) {
                            try {
                                createConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createConsumer.close();
                        }
                    }
                    return arrayList;
                } finally {
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            PulsarExceptionUtils.sneakyThrow(e);
            return Collections.emptyList();
        }
    }

    public <T> List<Message<T>> receiveAllMessages(String str, Schema<T> schema, Duration duration) {
        ArrayList arrayList = new ArrayList();
        Message<T> receiveMessage = receiveMessage(str, schema, duration);
        while (true) {
            Message<T> message = receiveMessage;
            if (message == null) {
                return arrayList;
            }
            arrayList.add(message);
            receiveMessage = receiveMessage(str, schema, duration);
        }
    }

    public TransactionCoordinatorClient coordinatorClient() {
        return client().getTcClient();
    }

    public String serviceUrl() {
        return this.serviceUrl;
    }

    public String adminUrl() {
        return this.adminUrl;
    }

    public PulsarClient client() {
        return this.client;
    }

    public PulsarAdmin admin() {
        return this.admin;
    }

    public Configuration config() {
        Configuration configuration = new Configuration();
        configuration.set(PulsarOptions.PULSAR_SERVICE_URL, serviceUrl());
        configuration.set(PulsarOptions.PULSAR_ADMIN_URL, adminUrl());
        return configuration;
    }

    public Configuration sinkConfig(DeliveryGuarantee deliveryGuarantee) {
        Configuration config = config();
        config.set(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE, deliveryGuarantee);
        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            config.set(PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT, Long.valueOf(Duration.ofMinutes(5L).toMillis()));
            config.set(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true);
            config.set(PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS, 0L);
        }
        return config;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws PulsarClientException {
        if (this.admin != null) {
            this.admin.close();
        }
        if (this.client != null) {
            this.client.close();
        }
    }

    private void createNonPartitionedTopic(String str) {
        try {
            admin().topics().createNonPartitionedTopic(str);
        } catch (PulsarAdminException e) {
            if ((e instanceof PulsarAdminException.ConflictException) && e.getMessage().equals("This topic already exists")) {
                return;
            }
            PulsarExceptionUtils.sneakyThrow(e);
        }
    }

    private void createPartitionedTopic(String str, int i) {
        try {
            admin().topics().createPartitionedTopic(str, i);
        } catch (PulsarAdminException e) {
            if ((e instanceof PulsarAdminException.ConflictException) && e.getMessage().equals("This topic already exists")) {
                return;
            }
            PulsarExceptionUtils.sneakyThrow(e);
        }
    }

    private <T> Producer<T> createProducer(String str, Schema<T> schema) {
        ProducerBuilder accessMode = client().newProducer(schema).topic(str).enableBatching(false).enableMultiSchema(true).accessMode(ProducerAccessMode.Shared);
        accessMode.getClass();
        return (Producer) PulsarExceptionUtils.sneakyClient(accessMode::create);
    }

    private <T> Consumer<T> createConsumer(String str, Schema<T> schema) {
        if (!((List) PulsarExceptionUtils.sneakyAdmin(() -> {
            return admin().topics().getSubscriptions(str);
        })).contains(SUBSCRIPTION_NAME)) {
            PulsarExceptionUtils.sneakyAdmin(() -> {
                admin().topics().createSubscription(str, SUBSCRIPTION_NAME, MessageId.earliest);
            });
        }
        ConsumerBuilder subscriptionType = client().newConsumer(schema).topic(new String[]{str}).subscriptionName(SUBSCRIPTION_NAME).subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Exclusive);
        subscriptionType.getClass();
        return (Consumer) PulsarExceptionUtils.sneakyClient(subscriptionType::subscribe);
    }
}
