package co.cask.gcp.publisher.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.gcp.common.GCPReferenceSourceConfig;
import com.google.cloud.Timestamp;
import java.lang.invoke.SerializedLambda;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import javax.annotation.Nullable;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.pubsub.PubsubUtils;
import org.apache.spark.streaming.pubsub.SparkGCPCredentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("GoogleSubscriber")
@Description("Streaming Source to read messages from Google PubSub.")
@Plugin(type = "streamingsource")
/* loaded from: input_file:co/cask/gcp/publisher/source/GoogleSubscriber.class */
public class GoogleSubscriber extends StreamingSource<StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(GoogleSubscriber.class);
    private static final Schema DEFAULT_SCHEMA = Schema.recordOf("event", new Schema.Field[]{Schema.Field.of("message", Schema.of(Schema.Type.BYTES)), Schema.Field.of("id", Schema.of(Schema.Type.STRING)), Schema.Field.of("timestamp", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))});
    private SubscriberConfig config;

    /* loaded from: input_file:co/cask/gcp/publisher/source/GoogleSubscriber$SubscriberConfig.class */
    public static class SubscriberConfig extends GCPReferenceSourceConfig {

        @Description("Cloud Pub/Sub subscription to read from. If the subscription does not exist it will be automatically created if a topic is specified. Messages published before the subscription was created will not be read.")
        @Macro
        private String subscription;

        @Description("Cloud Pub/Sub topic to subscribe messages from. If a topic is provided and the given subscription does not exists it will be created. Note: If a subscription does not exists and is created only the messages arrived after the creation of subscription will be received.")
        @Macro
        @Nullable
        private String topic;
    }

    public GoogleSubscriber(SubscriberConfig subscriberConfig) {
        this.config = subscriberConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
        super.configurePipeline(pipelineConfigurer);
        pipelineConfigurer.getStageConfigurer().setOutputSchema(DEFAULT_SCHEMA);
    }

    public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) {
        SparkGCPCredentials build;
        String serviceAccountFilePath = this.config.getServiceAccountFilePath();
        if (serviceAccountFilePath != null) {
            LOG.debug("Building credentials from specified service account file.");
            build = new SparkGCPCredentials.Builder().jsonServiceAccount(serviceAccountFilePath).build();
        } else {
            LOG.debug("No service account file specified. Building credentials using Application Default Credentials.");
            build = new SparkGCPCredentials.Builder().build();
        }
        return PubsubUtils.createStream(streamingContext.getSparkStreamingContext(), this.config.getProject(), this.config.topic, this.config.subscription, build, StorageLevel.MEMORY_ONLY()).map(sparkPubsubMessage -> {
            return StructuredRecord.builder(DEFAULT_SCHEMA).set("message", sparkPubsubMessage.getData()).set("id", sparkPubsubMessage.getMessageId()).setTimestamp("timestamp", getTimestamp(sparkPubsubMessage.getPublishTime())).build();
        });
    }

    private ZonedDateTime getTimestamp(String str) {
        return ZonedDateTime.ofInstant(Instant.ofEpochSecond(Timestamp.parseTimestamp(str).getSeconds()).plusNanos(r0.getNanos()), ZoneId.ofOffset("UTC", ZoneOffset.UTC));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1420670417:
                if (implMethodName.equals("lambda$getStream$bfe81421$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("co/cask/gcp/publisher/source/GoogleSubscriber") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/streaming/pubsub/SparkPubsubMessage;)Lco/cask/cdap/api/data/format/StructuredRecord;")) {
                    GoogleSubscriber googleSubscriber = (GoogleSubscriber) serializedLambda.getCapturedArg(0);
                    return sparkPubsubMessage -> {
                        return StructuredRecord.builder(DEFAULT_SCHEMA).set("message", sparkPubsubMessage.getData()).set("id", sparkPubsubMessage.getMessageId()).setTimestamp("timestamp", getTimestamp(sparkPubsubMessage.getPublishTime())).build();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
