/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import java.io.Serializable;
import java.util.Set;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class PubsubReadIT {
    private static final Logger LOG = LoggerFactory.getLogger(PubsubReadIT.class);
    @Rule
    public transient TestPubsubSignal signal = TestPubsubSignal.create();
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testReadPublicData() throws Exception {
        ((DirectOptions)this.pipeline.getOptions().as(DirectOptions.class)).setBlockOnRun(false);
        PCollection messages = (PCollection)this.pipeline.apply((PTransform)PubsubIO.readStrings().fromTopic("projects/pubsub-public-data/topics/taxirides-realtime"));
        messages.apply("waitForAnyMessage", this.signal.signalSuccessWhen(messages.getCoder(), (SerializableFunction & Serializable)anyMessages -> true));
        Supplier start = this.signal.waitForStart(Duration.standardMinutes((long)5L));
        this.pipeline.apply(this.signal.signalStart());
        PipelineResult job = this.pipeline.run();
        start.get();
        this.signal.waitForSuccess(Duration.standardSeconds((long)30L));
        try {
            job.cancel();
        }
        catch (UnsupportedOperationException unsupportedOperationException) {
            // empty catch block
        }
    }

    @Test
    public void testReadPubsubMessageId() throws Exception {
        ((DirectOptions)this.pipeline.getOptions().as(DirectOptions.class)).setBlockOnRun(false);
        PCollection messages = (PCollection)this.pipeline.apply((PTransform)PubsubIO.readMessagesWithAttributesAndMessageId().fromTopic("projects/pubsub-public-data/topics/taxirides-realtime"));
        messages.apply("isMessageIdNonNull", this.signal.signalSuccessWhen(messages.getCoder(), (SerializableFunction)new NonEmptyMessageIdCheck()));
        Supplier start = this.signal.waitForStart(Duration.standardMinutes((long)5L));
        this.pipeline.apply(this.signal.signalStart());
        PipelineResult job = this.pipeline.run();
        start.get();
        this.signal.waitForSuccess(Duration.standardMinutes((long)1L));
        try {
            job.cancel();
        }
        catch (UnsupportedOperationException unsupportedOperationException) {
            // empty catch block
        }
    }

    private static class NonEmptyMessageIdCheck
    implements SerializableFunction<Set<PubsubMessage>, Boolean> {
        private NonEmptyMessageIdCheck() {
        }

        public Boolean apply(Set<PubsubMessage> input) {
            for (PubsubMessage message : input) {
                if (!Strings.isNullOrEmpty((String)message.getMessageId())) continue;
                return false;
            }
            return true;
        }
    }
}

