package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.HashKeyRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.SequenceNumberRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.class */
public class FakeKinesisBehavioursFactory {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory$BlockingQueueKinesis.class */
    private static class BlockingQueueKinesis implements KinesisProxyInterface {
        private final Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap();
        private final Map<String, BlockingQueue<String>> shardIteratorToQueueMap = new HashMap();

        private static String getShardIterator(StreamShardHandle streamShardHandle) {
            return streamShardHandle.getStreamName() + "-" + streamShardHandle.getShard().getShardId();
        }

        public BlockingQueueKinesis(Map<String, List<BlockingQueue<String>>> map) {
            for (Map.Entry<String, List<BlockingQueue<String>>> entry : map.entrySet()) {
                String key = entry.getKey();
                int size = entry.getValue().size();
                if (size != 0) {
                    ArrayList arrayList = new ArrayList(size);
                    for (int i = 0; i < size; i++) {
                        StreamShardHandle streamShardHandle = new StreamShardHandle(key, new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)).withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("0")).withHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey("0")));
                        arrayList.add(streamShardHandle);
                        this.shardIteratorToQueueMap.put(getShardIterator(streamShardHandle), entry.getValue().get(i));
                    }
                    this.streamsWithListOfShards.put(key, arrayList);
                }
            }
        }

        public GetShardListResult getShardList(Map<String, String> map) {
            GetShardListResult getShardListResult = new GetShardListResult();
            for (Map.Entry<String, List<StreamShardHandle>> entry : this.streamsWithListOfShards.entrySet()) {
                String key = entry.getKey();
                for (StreamShardHandle streamShardHandle : entry.getValue()) {
                    if (map.get(key) == null) {
                        getShardListResult.addRetrievedShardToStream(key, streamShardHandle);
                    } else if (StreamShardHandle.compareShardIds(streamShardHandle.getShard().getShardId(), map.get(key)) > 0) {
                        getShardListResult.addRetrievedShardToStream(key, streamShardHandle);
                    }
                }
            }
            return getShardListResult;
        }

        public String getShardIterator(StreamShardHandle streamShardHandle, String str, Object obj) {
            return getShardIterator(streamShardHandle);
        }

        public GetRecordsResult getRecords(String str, int i) {
            BlockingQueue blockingQueue = (BlockingQueue) Preconditions.checkNotNull(this.shardIteratorToQueueMap.get(str), "no queue for iterator %s", new Object[]{str});
            List emptyList = Collections.emptyList();
            try {
                emptyList = Collections.singletonList(new Record().withData(ByteBuffer.wrap(((String) blockingQueue.take()).getBytes(ConfigConstants.DEFAULT_CHARSET))).withPartitionKey(UUID.randomUUID().toString()).withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())).withSequenceNumber(String.valueOf(0)));
            } catch (InterruptedException e) {
                str = null;
            }
            return new GetRecordsResult().withRecords(emptyList).withMillisBehindLatest(0L).withNextShardIterator(str);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory$NonReshardedStreamsKinesis.class */
    private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
        private final Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap();

        public NonReshardedStreamsKinesis(Map<String, Integer> map) {
            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                String key = entry.getKey();
                int intValue = entry.getValue().intValue();
                if (intValue != 0) {
                    ArrayList arrayList = new ArrayList(intValue);
                    for (int i = 0; i < intValue; i++) {
                        arrayList.add(TestUtils.createDummyStreamShardHandle(key, KinesisShardIdGenerator.generateFromShardOrder(i)));
                    }
                    this.streamsWithListOfShards.put(key, arrayList);
                }
            }
        }

        public GetShardListResult getShardList(Map<String, String> map) {
            GetShardListResult getShardListResult = new GetShardListResult();
            for (Map.Entry<String, List<StreamShardHandle>> entry : this.streamsWithListOfShards.entrySet()) {
                String key = entry.getKey();
                for (StreamShardHandle streamShardHandle : entry.getValue()) {
                    if (map.get(key) == null) {
                        getShardListResult.addRetrievedShardToStream(key, streamShardHandle);
                    } else if (compareShardIds(streamShardHandle.getShard().getShardId(), map.get(key)) > 0) {
                        getShardListResult.addRetrievedShardToStream(key, streamShardHandle);
                    }
                }
            }
            return getShardListResult;
        }

        public String getShardIterator(StreamShardHandle streamShardHandle, String str, Object obj) {
            return null;
        }

        public GetRecordsResult getRecords(String str, int i) {
            return null;
        }

        private static int compareShardIds(String str, String str2) {
            if (!isValidShardId(str)) {
                throw new IllegalArgumentException("The first shard id has invalid format.");
            }
            if (isValidShardId(str2)) {
                return Long.compare(Long.parseLong(str.substring(8)), Long.parseLong(str2.substring(8)));
            }
            throw new IllegalArgumentException("The second shard id has invalid format.");
        }

        private static boolean isValidShardId(String str) {
            if (str == null) {
                return false;
            }
            return str.matches("^shardId-\\d{12}");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory$SingleShardEmittingAdaptiveNumOfRecordsKinesis.class */
    private static class SingleShardEmittingAdaptiveNumOfRecordsKinesis extends SingleShardEmittingKinesis {
        protected static long averageRecordSizeBytes = 0;
        private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2097152;

        public SingleShardEmittingAdaptiveNumOfRecordsKinesis(int i, int i2, long j) {
            super(initShardItrToRecordBatch(i, i2), j);
        }

        private static Map<String, List<Record>> initShardItrToRecordBatch(int i, int i2) {
            HashMap hashMap = new HashMap();
            int i3 = 0;
            int i4 = i;
            for (int i5 = 0; i5 < i2; i5++) {
                hashMap.put(String.valueOf(i5), createRecordBatchWithRange(i3, i3 + i4));
                i3 += i4;
                i4 = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / ((averageRecordSizeBytes * 1000) / 200));
            }
            return hashMap;
        }

        private static List<Record> createRecordBatchWithRange(int i, int i2) {
            LinkedList linkedList = new LinkedList();
            long j = 0;
            String createDataSize = createDataSize(10240L);
            for (int i3 = i; i3 < i2; i3++) {
                linkedList.add(new Record().withData(ByteBuffer.wrap(createDataSize.getBytes(ConfigConstants.DEFAULT_CHARSET))).withPartitionKey(UUID.randomUUID().toString()).withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())).withSequenceNumber(String.valueOf(i3)));
                j += r0.getData().remaining();
            }
            if (linkedList.size() != 0) {
                averageRecordSizeBytes = j / linkedList.size();
            }
            return linkedList;
        }

        private static String createDataSize(long j) {
            return new String(new char[(int) j]);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory$SingleShardEmittingAggregatedRecordsKinesis.class */
    private static class SingleShardEmittingAggregatedRecordsKinesis extends SingleShardEmittingKinesis {
        public SingleShardEmittingAggregatedRecordsKinesis(int i, int i2, int i3) {
            super(initShardItrToRecordBatch(i, i2, i3));
        }

        private static Map<String, List<Record>> initShardItrToRecordBatch(int i, int i2, int i3) {
            HashMap hashMap = new HashMap();
            AtomicInteger atomicInteger = new AtomicInteger();
            for (int i4 = 0; i4 < i3; i4++) {
                hashMap.put(String.valueOf(i4), TestUtils.createAggregatedRecordBatch(i, i2, atomicInteger));
            }
            return hashMap;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory$SingleShardEmittingFixNumOfRecordsKinesis.class */
    private static class SingleShardEmittingFixNumOfRecordsKinesis implements KinesisProxyInterface {
        protected final int totalNumOfGetRecordsCalls;
        protected final int totalNumOfRecords;
        private final long millisBehindLatest;
        protected final Map<String, List<Record>> shardItrToRecordBatch = new HashMap();

        public SingleShardEmittingFixNumOfRecordsKinesis(int i, int i2, long j) {
            this.totalNumOfRecords = i;
            this.totalNumOfGetRecordsCalls = i2;
            this.millisBehindLatest = j;
            int i3 = 0;
            int i4 = (i / i2) + 1;
            for (int i5 = 0; i5 < this.totalNumOfGetRecordsCalls; i5++) {
                if (i5 != this.totalNumOfGetRecordsCalls - 1) {
                    this.shardItrToRecordBatch.put(String.valueOf(i5), createRecordBatchWithRange(i3, i3 + i4));
                    i3 += i4;
                } else {
                    this.shardItrToRecordBatch.put(String.valueOf(i5), createRecordBatchWithRange(i3, this.totalNumOfRecords));
                }
            }
        }

        public GetRecordsResult getRecords(String str, int i) {
            return new GetRecordsResult().withRecords(this.shardItrToRecordBatch.get(str)).withMillisBehindLatest(Long.valueOf(this.millisBehindLatest)).withNextShardIterator(Integer.parseInt(str) == this.totalNumOfGetRecordsCalls - 1 ? null : String.valueOf(Integer.parseInt(str) + 1));
        }

        public String getShardIterator(StreamShardHandle streamShardHandle, String str, Object obj) {
            return "0";
        }

        public GetShardListResult getShardList(Map<String, String> map) {
            return null;
        }

        public static List<Record> createRecordBatchWithRange(int i, int i2) {
            LinkedList linkedList = new LinkedList();
            for (int i3 = i; i3 < i2; i3++) {
                linkedList.add(new Record().withData(ByteBuffer.wrap(String.valueOf(i3).getBytes(ConfigConstants.DEFAULT_CHARSET))).withPartitionKey(UUID.randomUUID().toString()).withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())).withSequenceNumber(String.valueOf(i3)));
            }
            return linkedList;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory$SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis.class */
    private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
        private final long millisBehindLatest;
        private final int orderOfCallToExpire;
        private boolean expiredOnceAlready;
        private boolean expiredIteratorRefreshed;

        public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(int i, int i2, int i3, long j) {
            super(i, i2, j);
            this.expiredOnceAlready = false;
            this.expiredIteratorRefreshed = false;
            Preconditions.checkArgument(i3 <= i2, "can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls");
            this.millisBehindLatest = j;
            this.orderOfCallToExpire = i3;
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory.SingleShardEmittingFixNumOfRecordsKinesis
        public GetRecordsResult getRecords(String str, int i) {
            if (Integer.parseInt(str) == this.orderOfCallToExpire - 1 && !this.expiredOnceAlready) {
                this.expiredOnceAlready = true;
                throw new ExpiredIteratorException("Artificial expired shard iterator");
            }
            if (!this.expiredOnceAlready || this.expiredIteratorRefreshed) {
                return new GetRecordsResult().withRecords(this.shardItrToRecordBatch.get(str)).withMillisBehindLatest(Long.valueOf(this.millisBehindLatest)).withNextShardIterator(Integer.parseInt(str) == this.totalNumOfGetRecordsCalls - 1 ? null : String.valueOf(Integer.parseInt(str) + 1));
            }
            throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory.SingleShardEmittingFixNumOfRecordsKinesis
        public String getShardIterator(StreamShardHandle streamShardHandle, String str, Object obj) {
            if (!this.expiredOnceAlready) {
                return "0";
            }
            this.expiredIteratorRefreshed = true;
            return String.valueOf(this.orderOfCallToExpire - 1);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory$SingleShardEmittingKinesis.class */
    private static abstract class SingleShardEmittingKinesis implements KinesisProxyInterface {
        private final long millisBehindLatest;
        private final Map<String, List<Record>> shardItrToRecordBatch;

        protected SingleShardEmittingKinesis(Map<String, List<Record>> map) {
            this(map, 0L);
        }

        protected SingleShardEmittingKinesis(Map<String, List<Record>> map, long j) {
            this.millisBehindLatest = j;
            this.shardItrToRecordBatch = map;
        }

        public GetRecordsResult getRecords(String str, int i) {
            int parseInt = Integer.parseInt(str);
            return new GetRecordsResult().withRecords(this.shardItrToRecordBatch.get(str)).withNextShardIterator(parseInt == this.shardItrToRecordBatch.size() - 1 ? null : String.valueOf(parseInt + 1)).withMillisBehindLatest(Long.valueOf(this.millisBehindLatest));
        }

        public String getShardIterator(StreamShardHandle streamShardHandle, String str, Object obj) {
            return "0";
        }

        public GetShardListResult getShardList(Map<String, String> map) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory$SingleShardEmittingZeroRecords.class */
    private static class SingleShardEmittingZeroRecords implements KinesisProxyInterface {
        private int remainingIterators;

        private SingleShardEmittingZeroRecords(int i) {
            this.remainingIterators = i;
        }

        public String getShardIterator(StreamShardHandle streamShardHandle, String str, Object obj) throws InterruptedException {
            int i = this.remainingIterators;
            this.remainingIterators = i - 1;
            return String.valueOf(i);
        }

        public GetRecordsResult getRecords(String str, int i) throws InterruptedException {
            String valueOf;
            GetRecordsResult withMillisBehindLatest = new GetRecordsResult().withMillisBehindLatest(0L);
            if (this.remainingIterators == 0) {
                valueOf = null;
            } else {
                int i2 = this.remainingIterators;
                this.remainingIterators = i2 - 1;
                valueOf = String.valueOf(i2);
            }
            return withMillisBehindLatest.withNextShardIterator(valueOf);
        }

        public GetShardListResult getShardList(Map<String, String> map) throws InterruptedException {
            return null;
        }
    }

    public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
        return new KinesisProxyInterface() { // from class: org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory.1
            public GetShardListResult getShardList(Map<String, String> map) {
                return new GetShardListResult();
            }

            public String getShardIterator(StreamShardHandle streamShardHandle, String str, Object obj) {
                return null;
            }

            public GetRecordsResult getRecords(String str, int i) {
                return null;
            }
        };
    }

    public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String, Integer> map) {
        return new NonReshardedStreamsKinesis(map);
    }

    public static KinesisProxyInterface emptyShard(int i) {
        return new SingleShardEmittingZeroRecords(i);
    }

    public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(int i, int i2, long j) {
        return new SingleShardEmittingFixNumOfRecordsKinesis(i, i2, j);
    }

    public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(int i, int i2, int i3, long j) {
        return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(i, i2, i3, j);
    }

    public static KinesisProxyInterface initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(int i, int i2, long j) {
        return new SingleShardEmittingAdaptiveNumOfRecordsKinesis(i, i2, j);
    }

    public static KinesisProxyInterface aggregatedRecords(int i, int i2, int i3) {
        return new SingleShardEmittingAggregatedRecordsKinesis(i, i2, i3);
    }

    public static KinesisProxyInterface blockingQueueGetRecords(Map<String, List<BlockingQueue<String>>> map) {
        return new BlockingQueueKinesis(map);
    }
}
