package org.apache.beam.sdk.io.kinesis;

import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/ShardReadersPool.class */
class ShardReadersPool {
    private static final Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
    public static final int DEFAULT_CAPACITY_PER_SHARD = 10000;
    private static final int ATTEMPTS_TO_SHUTDOWN = 3;
    private static final int QUEUE_OFFER_TIMEOUT_MS = 500;
    private static final int QUEUE_POLL_TIMEOUT_MS = 1000;
    private BlockingQueue<KinesisRecord> recordsQueue;
    private final SimplifiedKinesisClient kinesis;
    private final WatermarkPolicyFactory watermarkPolicyFactory;
    private final RateLimitPolicyFactory rateLimitPolicyFactory;
    private final KinesisReaderCheckpoint initialCheckpoint;
    private final int queueCapacityPerShard;
    private final AtomicBoolean poolOpened = new AtomicBoolean(true);
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private final ConcurrentMap<String, AtomicInteger> numberOfRecordsInAQueueByShard = new ConcurrentHashMap();
    private final AtomicReference<ImmutableMap<String, ShardRecordsIterator>> shardIteratorsMap = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardReadersPool(SimplifiedKinesisClient simplifiedKinesisClient, KinesisReaderCheckpoint kinesisReaderCheckpoint, WatermarkPolicyFactory watermarkPolicyFactory, RateLimitPolicyFactory rateLimitPolicyFactory, int i) {
        this.kinesis = simplifiedKinesisClient;
        this.initialCheckpoint = kinesisReaderCheckpoint;
        this.watermarkPolicyFactory = watermarkPolicyFactory;
        this.rateLimitPolicyFactory = rateLimitPolicyFactory;
        this.queueCapacityPerShard = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() throws TransientKinesisException {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator<ShardCheckpoint> it = this.initialCheckpoint.iterator();
        while (it.hasNext()) {
            ShardCheckpoint next = it.next();
            builder.put(next.getShardId(), createShardIterator(this.kinesis, next));
        }
        this.shardIteratorsMap.set(builder.build());
        if (this.shardIteratorsMap.get().isEmpty()) {
            this.recordsQueue = new ArrayBlockingQueue(1);
            return;
        }
        this.recordsQueue = new ArrayBlockingQueue(this.queueCapacityPerShard * this.shardIteratorsMap.get().size());
        startReadingShards(this.shardIteratorsMap.get().values(), this.initialCheckpoint.getStreamName());
    }

    void startReadingShards(Iterable<ShardRecordsIterator> iterable, String str) {
        if (!iterable.iterator().hasNext()) {
            LOG.info("Stream {} will not be read, no shard records iterators available", str);
            return;
        }
        LOG.info("Starting to read {} stream from {} shards", str, getShardIdsFromRecordsIterators(iterable));
        for (ShardRecordsIterator shardRecordsIterator : iterable) {
            this.numberOfRecordsInAQueueByShard.put(shardRecordsIterator.getShardId(), new AtomicInteger());
            this.executorService.submit(() -> {
                readLoop(shardRecordsIterator, this.rateLimitPolicyFactory.getRateLimitPolicy());
            });
        }
    }

    /* JADX WARN: Finally extract failed */
    private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimitPolicy) {
        while (this.poolOpened.get()) {
            try {
                try {
                    List<KinesisRecord> readNextBatch = shardRecordsIterator.readNextBatch();
                    try {
                        for (KinesisRecord kinesisRecord : readNextBatch) {
                            while (this.poolOpened.get()) {
                                if (this.recordsQueue.offer(kinesisRecord, 500L, TimeUnit.MILLISECONDS)) {
                                    this.numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
                                }
                            }
                            if (Thread.currentThread().isInterrupted()) {
                                return;
                            }
                            rateLimitPolicy.onSuccess(readNextBatch);
                            return;
                        }
                        if (!Thread.currentThread().isInterrupted()) {
                            rateLimitPolicy.onSuccess(readNextBatch);
                        }
                    } catch (Throwable th) {
                        if (!Thread.currentThread().isInterrupted()) {
                            rateLimitPolicy.onSuccess(readNextBatch);
                        }
                        throw th;
                    }
                } catch (KinesisShardClosedException e) {
                    LOG.info("Shard iterator for {} shard is closed, finishing the read loop", shardRecordsIterator.getShardId(), e);
                    waitUntilAllShardRecordsRead(shardRecordsIterator);
                    readFromSuccessiveShards(shardRecordsIterator);
                }
            } catch (InterruptedException e2) {
                LOG.warn("Thread was interrupted, finishing the read loop", e2);
                Thread.currentThread().interrupt();
            } catch (KinesisClientThrottledException e3) {
                try {
                    rateLimitPolicy.onThrottle(e3);
                } catch (InterruptedException e4) {
                    LOG.warn("Thread was interrupted, finishing the read loop", e4);
                    Thread.currentThread().interrupt();
                }
            } catch (TransientKinesisException e5) {
                LOG.warn("Transient exception occurred.", e5);
            } catch (Throwable th2) {
                LOG.error("Unexpected exception occurred", th2);
            }
        }
        LOG.info("Kinesis Shard read loop has finished");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CustomOptional<KinesisRecord> nextRecord() {
        try {
            KinesisRecord poll = this.recordsQueue.poll(1000L, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return CustomOptional.absent();
            }
            ((ShardRecordsIterator) this.shardIteratorsMap.get().get(poll.getShardId())).ackRecord(poll);
            this.numberOfRecordsInAQueueByShard.get(poll.getShardId()).decrementAndGet();
            return CustomOptional.of(poll);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for KinesisRecord from the buffer");
            return CustomOptional.absent();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        LOG.info("Closing shard iterators pool");
        this.poolOpened.set(false);
        this.executorService.shutdown();
        awaitTermination();
        if (this.executorService.isTerminated()) {
            return;
        }
        LOG.warn("Executor service was not completely terminated after {} attempts, trying to forcibly stop it.", Integer.valueOf(ATTEMPTS_TO_SHUTDOWN));
        this.executorService.shutdownNow();
        awaitTermination();
    }

    private void awaitTermination() {
        int i = ATTEMPTS_TO_SHUTDOWN;
        boolean isTerminated = this.executorService.isTerminated();
        while (!isTerminated) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                return;
            }
            try {
                isTerminated = this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
                if (!isTerminated && i > 0) {
                    LOG.warn("Executor service is taking long time to shutdown, will retry. {} attempts left", Integer.valueOf(i));
                }
            } catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for the executor service to shutdown");
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Instant getWatermark() {
        return getMinTimestamp((v0) -> {
            return v0.getShardWatermark();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Instant getLatestRecordTimestamp() {
        return getMinTimestamp((v0) -> {
            return v0.getLatestRecordTimestamp();
        });
    }

    private Instant getMinTimestamp(Function<ShardRecordsIterator, Instant> function) {
        return (Instant) this.shardIteratorsMap.get().values().stream().map(function).min(Comparator.naturalOrder()).orElse(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisReaderCheckpoint getCheckpointMark() {
        return new KinesisReaderCheckpoint((Iterable) this.shardIteratorsMap.get().values().stream().map(shardRecordsIterator -> {
            Preconditions.checkArgument(shardRecordsIterator != null, "shardRecordsIterator can not be null");
            return shardRecordsIterator.getCheckpoint();
        }).collect(Collectors.toList()));
    }

    ShardRecordsIterator createShardIterator(SimplifiedKinesisClient simplifiedKinesisClient, ShardCheckpoint shardCheckpoint) throws TransientKinesisException {
        return new ShardRecordsIterator(shardCheckpoint, simplifiedKinesisClient, this.watermarkPolicyFactory);
    }

    private void waitUntilAllShardRecordsRead(ShardRecordsIterator shardRecordsIterator) throws InterruptedException {
        AtomicInteger atomicInteger = this.numberOfRecordsInAQueueByShard.get(shardRecordsIterator.getShardId());
        while (atomicInteger.get() != 0) {
            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        }
    }

    private void readFromSuccessiveShards(ShardRecordsIterator shardRecordsIterator) throws TransientKinesisException {
        ImmutableMap<String, ShardRecordsIterator> immutableMap;
        List<ShardRecordsIterator> findSuccessiveShardRecordIterators = shardRecordsIterator.findSuccessiveShardRecordIterators();
        do {
            immutableMap = this.shardIteratorsMap.get();
        } while (!this.shardIteratorsMap.compareAndSet(immutableMap, createMapWithSuccessiveShards(immutableMap, shardRecordsIterator, findSuccessiveShardRecordIterators)));
        this.numberOfRecordsInAQueueByShard.remove(shardRecordsIterator.getShardId());
        logSuccessiveShardsFromRecordsIterators(shardRecordsIterator, findSuccessiveShardRecordIterators);
        startReadingShards(findSuccessiveShardRecordIterators, shardRecordsIterator.getStreamName());
    }

    private static void logSuccessiveShardsFromRecordsIterators(ShardRecordsIterator shardRecordsIterator, Collection<ShardRecordsIterator> collection) {
        if (collection.isEmpty()) {
            LOG.info("Shard {} for {} stream is closed. Found no successive shards to read from as it was merged with another shard and this one is considered adjacent by merge operation", shardRecordsIterator.getShardId(), shardRecordsIterator.getStreamName());
        } else {
            LOG.info("Shard {} for {} stream is closed, found successive shards to read from: {}", new Object[]{shardRecordsIterator.getShardId(), shardRecordsIterator.getStreamName(), getShardIdsFromRecordsIterators(collection)});
        }
    }

    private static List<String> getShardIdsFromRecordsIterators(Iterable<ShardRecordsIterator> iterable) {
        return (List) StreamSupport.stream(iterable.spliterator(), false).map((v0) -> {
            return v0.getShardId();
        }).collect(Collectors.toList());
    }

    private ImmutableMap<String, ShardRecordsIterator> createMapWithSuccessiveShards(ImmutableMap<String, ShardRecordsIterator> immutableMap, ShardRecordsIterator shardRecordsIterator, List<ShardRecordsIterator> list) throws TransientKinesisException {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (ShardRecordsIterator shardRecordsIterator2 : Iterables.concat(immutableMap.values(), list)) {
            if (!shardRecordsIterator.getShardId().equals(shardRecordsIterator2.getShardId())) {
                builder.put(shardRecordsIterator2.getShardId(), shardRecordsIterator2);
            }
        }
        return builder.build();
    }

    @VisibleForTesting
    BlockingQueue<KinesisRecord> getRecordsQueue() {
        return this.recordsQueue;
    }
}
