package org.apache.ignite.stream.pubsub;

import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.stream.StreamAdapter;

/* loaded from: input_file:org/apache/ignite/stream/pubsub/PubSubStreamer.class */
public class PubSubStreamer<K, V> extends StreamAdapter<PubsubMessage, K, V> {
    private static final int DFLT_MAX_MESSAGES = 10;
    private IgniteLogger log;
    private ExecutorService executor;
    private List<ProjectTopicName> topics;
    private int threads;
    private String subscriptionName;
    private SubscriberStubSettings subscriberStubSettings;
    private boolean returnImmediately = false;
    private int maxMessages = DFLT_MAX_MESSAGES;
    private final List<PubSubStreamer<K, V>.ConsumerTask> consumerTasks = new ArrayList();

    /* loaded from: input_file:org/apache/ignite/stream/pubsub/PubSubStreamer$ConsumerTask.class */
    class ConsumerTask implements Callable<Void> {
        private final SubscriberStub subscriberStub;
        private final String subscriptionName;
        private final boolean returnImmediately;
        private final int maxMessages;
        private volatile boolean stopped;

        public ConsumerTask(SubscriberStubSettings subscriberStubSettings, String str, boolean z, int i) throws IgniteException {
            try {
                this.subscriberStub = GrpcSubscriberStub.create(subscriberStubSettings);
                this.subscriptionName = str;
                this.returnImmediately = z;
                this.maxMessages = i;
            } catch (IOException e) {
                throw new IgniteException(e);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            while (!this.stopped) {
                try {
                    PullResponse pullResponse = (PullResponse) this.subscriberStub.pullCallable().call(PullRequest.newBuilder().setMaxMessages(this.maxMessages).setReturnImmediately(this.returnImmediately).setSubscription(this.subscriptionName).build());
                    ArrayList arrayList = new ArrayList();
                    for (ReceivedMessage receivedMessage : pullResponse.getReceivedMessagesList()) {
                        PubSubStreamer.this.addMessage(receivedMessage.getMessage());
                        arrayList.add(receivedMessage.getAckId());
                    }
                    this.subscriberStub.acknowledgeCallable().call(AcknowledgeRequest.newBuilder().setSubscription(this.subscriptionName).addAllAckIds(arrayList).build());
                } finally {
                    this.subscriberStub.close();
                }
            }
            return null;
        }

        public void stop() {
            this.stopped = true;
            if (this.subscriberStub != null) {
                this.subscriberStub.shutdown();
            }
        }
    }

    public void setTopic(List<ProjectTopicName> list) {
        this.topics = list;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public void setSubscriptionName(String str) {
        this.subscriptionName = str;
    }

    public void setSubscriberStubSettings(SubscriberStubSettings subscriberStubSettings) {
        this.subscriberStubSettings = subscriberStubSettings;
    }

    public void setReturnImmediately(boolean z) {
        this.returnImmediately = z;
    }

    public void setMaxMessages(int i) {
        this.maxMessages = i;
    }

    public void start() {
        A.notNull(getStreamer(), "streamer");
        A.notNull(getIgnite(), "ignite");
        A.ensure((getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null) ? false : true, "tuple extractor missing");
        A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide both single and multiple tuple extractor");
        A.notNull(this.topics, "topics");
        A.notNull(this.subscriptionName, "Pub/Sub consumer config");
        A.ensure(this.threads > 0, "threads > 0");
        this.log = getIgnite().log();
        this.executor = Executors.newFixedThreadPool(this.threads);
        IntStream.range(0, this.threads).forEach(i -> {
            this.consumerTasks.add(new ConsumerTask(this.subscriberStubSettings, this.subscriptionName, this.returnImmediately, this.maxMessages));
        });
        Iterator<PubSubStreamer<K, V>.ConsumerTask> it = this.consumerTasks.iterator();
        while (it.hasNext()) {
            this.executor.submit(it.next());
        }
    }

    public void stop() {
        Iterator<PubSubStreamer<K, V>.ConsumerTask> it = this.consumerTasks.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS) && this.log.isDebugEnabled()) {
                    this.log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
                }
            } catch (InterruptedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Interrupted during shutdown, exiting uncleanly.");
                }
            }
        }
    }
}
