package org.apache.streams.amazon.kinesis;

import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.Record;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashMap;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.class */
public class KinesisPersistReaderTask implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReaderTask.class);
    private KinesisPersistReader reader;
    private String streamName;
    private String shardId;
    private String shardIteratorId;
    private Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();

    public KinesisPersistReaderTask(KinesisPersistReader kinesisPersistReader, String str, String str2) {
        this.reader = kinesisPersistReader;
        this.streamName = str;
        this.shardId = str2;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.shardIteratorId = this.reader.client.getShardIterator(new GetShardIteratorRequest().withStreamName(this.streamName).withShardId(this.shardId).withShardIteratorType("TRIM_HORIZON")).getShardIterator();
        HashMap hashMap = new HashMap();
        hashMap.put("streamName", this.streamName);
        hashMap.put("shardId", this.shardId);
        while (true) {
            GetRecordsResult records = this.reader.client.getRecords(new GetRecordsRequest().withShardIterator(this.shardIteratorId));
            LOGGER.info("{} records {} millis behind {}:{}:{} ", new Object[]{Integer.valueOf(records.getRecords().size()), records.getMillisBehindLatest(), this.streamName, this.shardId, this.shardIteratorId});
            this.shardIteratorId = records.getNextShardIterator();
            for (Record record : records.getRecords()) {
                try {
                    this.reader.persistQueue.add(new StreamsDatum(new String(record.getData().array(), Charset.forName("UTF-8")), record.getPartitionKey(), new DateTime(), new BigInteger(record.getSequenceNumber()), hashMap));
                } catch (Exception e) {
                    LOGGER.warn("Exception processing record {}: {}", record, e);
                }
            }
            try {
                Thread.sleep(this.reader.pollInterval.longValue());
            } catch (InterruptedException e2) {
                LOGGER.trace("InterruptedException", e2);
            }
        }
    }
}
