/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class ReadWriteIT {
    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
    private static final CloudZone ZONE = CloudZone.parse((String)"us-central1-b");
    private static final int MESSAGE_COUNT = 90;
    @Rule
    public transient TestPubsubSignal signal = TestPubsubSignal.create();
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private final Deque<Runnable> cleanupActions = new ArrayDeque<Runnable>();

    private static ProjectId getProject(PipelineOptions options) {
        return ProjectId.of((String)((String)Preconditions.checkArgumentNotNull((Object)((GcpOptions)options.as(GcpOptions.class)).getProject())));
    }

    private static String randomName() {
        return "beam_it_resource_" + ThreadLocalRandom.current().nextLong();
    }

    private static AdminClient newAdminClient() {
        return AdminClient.create((AdminClientSettings)AdminClientSettings.newBuilder().setRegion(ZONE.region()).build());
    }

    private TopicPath createTopic(ProjectId id) throws Exception {
        TopicPath toReturn = ((TopicPath.Builder)((TopicPath.Builder)TopicPath.newBuilder().setProject(id)).setLocation(ZONE)).setName(TopicName.of((String)ReadWriteIT.randomName())).build();
        Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString());
        topic.getPartitionConfigBuilder().setCount(2L).setCapacity(Topic.PartitionConfig.Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4));
        topic.getRetentionConfigBuilder().setPerPartitionBytes(0x780000000L);
        this.cleanupActions.addLast(() -> {
            try {
                AdminClient client = ReadWriteIT.newAdminClient();
                Throwable throwable = null;
                try {
                    client.deleteTopic(toReturn).get();
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (client != null) {
                        ReadWriteIT.$closeResource(throwable, (AutoCloseable)client);
                    }
                }
            }
            catch (Throwable t) {
                LOG.error("Failed to clean up topic.", t);
            }
        });
        try (AdminClient client = ReadWriteIT.newAdminClient();){
            client.createTopic(topic.build()).get();
        }
        return toReturn;
    }

    private SubscriptionPath createSubscription(TopicPath topic) throws Exception {
        SubscriptionPath toReturn = ((SubscriptionPath.Builder)((SubscriptionPath.Builder)SubscriptionPath.newBuilder().setProject(topic.project())).setLocation(ZONE)).setName(SubscriptionName.of((String)ReadWriteIT.randomName())).build();
        Subscription.Builder subscription = Subscription.newBuilder().setName(toReturn.toString());
        subscription.getDeliveryConfigBuilder().setDeliveryRequirement(Subscription.DeliveryConfig.DeliveryRequirement.DELIVER_IMMEDIATELY);
        subscription.setTopic(topic.toString());
        this.cleanupActions.addLast(() -> {
            try {
                AdminClient client = ReadWriteIT.newAdminClient();
                Throwable throwable = null;
                try {
                    client.deleteSubscription(toReturn).get();
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (client != null) {
                        ReadWriteIT.$closeResource(throwable, (AutoCloseable)client);
                    }
                }
            }
            catch (Throwable t) {
                LOG.error("Failed to clean up subscription.", t);
            }
        });
        try (AdminClient client = ReadWriteIT.newAdminClient();){
            client.createSubscription(subscription.build(), BacklogLocation.BEGINNING).get();
        }
        return toReturn;
    }

    @After
    public void tearDown() {
        while (!this.cleanupActions.isEmpty()) {
            this.cleanupActions.removeLast().run();
        }
    }

    public static void writeMessages(TopicPath topicPath, Pipeline pipeline) {
        PCollection trigger = (PCollection)pipeline.apply((PTransform)Create.of((Object)null, (Object[])new Void[0]));
        PCollection indexes = (PCollection)trigger.apply("createIndexes", (PTransform)new CustomCreate());
        PCollection messages = (PCollection)indexes.apply("createMessages", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Integer, PubSubMessage>((SerializableFunction & Serializable)index -> Message.builder().setData(ByteString.copyFromUtf8((String)index.toString())).build().toProto()){}));
        messages = (PCollection)messages.apply("addUuids", PubsubLiteIO.addUuids());
        messages.apply("writeMessages", PubsubLiteIO.write((PublisherOptions)PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
    }

    public static PCollection<SequencedMessage> readMessages(SubscriptionPath subscriptionPath, Pipeline pipeline) {
        PCollection messages = (PCollection)pipeline.apply("readMessages", PubsubLiteIO.read((SubscriberOptions)SubscriberOptions.newBuilder().setSubscriptionPath(subscriptionPath).build()));
        return messages;
    }

    public static SimpleFunction<SequencedMessage, Integer> extractIds() {
        return new SimpleFunction<SequencedMessage, Integer>(){

            public Integer apply(SequencedMessage input) {
                return Integer.parseInt(input.getMessage().getData().toStringUtf8());
            }
        };
    }

    public static SerializableFunction<Set<Integer>, Boolean> testIds() {
        return (SerializableFunction & Serializable)ids -> {
            LOG.info("Ids are: {}", ids);
            Set target = IntStream.range(0, 90).boxed().collect(Collectors.toSet());
            return target.equals(ids);
        };
    }

    @Test
    public void testReadWrite() throws Exception {
        ((StreamingOptions)this.pipeline.getOptions().as(StreamingOptions.class)).setStreaming(true);
        ((TestPipelineOptions)this.pipeline.getOptions().as(TestPipelineOptions.class)).setBlockOnRun(false);
        TopicPath topic = this.createTopic(ReadWriteIT.getProject(this.pipeline.getOptions()));
        SubscriptionPath subscription = null;
        Exception lastException = null;
        for (int i = 0; i < 30; ++i) {
            Thread.sleep(1000L);
            try {
                subscription = this.createSubscription(topic);
                break;
            }
            catch (Exception e) {
                lastException = e;
                LOG.info("Retrying exception on subscription creation.", (Throwable)e);
                continue;
            }
        }
        if (subscription == null) {
            throw lastException;
        }
        ReadWriteIT.writeMessages(topic, (Pipeline)this.pipeline);
        PCollection<SequencedMessage> messages = ReadWriteIT.readMessages(subscription, (Pipeline)this.pipeline);
        PCollection ids = (PCollection)messages.apply((PTransform)MapElements.via(ReadWriteIT.extractIds()));
        ids.apply("PubsubSignalTest", this.signal.signalSuccessWhen((Coder)BigEndianIntegerCoder.of(), ReadWriteIT.testIds()));
        Supplier start = this.signal.waitForStart(Duration.standardMinutes((long)5L));
        this.pipeline.apply(this.signal.signalStart());
        PipelineResult job = this.pipeline.run();
        start.get();
        LOG.info("Running!");
        this.signal.waitForSuccess(Duration.standardMinutes((long)5L));
        try {
            job.cancel();
        }
        catch (UnsupportedOperationException unsupportedOperationException) {
            // empty catch block
        }
    }

    private static class CustomCreate
    extends PTransform<PCollection<Void>, PCollection<Integer>> {
        private CustomCreate() {
        }

        public PCollection<Integer> expand(PCollection<Void> input) {
            return (PCollection)input.apply("createIndexes", (PTransform)FlatMapElements.via((SimpleFunction)new SimpleFunction<Void, Iterable<Integer>>(){

                public Iterable<Integer> apply(Void input) {
                    return IntStream.range(0, 90).boxed().collect(Collectors.toList());
                }
            }));
        }
    }
}

