package org.apache.pekko.stream.connectors.kinesis.impl;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.dispatch.ExecutionContexts$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.connectors.kinesis.KinesisErrors;
import org.apache.pekko.stream.connectors.kinesis.impl.KinesisSourceStage;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import org.apache.pekko.util.FutureConverters$;
import org.apache.pekko.util.FutureConverters$CompletionStageOps$;
import org.apache.pekko.util.ccompat.package$JavaConverters$;
import scala.Function$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/* compiled from: KinesisSourceStage.scala */
/* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/impl/KinesisSourceStage$$anon$1.class */
public final class KinesisSourceStage$$anon$1 extends TimerGraphStageLogic implements StageLogging, OutHandler {
    private String currentShardIterator;
    private final Queue<Record> buffer;
    private GraphStageLogic.StageActor self;
    private final Function1<Try<GetRecordsResponse>, BoxedUnit> handleGetRecords;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final /* synthetic */ KinesisSourceStage $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public void onDownstreamFinish(Throwable th) throws Exception {
        OutHandler.onDownstreamFinish$(this, th);
    }

    public Class<?> logSource() {
        return StageLogging.logSource$(this);
    }

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public void preStart() {
        this.self = getStageActor(tuple2 -> {
            this.awaitingShardIterator(tuple2);
            return BoxedUnit.UNIT;
        });
        requestShardIterator();
    }

    public void onPull() {
        ActorRef ref = this.self.ref();
        KinesisSourceStage$Pump$ kinesisSourceStage$Pump$ = KinesisSourceStage$Pump$.MODULE$;
        ref.$bang(kinesisSourceStage$Pump$, ref.$bang$default$2(kinesisSourceStage$Pump$));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void awaitingShardIterator(Tuple2<ActorRef, Object> tuple2) {
        if (tuple2 != null) {
            Object _2 = tuple2._2();
            if (_2 instanceof KinesisSourceStage.GetShardIteratorSuccess) {
                this.currentShardIterator = ((KinesisSourceStage.GetShardIteratorSuccess) _2).result().shardIterator();
                this.self.become(tuple22 -> {
                    this.awaitingRecords(tuple22);
                    return BoxedUnit.UNIT;
                });
                requestRecords();
                return;
            }
        }
        if (tuple2 != null) {
            Object _22 = tuple2._2();
            if (_22 instanceof KinesisSourceStage.GetShardIteratorFailure) {
                Throwable ex = ((KinesisSourceStage.GetShardIteratorFailure) _22).ex();
                KinesisErrors.GetShardIteratorError getShardIteratorError = new KinesisErrors.GetShardIteratorError(this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.shardId(), ex);
                StageLogging.log$(this).error(ex, getShardIteratorError.getMessage());
                failStage(getShardIteratorError);
                return;
            }
        }
        if (tuple2 != null) {
            if (KinesisSourceStage$Pump$.MODULE$.equals(tuple2._2())) {
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        throw new IllegalArgumentException(new StringBuilder(36).append("unexpected message ").append(tuple2._2()).append(" in state `ready`").toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void awaitingRecords(Tuple2<ActorRef, Object> tuple2) {
        if (tuple2 != null) {
            Object _2 = tuple2._2();
            if (_2 instanceof KinesisSourceStage.GetRecordsSuccess) {
                GetRecordsResponse records = ((KinesisSourceStage.GetRecordsSuccess) _2).records();
                Buffer asScala = package$JavaConverters$.MODULE$.ListHasAsScala(records.records()).asScala();
                if (records.nextShardIterator() == null) {
                    StageLogging.log$(this).info("Shard {} returned a null iterator and will now complete.", this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.shardId());
                    completeStage();
                } else {
                    this.currentShardIterator = records.nextShardIterator();
                }
                if (!asScala.nonEmpty()) {
                    scheduleOnce(KinesisSourceStage$GetRecords$.MODULE$, this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.refreshInterval());
                    return;
                }
                asScala.foreach(record -> {
                    return this.buffer.enqueue(record);
                });
                this.self.become(tuple22 -> {
                    this.ready(tuple22);
                    return BoxedUnit.UNIT;
                });
                ActorRef ref = this.self.ref();
                KinesisSourceStage$Pump$ kinesisSourceStage$Pump$ = KinesisSourceStage$Pump$.MODULE$;
                ref.$bang(kinesisSourceStage$Pump$, ref.$bang$default$2(kinesisSourceStage$Pump$));
                return;
            }
        }
        if (tuple2 != null) {
            Object _22 = tuple2._2();
            if (_22 instanceof KinesisSourceStage.GetRecordsFailure) {
                Throwable ex = ((KinesisSourceStage.GetRecordsFailure) _22).ex();
                KinesisErrors.GetRecordsError getRecordsError = new KinesisErrors.GetRecordsError(this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.shardId(), ex);
                StageLogging.log$(this).error(ex, getRecordsError.getMessage());
                failStage(getRecordsError);
                return;
            }
        }
        if (tuple2 != null) {
            if (KinesisSourceStage$Pump$.MODULE$.equals(tuple2._2())) {
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        throw new IllegalArgumentException(new StringBuilder(36).append("unexpected message ").append(tuple2._2()).append(" in state `ready`").toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ready(Tuple2<ActorRef, Object> tuple2) {
        if (tuple2 != null) {
            if (KinesisSourceStage$Pump$.MODULE$.equals(tuple2._2())) {
                if (isAvailable(this.$outer.m23shape().out())) {
                    push(this.$outer.m23shape().out(), this.buffer.dequeue());
                    ActorRef ref = this.self.ref();
                    KinesisSourceStage$Pump$ kinesisSourceStage$Pump$ = KinesisSourceStage$Pump$.MODULE$;
                    ref.$bang(kinesisSourceStage$Pump$, ref.$bang$default$2(kinesisSourceStage$Pump$));
                }
                if (this.buffer.isEmpty()) {
                    this.self.become(tuple22 -> {
                        this.awaitingRecords(tuple22);
                        return BoxedUnit.UNIT;
                    });
                    requestRecords();
                    return;
                }
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        throw new IllegalArgumentException(new StringBuilder(36).append("unexpected message ").append(tuple2._2()).append(" in state `ready`").toString());
    }

    public void onTimer(Object obj) {
        if (KinesisSourceStage$GetRecords$.MODULE$.equals(obj)) {
            requestRecords();
        } else {
            StageLogging.log$(this).warning("unexpected timer [{}]", obj);
        }
    }

    private void requestRecords() {
        FutureConverters$CompletionStageOps$ futureConverters$CompletionStageOps$ = FutureConverters$CompletionStageOps$.MODULE$;
        FutureConverters$ futureConverters$ = FutureConverters$.MODULE$;
        scala.jdk.javaapi.FutureConverters$.MODULE$.asScala(((KinesisAsyncClient) this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$amazonKinesisAsync.apply()).getRecords((GetRecordsRequest) GetRecordsRequest.builder().limit(Predef$.MODULE$.int2Integer(this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.limit())).shardIterator(this.currentShardIterator).build())).onComplete(this.handleGetRecords, ExecutionContexts$.MODULE$.parasitic());
    }

    private void requestShardIterator() {
        GetShardIteratorRequest getShardIteratorRequest = (GetShardIteratorRequest) ((SdkBuilder) Function$.MODULE$.chain(new $colon.colon(builder -> {
            return (GetShardIteratorRequest.Builder) this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.startingSequenceNumber().fold(() -> {
                return builder;
            }, str -> {
                return builder.startingSequenceNumber(str);
            });
        }, new $colon.colon(builder2 -> {
            return (GetShardIteratorRequest.Builder) this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.atTimestamp().fold(() -> {
                return builder2;
            }, instant -> {
                return builder2.timestamp(instant);
            });
        }, Nil$.MODULE$))).apply(GetShardIteratorRequest.builder().streamName(this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.streamName()).shardId(this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.shardId()).shardIteratorType(this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$shardSettings.shardIteratorType()))).build();
        Function1 function1 = r4 -> {
            $anonfun$requestShardIterator$7(this, r4);
            return BoxedUnit.UNIT;
        };
        FutureConverters$CompletionStageOps$ futureConverters$CompletionStageOps$ = FutureConverters$CompletionStageOps$.MODULE$;
        FutureConverters$ futureConverters$ = FutureConverters$.MODULE$;
        scala.jdk.javaapi.FutureConverters$.MODULE$.asScala(((KinesisAsyncClient) this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$amazonKinesisAsync.apply()).getShardIterator(getShardIteratorRequest)).onComplete(function1, ExecutionContexts$.MODULE$.parasitic());
    }

    public static final /* synthetic */ void $anonfun$handleGetRecords$1(KinesisSourceStage$$anon$1 kinesisSourceStage$$anon$1, Try r5) {
        if (r5 instanceof Failure) {
            Throwable exception = ((Failure) r5).exception();
            ActorRef ref = kinesisSourceStage$$anon$1.self.ref();
            KinesisSourceStage.GetRecordsFailure getRecordsFailure = new KinesisSourceStage.GetRecordsFailure(exception);
            ref.$bang(getRecordsFailure, ref.$bang$default$2(getRecordsFailure));
            return;
        }
        if (!(r5 instanceof Success)) {
            throw new MatchError(r5);
        }
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse) ((Success) r5).value();
        ActorRef ref2 = kinesisSourceStage$$anon$1.self.ref();
        KinesisSourceStage.GetRecordsSuccess getRecordsSuccess = new KinesisSourceStage.GetRecordsSuccess(getRecordsResponse);
        ref2.$bang(getRecordsSuccess, ref2.$bang$default$2(getRecordsSuccess));
    }

    public static final /* synthetic */ void $anonfun$requestShardIterator$7(KinesisSourceStage$$anon$1 kinesisSourceStage$$anon$1, Try r5) {
        if (r5 instanceof Success) {
            GetShardIteratorResponse getShardIteratorResponse = (GetShardIteratorResponse) ((Success) r5).value();
            ActorRef ref = kinesisSourceStage$$anon$1.self.ref();
            KinesisSourceStage.GetShardIteratorSuccess getShardIteratorSuccess = new KinesisSourceStage.GetShardIteratorSuccess(getShardIteratorResponse);
            ref.$bang(getShardIteratorSuccess, ref.$bang$default$2(getShardIteratorSuccess));
            return;
        }
        if (!(r5 instanceof Failure)) {
            throw new MatchError(r5);
        }
        Throwable exception = ((Failure) r5).exception();
        ActorRef ref2 = kinesisSourceStage$$anon$1.self.ref();
        KinesisSourceStage.GetShardIteratorFailure getShardIteratorFailure = new KinesisSourceStage.GetShardIteratorFailure(exception);
        ref2.$bang(getShardIteratorFailure, ref2.$bang$default$2(getShardIteratorFailure));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KinesisSourceStage$$anon$1(KinesisSourceStage kinesisSourceStage) {
        super(kinesisSourceStage.m23shape());
        if (kinesisSourceStage == null) {
            throw null;
        }
        this.$outer = kinesisSourceStage;
        StageLogging.$init$(this);
        OutHandler.$init$(this);
        setHandler(kinesisSourceStage.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSourceStage$$out(), this);
        this.buffer = Queue$.MODULE$.empty();
        this.handleGetRecords = r4 -> {
            $anonfun$handleGetRecords$1(this, r4);
            return BoxedUnit.UNIT;
        };
    }
}
