package org.apache.beam.sdk.io.kinesis;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.UnmodifiableIterator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/ShardReadersPool.class */
public class ShardReadersPool {
    private static final Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
    private static final int DEFAULT_CAPACITY_PER_SHARD = 10000;
    private final ExecutorService executorService;
    private BlockingQueue<KinesisRecord> recordsQueue;
    private final AtomicReference<ImmutableMap<String, ShardRecordsIterator>> shardIteratorsMap;
    private final ConcurrentMap<String, AtomicInteger> numberOfRecordsInAQueueByShard;
    private final SimplifiedKinesisClient kinesis;
    private final KinesisReaderCheckpoint initialCheckpoint;
    private final int queueCapacityPerShard;
    private final AtomicBoolean poolOpened;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardReadersPool(SimplifiedKinesisClient simplifiedKinesisClient, KinesisReaderCheckpoint kinesisReaderCheckpoint) {
        this(simplifiedKinesisClient, kinesisReaderCheckpoint, DEFAULT_CAPACITY_PER_SHARD);
    }

    ShardReadersPool(SimplifiedKinesisClient simplifiedKinesisClient, KinesisReaderCheckpoint kinesisReaderCheckpoint, int i) {
        this.poolOpened = new AtomicBoolean(true);
        this.kinesis = simplifiedKinesisClient;
        this.initialCheckpoint = kinesisReaderCheckpoint;
        this.queueCapacityPerShard = i;
        this.executorService = Executors.newCachedThreadPool();
        this.numberOfRecordsInAQueueByShard = new ConcurrentHashMap();
        this.shardIteratorsMap = new AtomicReference<>();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() throws TransientKinesisException {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator<ShardCheckpoint> it = this.initialCheckpoint.iterator();
        while (it.hasNext()) {
            ShardCheckpoint next = it.next();
            builder.put(next.getShardId(), createShardIterator(this.kinesis, next));
        }
        this.shardIteratorsMap.set(builder.build());
        if (this.shardIteratorsMap.get().isEmpty()) {
            this.recordsQueue = new ArrayBlockingQueue(1);
        } else {
            this.recordsQueue = new ArrayBlockingQueue(this.queueCapacityPerShard * this.shardIteratorsMap.get().size());
            startReadingShards(this.shardIteratorsMap.get().values());
        }
    }

    void startReadingShards(Iterable<ShardRecordsIterator> iterable) {
        for (ShardRecordsIterator shardRecordsIterator : iterable) {
            this.numberOfRecordsInAQueueByShard.put(shardRecordsIterator.getShardId(), new AtomicInteger());
            this.executorService.submit(() -> {
                readLoop(shardRecordsIterator);
            });
        }
    }

    private void readLoop(ShardRecordsIterator shardRecordsIterator) {
        while (this.poolOpened.get()) {
            try {
                try {
                    for (KinesisRecord kinesisRecord : shardRecordsIterator.readNextBatch()) {
                        this.recordsQueue.put(kinesisRecord);
                        this.numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
                    }
                } catch (KinesisShardClosedException e) {
                    LOG.info("Shard iterator for {} shard is closed, finishing the read loop", shardRecordsIterator.getShardId(), e);
                    waitUntilAllShardRecordsRead(shardRecordsIterator);
                    readFromSuccessiveShards(shardRecordsIterator);
                    break;
                }
            } catch (InterruptedException e2) {
                LOG.warn("Thread was interrupted, finishing the read loop", e2);
            } catch (TransientKinesisException e3) {
                LOG.warn("Transient exception occurred.", e3);
            } catch (Throwable th) {
                LOG.error("Unexpected exception occurred", th);
            }
        }
        LOG.info("Kinesis Shard read loop has finished");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CustomOptional<KinesisRecord> nextRecord() {
        try {
            KinesisRecord poll = this.recordsQueue.poll(1L, TimeUnit.SECONDS);
            if (poll == null) {
                return CustomOptional.absent();
            }
            ((ShardRecordsIterator) this.shardIteratorsMap.get().get(poll.getShardId())).ackRecord(poll);
            this.numberOfRecordsInAQueueByShard.get(poll.getShardId()).decrementAndGet();
            return CustomOptional.of(poll);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for KinesisRecord from the buffer");
            return CustomOptional.absent();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        LOG.info("Closing shard iterators pool");
        this.poolOpened.set(false);
        this.executorService.shutdownNow();
        boolean z = false;
        int i = 3;
        while (!z) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                return;
            }
            try {
                z = this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
                if (!z && i > 0) {
                    LOG.warn("Executor service is taking long time to shutdown, will retry. {} attempts left", Integer.valueOf(i));
                }
            } catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for the executor service to shutdown");
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean allShardsUpToDate() {
        boolean z = true;
        UnmodifiableIterator it = this.shardIteratorsMap.get().values().iterator();
        while (it.hasNext()) {
            z &= ((ShardRecordsIterator) it.next()).isUpToDate();
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisReaderCheckpoint getCheckpointMark() {
        return new KinesisReaderCheckpoint((Iterable) this.shardIteratorsMap.get().values().stream().map(shardRecordsIterator -> {
            Preconditions.checkArgument(shardRecordsIterator != null, "shardRecordsIterator can not be null");
            return shardRecordsIterator.getCheckpoint();
        }).collect(Collectors.toList()));
    }

    ShardRecordsIterator createShardIterator(SimplifiedKinesisClient simplifiedKinesisClient, ShardCheckpoint shardCheckpoint) throws TransientKinesisException {
        return new ShardRecordsIterator(shardCheckpoint, simplifiedKinesisClient);
    }

    private void waitUntilAllShardRecordsRead(ShardRecordsIterator shardRecordsIterator) throws InterruptedException {
        AtomicInteger atomicInteger = this.numberOfRecordsInAQueueByShard.get(shardRecordsIterator.getShardId());
        while (atomicInteger.get() != 0) {
            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        }
    }

    private void readFromSuccessiveShards(ShardRecordsIterator shardRecordsIterator) throws TransientKinesisException {
        ImmutableMap<String, ShardRecordsIterator> immutableMap;
        List<ShardRecordsIterator> findSuccessiveShardRecordIterators = shardRecordsIterator.findSuccessiveShardRecordIterators();
        do {
            immutableMap = this.shardIteratorsMap.get();
        } while (!this.shardIteratorsMap.compareAndSet(immutableMap, createMapWithSuccessiveShards(immutableMap, shardRecordsIterator, findSuccessiveShardRecordIterators)));
        this.numberOfRecordsInAQueueByShard.remove(shardRecordsIterator.getShardId());
        startReadingShards(findSuccessiveShardRecordIterators);
    }

    private ImmutableMap<String, ShardRecordsIterator> createMapWithSuccessiveShards(ImmutableMap<String, ShardRecordsIterator> immutableMap, ShardRecordsIterator shardRecordsIterator, List<ShardRecordsIterator> list) throws TransientKinesisException {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (ShardRecordsIterator shardRecordsIterator2 : Iterables.concat(immutableMap.values(), list)) {
            if (!shardRecordsIterator.getShardId().equals(shardRecordsIterator2.getShardId())) {
                builder.put(shardRecordsIterator2.getShardId(), shardRecordsIterator2);
            }
        }
        return builder.build();
    }
}
