package org.apache.flink.streaming.connectors.gcp.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannel;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.class */
public class PubSubSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class);
    private final AtomicReference<Exception> exceptionAtomicReference;
    private final ApiFutureCallback<String> failureHandler;
    private final AtomicInteger numPendingFutures;
    private final Credentials credentials;
    private final SerializationSchema<IN> serializationSchema;
    private final String projectName;
    private final String topicName;
    private final String hostAndPortForEmulator;
    private transient Publisher publisher;
    private volatile boolean isRunning;
    private transient ManagedChannel managedChannel;
    private transient TransportChannel channel;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink$FailureHandler.class */
    private class FailureHandler implements ApiFutureCallback<String>, Serializable {
        private FailureHandler() {
        }

        public void onFailure(Throwable th) {
            ackAndMaybeNotifyNoPendingFutures();
            PubSubSink.this.exceptionAtomicReference.set(new RuntimeException("Failed trying to publish message", th));
        }

        public void onSuccess(String str) {
            ackAndMaybeNotifyNoPendingFutures();
            PubSubSink.LOG.debug("Successfully published message with id: {}", str);
        }

        private void ackAndMaybeNotifyNoPendingFutures() {
            if (PubSubSink.this.numPendingFutures.decrementAndGet() == 0) {
                synchronized (PubSubSink.this.numPendingFutures) {
                    PubSubSink.this.numPendingFutures.notify();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink$ProjectNameBuilder.class */
    public interface ProjectNameBuilder<IN> {
        TopicNameBuilder<IN> withProjectName(String str);
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink$PubSubSinkBuilder.class */
    public static class PubSubSinkBuilder<IN> implements ProjectNameBuilder<IN>, TopicNameBuilder<IN> {
        private SerializationSchema<IN> serializationSchema;
        private String projectName;
        private String topicName;
        private Credentials credentials;
        private String hostAndPort;

        private PubSubSinkBuilder(SerializationSchema<IN> serializationSchema) {
            this.serializationSchema = serializationSchema;
        }

        public PubSubSinkBuilder<IN> withCredentials(Credentials credentials) {
            this.credentials = credentials;
            return this;
        }

        @Override // org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink.ProjectNameBuilder
        public TopicNameBuilder<IN> withProjectName(String str) {
            Preconditions.checkNotNull(str);
            this.projectName = str;
            return this;
        }

        @Override // org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink.TopicNameBuilder
        public PubSubSinkBuilder<IN> withTopicName(String str) {
            Preconditions.checkNotNull(str);
            this.topicName = str;
            return this;
        }

        public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String str) {
            this.hostAndPort = str;
            return this;
        }

        public PubSubSink<IN> build() throws IOException {
            if (this.credentials == null) {
                this.credentials = SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build().getCredentials();
            }
            return new PubSubSink<>(this.credentials, this.serializationSchema, this.projectName, this.topicName, this.hostAndPort);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink$SerializationSchemaBuilder.class */
    public static class SerializationSchemaBuilder {
        public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) {
            return new PubSubSinkBuilder(serializationSchema);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink$TopicNameBuilder.class */
    public interface TopicNameBuilder<IN> {
        PubSubSinkBuilder<IN> withTopicName(String str);
    }

    private PubSubSink(Credentials credentials, SerializationSchema<IN> serializationSchema, String str, String str2, String str3) {
        this.managedChannel = null;
        this.channel = null;
        this.exceptionAtomicReference = new AtomicReference<>();
        this.failureHandler = new FailureHandler();
        this.numPendingFutures = new AtomicInteger(0);
        this.credentials = credentials;
        this.serializationSchema = serializationSchema;
        this.projectName = str;
        this.topicName = str2;
        this.hostAndPortForEmulator = str3;
    }

    public void open(Configuration configuration) throws Exception {
        Publisher.Builder credentialsProvider = Publisher.newBuilder(ProjectTopicName.of(this.projectName, this.topicName)).setCredentialsProvider(FixedCredentialsProvider.create(this.credentials));
        if (this.hostAndPortForEmulator != null) {
            this.managedChannel = ManagedChannelBuilder.forTarget(this.hostAndPortForEmulator).usePlaintext(true).build();
            this.channel = GrpcTransportChannel.newBuilder().setManagedChannel(this.managedChannel).build();
            credentialsProvider.setChannelProvider(FixedTransportChannelProvider.create(this.channel)).setCredentialsProvider(NoCredentialsProvider.create());
        }
        this.publisher = credentialsProvider.build();
        this.isRunning = true;
    }

    public void close() throws Exception {
        super.close();
        shutdownPublisher();
        shutdownTransportChannel();
        shutdownManagedChannel();
        this.isRunning = false;
    }

    private void shutdownPublisher() {
        try {
            if (this.publisher != null) {
                this.publisher.shutdown();
            }
        } catch (Exception e) {
            LOG.info("Shutting down Publisher failed.", e);
        }
    }

    private void shutdownTransportChannel() {
        if (this.channel == null) {
            return;
        }
        try {
            this.channel.close();
        } catch (Exception e) {
            LOG.info("Shutting down TransportChannel failed.", e);
        }
    }

    private void shutdownManagedChannel() {
        if (this.managedChannel == null) {
            return;
        }
        try {
            this.managedChannel.shutdownNow();
            this.managedChannel.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOG.info("Shutting down ManagedChannel failed.", e);
        }
    }

    public void invoke(IN in, SinkFunction.Context context) throws Exception {
        ApiFuture publish = this.publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFrom(this.serializationSchema.serialize(in))).build());
        this.numPendingFutures.incrementAndGet();
        ApiFutures.addCallback(publish, this.failureHandler, Executors.directExecutor());
    }

    public static SerializationSchemaBuilder newBuilder() {
        return new SerializationSchemaBuilder();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.publisher.publishAllOutstanding();
        waitForFuturesToComplete();
        if (this.exceptionAtomicReference.get() != null) {
            throw this.exceptionAtomicReference.get();
        }
    }

    private void waitForFuturesToComplete() {
        synchronized (this.numPendingFutures) {
            while (this.isRunning && this.numPendingFutures.get() > 0) {
                try {
                    this.numPendingFutures.wait();
                } catch (InterruptedException e) {
                    LOG.info("Interrupted when waiting for futures to complete");
                }
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }
}
