package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PushConfig;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.class */
public class TestPubsub implements TestRule {
    private static final String EVENTS_TOPIC_NAME = "events";
    private static final String TOPIC_PREFIX = "integ-test-";
    private final TestPubsubOptions pipelineOptions;
    private final String pubsubEndpoint;
    private final boolean isLocalhost;
    private TopicAdminClient topicAdmin = null;
    private SubscriptionAdminClient subscriptionAdmin = null;
    private PubsubClient.TopicPath eventsTopicPath = null;
    private PubsubClient.SubscriptionPath subscriptionPath = null;
    private ManagedChannel channel = null;
    private TransportChannelProvider channelProvider = null;
    private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
    private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsub$PollingAssertion.class */
    public interface PollingAssertion {
        void waitForUpTo(Duration duration) throws IOException, InterruptedException;
    }

    public static TestPubsub create() {
        return fromOptions(TestPipeline.testingPipelineOptions());
    }

    public static TestPubsub fromOptions(PipelineOptions pipelineOptions) {
        return new TestPubsub((TestPubsubOptions) pipelineOptions.as(TestPubsubOptions.class));
    }

    private TestPubsub(TestPubsubOptions testPubsubOptions) {
        this.pipelineOptions = testPubsubOptions;
        this.pubsubEndpoint = PubsubOptions.targetForRootUrl(this.pipelineOptions.getPubsubRootUrl());
        this.isLocalhost = this.pubsubEndpoint.startsWith("localhost");
    }

    public Statement apply(final Statement statement, final Description description) {
        return new Statement() { // from class: org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.1
            public void evaluate() throws Throwable {
                if (TestPubsub.this.topicAdmin != null || TestPubsub.this.subscriptionAdmin != null) {
                    throw new AssertionError("Pubsub client was not shutdown after previous test. Topic path is'" + TestPubsub.this.eventsTopicPath + "'. Current test: " + description.getDisplayName());
                }
                try {
                    TestPubsub.this.initializePubsub(description);
                    statement.evaluate();
                } finally {
                    TestPubsub.this.tearDown();
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializePubsub(Description description) throws IOException {
        if (this.isLocalhost) {
            this.channel = ManagedChannelBuilder.forTarget(this.pubsubEndpoint).usePlaintext().build();
        } else {
            this.channel = ManagedChannelBuilder.forTarget(this.pubsubEndpoint).useTransportSecurity().build();
        }
        this.channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(this.channel));
        TopicAdminSettings.Builder newBuilder = TopicAdminSettings.newBuilder();
        TestPubsubOptions testPubsubOptions = this.pipelineOptions;
        Objects.requireNonNull(testPubsubOptions);
        this.topicAdmin = TopicAdminClient.create(newBuilder.setCredentialsProvider(testPubsubOptions::getGcpCredential).setTransportChannelProvider(this.channelProvider).setEndpoint(this.pubsubEndpoint).build());
        SubscriptionAdminSettings.Builder newBuilder2 = SubscriptionAdminSettings.newBuilder();
        TestPubsubOptions testPubsubOptions2 = this.pipelineOptions;
        Objects.requireNonNull(testPubsubOptions2);
        this.subscriptionAdmin = SubscriptionAdminClient.create(newBuilder2.setCredentialsProvider(testPubsubOptions2::getGcpCredential).setTransportChannelProvider(this.channelProvider).setEndpoint(this.pubsubEndpoint).build());
        PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromName(this.pipelineOptions.getProject(), createTopicName(description, EVENTS_TOPIC_NAME));
        this.topicAdmin.createTopic(topicPath.getPath());
        this.eventsTopicPath = topicPath;
        PubsubClient.SubscriptionPath subscriptionPath = new PubsubClient.SubscriptionPath(String.format("projects/%s/subscriptions/%s", this.pipelineOptions.getProject(), topicPath().getName() + "_beam_" + ThreadLocalRandom.current().nextLong()));
        this.subscriptionAdmin.createSubscription(subscriptionPath.getPath(), topicPath().getPath(), PushConfig.getDefaultInstance(), DEFAULT_ACK_DEADLINE_SECONDS.intValue());
        this.subscriptionPath = subscriptionPath;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tearDown() {
        if (this.subscriptionAdmin == null || this.topicAdmin == null || this.channel == null) {
            return;
        }
        try {
            if (this.subscriptionPath != null) {
                this.subscriptionAdmin.deleteSubscription(this.subscriptionPath.getPath());
            }
            if (this.eventsTopicPath != null) {
                Iterator it = this.topicAdmin.listTopicSubscriptions(this.eventsTopicPath.getPath()).iterateAll().iterator();
                while (it.hasNext()) {
                    this.subscriptionAdmin.deleteSubscription((String) it.next());
                }
                this.topicAdmin.deleteTopic(this.eventsTopicPath.getPath());
            }
        } finally {
            this.subscriptionAdmin.close();
            this.topicAdmin.close();
            this.channel.shutdown();
            this.subscriptionAdmin = null;
            this.topicAdmin = null;
            this.channelProvider = null;
            this.channel = null;
            this.eventsTopicPath = null;
            this.subscriptionPath = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String createTopicName(Description description, String str) throws IOException {
        StringBuilder sb = new StringBuilder(TOPIC_PREFIX);
        if (description.getClassName() != null) {
            try {
                sb.append(Class.forName(description.getClassName()).getSimpleName()).append("-");
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        }
        if (description.getMethodName() != null) {
            sb.append(description.getMethodName().replaceAll("[\\[\\]]", ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME)).append("-");
        }
        DATETIME_FORMAT.printTo(sb, Instant.now());
        return sb.toString() + "-" + str + "-" + String.valueOf(ThreadLocalRandom.current().nextLong());
    }

    public PubsubClient.TopicPath topicPath() {
        return this.eventsTopicPath;
    }

    public PubsubClient.SubscriptionPath subscriptionPath() {
        return this.subscriptionPath;
    }

    private List<String> listSubscriptions(PubsubClient.TopicPath topicPath) {
        Preconditions.checkNotNull(this.topicAdmin);
        return (List) Streams.stream(this.topicAdmin.listTopicSubscriptions(topicPath.getPath()).iterateAll()).filter(str -> {
            return !str.equals(this.subscriptionPath.getPath());
        }).collect(Collectors.toList());
    }

    public void publish(List<PubsubMessage> list) {
        Preconditions.checkNotNull(this.eventsTopicPath);
        try {
            Publisher.Builder newBuilder = Publisher.newBuilder(this.eventsTopicPath.getPath());
            TestPubsubOptions testPubsubOptions = this.pipelineOptions;
            Objects.requireNonNull(testPubsubOptions);
            Publisher build = newBuilder.setCredentialsProvider(testPubsubOptions::getGcpCredential).setChannelProvider(this.channelProvider).setEndpoint(this.pubsubEndpoint).build();
            try {
                ApiFutures.allAsList((List) list.stream().map(pubsubMessage -> {
                    return build.publish(com.google.pubsub.v1.PubsubMessage.newBuilder().setData(ByteString.copyFrom(pubsubMessage.getPayload())).putAllAttributes(pubsubMessage.getAttributeMap()).build());
                }).collect(Collectors.toList())).get();
                build.shutdown();
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for messages to publish", e);
            } catch (ExecutionException e2) {
                throw new RuntimeException("Error publishing a test message", e2);
            }
        } catch (IOException e3) {
            throw new RuntimeException("Error creating event publisher", e3);
        }
    }

    public List<PubsubMessage> waitForNMessages(int i, Duration duration) throws IOException, InterruptedException {
        Preconditions.checkNotNull(this.subscriptionPath);
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(i);
        Subscriber.Builder newBuilder = Subscriber.newBuilder(this.subscriptionPath.getPath(), (pubsubMessage, ackReplyConsumer) -> {
            if (linkedBlockingDeque.offer(pubsubMessage)) {
                ackReplyConsumer.ack();
            } else {
                ackReplyConsumer.nack();
            }
        });
        TestPubsubOptions testPubsubOptions = this.pipelineOptions;
        Objects.requireNonNull(testPubsubOptions);
        Subscriber build = newBuilder.setCredentialsProvider(testPubsubOptions::getGcpCredential).setChannelProvider(this.channelProvider).setEndpoint(this.pubsubEndpoint).build();
        build.startAsync();
        DateTime dateTime = new DateTime();
        int seconds = duration.toStandardSeconds().getSeconds();
        while (linkedBlockingDeque.size() < i && Seconds.secondsBetween(dateTime, new DateTime()).getSeconds() < seconds) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
        build.stopAsync();
        build.awaitTerminated();
        return (List) linkedBlockingDeque.stream().map(pubsubMessage2 -> {
            return new PubsubMessage(pubsubMessage2.getData().toByteArray(), pubsubMessage2.getAttributesMap(), pubsubMessage2.getMessageId());
        }).collect(Collectors.toList());
    }

    public PollingAssertion assertThatTopicEventuallyReceives(Matcher<PubsubMessage>... matcherArr) {
        return duration -> {
            MatcherAssert.assertThat(waitForNMessages(matcherArr.length, duration), Matchers.containsInAnyOrder(matcherArr));
        };
    }

    @Deprecated
    public void checkIfAnySubscriptionExists(String str, Duration duration) throws InterruptedException, IllegalArgumentException, IOException, TimeoutException {
        try {
            assertSubscriptionEventuallyCreated(str, duration);
        } catch (AssertionError e) {
            throw new TimeoutException(e.getMessage());
        }
    }

    public void assertSubscriptionEventuallyCreated(String str, Duration duration) throws InterruptedException, IllegalArgumentException, IOException {
        int i;
        if (duration.getMillis() <= 0) {
            throw new IllegalArgumentException(String.format("timeoutDuration should be greater than 0", new Object[0]));
        }
        DateTime dateTime = new DateTime();
        int i2 = 0;
        while (true) {
            i = i2;
            if (i != 0 || Seconds.secondsBetween(dateTime, new DateTime()).getSeconds() >= duration.toStandardSeconds().getSeconds()) {
                break;
            }
            Thread.sleep(1000L);
            i2 = Iterables.size(listSubscriptions(topicPath()));
        }
        if (i <= 0) {
            throw new AssertionError("Timed out before subscription created for " + topicPath());
        }
    }
}
