package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.protobuf.ByteString;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.class */
public class ReadWriteIT {
    private static final int MESSAGE_COUNT = 90;

    @Rule
    public transient TestPubsubSignal signal = TestPubsubSignal.create();

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private final Deque<Runnable> cleanupActions = new ArrayDeque();
    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
    private static final CloudZone ZONE = CloudZone.parse("us-central1-b");
    private static final Schema SAMPLE_BEAM_SCHEMA = Schema.builder().addStringField("numberInString").addInt32Field("numberInInt").build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT$CustomCreate.class */
    public static class CustomCreate extends PTransform<PCollection<Void>, PCollection<Integer>> {
        private CustomCreate() {
        }

        public PCollection<Integer> expand(PCollection<Void> pCollection) {
            return pCollection.apply("createIndexes", FlatMapElements.via(new SimpleFunction<Void, Iterable<Integer>>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.ReadWriteIT.CustomCreate.1
                public Iterable<Integer> apply(Void r4) {
                    return (Iterable) IntStream.range(0, ReadWriteIT.MESSAGE_COUNT).boxed().collect(Collectors.toList());
                }
            }));
        }
    }

    private static ProjectId getProject(PipelineOptions pipelineOptions) {
        return ProjectId.of((String) Preconditions.checkArgumentNotNull(pipelineOptions.as(GcpOptions.class).getProject()));
    }

    private static String randomName() {
        return "beam_it_resource_" + ThreadLocalRandom.current().nextLong();
    }

    private static AdminClient newAdminClient() {
        return AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build());
    }

    private TopicPath createTopic(ProjectId projectId) throws Exception {
        TopicPath build = TopicPath.newBuilder().setProject(projectId).setLocation(ZONE).setName(TopicName.of(randomName())).build();
        Topic.Builder name = Topic.newBuilder().setName(build.toString());
        name.getPartitionConfigBuilder().setCount(2L).setCapacity(Topic.PartitionConfig.Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4));
        name.getRetentionConfigBuilder().setPerPartitionBytes(32212254720L);
        this.cleanupActions.addLast(() -> {
            try {
                AdminClient newAdminClient = newAdminClient();
                try {
                    newAdminClient.deleteTopic(build).get();
                    if (newAdminClient != null) {
                        $closeResource(null, newAdminClient);
                    }
                } catch (Throwable th) {
                    if (newAdminClient != null) {
                        $closeResource(null, newAdminClient);
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                LOG.error("Failed to clean up topic.", th2);
            }
        });
        LOG.info("Creating topic named {}", build);
        AdminClient newAdminClient = newAdminClient();
        Throwable th = null;
        try {
            try {
                newAdminClient.createTopic(name.build()).get();
                if (newAdminClient != null) {
                    $closeResource(null, newAdminClient);
                }
                return build;
            } finally {
            }
        } catch (Throwable th2) {
            if (newAdminClient != null) {
                $closeResource(th, newAdminClient);
            }
            throw th2;
        }
    }

    private SubscriptionPath createSubscription(TopicPath topicPath) throws Exception {
        SubscriptionPath build = SubscriptionPath.newBuilder().setProject(topicPath.project()).setLocation(ZONE).setName(SubscriptionName.of(randomName())).build();
        Subscription.Builder name = Subscription.newBuilder().setName(build.toString());
        name.getDeliveryConfigBuilder().setDeliveryRequirement(Subscription.DeliveryConfig.DeliveryRequirement.DELIVER_IMMEDIATELY);
        name.setTopic(topicPath.toString());
        this.cleanupActions.addLast(() -> {
            try {
                AdminClient newAdminClient = newAdminClient();
                try {
                    newAdminClient.deleteSubscription(build).get();
                    if (newAdminClient != null) {
                        $closeResource(null, newAdminClient);
                    }
                } catch (Throwable th) {
                    if (newAdminClient != null) {
                        $closeResource(null, newAdminClient);
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                LOG.error("Failed to clean up subscription.", th2);
            }
        });
        LOG.info("Creating subscription named {} from topic {}", build, topicPath);
        AdminClient newAdminClient = newAdminClient();
        try {
            newAdminClient.createSubscription(name.build(), BacklogLocation.BEGINNING).get();
            if (newAdminClient != null) {
                $closeResource(null, newAdminClient);
            }
            return build;
        } catch (Throwable th) {
            if (newAdminClient != null) {
                $closeResource(null, newAdminClient);
            }
            throw th;
        }
    }

    @After
    public void tearDown() {
        while (!this.cleanupActions.isEmpty()) {
            this.cleanupActions.removeLast().run();
        }
    }

    public static void writeJsonMessages(TopicPath topicPath, Pipeline pipeline) {
        PCollectionRowTuple.of("input", pipeline.apply(Create.of((Void) null, new Void[0])).apply("createIndexes", new CustomCreate()).apply("format to rows", MapElements.via(new SimpleFunction<Integer, Row>(num -> {
            return Row.withSchema(SAMPLE_BEAM_SCHEMA).addValue(((Integer) Objects.requireNonNull(num)).toString()).addValue(num).build();
        }) { // from class: org.apache.beam.sdk.io.gcp.pubsublite.ReadWriteIT.1
        })).setRowSchema(SAMPLE_BEAM_SCHEMA)).apply("write to pslite", new PubsubLiteWriteSchemaTransformProvider().from(PubsubLiteWriteSchemaTransformProvider.PubsubLiteWriteSchemaTransformConfiguration.builder().setFormat("JSON").setLocation(ZONE.toString()).setTopicName(topicPath.name().value()).setProject(topicPath.project().name().value()).build()).buildTransform());
    }

    public static void writeMessages(TopicPath topicPath, Pipeline pipeline) {
        pipeline.apply(Create.of((Void) null, new Void[0])).apply("createIndexes", new CustomCreate()).apply("createMessages", MapElements.via(new SimpleFunction<Integer, PubSubMessage>(num -> {
            return Message.builder().setData(ByteString.copyFromUtf8(num.toString())).build().toProto();
        }) { // from class: org.apache.beam.sdk.io.gcp.pubsublite.ReadWriteIT.2
        })).apply("addUuids", PubsubLiteIO.addUuids()).apply("writeMessages", PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
    }

    public static PCollection<SequencedMessage> readMessages(SubscriptionPath subscriptionPath, Pipeline pipeline) {
        return pipeline.apply("readMessages", PubsubLiteIO.read(SubscriberOptions.newBuilder().setSubscriptionPath(subscriptionPath).build()));
    }

    public static SimpleFunction<SequencedMessage, Integer> extractIds() {
        return new SimpleFunction<SequencedMessage, Integer>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.ReadWriteIT.3
            public Integer apply(SequencedMessage sequencedMessage) {
                return Integer.valueOf(Integer.parseInt(sequencedMessage.getMessage().getData().toStringUtf8()));
            }
        };
    }

    public static SerializableFunction<Set<Integer>, Boolean> testIds() {
        return set -> {
            LOG.debug("Ids are: {}", set);
            return Boolean.valueOf(((Set) IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toSet())).equals(set));
        };
    }

    @Test
    public void testPubsubLiteWriteReadWithSchemaTransform() throws Exception {
        this.pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
        this.pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
        TopicPath createTopic = createTopic(getProject(this.pipeline.getOptions()));
        SubscriptionPath subscriptionPath = null;
        Exception exc = null;
        for (int i = 0; i < 30; i++) {
            Thread.sleep(1000L);
            try {
                subscriptionPath = createSubscription(createTopic);
                break;
            } catch (Exception e) {
                exc = e;
                LOG.info("Retrying exception on subscription creation.", e);
            }
        }
        if (subscriptionPath == null) {
            throw exc;
        }
        writeJsonMessages(createTopic, this.pipeline);
        PCollectionRowTuple.empty(this.pipeline).apply("read from pslite", new PubsubLiteReadSchemaTransformProvider().from(PubsubLiteReadSchemaTransformProvider.PubsubLiteReadSchemaTransformConfiguration.builder().setDataFormat("JSON").setSchema("{\n  \"properties\": {\n    \"numberInString\": {\n      \"type\": \"string\"\n    },\n    \"numberInInt\": {\n      \"type\": \"integer\"\n    }\n  }\n}").setSubscriptionName(subscriptionPath.name().value()).setLocation(subscriptionPath.location().toString()).build()).buildTransform()).get("output").apply("get ints", MapElements.into(TypeDescriptors.integers()).via(row -> {
            return Integer.valueOf(((Long) Objects.requireNonNull(row.getInt64("numberInInt"))).intValue());
        })).apply("PubsubSignalTest", this.signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds()));
        Supplier waitForStart = this.signal.waitForStart(Duration.standardMinutes(5L));
        this.pipeline.apply("start signal", this.signal.signalStart());
        PipelineResult run = this.pipeline.run();
        waitForStart.get();
        LOG.info("Running!");
        this.signal.waitForSuccess(Duration.standardMinutes(5L));
        try {
            run.cancel();
        } catch (UnsupportedOperationException e2) {
        }
    }

    @Test
    public void testReadWrite() throws Exception {
        this.pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
        this.pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
        TopicPath createTopic = createTopic(getProject(this.pipeline.getOptions()));
        SubscriptionPath subscriptionPath = null;
        Exception exc = null;
        for (int i = 0; i < 30; i++) {
            Thread.sleep(1000L);
            try {
                subscriptionPath = createSubscription(createTopic);
                break;
            } catch (Exception e) {
                exc = e;
                LOG.info("Retrying exception on subscription creation.", e);
            }
        }
        if (subscriptionPath == null) {
            throw exc;
        }
        writeMessages(createTopic, this.pipeline);
        readMessages(subscriptionPath, this.pipeline).apply(MapElements.via(extractIds())).apply("PubsubSignalTest", this.signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds()));
        Supplier waitForStart = this.signal.waitForStart(Duration.standardMinutes(5L));
        this.pipeline.apply(this.signal.signalStart());
        PipelineResult run = this.pipeline.run();
        waitForStart.get();
        LOG.info("Running!");
        this.signal.waitForSuccess(Duration.standardMinutes(5L));
        try {
            run.cancel();
        } catch (UnsupportedOperationException e2) {
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -458218820:
                if (implMethodName.equals("lambda$testIds$97052e64$1")) {
                    z = true;
                    break;
                }
                break;
            case -302364203:
                if (implMethodName.equals("lambda$writeMessages$eb58043a$1")) {
                    z = 3;
                    break;
                }
                break;
            case 852042904:
                if (implMethodName.equals("lambda$testPubsubLiteWriteReadWithSchemaTransform$1dedbd69$1")) {
                    z = false;
                    break;
                }
                break;
            case 875966459:
                if (implMethodName.equals("lambda$writeJsonMessages$b51c0e17$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Ljava/lang/Integer;")) {
                    return row -> {
                        return Integer.valueOf(((Long) Objects.requireNonNull(row.getInt64("numberInInt"))).intValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;)Ljava/lang/Boolean;")) {
                    return set -> {
                        LOG.debug("Ids are: {}", set);
                        return Boolean.valueOf(((Set) IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toSet())).equals(set));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Lorg/apache/beam/sdk/values/Row;")) {
                    return num -> {
                        return Row.withSchema(SAMPLE_BEAM_SCHEMA).addValue(((Integer) Objects.requireNonNull(num)).toString()).addValue(num).build();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Lcom/google/cloud/pubsublite/proto/PubSubMessage;")) {
                    return num2 -> {
                        return Message.builder().setData(ByteString.copyFromUtf8(num2.toString())).build().toProto();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
