/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.plugin.stream.kinesis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
import org.apache.pinot.plugin.stream.kinesis.KinesisConnectionHandler;
import org.apache.pinot.plugin.stream.kinesis.KinesisPartitionGroupOffset;
import org.apache.pinot.plugin.stream.kinesis.KinesisRecordsBatch;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

public class KinesisConsumer
extends KinesisConnectionHandler
implements PartitionGroupConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConsumer.class);
    private final String _streamTopicName;
    private final int _numMaxRecordsToFetch;
    private final ExecutorService _executorService;
    private final ShardIteratorType _shardIteratorType;

    public KinesisConsumer(KinesisConfig kinesisConfig) {
        super(kinesisConfig);
        this._streamTopicName = kinesisConfig.getStreamTopicName();
        this._numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
        this._shardIteratorType = kinesisConfig.getShardIteratorType();
        this._executorService = Executors.newSingleThreadExecutor();
    }

    @VisibleForTesting
    public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient kinesisClient) {
        super(kinesisConfig, kinesisClient);
        this._kinesisClient = kinesisClient;
        this._streamTopicName = kinesisConfig.getStreamTopicName();
        this._numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
        this._shardIteratorType = kinesisConfig.getShardIteratorType();
        this._executorService = Executors.newSingleThreadExecutor();
    }

    public KinesisRecordsBatch fetchMessages(StreamPartitionMsgOffset startCheckpoint, StreamPartitionMsgOffset endCheckpoint, int timeoutMs) {
        ArrayList<Record> recordList = new ArrayList<Record>();
        Future<KinesisRecordsBatch> kinesisFetchResultFuture = this._executorService.submit(() -> this.getResult(startCheckpoint, endCheckpoint, recordList));
        try {
            return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            kinesisFetchResultFuture.cancel(true);
            return this.handleException((KinesisPartitionGroupOffset)startCheckpoint, recordList);
        }
        catch (Exception e) {
            return this.handleException((KinesisPartitionGroupOffset)startCheckpoint, recordList);
        }
    }

    private KinesisRecordsBatch getResult(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, List<Record> recordList) {
        KinesisPartitionGroupOffset kinesisStartCheckpoint = (KinesisPartitionGroupOffset)startOffset;
        try {
            Map<String, String> startShardToSequenceMap;
            if (this._kinesisClient == null) {
                this.createConnection();
            }
            Preconditions.checkState(((startShardToSequenceMap = kinesisStartCheckpoint.getShardToStartSequenceMap()).size() == 1 ? 1 : 0) != 0, (String)"Only 1 shard per consumer supported. Found: %s, in startShardToSequenceMap", startShardToSequenceMap.keySet());
            Map.Entry<String, String> startShardToSequenceNum = startShardToSequenceMap.entrySet().iterator().next();
            String shardIterator = this.getShardIterator(startShardToSequenceNum.getKey(), startShardToSequenceNum.getValue());
            String kinesisEndSequenceNumber = null;
            if (endOffset != null) {
                KinesisPartitionGroupOffset kinesisEndCheckpoint = (KinesisPartitionGroupOffset)endOffset;
                Map<String, String> endShardToSequenceMap = kinesisEndCheckpoint.getShardToStartSequenceMap();
                Preconditions.checkState((endShardToSequenceMap.size() == 1 ? 1 : 0) != 0, (String)"Only 1 shard per consumer supported. Found: %s, in endShardToSequenceMap", endShardToSequenceMap.keySet());
                kinesisEndSequenceNumber = endShardToSequenceMap.values().iterator().next();
            }
            boolean isEndOfShard = false;
            while (shardIterator != null) {
                GetRecordsRequest getRecordsRequest = (GetRecordsRequest)GetRecordsRequest.builder().shardIterator(shardIterator).build();
                GetRecordsResponse getRecordsResponse = this._kinesisClient.getRecords(getRecordsRequest);
                if (!getRecordsResponse.records().isEmpty()) {
                    recordList.addAll(getRecordsResponse.records());
                    String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
                    if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(nextStartSequenceNumber) <= 0 || recordList.size() >= this._numMaxRecordsToFetch) break;
                }
                if (getRecordsResponse.hasChildShards() && !getRecordsResponse.childShards().isEmpty()) {
                    isEndOfShard = true;
                    break;
                }
                shardIterator = getRecordsResponse.nextShardIterator();
                if (!Thread.interrupted()) continue;
                break;
            }
            return new KinesisRecordsBatch(recordList, startShardToSequenceNum.getKey(), isEndOfShard);
        }
        catch (IllegalStateException e) {
            this.debugOrLogWarning("Illegal state exception, connection is broken", e);
            return this.handleException(kinesisStartCheckpoint, recordList);
        }
        catch (ProvisionedThroughputExceededException e) {
            this.debugOrLogWarning("The request rate for the stream is too high", e);
            return this.handleException(kinesisStartCheckpoint, recordList);
        }
        catch (ExpiredIteratorException e) {
            this.debugOrLogWarning("ShardIterator expired while trying to fetch records", e);
            return this.handleException(kinesisStartCheckpoint, recordList);
        }
        catch (InvalidArgumentException | ResourceNotFoundException e) {
            LOGGER.error("Encountered AWS error while attempting to fetch records", e);
            return this.handleException(kinesisStartCheckpoint, recordList);
        }
        catch (KinesisException e) {
            this.debugOrLogWarning("Encountered unknown unrecoverable AWS exception", e);
            throw new RuntimeException(e);
        }
        catch (AbortedException e) {
            this.debugOrLogWarning("Task aborted due to exception", e);
            return this.handleException(kinesisStartCheckpoint, recordList);
        }
        catch (Throwable e) {
            LOGGER.error("Unknown fetchRecords exception", e);
            throw new RuntimeException(e);
        }
    }

    private void debugOrLogWarning(String message, Throwable throwable) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(message, throwable);
        } else {
            LOGGER.warn(message + ": " + throwable.getMessage());
        }
    }

    private KinesisRecordsBatch handleException(KinesisPartitionGroupOffset start, List<Record> recordList) {
        String shardId = start.getShardToStartSequenceMap().entrySet().iterator().next().getKey();
        if (!recordList.isEmpty()) {
            String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
            HashMap<String, String> newCheckpoint = new HashMap<String, String>(start.getShardToStartSequenceMap());
            newCheckpoint.put((String)newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber);
        }
        return new KinesisRecordsBatch(recordList, shardId, false);
    }

    private String getShardIterator(String shardId, String sequenceNumber) {
        GetShardIteratorRequest.Builder requestBuilder = GetShardIteratorRequest.builder().streamName(this._streamTopicName).shardId(shardId);
        requestBuilder = sequenceNumber != null ? requestBuilder.startingSequenceNumber(sequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) : requestBuilder.shardIteratorType(this._shardIteratorType);
        return this._kinesisClient.getShardIterator((GetShardIteratorRequest)requestBuilder.build()).shardIterator();
    }

    @Override
    public void close() {
        super.close();
        this.shutdownAndAwaitTermination();
    }

    void shutdownAndAwaitTermination() {
        this._executorService.shutdown();
        try {
            if (!this._executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                this._executorService.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            this._executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

