package net.quasardb.kinesis;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Tag;
import java.io.IOException;
import java.time.Duration;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import net.quasardb.kinesis.Relay;
import net.quasardb.qdb.Session;
import net.quasardb.qdb.SessionFactory;
import net.quasardb.qdb.ts.Timespec;
import net.quasardb.qdb.ts.WritableRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StartingPosition;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/quasardb/kinesis/Consumer.class */
public class Consumer extends Thread {
    private static final Logger logger;
    private SessionFactory sessionFactory;
    private RelayPool relayPool;
    private int batchSize;
    private Class<? extends Parser> parser;
    private List<String> shardIds;
    private String prefix;
    private String streamName;
    private KinesisClient client;
    private CompletableFuture<Void> subscriptionFuture;
    private Set<String> blacklist;
    private long rejectionAgeMs;
    private Counter recordsCounter;
    static final /* synthetic */ boolean $assertionsDisabled;
    private Session session = null;
    private Map<String, String> shardIteratorsById = new HashMap();
    private Map<String, AtomicLong> lastLagByShardId = new HashMap();
    private Map<String, DistributionSummary> batchSizeDistributionByShardId = new HashMap();
    private Map<String, DistributionSummary> kinesisResponseSizeDistributionByShardId = new HashMap();
    private Map<String, Counter> kinesisRequestsCounterByShardId = new HashMap();
    private Map<String, DistributionSummary> recordAgeDistributionByShardId = new HashMap();
    private Map<String, Counter> recordsCounterByShardId = new HashMap();
    private Map<String, Counter> emptyKinesisResponseCounterByShardId = new HashMap();
    private Map<String, Counter> rejectedRecordsCounterByShardId = new HashMap();
    private AbstractCollection<String> tableNames = this.tableNames;
    private AbstractCollection<String> tableNames = this.tableNames;
    private CompletableFuture<Boolean> isStopped = new CompletableFuture<>();
    private AtomicReference<Boolean> shouldStop = new AtomicReference<>(false);

    public Consumer(KinesisClient kinesisClient, SessionFactory sessionFactory, RelayPool relayPool, int i, long j, Set<String> set, Class<? extends Parser> cls, List<String> list, String str, Counter counter, String str2) throws Exception {
        this.client = kinesisClient;
        this.sessionFactory = sessionFactory;
        this.batchSize = i;
        this.rejectionAgeMs = j;
        this.blacklist = set;
        this.parser = cls;
        this.shardIds = list;
        this.prefix = str;
        this.recordsCounter = counter;
        this.streamName = str2;
        this.relayPool = relayPool;
        for (String str3 : list) {
            List asList = Arrays.asList(Tag.of("prefix", str), Tag.of("shardId", str3));
            this.recordsCounterByShardId.put(str3, Metrics.registry.counter("records", asList));
            this.emptyKinesisResponseCounterByShardId.put(str3, Metrics.registry.counter("empty_responses", asList));
            this.rejectedRecordsCounterByShardId.put(str3, Metrics.registry.counter("rejected_records", asList));
            this.kinesisRequestsCounterByShardId.put(str3, Metrics.registry.counter("requests", asList));
            AtomicLong atomicLong = new AtomicLong();
            this.lastLagByShardId.put(str3, atomicLong);
            Metrics.registry.more().timeGauge("lag", asList, atomicLong, TimeUnit.MILLISECONDS, atomicLong2 -> {
                return atomicLong2.doubleValue();
            });
            this.batchSizeDistributionByShardId.put(str3, Metrics.registry.summary("batch.size", asList));
            this.kinesisResponseSizeDistributionByShardId.put(str3, Metrics.registry.summary("response.size", asList));
            this.recordAgeDistributionByShardId.put(str3, DistributionSummary.builder("record.age").baseUnit("ms").minimumExpectedValue(1L).maximumExpectedValue(500654080L).publishPercentiles(new double[]{0.99999d, 0.9999d, 0.999d, 0.99d, 0.9d}).tags(asList).distributionStatisticExpiry(Duration.ofMinutes(15L)).register(Metrics.registry));
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        long j = 0;
        try {
            Parser parser = (Parser) PluginLoader.instantiate(this.parser);
            while (!this.shouldStop.get().booleanValue()) {
                if (this.session == null) {
                    logger.info("No session initialized yet, initializing new session");
                    this.session = this.sessionFactory.newSession();
                }
                try {
                    boolean z = true;
                    boolean z2 = false;
                    for (String str : this.shardIds) {
                        if (this.shouldStop.get().booleanValue()) {
                            break;
                        }
                        try {
                            if (doPoll(parser, str)) {
                                z = false;
                            }
                        } catch (ExpiredIteratorException e) {
                            logger.warn("Iterator expired for shard {}, doing nothing", str);
                        } catch (ProvisionedThroughputExceededException e2) {
                            logger.warn("rate limit exceeded, waiting 5s");
                            z2 = true;
                        }
                    }
                    if (z) {
                        logger.info("no data for any shards, waiting 5s");
                        Thread.sleep(5000L);
                    }
                    if (z2) {
                        logger.info("one of the shards was rate limited, waiting for 5s");
                        Thread.sleep(5000L);
                    }
                    j = 0;
                } catch (Exception e3) {
                    logger.error("Unexpected exception while supervising consumers: ", e3);
                    j++;
                    long j2 = j * 15000;
                    if (j >= 30) {
                        logger.error("too many errors!");
                        this.isStopped.completeExceptionally(e3);
                        System.exit(-1);
                    }
                    logger.info("error count = {}, sleeping for {} ms", Long.valueOf(j), Long.valueOf(j2));
                    Thread.sleep(j2);
                }
            }
            this.isStopped.complete(true);
        } catch (Exception e4) {
            logger.error("Unexpected exception", e4);
            logger.error("Consumer will stop");
            try {
                this.isStopped.completeExceptionally(e4);
            } catch (Exception e5) {
                logger.error("Error while completing isStopped!", e5);
            }
        }
        logger.info("Consumer is completed");
    }

    public void gracefulStop() {
        this.shouldStop.set(true);
    }

    public CompletableFuture<Boolean> isStopped() {
        return this.isStopped;
    }

    private synchronized StartingPosition getStartingPosition(String str) {
        return Checkpointer.startingPosition(this.session, this.prefix, str);
    }

    private synchronized void storeCheckpoint(String str, String str2) {
        Checkpointer.store(this.session, this.prefix, str, str2);
    }

    private String getShardIterator(String str) {
        StartingPosition startingPosition = getStartingPosition(str);
        GetShardIteratorRequest.Builder shardId = GetShardIteratorRequest.builder().streamName(this.streamName).shardIteratorType(startingPosition.type()).shardId(str);
        if (startingPosition.type() == ShardIteratorType.AFTER_SEQUENCE_NUMBER) {
            shardId = shardId.startingSequenceNumber(startingPosition.sequenceNumber());
        }
        String shardIterator = this.client.getShardIterator((GetShardIteratorRequest) shardId.build()).shardIterator();
        logger.debug("shard id {} has iterator iterator = {}", str, shardIterator);
        return shardIterator;
    }

    private void recordResultMetrics(String str, GetRecordsResponse getRecordsResponse) {
        this.kinesisRequestsCounterByShardId.get(str).increment();
        this.kinesisResponseSizeDistributionByShardId.get(str).record(getRecordsResponse.records().size());
        this.lastLagByShardId.get(str).set(getRecordsResponse.millisBehindLatest().longValue());
        this.recordsCounterByShardId.get(str).increment(getRecordsResponse.records().size());
        this.recordsCounter.increment(getRecordsResponse.records().size());
        if (getRecordsResponse.records().size() == 0) {
            this.emptyKinesisResponseCounterByShardId.get(str).increment();
        }
    }

    private boolean doPoll(Parser parser, String str) throws Exception {
        try {
            String shardIterator = this.shardIteratorsById.containsKey(str) ? this.shardIteratorsById.get(str) : getShardIterator(str);
            if (shardIterator != null) {
                return doPoll(parser, str, shardIterator);
            }
            logger.debug("shard {} has ended", str);
            return false;
        } catch (ExpiredIteratorException e) {
            logger.warn("Iterator expired for shard {}, removing entry", str);
            this.shardIteratorsById.remove(str);
            throw e;
        }
    }

    private boolean doPoll(Parser parser, String str, String str2) throws Exception {
        GetRecordsResponse records = this.client.getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(str2).limit(10000).build());
        logger.debug("has result, hasRecords = {}, records size = {}", Boolean.valueOf(records.hasRecords()), Integer.valueOf(records.records().size()));
        if (records.hasRecords() && records.records().size() > 0) {
            handleResult(parser, str, records);
        }
        logger.debug("putting next shard iterator for shard {}", str);
        this.shardIteratorsById.put(str, records.nextShardIterator());
        recordResultMetrics(str, records);
        return records.hasRecords();
    }

    private static long rowAgeMs(WritableRow writableRow) {
        return Timespec.now().toEpochMillis() - writableRow.getTimestamp().toEpochMillis();
    }

    private void measureRecordAge(String str, WritableRow writableRow) throws IOException {
        this.recordAgeDistributionByShardId.get(str).record(rowAgeMs(writableRow));
    }

    private void handleBatch(List<Relay.WorkItem> list) throws Exception {
        if (this.shouldStop.get().booleanValue()) {
            return;
        }
        Relay acquire = this.relayPool.acquire();
        try {
            acquire.enqueue(list);
            acquire.flush();
        } finally {
            this.relayPool.release(acquire);
        }
    }

    private boolean isRowRejected(String str, WritableRow writableRow) {
        if (this.blacklist.contains(str)) {
            return true;
        }
        return this.rejectionAgeMs != 0 && rowAgeMs(writableRow) > this.rejectionAgeMs;
    }

    private void handleResult(Parser parser, String str, GetRecordsResponse getRecordsResponse) throws Exception {
        logger.debug("handling result for shard {} with {} records", str, Integer.valueOf(getRecordsResponse.records().size()));
        ArrayList arrayList = new ArrayList(getRecordsResponse.records().size() * 2000);
        String str2 = null;
        for (Record record : getRecordsResponse.records()) {
            str2 = record.sequenceNumber();
            String recordToSensorId = parser.recordToSensorId(record);
            String sensorIdToTableName = parser.sensorIdToTableName(recordToSensorId);
            Iterator<WritableRow> it = parser.kinesisToQdb(record).iterator();
            while (it.hasNext()) {
                WritableRow next = it.next();
                measureRecordAge(str, next);
                rowAgeMs(next);
                if (isRowRejected(recordToSensorId, next)) {
                    this.rejectedRecordsCounterByShardId.get(str).increment();
                } else {
                    arrayList.add(new Relay.WorkItem(sensorIdToTableName, next));
                }
            }
        }
        if (!$assertionsDisabled && str2 == null) {
            throw new AssertionError();
        }
        logger.debug("have total of {} work items", Integer.valueOf(arrayList.size()), str);
        for (List<Relay.WorkItem> list : Util.partitionList(arrayList, this.batchSize)) {
            logger.debug("about to handle batch with size {} for shard {}", Integer.valueOf(list.size()), str);
            this.batchSizeDistributionByShardId.get(str).record(list.size());
            handleBatch(list);
        }
        storeCheckpoint(str, str2);
    }

    static {
        $assertionsDisabled = !Consumer.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(Consumer.class);
    }
}
