package org.apache.beam.it.gcp.pubsub;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.SchemaServiceSettings;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.utils.ExceptionUtils;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubsubResourceManager.class */
public final class PubsubResourceManager implements ResourceManager {
    private static final int DEFAULT_ACK_DEADLINE_SECONDS = 600;
    private static final String RESOURCE_NAME_SEPARATOR = "-";
    private static final int FAILSAFE_MAX_RETRIES = 5;
    private static final double FAILSAFE_RETRY_JITTER = 0.1d;
    private final String testId;
    private final String projectId;
    private final PubsubPublisherFactory publisherFactory;
    private final TopicAdminClient topicAdminClient;
    private final SubscriptionAdminClient subscriptionAdminClient;
    private final SchemaServiceClient schemaServiceClient;
    private final Set<TopicName> createdTopics;
    private final Set<SubscriptionName> createdSubscriptions;
    private final Set<SchemaName> createdSchemas;
    private static final Logger LOG = LoggerFactory.getLogger(PubsubResourceManager.class);
    private static final Duration FAILSAFE_RETRY_DELAY = Duration.ofSeconds(10);
    private static final Duration FAILSAFE_RETRY_MAX_DELAY = Duration.ofSeconds(60);

    /* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubsubResourceManager$Builder.class */
    public static final class Builder {
        private final String projectId;
        private final String testName;
        private CredentialsProvider credentialsProvider;

        private Builder(String str, String str2, CredentialsProvider credentialsProvider) {
            this.testName = str;
            this.projectId = str2;
            this.credentialsProvider = credentialsProvider;
        }

        public Builder credentialsProvider(CredentialsProvider credentialsProvider) {
            this.credentialsProvider = credentialsProvider;
            return this;
        }

        public PubsubResourceManager build() throws IOException {
            if (this.credentialsProvider == null) {
                throw new IllegalArgumentException("Unable to find credentials. Please provide credentials to authenticate to GCP");
            }
            return new PubsubResourceManager(this);
        }
    }

    private PubsubResourceManager(Builder builder) throws IOException {
        this(builder.testName, builder.projectId, new PubsubPublisherFactory(builder.credentialsProvider), TopicAdminClient.create(TopicAdminSettings.newBuilder().setCredentialsProvider(builder.credentialsProvider).build()), SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder().setCredentialsProvider(builder.credentialsProvider).build()), SchemaServiceClient.create(SchemaServiceSettings.newBuilder().setCredentialsProvider(builder.credentialsProvider).build()));
    }

    @VisibleForTesting
    PubsubResourceManager(String str, String str2, PubsubPublisherFactory pubsubPublisherFactory, TopicAdminClient topicAdminClient, SubscriptionAdminClient subscriptionAdminClient, SchemaServiceClient schemaServiceClient) {
        this.projectId = str2;
        this.testId = PubsubUtils.createTestId(str);
        this.publisherFactory = pubsubPublisherFactory;
        this.topicAdminClient = topicAdminClient;
        this.subscriptionAdminClient = subscriptionAdminClient;
        this.createdTopics = Collections.synchronizedSet(new HashSet());
        this.createdSubscriptions = Collections.synchronizedSet(new HashSet());
        this.createdSchemas = Collections.synchronizedSet(new HashSet());
        this.schemaServiceClient = schemaServiceClient;
    }

    public static Builder builder(String str, String str2, CredentialsProvider credentialsProvider) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "testName can not be null or empty");
        Preconditions.checkArgument(!str2.isEmpty(), "projectId can not be empty");
        return new Builder(str, str2, credentialsProvider);
    }

    public String getTestId() {
        return this.testId;
    }

    public TopicName createTopic(String str) {
        Preconditions.checkArgument(!str.isEmpty(), "topicName can not be empty");
        checkIsUsable();
        return createTopicInternal(getTopicName(str));
    }

    public TopicName createTopicWithoutPrefix(String str) {
        Preconditions.checkArgument(!str.isEmpty(), "topicName can not be empty");
        checkIsUsable();
        return createTopicInternal(TopicName.of(this.projectId, str));
    }

    public SubscriptionName createSubscription(TopicName topicName, String str) {
        Preconditions.checkArgument(!str.isEmpty(), "subscriptionName can not be empty");
        checkIsUsable();
        if (!this.createdTopics.contains(topicName)) {
            throw new IllegalArgumentException("Can not create a subscription for a topic not managed by this instance.");
        }
        LOG.info("Creating subscription '{}' for topic '{}'", str, topicName);
        Subscription subscription = (Subscription) Failsafe.with(retryOnDeadlineExceeded(), new RetryPolicy[0]).get(() -> {
            return this.subscriptionAdminClient.createSubscription(getSubscriptionName(str), topicName, PushConfig.getDefaultInstance(), DEFAULT_ACK_DEADLINE_SECONDS);
        });
        SubscriptionName subscriptionName = PubsubUtils.toSubscriptionName(subscription);
        this.createdSubscriptions.add(getSubscriptionName(str));
        LOG.info("Subscription '{}' for topic '{}' was created successfully!", subscription.getName(), topicName);
        return subscriptionName;
    }

    public String publish(TopicName topicName, Map<String, String> map, ByteString byteString) throws PubsubResourceManagerException {
        checkIsUsable();
        if (!this.createdTopics.contains(topicName)) {
            throw new IllegalArgumentException("Can not publish to a topic not managed by this instance.");
        }
        LOG.info("Publishing message with {} bytes to topic '{}'", Integer.valueOf(byteString.size()), topicName);
        PubsubMessage build = PubsubMessage.newBuilder().putAllAttributes(map).setData(byteString).build();
        try {
            Publisher createPublisher = this.publisherFactory.createPublisher(topicName);
            String str = (String) createPublisher.publish(build).get();
            LOG.info("Message published with id '{}'", str);
            createPublisher.shutdown();
            return str;
        } catch (Exception e) {
            throw new PubsubResourceManagerException("Error publishing message to Pubsub", e);
        }
    }

    public PullResponse pull(SubscriptionName subscriptionName, int i) {
        LOG.info("Pulling messages from subscription '{}'", subscriptionName);
        PullResponse pull = this.subscriptionAdminClient.pull(subscriptionName, i);
        LOG.info("Received {} messages from subscription '{}'", Integer.valueOf(pull.getReceivedMessagesCount()), subscriptionName);
        return pull;
    }

    public String createSchema(Schema.Type type, String str, Encoding encoding, TopicName topicName) {
        Schema createSchema = this.schemaServiceClient.createSchema(ProjectName.newBuilder().setProject(this.projectId).build(), Schema.newBuilder().setType(type).setDefinition(str).build(), "schema-" + this.testId + RESOURCE_NAME_SEPARATOR + topicName.getTopic());
        this.createdSchemas.add(SchemaName.parse(createSchema.getName()));
        this.topicAdminClient.updateTopic(UpdateTopicRequest.newBuilder().setUpdateMask(FieldMask.newBuilder().addPaths("schema_settings")).setTopic(Topic.newBuilder().setName(topicName.toString()).setSchemaSettings(SchemaSettings.newBuilder().setSchema(createSchema.getName()).setEncoding(encoding).build()).build()).build());
        return createSchema.getName();
    }

    public synchronized void cleanupAll() {
        if (isNotUsable()) {
            return;
        }
        LOG.info("Attempting to cleanup Pub/Sub resource manager.");
        try {
            for (SubscriptionName subscriptionName : this.createdSubscriptions) {
                LOG.info("Deleting subscription '{}'", subscriptionName);
                Failsafe.with(retryOnDeadlineExceeded(), new RetryPolicy[0]).run(() -> {
                    this.subscriptionAdminClient.deleteSubscription(subscriptionName);
                });
            }
            for (TopicName topicName : this.createdTopics) {
                LOG.info("Deleting topic '{}'", topicName);
                Failsafe.with(retryOnDeadlineExceeded(), new RetryPolicy[0]).run(() -> {
                    for (String str : this.topicAdminClient.listTopicSubscriptions(topicName).iterateAll()) {
                        LOG.info("Deleting subscription '{}'", str);
                        this.subscriptionAdminClient.deleteSubscription(str);
                    }
                    this.topicAdminClient.deleteTopic(topicName);
                });
            }
            for (SchemaName schemaName : this.createdSchemas) {
                LOG.info("Deleting schema '{}'", schemaName);
                Failsafe.with(retryOnDeadlineExceeded(), new RetryPolicy[0]).run(() -> {
                    this.schemaServiceClient.deleteSchema(schemaName);
                });
            }
            LOG.info("Manager successfully cleaned up.");
        } finally {
            this.subscriptionAdminClient.close();
            this.topicAdminClient.close();
            this.schemaServiceClient.close();
        }
    }

    SubscriptionName getSubscriptionName(String str) {
        return SubscriptionName.of(this.projectId, this.testId + RESOURCE_NAME_SEPARATOR + str);
    }

    TopicName getTopicName(String str) {
        return TopicName.of(this.projectId, this.testId + RESOURCE_NAME_SEPARATOR + str);
    }

    private void checkIsUsable() throws IllegalStateException {
        if (isNotUsable()) {
            throw new IllegalStateException("Manager has cleaned up resources and is unusable.");
        }
    }

    private TopicName createTopicInternal(TopicName topicName) {
        LOG.info("Creating topic '{}'...", topicName.toString());
        TopicName topicName2 = PubsubUtils.toTopicName((Topic) Failsafe.with(retryOnDeadlineExceeded(), new RetryPolicy[0]).get(() -> {
            return this.topicAdminClient.createTopic(topicName);
        }));
        this.createdTopics.add(topicName2);
        LOG.info("Topic '{}' was created successfully!", topicName2);
        return topicName2;
    }

    private boolean isNotUsable() {
        return this.topicAdminClient.isShutdown() || this.subscriptionAdminClient.isShutdown();
    }

    private static <T> RetryPolicy<T> retryOnDeadlineExceeded() {
        return ((RetryPolicyBuilder) RetryPolicy.builder().handleIf(th -> {
            return ExceptionUtils.containsType(th, DeadlineExceededException.class);
        })).withMaxRetries(FAILSAFE_MAX_RETRIES).withBackoff(FAILSAFE_RETRY_DELAY, FAILSAFE_RETRY_MAX_DELAY).withJitter(FAILSAFE_RETRY_JITTER).build();
    }
}
