package org.apache.camel.component.aws2.kinesis;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

/* loaded from: input_file:org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.class */
public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
    private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
    private String currentShardIterator;
    private boolean isShardClosed;
    private ResumeStrategy resumeStrategy;

    public Kinesis2Consumer(Kinesis2Endpoint kinesis2Endpoint, Processor processor) {
        super(kinesis2Endpoint, processor);
    }

    protected int poll() throws Exception {
        String shardIterator = getShardIterator();
        if (shardIterator == null) {
            return 0;
        }
        GetRecordsResponse records = getClient().getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(shardIterator).limit(Integer.valueOf(m8getEndpoint().getConfiguration().getMaxResultsPerRequest())).build());
        int processBatch = processBatch(CastUtils.cast(createExchanges(records.records())));
        this.currentShardIterator = records.nextShardIterator();
        if (this.isShardClosed) {
            switch (m8getEndpoint().getConfiguration().getShardClosed()) {
                case ignore:
                    LOG.warn("The shard {} is in closed state", this.currentShardIterator);
                    break;
                case silent:
                    break;
                case fail:
                    LOG.info("Shard Iterator reaches CLOSE status:{} {}", m8getEndpoint().getConfiguration().getStreamName(), m8getEndpoint().getConfiguration().getShardId());
                    throw new ReachedClosedStatusException(m8getEndpoint().getConfiguration().getStreamName(), m8getEndpoint().getConfiguration().getShardId());
                default:
                    throw new IllegalArgumentException("Unsupported shard closed strategy");
            }
        }
        return processBatch;
    }

    public int processBatch(Queue<Object> queue) throws Exception {
        int i = 0;
        while (!queue.isEmpty()) {
            Exchange exchange = (Exchange) ObjectHelper.cast(Exchange.class, queue.poll());
            getAsyncProcessor().process(exchange, defaultConsumerCallback(exchange, true));
            i++;
        }
        return i;
    }

    private KinesisClient getClient() {
        return m8getEndpoint().getClient();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public Kinesis2Endpoint m8getEndpoint() {
        return super.getEndpoint();
    }

    private String getShardIterator() {
        String shardId;
        if (this.currentShardIterator == null) {
            if (m8getEndpoint().getConfiguration().getShardId().isEmpty()) {
                List shards = getClient().describeStream((DescribeStreamRequest) DescribeStreamRequest.builder().streamName(m8getEndpoint().getConfiguration().getStreamName()).build()).streamDescription().shards();
                if (shards.isEmpty()) {
                    LOG.warn("There are no shards in the stream");
                    return null;
                }
                shardId = ((Shard) shards.get(0)).shardId();
                this.isShardClosed = ((Shard) shards.get(0)).sequenceNumberRange().endingSequenceNumber() != null;
            } else {
                shardId = m8getEndpoint().getConfiguration().getShardId();
                for (Shard shard : getClient().describeStream((DescribeStreamRequest) DescribeStreamRequest.builder().streamName(m8getEndpoint().getConfiguration().getStreamName()).build()).streamDescription().shards()) {
                    if (shard.shardId().equalsIgnoreCase(m8getEndpoint().getConfiguration().getShardId())) {
                        this.isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
                    }
                }
            }
            LOG.debug("ShardId is: {}", shardId);
            GetShardIteratorRequest.Builder shardIteratorType = GetShardIteratorRequest.builder().streamName(m8getEndpoint().getConfiguration().getStreamName()).shardId(shardId).shardIteratorType(m8getEndpoint().getConfiguration().getIteratorType());
            if (hasSequenceNumber()) {
                shardIteratorType.startingSequenceNumber(m8getEndpoint().getConfiguration().getSequenceNumber());
            }
            resume(shardIteratorType);
            this.currentShardIterator = getClient().getShardIterator((GetShardIteratorRequest) shardIteratorType.build()).shardIterator();
        }
        LOG.debug("Shard Iterator is: {}", this.currentShardIterator);
        return this.currentShardIterator;
    }

    private void resume(GetShardIteratorRequest.Builder builder) {
        if (this.resumeStrategy == null) {
            return;
        }
        KinesisResumeAdapter kinesisResumeAdapter = (KinesisResumeAdapter) this.resumeStrategy.getAdapter(KinesisResumeAdapter.class);
        if (kinesisResumeAdapter == null) {
            LOG.warn("There is a resume strategy setup, but no adapter configured or the type is incorrect");
            return;
        }
        kinesisResumeAdapter.setRequestBuilder(builder);
        kinesisResumeAdapter.setStreamName(m8getEndpoint().getConfiguration().getStreamName());
        kinesisResumeAdapter.resume();
    }

    private Queue<Exchange> createExchanges(List<Record> list) {
        ArrayDeque arrayDeque = new ArrayDeque();
        Iterator<Record> it = list.iterator();
        while (it.hasNext()) {
            arrayDeque.add(createExchange(it.next()));
        }
        return arrayDeque;
    }

    protected Exchange createExchange(Record record) {
        Exchange createExchange = createExchange(true);
        createExchange.getIn().setBody(record.data().asInputStream());
        createExchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, record.approximateArrivalTimestamp());
        createExchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, record.partitionKey());
        createExchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, record.sequenceNumber());
        if (record.approximateArrivalTimestamp() != null) {
            createExchange.getIn().setHeader(Kinesis2Constants.MESSAGE_TIMESTAMP, Long.valueOf(record.approximateArrivalTimestamp().getEpochSecond() * 1000));
        }
        return createExchange;
    }

    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
        this.resumeStrategy = resumeStrategy;
    }

    public ResumeStrategy getResumeStrategy() {
        return this.resumeStrategy;
    }

    private boolean hasSequenceNumber() {
        return !m8getEndpoint().getConfiguration().getSequenceNumber().isEmpty() && (m8getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || m8getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
    }

    protected void doStart() throws Exception {
        super.doStart();
        if (this.resumeStrategy != null) {
            this.resumeStrategy.loadCache();
        }
    }
}
