/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.aws2.kinesis.CustomOptional;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisClientThrottledException;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReaderCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisShardClosedException;
import org.apache.beam.sdk.io.aws2.kinesis.RateLimitPolicy;
import org.apache.beam.sdk.io.aws2.kinesis.RateLimitPolicyFactory;
import org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.ShardRecordsIterator;
import org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.aws2.kinesis.TransientKinesisException;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicyFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ShardReadersPool {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
    public static final @UnknownKeyFor @NonNull @Initialized int DEFAULT_CAPACITY_PER_SHARD = 10000;
    private static final @UnknownKeyFor @NonNull @Initialized int ATTEMPTS_TO_SHUTDOWN = 3;
    private final @UnknownKeyFor @NonNull @Initialized ExecutorService executorService;
    private @UnknownKeyFor @NonNull @Initialized BlockingQueue<@UnknownKeyFor @NonNull @Initialized KinesisRecord> recordsQueue;
    private final @UnknownKeyFor @NonNull @Initialized AtomicReference<@UnknownKeyFor @NonNull @Initialized ImmutableMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ShardRecordsIterator>> shardIteratorsMap;
    private final @UnknownKeyFor @NonNull @Initialized ConcurrentMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized AtomicInteger> numberOfRecordsInAQueueByShard;
    private final @UnknownKeyFor @NonNull @Initialized SimplifiedKinesisClient kinesis;
    private final @UnknownKeyFor @NonNull @Initialized WatermarkPolicyFactory watermarkPolicyFactory;
    private final @UnknownKeyFor @NonNull @Initialized RateLimitPolicyFactory rateLimitPolicyFactory;
    private final @UnknownKeyFor @NonNull @Initialized KinesisReaderCheckpoint initialCheckpoint;
    private final @UnknownKeyFor @NonNull @Initialized int queueCapacityPerShard;
    private final @UnknownKeyFor @NonNull @Initialized AtomicBoolean poolOpened = new AtomicBoolean(true);

    ShardReadersPool(@UnknownKeyFor @NonNull @Initialized SimplifiedKinesisClient kinesis, @UnknownKeyFor @NonNull @Initialized KinesisReaderCheckpoint initialCheckpoint, @UnknownKeyFor @NonNull @Initialized WatermarkPolicyFactory watermarkPolicyFactory, @UnknownKeyFor @NonNull @Initialized RateLimitPolicyFactory rateLimitPolicyFactory, @UnknownKeyFor @NonNull @Initialized int queueCapacityPerShard) {
        this.kinesis = kinesis;
        this.initialCheckpoint = initialCheckpoint;
        this.watermarkPolicyFactory = watermarkPolicyFactory;
        this.rateLimitPolicyFactory = rateLimitPolicyFactory;
        this.queueCapacityPerShard = queueCapacityPerShard;
        this.executorService = Executors.newCachedThreadPool();
        this.numberOfRecordsInAQueueByShard = new ConcurrentHashMap<String, AtomicInteger>();
        this.shardIteratorsMap = new AtomicReference();
    }

    void start() throws @UnknownKeyFor @NonNull @Initialized TransientKinesisException {
        ImmutableMap.Builder shardsMap = ImmutableMap.builder();
        for (ShardCheckpoint checkpoint : this.initialCheckpoint) {
            shardsMap.put((Object)checkpoint.getShardId(), (Object)this.createShardIterator(this.kinesis, checkpoint));
        }
        this.shardIteratorsMap.set((ImmutableMap<String, ShardRecordsIterator>)shardsMap.build());
        if (!this.shardIteratorsMap.get().isEmpty()) {
            this.recordsQueue = new ArrayBlockingQueue<KinesisRecord>(this.queueCapacityPerShard * this.shardIteratorsMap.get().size());
            this.startReadingShards((Iterable<ShardRecordsIterator>)this.shardIteratorsMap.get().values());
        } else {
            this.recordsQueue = new ArrayBlockingQueue<KinesisRecord>(1);
        }
    }

    void startReadingShards(@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized ShardRecordsIterator> shardRecordsIterators) {
        for (ShardRecordsIterator recordsIterator : shardRecordsIterators) {
            this.numberOfRecordsInAQueueByShard.put(recordsIterator.getShardId(), new AtomicInteger());
            this.executorService.submit(() -> this.readLoop(recordsIterator, this.rateLimitPolicyFactory.getRateLimitPolicy()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readLoop(@UnknownKeyFor @NonNull @Initialized ShardRecordsIterator shardRecordsIterator, @UnknownKeyFor @NonNull @Initialized RateLimitPolicy rateLimiter) {
        while (this.poolOpened.get()) {
            try {
                try {
                    List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
                    try {
                        for (KinesisRecord kinesisRecord : kinesisRecords) {
                            this.recordsQueue.put(kinesisRecord);
                            ((AtomicInteger)this.numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId())).incrementAndGet();
                        }
                    }
                    finally {
                        if (Thread.currentThread().isInterrupted()) continue;
                        rateLimiter.onSuccess(kinesisRecords);
                    }
                }
                catch (KinesisShardClosedException e) {
                    LOG.info("Shard iterator for {} shard is closed, finishing the read loop", (Object)shardRecordsIterator.getShardId(), (Object)e);
                    this.waitUntilAllShardRecordsRead(shardRecordsIterator);
                    this.readFromSuccessiveShards(shardRecordsIterator);
                    break;
                }
            }
            catch (KinesisClientThrottledException e) {
                try {
                    rateLimiter.onThrottle(e);
                }
                catch (InterruptedException ex) {
                    LOG.warn("Thread was interrupted, finishing the read loop", (Throwable)ex);
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            catch (TransientKinesisException e) {
                LOG.warn("Transient exception occurred.", (Throwable)e);
            }
            catch (InterruptedException e) {
                LOG.warn("Thread was interrupted, finishing the read loop", (Throwable)e);
                Thread.currentThread().interrupt();
                break;
            }
            catch (Throwable e) {
                LOG.error("Unexpected exception occurred", e);
            }
        }
        LOG.info("Kinesis Shard read loop has finished");
    }

    @UnknownKeyFor @NonNull @Initialized CustomOptional<@UnknownKeyFor @NonNull @Initialized KinesisRecord> nextRecord() {
        try {
            KinesisRecord record = this.recordsQueue.poll(1L, TimeUnit.SECONDS);
            if (record == null) {
                return CustomOptional.absent();
            }
            ((ShardRecordsIterator)this.shardIteratorsMap.get().get((Object)record.getShardId())).ackRecord(record);
            ((AtomicInteger)this.numberOfRecordsInAQueueByShard.get(record.getShardId())).decrementAndGet();
            return CustomOptional.of(record);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for KinesisRecord from the buffer");
            return CustomOptional.absent();
        }
    }

    void stop() {
        LOG.info("Closing shard iterators pool");
        this.poolOpened.set(false);
        this.executorService.shutdown();
        this.awaitTermination();
        if (!this.executorService.isTerminated()) {
            LOG.warn("Executor service was not completely terminated after {} attempts, trying to forcibly stop it.", (Object)3);
            this.executorService.shutdownNow();
            this.awaitTermination();
        }
    }

    private void awaitTermination() {
        int attemptsLeft = 3;
        boolean isTerminated = this.executorService.isTerminated();
        while (!isTerminated && attemptsLeft-- > 0) {
            try {
                isTerminated = this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for the executor service to shutdown");
                throw new RuntimeException(e);
            }
            if (isTerminated || attemptsLeft <= 0) continue;
            LOG.warn("Executor service is taking long time to shutdown, will retry. {} attempts left", (Object)attemptsLeft);
        }
    }

    @UnknownKeyFor @NonNull @Initialized Instant getWatermark() {
        return this.getMinTimestamp(ShardRecordsIterator::getShardWatermark);
    }

    @UnknownKeyFor @NonNull @Initialized Instant getLatestRecordTimestamp() {
        return this.getMinTimestamp(ShardRecordsIterator::getLatestRecordTimestamp);
    }

    private @UnknownKeyFor @NonNull @Initialized Instant getMinTimestamp(@UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @NonNull @Initialized ShardRecordsIterator, @UnknownKeyFor @NonNull @Initialized Instant> timestampExtractor) {
        return this.shardIteratorsMap.get().values().stream().map(timestampExtractor).min(Comparator.naturalOrder()).orElse(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    @UnknownKeyFor @NonNull @Initialized KinesisReaderCheckpoint getCheckpointMark() {
        ImmutableMap<String, ShardRecordsIterator> currentShardIterators = this.shardIteratorsMap.get();
        return new KinesisReaderCheckpoint(currentShardIterators.values().stream().map(shardRecordsIterator -> {
            Preconditions.checkArgument((shardRecordsIterator != null ? 1 : 0) != 0, (Object)"shardRecordsIterator can not be null");
            return shardRecordsIterator.getCheckpoint();
        }).collect(Collectors.toList()));
    }

    @UnknownKeyFor @NonNull @Initialized ShardRecordsIterator createShardIterator(@UnknownKeyFor @NonNull @Initialized SimplifiedKinesisClient kinesis, @UnknownKeyFor @NonNull @Initialized ShardCheckpoint checkpoint) throws @UnknownKeyFor @NonNull @Initialized TransientKinesisException {
        return new ShardRecordsIterator(checkpoint, kinesis, this.watermarkPolicyFactory);
    }

    private void waitUntilAllShardRecordsRead(@UnknownKeyFor @NonNull @Initialized ShardRecordsIterator shardRecordsIterator) throws @UnknownKeyFor @NonNull @Initialized InterruptedException {
        AtomicInteger numberOfShardRecordsInAQueue = (AtomicInteger)this.numberOfRecordsInAQueueByShard.get(shardRecordsIterator.getShardId());
        while (numberOfShardRecordsInAQueue.get() != 0) {
            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        }
    }

    private void readFromSuccessiveShards(@UnknownKeyFor @NonNull @Initialized ShardRecordsIterator closedShardIterator) throws @UnknownKeyFor @NonNull @Initialized TransientKinesisException {
        ImmutableMap<String, ShardRecordsIterator> updated;
        ImmutableMap<String, ShardRecordsIterator> current;
        List<ShardRecordsIterator> successiveShardRecordIterators = closedShardIterator.findSuccessiveShardRecordIterators();
        while (!this.shardIteratorsMap.compareAndSet(current = this.shardIteratorsMap.get(), updated = this.createMapWithSuccessiveShards(current, closedShardIterator, successiveShardRecordIterators))) {
        }
        this.numberOfRecordsInAQueueByShard.remove(closedShardIterator.getShardId());
        this.startReadingShards(successiveShardRecordIterators);
    }

    private @UnknownKeyFor @NonNull @Initialized ImmutableMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ShardRecordsIterator> createMapWithSuccessiveShards(@UnknownKeyFor @NonNull @Initialized ImmutableMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ShardRecordsIterator> current, @UnknownKeyFor @NonNull @Initialized ShardRecordsIterator closedShardIterator, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized ShardRecordsIterator> successiveShardRecordIterators) throws @UnknownKeyFor @NonNull @Initialized TransientKinesisException {
        ImmutableMap.Builder shardsMap = ImmutableMap.builder();
        Iterable allShards = Iterables.concat((Iterable)current.values(), successiveShardRecordIterators);
        for (ShardRecordsIterator iterator : allShards) {
            if (closedShardIterator.getShardId().equals(iterator.getShardId())) continue;
            shardsMap.put((Object)iterator.getShardId(), (Object)iterator);
        }
        return shardsMap.build();
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized BlockingQueue<@UnknownKeyFor @NonNull @Initialized KinesisRecord> getRecordsQueue() {
        return this.recordsQueue;
    }
}

