package org.apache.samza.system.kinesis.consumer;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.Record;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointListener;
import org.apache.samza.config.JobConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.kinesis.KinesisConfig;
import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.class */
public class KinesisSystemConsumer extends BlockingEnvelopeMap implements CheckpointListener, KinesisRecordProcessorListener {
    private static final int MAX_BLOCKING_QUEUE_SIZE = 100;
    private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemConsumer.class.getName());
    private final String system;
    private final KinesisConfig kConfig;
    private final KinesisSystemConsumerMetrics metrics;
    private final SSPAllocator sspAllocator;
    private final Set<String> streams;
    private final Map<SystemStreamPartition, KinesisRecordProcessor> processors;
    private final List<Worker> workers;
    private ExecutorService executorService;
    private volatile Exception callbackException;

    public KinesisSystemConsumer(String str, KinesisConfig kinesisConfig, MetricsRegistry metricsRegistry) {
        super(metricsRegistry, System::currentTimeMillis, (String) null);
        this.streams = new HashSet();
        this.processors = new ConcurrentHashMap();
        this.workers = new LinkedList();
        this.system = str;
        this.kConfig = kinesisConfig;
        this.metrics = new KinesisSystemConsumerMetrics(metricsRegistry);
        this.sspAllocator = new SSPAllocator();
    }

    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue(MAX_BLOCKING_QUEUE_SIZE);
    }

    protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope incomingMessageEnvelope) {
        try {
            super.put(systemStreamPartition, incomingMessageEnvelope);
        } catch (Exception e) {
            LOG.error("Exception while putting record. Shutting down SystemStream {}", systemStreamPartition.getSystemStream(), e);
            Thread.currentThread().interrupt();
        }
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        LOG.info("Register called with ssp {} and offset {}. Offset will be ignored.", systemStreamPartition, str);
        this.streams.add(systemStreamPartition.getStream());
        this.sspAllocator.free(systemStreamPartition);
        super.register(systemStreamPartition, str);
    }

    public void start() {
        LOG.info("Start samza consumer for system {}.", this.system);
        this.metrics.initializeMetrics(this.streams);
        this.executorService = Executors.newFixedThreadPool(this.streams.size(), new ThreadFactoryBuilder().setNameFormat("kinesis-worker-thread-" + this.system + "-%d").build());
        for (String str : this.streams) {
            Runnable build = new Worker.Builder().recordProcessorFactory(createRecordProcessorFactory(str)).config(this.kConfig.getKinesisClientLibConfig(this.system, str, this.kConfig.get(JobConfig.JOB_NAME()) + "-" + this.kConfig.get(JobConfig.JOB_ID()) + "-" + str)).build();
            this.workers.add(build);
            this.executorService.execute(build);
            LOG.info("Started worker for system {} stream {}.", this.system, str);
        }
    }

    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) throws InterruptedException {
        if (this.callbackException != null) {
            throw new SamzaException(this.callbackException);
        }
        return super.poll(set, j);
    }

    public void stop() {
        LOG.info("Stop samza consumer for system {}.", this.system);
        this.workers.forEach((v0) -> {
            v0.shutdown();
        });
        this.workers.clear();
        this.executorService.shutdownNow();
        LOG.info("Kinesis system consumer executor service for system {} is shutdown.", this.system);
    }

    IRecordProcessorFactory createRecordProcessorFactory(String str) {
        return () -> {
            try {
                SystemStreamPartition allocate = this.sspAllocator.allocate(str);
                KinesisRecordProcessor kinesisRecordProcessor = new KinesisRecordProcessor(allocate, this);
                KinesisRecordProcessor put = this.processors.put(allocate, kinesisRecordProcessor);
                Validate.isTrue(put == null, String.format("Adding new kinesis record processor %s while the previous processor %s for the same ssp %s is still active.", kinesisRecordProcessor, put, allocate));
                return kinesisRecordProcessor;
            } catch (Exception e) {
                this.callbackException = e;
                throw new SamzaException(e);
            }
        };
    }

    public void onCheckpoint(Map<SystemStreamPartition, String> map) {
        LOG.info("onCheckpoint called with sspOffsets {}", map);
        map.forEach((systemStreamPartition, str) -> {
            KinesisRecordProcessor kinesisRecordProcessor = this.processors.get(systemStreamPartition);
            KinesisSystemConsumerOffset parse = KinesisSystemConsumerOffset.parse(str);
            if (kinesisRecordProcessor == null) {
                LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the checkpoint {}.", systemStreamPartition, str);
            } else if (parse.getShardId().equals(kinesisRecordProcessor.getShardId())) {
                kinesisRecordProcessor.checkpoint(parse.getSeqNumber());
            } else {
                LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could be the result of rebalance. Hence dropping the checkpoint {}.", new Object[]{systemStreamPartition, kinesisRecordProcessor.getShardId(), parse.getShardId(), str});
            }
        });
    }

    @Override // org.apache.samza.system.kinesis.consumer.KinesisRecordProcessorListener
    public void onReceiveRecords(SystemStreamPartition systemStreamPartition, List<Record> list, long j) {
        this.metrics.updateMillisBehindLatest(systemStreamPartition.getStream(), Long.valueOf(j));
        list.forEach(record -> {
            put(systemStreamPartition, translate(systemStreamPartition, record));
        });
    }

    @Override // org.apache.samza.system.kinesis.consumer.KinesisRecordProcessorListener
    public void onShutdown(SystemStreamPartition systemStreamPartition) {
        this.processors.remove(systemStreamPartition);
        this.sspAllocator.free(systemStreamPartition);
    }

    private IncomingMessageEnvelope translate(SystemStreamPartition systemStreamPartition, Record record) {
        String shardId = this.processors.get(systemStreamPartition).getShardId();
        byte[] bArr = new byte[record.getData().remaining()];
        this.metrics.updateMetrics(systemStreamPartition.getStream(), record);
        record.getData().get(bArr);
        return new KinesisIncomingMessageEnvelope(systemStreamPartition, new KinesisSystemConsumerOffset(shardId, record.getSequenceNumber()).toString(), record.getPartitionKey(), bArr, shardId, record.getSequenceNumber(), record.getApproximateArrivalTimestamp());
    }
}
