package co.cask.microservice.channel.sqs;

import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.microservice.api.Channel;
import co.cask.microservice.api.EventContext;
import co.cask.microservice.channel.ChannelEvent;
import co.cask.microservice.channel.ChannelInitializationException;
import co.cask.microservice.channel.InboundChannelManager;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.QueueNameExistsException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.util.Base64;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/microservice/channel/sqs/SQSInboundChannelManager.class */
public class SQSInboundChannelManager extends SQSChannelManager implements InboundChannelManager {
    private static final Logger LOG = LoggerFactory.getLogger(SQSInboundChannelManager.class);
    private String queueUrl;

    public SQSInboundChannelManager(Channel channel) {
        super(channel);
    }

    @Override // co.cask.microservice.channel.sqs.SQSChannelManager, co.cask.microservice.channel.ChannelManager
    public void initialize() throws ChannelInitializationException {
        super.initialize();
        try {
            try {
                getSQSClient().getQueueUrl(getQueueName());
                this.queueUrl = getSQSClient().getQueueUrl(getQueueName()).getQueueUrl();
            } catch (Throwable th) {
                this.queueUrl = getSQSClient().getQueueUrl(getQueueName()).getQueueUrl();
                throw th;
            }
        } catch (QueueDoesNotExistException e) {
            LOG.info("Amazon SQS queue '{}' doesn't exist. Creating it.", getQueueName());
            try {
                getSQSClient().createQueue(getQueueName());
            } catch (QueueNameExistsException e2) {
            }
            this.queueUrl = getSQSClient().getQueueUrl(getQueueName()).getQueueUrl();
        }
    }

    @Override // co.cask.microservice.channel.InboundChannelManager
    public CloseableIterator<ChannelEvent> poll(String str, int i) throws Exception {
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
        receiveMessageRequest.setQueueUrl(this.queueUrl);
        receiveMessageRequest.setMaxNumberOfMessages(Integer.valueOf(i > 10 ? 10 : i));
        final Iterator<Message> it = getSQSClient().receiveMessage(receiveMessageRequest).getMessages().iterator();
        return new CloseableIterator<ChannelEvent>() { // from class: co.cask.microservice.channel.sqs.SQSInboundChannelManager.1
            public void close() {
            }

            public boolean hasNext() {
                return it.hasNext();
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public ChannelEvent m7next() {
                Message message = (Message) it.next();
                return new ChannelEvent(Base64.decode(message.getBody()), new EventContext(SQSInboundChannelManager.this.getChannel(), message.getReceiptHandle()));
            }

            public void remove() {
                it.remove();
            }
        };
    }

    @Override // co.cask.microservice.channel.InboundChannelManager
    public void acknowledge(String str) {
        getSQSClient().deleteMessage(this.queueUrl, str);
    }
}
