package akka.stream.alpakka.kinesis.impl;

import akka.actor.ActorRef;
import akka.actor.ScalaActorRef;
import akka.actor.package$;
import akka.event.LoggingAdapter;
import akka.stream.alpakka.kinesis.KinesisErrors;
import akka.stream.alpakka.kinesis.impl.KinesisSourceStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import akka.stream.stage.TimerGraphStageLogic;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.Record;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.Date;
import scala.Function$;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Symbol;
import scala.Symbol$;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Seq$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.SymbolLiteral;

/* compiled from: KinesisSourceStage.scala */
/* loaded from: input_file:akka/stream/alpakka/kinesis/impl/KinesisSourceStage$$anon$1.class */
public final class KinesisSourceStage$$anon$1 extends TimerGraphStageLogic implements StageLogging {
    private String currentShardIterator;
    private final Queue<Record> buffer;
    public GraphStageLogic.StageActor akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self;
    private final AsyncHandler<GetRecordsRequest, GetRecordsResult> handleGetRecords;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final /* synthetic */ KinesisSourceStage $outer;

    public Class<?> logSource() {
        return StageLogging.logSource$(this);
    }

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public void preStart() {
        this.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self = getStageActor(tuple2 -> {
            this.awaitingShardIterator(tuple2);
            return BoxedUnit.UNIT;
        });
        requestShardIterator();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void awaitingShardIterator(Tuple2<ActorRef, Object> tuple2) {
        if (tuple2 != null) {
            Object _2 = tuple2._2();
            if (_2 instanceof KinesisSourceStage.GetShardIteratorSuccess) {
                this.currentShardIterator = ((KinesisSourceStage.GetShardIteratorSuccess) _2).result().getShardIterator();
                this.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.become(tuple22 -> {
                    this.awaitingRecords(tuple22);
                    return BoxedUnit.UNIT;
                });
                requestRecords();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            Object _22 = tuple2._2();
            if (_22 instanceof KinesisSourceStage.GetShardIteratorFailure) {
                Exception ex = ((KinesisSourceStage.GetShardIteratorFailure) _22).ex();
                log().error(ex, "Failed to get a shard iterator for shard {}", this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.shardId());
                failStage(new KinesisErrors.GetShardIteratorError(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.shardId(), ex));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            if (KinesisSourceStage$Pump$.MODULE$.equals(tuple2._2())) {
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        throw new IllegalArgumentException(new StringBuilder(36).append("unexpected message ").append(tuple2._2()).append(" in state `ready`").toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void awaitingRecords(Tuple2<ActorRef, Object> tuple2) {
        BoxedUnit boxedUnit;
        if (tuple2 != null) {
            Object _2 = tuple2._2();
            if (_2 instanceof KinesisSourceStage.GetRecordsSuccess) {
                GetRecordsResult records = ((KinesisSourceStage.GetRecordsSuccess) _2).records();
                Buffer buffer = (Buffer) JavaConverters$.MODULE$.asScalaBufferConverter(records.getRecords()).asScala();
                if (records.getNextShardIterator() == null) {
                    log().info("Shard {} returned a null iterator and will now complete.", this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.shardId());
                    completeStage();
                } else {
                    this.currentShardIterator = records.getNextShardIterator();
                }
                if (buffer.nonEmpty()) {
                    buffer.foreach(record -> {
                        return this.buffer.enqueue(record);
                    });
                    this.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.become(tuple22 -> {
                        this.ready(tuple22);
                        return BoxedUnit.UNIT;
                    });
                    ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.ref());
                    KinesisSourceStage$Pump$ kinesisSourceStage$Pump$ = KinesisSourceStage$Pump$.MODULE$;
                    actorRef2Scala.$bang(kinesisSourceStage$Pump$, actorRef2Scala.$bang$default$2(kinesisSourceStage$Pump$));
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    scheduleOnce((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "GET_RECORDS").dynamicInvoker().invoke() /* invoke-custom */, this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.refreshInterval());
                    boxedUnit = BoxedUnit.UNIT;
                }
                return;
            }
        }
        if (tuple2 != null) {
            Object _22 = tuple2._2();
            if (_22 instanceof KinesisSourceStage.GetRecordsFailure) {
                Exception ex = ((KinesisSourceStage.GetRecordsFailure) _22).ex();
                log().error(ex, "Failed to fetch records from Kinesis for shard {}", this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.shardId());
                failStage(new KinesisErrors.GetRecordsError(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.shardId(), ex));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            if (KinesisSourceStage$Pump$.MODULE$.equals(tuple2._2())) {
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        throw new IllegalArgumentException(new StringBuilder(36).append("unexpected message ").append(tuple2._2()).append(" in state `ready`").toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ready(Tuple2<ActorRef, Object> tuple2) {
        BoxedUnit boxedUnit;
        if (tuple2 != null) {
            if (KinesisSourceStage$Pump$.MODULE$.equals(tuple2._2())) {
                if (isAvailable(this.$outer.m12shape().out())) {
                    push(this.$outer.m12shape().out(), this.buffer.dequeue());
                    ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.ref());
                    KinesisSourceStage$Pump$ kinesisSourceStage$Pump$ = KinesisSourceStage$Pump$.MODULE$;
                    actorRef2Scala.$bang(kinesisSourceStage$Pump$, actorRef2Scala.$bang$default$2(kinesisSourceStage$Pump$));
                }
                if (this.buffer.isEmpty()) {
                    this.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.become(tuple22 -> {
                        this.awaitingRecords(tuple22);
                        return BoxedUnit.UNIT;
                    });
                    requestRecords();
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        throw new IllegalArgumentException(new StringBuilder(36).append("unexpected message ").append(tuple2._2()).append(" in state `ready`").toString());
    }

    public void onTimer(Object obj) {
        if (obj instanceof Symbol) {
            Option unapply = Symbol$.MODULE$.unapply((Symbol) obj);
            if (!unapply.isEmpty() && "GET_RECORDS".equals((String) unapply.get())) {
                requestRecords();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        throw new MatchError(obj);
    }

    private void requestRecords() {
        ((AmazonKinesisAsync) this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$amazonKinesisAsync.apply()).getRecordsAsync(new GetRecordsRequest().withLimit(Predef$.MODULE$.int2Integer(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.limit())).withShardIterator(this.currentShardIterator), this.handleGetRecords);
    }

    private void requestShardIterator() {
        ((AmazonKinesisAsync) this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$amazonKinesisAsync.apply()).getShardIteratorAsync((GetShardIteratorRequest) Function$.MODULE$.chain(Seq$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Function1[]{getShardIteratorRequest -> {
            return (GetShardIteratorRequest) this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.startingSequenceNumber().fold(() -> {
                return getShardIteratorRequest;
            }, str -> {
                return getShardIteratorRequest.withStartingSequenceNumber(str);
            });
        }, getShardIteratorRequest2 -> {
            return (GetShardIteratorRequest) this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.atTimestamp().fold(() -> {
                return getShardIteratorRequest2;
            }, instant -> {
                return getShardIteratorRequest2.withTimestamp(Date.from(instant));
            });
        }}))).apply(new GetShardIteratorRequest().withStreamName(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.streamName()).withShardId(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.shardId()).withShardIteratorType(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$shardSettings.shardIteratorType())), new AsyncHandler<GetShardIteratorRequest, GetShardIteratorResult>(this) { // from class: akka.stream.alpakka.kinesis.impl.KinesisSourceStage$$anon$1$$anon$4
            private final /* synthetic */ KinesisSourceStage$$anon$1 $outer;

            public void onSuccess(GetShardIteratorRequest getShardIteratorRequest3, GetShardIteratorResult getShardIteratorResult) {
                ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.ref());
                KinesisSourceStage.GetShardIteratorSuccess getShardIteratorSuccess = new KinesisSourceStage.GetShardIteratorSuccess(getShardIteratorResult);
                actorRef2Scala.$bang(getShardIteratorSuccess, actorRef2Scala.$bang$default$2(getShardIteratorSuccess));
            }

            public void onError(Exception exc) {
                ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.ref());
                KinesisSourceStage.GetShardIteratorFailure getShardIteratorFailure = new KinesisSourceStage.GetShardIteratorFailure(exc);
                actorRef2Scala.$bang(getShardIteratorFailure, actorRef2Scala.$bang$default$2(getShardIteratorFailure));
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KinesisSourceStage$$anon$1(KinesisSourceStage kinesisSourceStage) {
        super(kinesisSourceStage.m12shape());
        if (kinesisSourceStage == null) {
            throw null;
        }
        this.$outer = kinesisSourceStage;
        StageLogging.$init$(this);
        this.buffer = Queue$.MODULE$.empty();
        setHandler(kinesisSourceStage.m12shape().out(), new OutHandler(this) { // from class: akka.stream.alpakka.kinesis.impl.KinesisSourceStage$$anon$1$$anon$2
            private final /* synthetic */ KinesisSourceStage$$anon$1 $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onPull() {
                ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.ref());
                KinesisSourceStage$Pump$ kinesisSourceStage$Pump$ = KinesisSourceStage$Pump$.MODULE$;
                actorRef2Scala.$bang(kinesisSourceStage$Pump$, actorRef2Scala.$bang$default$2(kinesisSourceStage$Pump$));
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
        this.handleGetRecords = new AsyncHandler<GetRecordsRequest, GetRecordsResult>(this) { // from class: akka.stream.alpakka.kinesis.impl.KinesisSourceStage$$anon$1$$anon$3
            private final /* synthetic */ KinesisSourceStage$$anon$1 $outer;

            public void onSuccess(GetRecordsRequest getRecordsRequest, GetRecordsResult getRecordsResult) {
                ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.ref());
                KinesisSourceStage.GetRecordsSuccess getRecordsSuccess = new KinesisSourceStage.GetRecordsSuccess(getRecordsResult);
                actorRef2Scala.$bang(getRecordsSuccess, actorRef2Scala.$bang$default$2(getRecordsSuccess));
            }

            public void onError(Exception exc) {
                ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisSourceStage$$anon$$self.ref());
                KinesisSourceStage.GetRecordsFailure getRecordsFailure = new KinesisSourceStage.GetRecordsFailure(exc);
                actorRef2Scala.$bang(getRecordsFailure, actorRef2Scala.$bang$default$2(getRecordsFailure));
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
    }
}
