package org.apache.beam.sdk.io.gcp.pubsub;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.class */
public class TestPubsubSignal implements TestRule {
    private static final String RESULT_TOPIC_NAME = "result";
    private static final String RESULT_SUCCESS_MESSAGE = "SUCCESS";
    private static final String START_TOPIC_NAME = "start";
    private static final String START_SIGNAL_MESSAGE = "START SIGNAL";
    PubsubClient pubsub;
    private TestPubsubOptions pipelineOptions;

    @Nullable
    private PubsubClient.TopicPath resultTopicPath = null;

    @Nullable
    private PubsubClient.TopicPath startTopicPath = null;
    private static final Logger LOG = LoggerFactory.getLogger(TestPubsubSignal.class);
    private static final String NO_ID_ATTRIBUTE = null;
    private static final String NO_TIMESTAMP_ATTRIBUTE = null;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal$PublishStart.class */
    static class PublishStart extends PTransform<PBegin, PDone> {
        private final PubsubClient.TopicPath startTopicPath;

        PublishStart(PubsubClient.TopicPath topicPath) {
            this.startTopicPath = topicPath;
        }

        public PDone expand(PBegin pBegin) {
            return pBegin.apply("Start signal", Create.of(TestPubsubSignal.START_SIGNAL_MESSAGE, new String[0])).apply(PubsubIO.writeStrings().to(this.startTopicPath.getPath()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal$PublishSuccessWhen.class */
    public static class PublishSuccessWhen<T> extends PTransform<PCollection<? extends T>, POutput> {
        private final Coder<T> coder;
        private final SerializableFunction<T, String> formatter;
        private final SerializableFunction<Set<T>, Boolean> successPredicate;
        private final PubsubClient.TopicPath resultTopicPath;

        PublishSuccessWhen(Coder<T> coder, SerializableFunction<T, String> serializableFunction, SerializableFunction<Set<T>, Boolean> serializableFunction2, PubsubClient.TopicPath topicPath) {
            this.coder = coder;
            this.formatter = serializableFunction;
            this.successPredicate = serializableFunction2;
            this.resultTopicPath = topicPath;
        }

        public POutput expand(PCollection<? extends T> pCollection) {
            return pCollection.apply(Window.into(new GlobalWindows())).apply(WithKeys.of("dummyKey")).apply("checkAllEventsForSuccess", ParDo.of(new StatefulPredicateCheck(this.coder, this.formatter, this.successPredicate))).apply("publishSuccess", PubsubIO.writeStrings().to(this.resultTopicPath.getPath()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal$StatefulPredicateCheck.class */
    public static class StatefulPredicateCheck<T> extends DoFn<KV<String, ? extends T>, String> {
        private final SerializableFunction<T, String> formatter;
        private SerializableFunction<Set<T>, Boolean> successPredicate;
        private static final String SEEN_EVENTS = "seenEvents";

        @DoFn.StateId(SEEN_EVENTS)
        private final StateSpec<BagState<T>> seenEvents;

        StatefulPredicateCheck(Coder<T> coder, SerializableFunction<T, String> serializableFunction, SerializableFunction<Set<T>, Boolean> serializableFunction2) {
            this.seenEvents = StateSpecs.bag(coder);
            this.formatter = serializableFunction;
            this.successPredicate = serializableFunction2;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, ? extends T>, String>.ProcessContext processContext, @DoFn.StateId("seenEvents") BagState<T> bagState) {
            bagState.add(((KV) processContext.element()).getValue());
            try {
                if (((Boolean) this.successPredicate.apply(ImmutableSet.copyOf(bagState.read()))).booleanValue()) {
                    processContext.output(TestPubsubSignal.RESULT_SUCCESS_MESSAGE);
                }
            } catch (Throwable th) {
                processContext.output("FAILURE: " + th.getMessage());
            }
        }
    }

    public static TestPubsubSignal create() {
        return new TestPubsubSignal((TestPubsubOptions) TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class));
    }

    private TestPubsubSignal(TestPubsubOptions testPubsubOptions) {
        this.pipelineOptions = testPubsubOptions;
    }

    public Statement apply(final Statement statement, final Description description) {
        return new Statement() { // from class: org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal.1
            public void evaluate() throws Throwable {
                if (TestPubsubSignal.this.pubsub != null) {
                    throw new AssertionError("Pubsub client was not shutdown in previous test. Topic path is'" + TestPubsubSignal.this.resultTopicPath + "'. Current test: " + description.getDisplayName());
                }
                try {
                    TestPubsubSignal.this.initializePubsub(description);
                    statement.evaluate();
                } finally {
                    TestPubsubSignal.this.tearDown();
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializePubsub(Description description) throws IOException {
        this.pubsub = PubsubGrpcClient.FACTORY.newClient(NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, this.pipelineOptions);
        PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromName(this.pipelineOptions.getProject(), TestPubsub.createTopicName(description, RESULT_TOPIC_NAME));
        PubsubClient.TopicPath topicPath2 = PubsubClient.topicPathFromName(this.pipelineOptions.getProject(), TestPubsub.createTopicName(description, START_TOPIC_NAME));
        this.pubsub.createTopic(topicPath);
        this.pubsub.createTopic(topicPath2);
        this.resultTopicPath = topicPath;
        this.startTopicPath = topicPath2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tearDown() throws IOException {
        if (this.pubsub == null) {
            return;
        }
        try {
            if (this.resultTopicPath != null) {
                this.pubsub.deleteTopic(this.resultTopicPath);
            }
        } finally {
            this.pubsub.close();
            this.pubsub = null;
            this.resultTopicPath = null;
        }
    }

    public PTransform<PBegin, PDone> signalStart() {
        return new PublishStart(this.startTopicPath);
    }

    public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(Coder<T> coder, SerializableFunction<T, String> serializableFunction, SerializableFunction<Set<T>, Boolean> serializableFunction2) {
        return new PublishSuccessWhen(coder, serializableFunction, serializableFunction2, this.resultTopicPath);
    }

    public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(Coder<T> coder, SerializableFunction<Set<T>, Boolean> serializableFunction) {
        return signalSuccessWhen(coder, (v0) -> {
            return v0.toString();
        }, serializableFunction);
    }

    public Supplier<Void> waitForStart(Duration duration) throws IOException {
        PubsubClient.SubscriptionPath subscriptionPathFromName = PubsubClient.subscriptionPathFromName(this.pipelineOptions.getProject(), "start-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
        this.pubsub.createSubscription(this.startTopicPath, subscriptionPathFromName, (int) duration.getStandardSeconds());
        return Suppliers.memoize(() -> {
            try {
                Preconditions.checkState(START_SIGNAL_MESSAGE.equals(pollForResultForDuration(subscriptionPathFromName, duration)));
                return null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void waitForSuccess(Duration duration) throws IOException {
        PubsubClient.SubscriptionPath subscriptionPathFromName = PubsubClient.subscriptionPathFromName(this.pipelineOptions.getProject(), "result-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
        this.pubsub.createSubscription(this.resultTopicPath, subscriptionPathFromName, (int) duration.getStandardSeconds());
        String pollForResultForDuration = pollForResultForDuration(subscriptionPathFromName, duration);
        if (!RESULT_SUCCESS_MESSAGE.equals(pollForResultForDuration)) {
            throw new AssertionError(pollForResultForDuration);
        }
    }

    private String pollForResultForDuration(PubsubClient.SubscriptionPath subscriptionPath, Duration duration) throws IOException {
        List<PubsubClient.IncomingMessage> list = null;
        DateTime plus = DateTime.now().plus(duration.getMillis());
        do {
            try {
                list = this.pubsub.pull(DateTime.now().getMillis(), subscriptionPath, 1, false);
                this.pubsub.acknowledge(subscriptionPath, (List) list.stream().map(incomingMessage -> {
                    return incomingMessage.ackId;
                }).collect(Collectors.toList()));
                break;
            } catch (StatusRuntimeException e) {
                if (!Status.DEADLINE_EXCEEDED.equals(e.getStatus())) {
                    LOG.warn("(Will retry) Error while polling {} for signal: {}", subscriptionPath, e.getStatus());
                }
                sleep(500L);
            }
        } while (DateTime.now().isBefore(plus));
        if (list == null) {
            throw new AssertionError(String.format("Did not receive signal on %s in %ss", subscriptionPath, Long.valueOf(duration.getStandardSeconds())));
        }
        return new String(list.get(0).elementBytes, StandardCharsets.UTF_8);
    }

    private void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1776922004:
                if (implMethodName.equals("toString")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Object") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.toString();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
