package com.google.cloud.spring.pubsub.integration.inbound;

import com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberOperations;
import com.google.cloud.spring.pubsub.integration.AckMode;
import com.google.cloud.spring.pubsub.integration.PubSubHeaderMapper;
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;
import com.google.cloud.spring.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.util.Assert;

/* loaded from: input_file:com/google/cloud/spring/pubsub/integration/inbound/PubSubMessageSource.class */
public class PubSubMessageSource extends AbstractFetchLimitingMessageSource<Object> {
    private final String subscriptionName;
    private final PubSubSubscriberOperations pubSubSubscriberOperations;
    private boolean blockOnPull;
    private AckMode ackMode = AckMode.AUTO;
    private HeaderMapper<Map<String, String>> headerMapper = new PubSubHeaderMapper();
    private Class<?> payloadType = byte[].class;
    private ArrayDeque<ConvertedAcknowledgeablePubsubMessage> cachedMessages = new ArrayDeque<>();

    public PubSubMessageSource(PubSubSubscriberOperations pubSubSubscriberOperations, String str) {
        Assert.notNull(pubSubSubscriberOperations, "Pub/Sub subscriber template can't be null.");
        Assert.notNull(str, "Pub/Sub subscription name can't be null.");
        this.pubSubSubscriberOperations = pubSubSubscriberOperations;
        this.subscriptionName = str;
    }

    public void setAckMode(AckMode ackMode) {
        Assert.notNull(ackMode, "The acknowledgement mode can't be null.");
        this.ackMode = ackMode;
    }

    public void setPayloadType(Class<?> cls) {
        Assert.notNull(cls, "The payload type cannot be null.");
        this.payloadType = cls;
    }

    public void setHeaderMapper(HeaderMapper<Map<String, String>> headerMapper) {
        Assert.notNull(headerMapper, "The header mapper can't be null.");
        this.headerMapper = headerMapper;
    }

    public void setBlockOnPull(boolean z) {
        this.blockOnPull = z;
    }

    protected Object doReceive(int i) {
        if (this.cachedMessages.isEmpty()) {
            List pullAndConvert = this.pubSubSubscriberOperations.pullAndConvert(this.subscriptionName, Integer.valueOf(i > 0 ? i : 1), Boolean.valueOf(!this.blockOnPull), this.payloadType);
            if (pullAndConvert.isEmpty()) {
                return null;
            }
            if (pullAndConvert.size() == 1) {
                return processMessage((ConvertedAcknowledgeablePubsubMessage) pullAndConvert.get(0));
            }
            this.cachedMessages.addAll(pullAndConvert);
        }
        return processMessage(this.cachedMessages.pollFirst());
    }

    public String getComponentType() {
        return "gcp-pubsub:message-source";
    }

    private AbstractIntegrationMessageBuilder<?> processMessage(ConvertedAcknowledgeablePubsubMessage<?> convertedAcknowledgeablePubsubMessage) {
        if (convertedAcknowledgeablePubsubMessage == null) {
            return null;
        }
        Map headers = this.headerMapper.toHeaders(convertedAcknowledgeablePubsubMessage.getPubsubMessage().getAttributesMap());
        headers.put(GcpPubSubHeaders.ORIGINAL_MESSAGE, convertedAcknowledgeablePubsubMessage);
        headers.put("acknowledgmentCallback", new PubSubAcknowledgmentCallback(convertedAcknowledgeablePubsubMessage, this.ackMode));
        return getMessageBuilderFactory().withPayload(convertedAcknowledgeablePubsubMessage.getPayload()).copyHeaders(headers);
    }
}
