package org.apache.flink.streaming.connectors.gcp.pubsub;

import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CancellationException;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeIdsForCheckpoint;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.Acknowledger;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.class */
public class PubSubSource<OUT> extends RichSourceFunction<OUT> implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>, CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {
    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
    protected final Credentials credentials;
    protected final AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory;
    protected final FlinkConnectorRateLimiter rateLimiter;
    protected final int messagePerSecondRateLimit;
    protected transient AcknowledgeOnCheckpoint<String> acknowledgeOnCheckpoint;
    protected transient PubSubSubscriber subscriber;
    protected volatile transient boolean isRunning;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource$AcknowledgeOnCheckpointFactory.class */
    public static class AcknowledgeOnCheckpointFactory implements Serializable {
        AcknowledgeOnCheckpointFactory() {
        }

        AcknowledgeOnCheckpoint<String> create(Acknowledger<String> acknowledger) {
            return new AcknowledgeOnCheckpoint<>(acknowledger);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource$DeserializationSchemaBuilder.class */
    public static class DeserializationSchemaBuilder {
        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
            return new PubSubSourceBuilder(deserializationSchema);
        }

        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> pubSubDeserializationSchema) {
            return new PubSubSourceBuilder(pubSubDeserializationSchema);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource$ProjectNameBuilder.class */
    public interface ProjectNameBuilder<OUT> {
        SubscriptionNameBuilder<OUT> withProjectName(String str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource$PubSubCollector.class */
    public class PubSubCollector implements Collector<OUT> {
        private final SourceFunction.SourceContext<OUT> ctx;
        private boolean endOfStreamSignalled;

        private PubSubCollector(SourceFunction.SourceContext<OUT> sourceContext) {
            this.endOfStreamSignalled = false;
            this.ctx = sourceContext;
        }

        public void collect(OUT out) {
            if (this.endOfStreamSignalled || PubSubSource.this.deserializationSchema.isEndOfStream(out)) {
                this.endOfStreamSignalled = true;
            } else {
                this.ctx.collect(out);
            }
        }

        public boolean isEndOfStreamSignalled() {
            return this.endOfStreamSignalled;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource$PubSubSourceBuilder.class */
    public static class PubSubSourceBuilder<OUT> implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
        private PubSubDeserializationSchema<OUT> deserializationSchema;
        private String projectName;
        private String subscriptionName;
        private PubSubSubscriberFactory pubSubSubscriberFactory;
        private Credentials credentials;
        private int maxMessageToAcknowledge;
        private int messagePerSecondRateLimit;

        private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) {
            this.maxMessageToAcknowledge = 10000;
            this.messagePerSecondRateLimit = 100000;
            Preconditions.checkNotNull(deserializationSchema);
            this.deserializationSchema = new DeserializationSchemaWrapper(deserializationSchema);
        }

        private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> pubSubDeserializationSchema) {
            this.maxMessageToAcknowledge = 10000;
            this.messagePerSecondRateLimit = 100000;
            Preconditions.checkNotNull(pubSubDeserializationSchema);
            this.deserializationSchema = pubSubDeserializationSchema;
        }

        @Override // org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.ProjectNameBuilder
        public SubscriptionNameBuilder<OUT> withProjectName(String str) {
            Preconditions.checkNotNull(str);
            this.projectName = str;
            return this;
        }

        @Override // org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.SubscriptionNameBuilder
        public PubSubSourceBuilder<OUT> withSubscriptionName(String str) {
            Preconditions.checkNotNull(str);
            this.subscriptionName = str;
            return this;
        }

        public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials) {
            this.credentials = credentials;
            return this;
        }

        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory pubSubSubscriberFactory) {
            this.pubSubSubscriberFactory = pubSubSubscriberFactory;
            return this;
        }

        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int i, Duration duration, int i2) {
            this.pubSubSubscriberFactory = new DefaultPubSubSubscriberFactory(ProjectSubscriptionName.format(this.projectName, this.subscriptionName), i2, duration, i);
            return this;
        }

        public PubSubSourceBuilder<OUT> withMessageRateLimit(int i) {
            this.messagePerSecondRateLimit = i;
            return this;
        }

        public PubSubSource<OUT> build() throws IOException {
            if (this.credentials == null) {
                this.credentials = SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build().getCredentials();
            }
            if (this.pubSubSubscriberFactory == null) {
                this.pubSubSubscriberFactory = new DefaultPubSubSubscriberFactory(ProjectSubscriptionName.format(this.projectName, this.subscriptionName), 3, Duration.ofSeconds(15L), 100);
            }
            return new PubSubSource<>(this.deserializationSchema, this.pubSubSubscriberFactory, this.credentials, new AcknowledgeOnCheckpointFactory(), new GuavaFlinkConnectorRateLimiter(), this.messagePerSecondRateLimit);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource$SubscriptionNameBuilder.class */
    public interface SubscriptionNameBuilder<OUT> {
        PubSubSourceBuilder<OUT> withSubscriptionName(String str);
    }

    PubSubSource(PubSubDeserializationSchema<OUT> pubSubDeserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory, FlinkConnectorRateLimiter flinkConnectorRateLimiter, int i) {
        this.deserializationSchema = pubSubDeserializationSchema;
        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
        this.credentials = credentials;
        this.acknowledgeOnCheckpointFactory = acknowledgeOnCheckpointFactory;
        this.rateLimiter = flinkConnectorRateLimiter;
        this.messagePerSecondRateLimit = i;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        if (hasNoCheckpointingEnabled(getRuntimeContext())) {
            throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message.");
        }
        getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck);
        this.rateLimiter.setRate(this.messagePerSecondRateLimit * getRuntimeContext().getNumberOfParallelSubtasks());
        this.rateLimiter.open(getRuntimeContext());
        this.deserializationSchema.open(() -> {
            return getRuntimeContext().getMetricGroup().addGroup("user");
        });
        createAndSetPubSubSubscriber();
        this.isRunning = true;
    }

    private boolean hasNoCheckpointingEnabled(RuntimeContext runtimeContext) {
        return ((runtimeContext instanceof StreamingRuntimeContext) && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) ? false : true;
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        PubSubSource<OUT>.PubSubCollector pubSubCollector = new PubSubCollector(sourceContext);
        while (this.isRunning) {
            try {
                processMessage(sourceContext, this.subscriber.pull(), pubSubCollector);
            } catch (InterruptedException | CancellationException e) {
                this.isRunning = false;
            }
        }
        this.subscriber.close();
    }

    private void processMessage(SourceFunction.SourceContext<OUT> sourceContext, List<ReceivedMessage> list, PubSubSource<OUT>.PubSubCollector pubSubCollector) throws Exception {
        this.rateLimiter.acquire(list.size());
        synchronized (sourceContext.getCheckpointLock()) {
            for (ReceivedMessage receivedMessage : list) {
                this.acknowledgeOnCheckpoint.addAcknowledgeId(receivedMessage.getAckId());
                this.deserializationSchema.deserialize(receivedMessage.getMessage(), pubSubCollector);
                if (pubSubCollector.isEndOfStreamSignalled()) {
                    cancel();
                    return;
                }
            }
        }
    }

    private Integer getOutstandingMessagesToAck() {
        return Integer.valueOf(this.acknowledgeOnCheckpoint.numberOfOutstandingAcknowledgements());
    }

    public void cancel() {
        this.isRunning = false;
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public static DeserializationSchemaBuilder newBuilder() {
        return new DeserializationSchemaBuilder();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.acknowledgeOnCheckpoint.notifyCheckpointComplete(j);
    }

    public void notifyCheckpointAborted(long j) {
    }

    public List<AcknowledgeIdsForCheckpoint<String>> snapshotState(long j, long j2) throws Exception {
        return this.acknowledgeOnCheckpoint.snapshotState(j, j2);
    }

    public void restoreState(List<AcknowledgeIdsForCheckpoint<String>> list) throws Exception {
        createAndSetPubSubSubscriber();
        this.acknowledgeOnCheckpoint.restoreState(list);
    }

    private void createAndSetPubSubSubscriber() throws Exception {
        if (this.subscriber == null) {
            this.subscriber = this.pubSubSubscriberFactory.getSubscriber(this.credentials);
        }
        if (this.acknowledgeOnCheckpoint == null) {
            this.acknowledgeOnCheckpoint = this.acknowledgeOnCheckpointFactory.create(this.subscriber);
        }
    }
}
