package akka.stream.alpakka.kinesis;

import akka.event.LoggingAdapter;
import akka.stream.alpakka.kinesis.KinesisErrors;
import akka.stream.alpakka.kinesis.KinesisFlowSettings;
import akka.stream.alpakka.kinesis.KinesisFlowStage;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import akka.stream.stage.TimerGraphStageLogic;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashMap$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: KinesisFlowStage.scala */
/* loaded from: input_file:akka/stream/alpakka/kinesis/KinesisFlowStage$$anon$1.class */
public final class KinesisFlowStage$$anon$1 extends TimerGraphStageLogic implements StageLogging, InHandler, OutHandler {
    private final long retryBaseInMillis;
    private Option<Try<BoxedUnit>> completionState;
    private final Queue<KinesisFlowStage.Job> pendingRequests;
    private AsyncCallback<KinesisFlowStage.Result> resultCallback;
    private int inFlight;
    private final HashMap<Object, KinesisFlowStage.Job> waitingRetries;
    private int retryToken;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final /* synthetic */ KinesisFlowStage $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public Class<?> logSource() {
        return StageLogging.logSource$(this);
    }

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    private long retryBaseInMillis() {
        return this.retryBaseInMillis;
    }

    private Option<Try<BoxedUnit>> completionState() {
        return this.completionState;
    }

    private void completionState_$eq(Option<Try<BoxedUnit>> option) {
        this.completionState = option;
    }

    private Queue<KinesisFlowStage.Job> pendingRequests() {
        return this.pendingRequests;
    }

    private AsyncCallback<KinesisFlowStage.Result> resultCallback() {
        return this.resultCallback;
    }

    private void resultCallback_$eq(AsyncCallback<KinesisFlowStage.Result> asyncCallback) {
        this.resultCallback = asyncCallback;
    }

    private int inFlight() {
        return this.inFlight;
    }

    private void inFlight_$eq(int i) {
        this.inFlight = i;
    }

    private HashMap<Object, KinesisFlowStage.Job> waitingRetries() {
        return this.waitingRetries;
    }

    private int retryToken() {
        return this.retryToken;
    }

    private void retryToken_$eq(int i) {
        this.retryToken = i;
    }

    private void tryToExecute() {
        if (pendingRequests().nonEmpty() && isAvailable(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$out())) {
            log().debug("Executing PutRecords call");
            inFlight_$eq(inFlight() + 1);
            KinesisFlowStage.Job job = (KinesisFlowStage.Job) pendingRequests().dequeue();
            push(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$out(), KinesisFlowStage$.MODULE$.akka$stream$alpakka$kinesis$KinesisFlowStage$$putRecords(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$streamName, job.records(), seq -> {
                $anonfun$tryToExecute$1(this, job, seq);
                return BoxedUnit.UNIT;
            }, this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$kinesisClient));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleResult(KinesisFlowStage.Result result) {
        FiniteDuration $times;
        if (result != null) {
            if (Nil$.MODULE$.equals(result.recordsToRetry())) {
                log().debug("PutRecords call finished successfully");
                inFlight_$eq(inFlight() - 1);
                tryToExecute();
                if (!hasBeenPulled(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$in())) {
                    tryPull(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$in());
                }
                checkForCompletion();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (result != null) {
            int attempt = result.attempt();
            Seq<Tuple2<PutRecordsResultEntry, PutRecordsRequestEntry>> recordsToRetry = result.recordsToRetry();
            if (attempt > this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$maxRetries) {
                log().debug("PutRecords call finished with partial errors after {} attempts", BoxesRunTime.boxToInteger(attempt));
                failStage(new KinesisErrors.ErrorPublishingRecords(attempt, (Seq) recordsToRetry.map(tuple2 -> {
                    return (PutRecordsResultEntry) tuple2._1();
                }, Seq$.MODULE$.canBuildFrom())));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (result == null) {
            throw new MatchError(result);
        }
        int attempt2 = result.attempt();
        Seq<Tuple2<PutRecordsResultEntry, PutRecordsRequestEntry>> recordsToRetry2 = result.recordsToRetry();
        log().debug("PutRecords call finished with partial errors; scheduling retry");
        inFlight_$eq(inFlight() - 1);
        waitingRetries().put(BoxesRunTime.boxToInteger(retryToken()), new KinesisFlowStage.Job(attempt2 + 1, (Seq) recordsToRetry2.map(tuple22 -> {
            return (PutRecordsRequestEntry) tuple22._2();
        }, Seq$.MODULE$.canBuildFrom())));
        Integer boxToInteger = BoxesRunTime.boxToInteger(retryToken());
        KinesisFlowSettings.RetryBackoffStrategy retryBackoffStrategy = this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$backoffStrategy;
        if (KinesisFlowSettings$Exponential$.MODULE$.equals(retryBackoffStrategy)) {
            $times = new package.DurationInt(package$.MODULE$.DurationInt((int) scala.math.package$.MODULE$.pow(retryBaseInMillis(), attempt2))).millis();
        } else {
            if (!KinesisFlowSettings$Lineal$.MODULE$.equals(retryBackoffStrategy)) {
                throw new MatchError(retryBackoffStrategy);
            }
            $times = this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$retryInitialTimeout.$times(attempt2);
        }
        scheduleOnce(boxToInteger, $times);
        retryToken_$eq(retryToken() + 1);
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    private void checkForCompletion() {
        if (inFlight() == 0 && pendingRequests().isEmpty() && waitingRetries().isEmpty() && isClosed(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$in())) {
            boolean z = false;
            Some some = null;
            Option<Try<BoxedUnit>> completionState = completionState();
            if (completionState instanceof Some) {
                z = true;
                some = (Some) completionState;
                if (((Try) some.value()) instanceof Success) {
                    completeStage();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
            if (z) {
                Failure failure = (Try) some.value();
                if (failure instanceof Failure) {
                    failStage(failure.exception());
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
            }
            if (!None$.MODULE$.equals(completionState)) {
                throw new MatchError(completionState);
            }
            failStage(new IllegalStateException("Stage completed, but there is no info about status"));
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public void onTimer(Object obj) {
        waitingRetries().remove(BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(obj))).foreach(job -> {
            $anonfun$onTimer$1(this, job);
            return BoxedUnit.UNIT;
        });
    }

    public void preStart() {
        completionState_$eq(None$.MODULE$);
        inFlight_$eq(0);
        retryToken_$eq(0);
        resultCallback_$eq(getAsyncCallback(result -> {
            this.handleResult(result);
            return BoxedUnit.UNIT;
        }));
        pull(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$in());
    }

    public void postStop() {
        pendingRequests().clear();
        waitingRetries().clear();
    }

    public void onUpstreamFinish() {
        completionState_$eq(new Some(new Success(BoxedUnit.UNIT)));
        checkForCompletion();
    }

    public void onUpstreamFailure(Throwable th) {
        completionState_$eq(new Some(new Failure(th)));
        checkForCompletion();
    }

    public void onPull() {
        tryToExecute();
        if (!waitingRetries().isEmpty() || hasBeenPulled(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$in())) {
            return;
        }
        tryPull(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$in());
    }

    public void onPush() {
        log().debug("New PutRecords request available");
        pendingRequests().enqueue(Predef$.MODULE$.wrapRefArray(new KinesisFlowStage.Job[]{new KinesisFlowStage.Job(1, (Seq) grab(this.$outer.akka$stream$alpakka$kinesis$KinesisFlowStage$$in()))}));
        tryToExecute();
    }

    public static final /* synthetic */ void $anonfun$tryToExecute$1(KinesisFlowStage$$anon$1 kinesisFlowStage$$anon$1, KinesisFlowStage.Job job, Seq seq) {
        kinesisFlowStage$$anon$1.resultCallback().invoke(new KinesisFlowStage.Result(job.attempt(), seq));
    }

    public static final /* synthetic */ void $anonfun$onTimer$1(KinesisFlowStage$$anon$1 kinesisFlowStage$$anon$1, KinesisFlowStage.Job job) {
        kinesisFlowStage$$anon$1.log().debug("New PutRecords retry attempt available");
        kinesisFlowStage$$anon$1.pendingRequests().enqueue(Predef$.MODULE$.wrapRefArray(new KinesisFlowStage.Job[]{job}));
        kinesisFlowStage$$anon$1.tryToExecute();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KinesisFlowStage$$anon$1(KinesisFlowStage kinesisFlowStage) {
        super(kinesisFlowStage.m9shape());
        if (kinesisFlowStage == null) {
            throw null;
        }
        this.$outer = kinesisFlowStage;
        StageLogging.$init$(this);
        InHandler.$init$(this);
        OutHandler.$init$(this);
        this.retryBaseInMillis = kinesisFlowStage.akka$stream$alpakka$kinesis$KinesisFlowStage$$retryInitialTimeout.toMillis();
        this.pendingRequests = Queue$.MODULE$.empty();
        this.waitingRetries = HashMap$.MODULE$.empty();
        setHandlers(kinesisFlowStage.akka$stream$alpakka$kinesis$KinesisFlowStage$$in(), kinesisFlowStage.akka$stream$alpakka$kinesis$KinesisFlowStage$$out(), this);
    }
}
