package com.datatorrent.contrib.kinesis;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.datatorrent.contrib.memcache.MemcachePOJOOperatorTest;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kinesis/KinesisTestConsumer.class */
public class KinesisTestConsumer implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KinesisConsumer.class);
    protected String streamName;
    protected static final int BUFFER_SIZE_DEFAULT = 1024;
    private CountDownLatch doneLatch;
    protected static final int MAX_TRY_TIMES = 30;
    protected transient AmazonKinesisClient client = null;
    private final int bufferSize = BUFFER_SIZE_DEFAULT;
    public transient ArrayBlockingQueue<Record> holdingBuffer = new ArrayBlockingQueue<>(BUFFER_SIZE_DEFAULT);
    private volatile boolean isAlive = true;
    private int receiveCount = 0;
    protected boolean shouldProcessRecord = true;

    private void createClient() {
        this.client = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
    }

    public int getReceiveCount() {
        return this.receiveCount;
    }

    public void setReceiveCount(int i) {
        this.receiveCount = i;
    }

    public void setIsAlive(boolean z) {
        this.isAlive = z;
    }

    public KinesisTestConsumer(String str) {
        createClient();
        this.streamName = str;
    }

    public String getData(Record record) {
        ByteBuffer data = record.getData();
        byte[] bArr = new byte[data.remaining()];
        data.get(bArr);
        return new String(bArr);
    }

    @Override // java.lang.Runnable
    public void run() {
        String prepareIterator = prepareIterator();
        while (this.isAlive) {
            prepareIterator = processNextIterator(prepareIterator);
            try {
                Thread.sleep(1000L);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        logger.debug("DONE consuming");
    }

    public String prepareIterator() {
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(this.streamName);
        List list = null;
        for (int i = 0; i < MAX_TRY_TIMES; i++) {
            try {
                list = this.client.describeStream(describeStreamRequest).getStreamDescription().getShards();
                if (!list.isEmpty()) {
                    break;
                }
                logger.warn("shards is empty");
                try {
                    Thread.sleep(1000L);
                } catch (Exception e) {
                }
            } catch (Exception e2) {
                logger.error("get Stream description exception: ", e2);
                throw new RuntimeException(e2);
            }
        }
        Shard shard = (Shard) list.get(0);
        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setStreamName(this.streamName);
        getShardIteratorRequest.setShardId(shard.getShardId());
        getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
        return this.client.getShardIterator(getShardIteratorRequest).getShardIterator();
    }

    public String processNextIterator(String str) {
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setLimit(Integer.valueOf(MemcachePOJOOperatorTest.TUPLE_SIZE));
        getRecordsRequest.setShardIterator(str);
        GetRecordsResult records = this.client.getRecords(getRecordsRequest);
        String nextShardIterator = records.getNextShardIterator();
        processResponseRecords(records.getRecords());
        return nextShardIterator;
    }

    protected void processResponseRecords(List<Record> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        this.receiveCount += list.size();
        logger.debug("ReceiveCount= {}", Integer.valueOf(this.receiveCount));
        for (Record record : list) {
            this.holdingBuffer.add(record);
            if (this.shouldProcessRecord) {
                processRecord(record);
            }
            if (this.doneLatch != null) {
                this.doneLatch.countDown();
            }
        }
    }

    protected void processRecord(Record record) {
    }

    public void close() {
        this.isAlive = false;
        this.holdingBuffer.clear();
    }

    public void setDoneLatch(CountDownLatch countDownLatch) {
        this.doneLatch = countDownLatch;
    }
}
