/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

class EFOStubbedKinesisAsyncClient
implements KinesisAsyncClient {
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final int publisherRateMs;
    private final Map<String, Deque<StubbedSdkPublisher>> stubbedPublishers = new HashMap<String, Deque<StubbedSdkPublisher>>();
    private final ConcurrentLinkedQueue<SubscribeToShardRequest> subscribeRequestsSeen = new ConcurrentLinkedQueue();

    EFOStubbedKinesisAsyncClient(int publisherRateMs) {
        this.publisherRateMs = publisherRateMs;
    }

    CanFail stubSubscribeToShard(String shardId, SubscribeToShardEventStream ... events) {
        StubbedSdkPublisher publisher = new StubbedSdkPublisher(events);
        this.stubbedPublishers.computeIfAbsent(shardId, id -> new ArrayDeque()).add(publisher);
        return publisher;
    }

    public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest req, SubscribeToShardResponseHandler resp) {
        this.subscribeRequestsSeen.add(req);
        Deque publishers = (Deque)Preconditions.checkNotNull(this.stubbedPublishers.get(req.shardId()), (Object)"Not stubbed");
        StubbedSdkPublisher publisher = (StubbedSdkPublisher)publishers.poll();
        if (publisher == null) {
            publisher = new NoopSdkPublisher();
        }
        resp.onEventStream((SdkPublisher)publisher);
        return publisher.result;
    }

    public void close() {
        this.scheduler.shutdown();
    }

    public String serviceName() {
        return "kinesis";
    }

    List<SubscribeToShardRequest> subscribeRequestsSeen() {
        return new ArrayList<SubscribeToShardRequest>(this.subscribeRequestsSeen);
    }

    private class StubbedSdkPublisher
    implements SdkPublisher<SubscribeToShardEventStream>,
    CanFail {
        final CompletableFuture<Void> result = new CompletableFuture();
        final SubscribeToShardEventStream[] events;
        @Nullable
        Throwable error = null;

        StubbedSdkPublisher(SubscribeToShardEventStream[] events) {
            this.events = events;
        }

        @Override
        public void failWith(Throwable error) {
            this.error = error;
        }

        public void subscribe(final Subscriber<? super SubscribeToShardEventStream> subscriber) {
            final AtomicInteger requested = new AtomicInteger();
            subscriber.onSubscribe(new Subscription(){
                int idx;
                {
                    EFOStubbedKinesisAsyncClient.this.scheduler.schedule(this::publish, (long)EFOStubbedKinesisAsyncClient.this.publisherRateMs, TimeUnit.MILLISECONDS);
                    this.idx = 0;
                }

                public void request(long n) {
                    requested.incrementAndGet();
                }

                public void cancel() {
                    StubbedSdkPublisher.this.result.complete(null);
                }

                void publish() {
                    if (!StubbedSdkPublisher.this.result.isDone() && this.idx < StubbedSdkPublisher.this.events.length) {
                        if (requested.getAndUpdate(i -> Math.max(0, i - 1)) > 0) {
                            subscriber.onNext((Object)StubbedSdkPublisher.this.events[this.idx++]);
                        }
                        EFOStubbedKinesisAsyncClient.this.scheduler.schedule(this::publish, (long)EFOStubbedKinesisAsyncClient.this.publisherRateMs, TimeUnit.MILLISECONDS);
                    } else if (StubbedSdkPublisher.this.error != null) {
                        subscriber.onError(StubbedSdkPublisher.this.error);
                        StubbedSdkPublisher.this.result.completeExceptionally(StubbedSdkPublisher.this.error);
                    } else {
                        subscriber.onComplete();
                        StubbedSdkPublisher.this.result.complete(null);
                    }
                }
            });
        }
    }

    private class NoopSdkPublisher
    extends StubbedSdkPublisher {
        NoopSdkPublisher() {
            super(new SubscribeToShardEventStream[0]);
        }

        @Override
        public void subscribe(Subscriber<? super SubscribeToShardEventStream> subscriber) {
            subscriber.onSubscribe(new Subscription(){

                public void request(long n) {
                }

                public void cancel() {
                    NoopSdkPublisher.this.result.complete(null);
                }
            });
        }
    }

    static interface CanFail {
        public void failWith(Throwable var1);
    }
}

