package akka.stream.alpakka.kinesis.impl;

import akka.NotUsed;
import akka.NotUsed$;
import akka.event.LoggingAdapter;
import akka.stream.alpakka.kinesis.KinesisErrors;
import akka.stream.alpakka.kinesis.KinesisFlowSettings;
import akka.stream.alpakka.kinesis.KinesisFlowSettings$Exponential$;
import akka.stream.alpakka.kinesis.KinesisFlowSettings$Linear$;
import akka.stream.alpakka.kinesis.impl.KinesisFlowStage;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import akka.stream.stage.TimerGraphStageLogic;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Seq$;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashMap$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.FiniteDuration;
import scala.math.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: KinesisFlowStage.scala */
/* loaded from: input_file:akka/stream/alpakka/kinesis/impl/KinesisFlowStage$$anon$1.class */
public final class KinesisFlowStage$$anon$1 extends TimerGraphStageLogic implements StageLogging, InHandler, OutHandler {
    private Option<Try<BoxedUnit>> completionState;
    private final Queue<KinesisFlowStage.Job<T>> pendingRequests;
    private int inFlight;
    private final AsyncCallback<NotUsed> akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$putRecordsSuccessfulCallback;
    private final AsyncCallback<KinesisFlowStage.Result<T>> akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$resendCallback;
    private final AsyncCallback<KinesisFlowStage.Result<T>> akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$failAfterResendsCallback;
    private final HashMap<Object, KinesisFlowStage.Job<T>> waitingRetries;
    private int retryToken;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final /* synthetic */ KinesisFlowStage $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public Class<?> logSource() {
        return StageLogging.logSource$(this);
    }

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    private Option<Try<BoxedUnit>> completionState() {
        return this.completionState;
    }

    private void completionState_$eq(Option<Try<BoxedUnit>> option) {
        this.completionState = option;
    }

    private Queue<KinesisFlowStage.Job<T>> pendingRequests() {
        return this.pendingRequests;
    }

    private int inFlight() {
        return this.inFlight;
    }

    private void inFlight_$eq(int i) {
        this.inFlight = i;
    }

    public AsyncCallback<NotUsed> akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$putRecordsSuccessfulCallback() {
        return this.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$putRecordsSuccessfulCallback;
    }

    public AsyncCallback<KinesisFlowStage.Result<T>> akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$resendCallback() {
        return this.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$resendCallback;
    }

    public AsyncCallback<KinesisFlowStage.Result<T>> akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$failAfterResendsCallback() {
        return this.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$failAfterResendsCallback;
    }

    private HashMap<Object, KinesisFlowStage.Job<T>> waitingRetries() {
        return this.waitingRetries;
    }

    private int retryToken() {
        return this.retryToken;
    }

    private void retryToken_$eq(int i) {
        this.retryToken = i;
    }

    private void tryToExecute() {
        if (pendingRequests().nonEmpty() && isAvailable(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$out())) {
            inFlight_$eq(inFlight() + 1);
            push(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$out(), putRecords((KinesisFlowStage.Job) pendingRequests().dequeue()));
        }
    }

    private Future<Seq<Tuple2<PutRecordsResultEntry, T>>> putRecords(final KinesisFlowStage.Job<T> job) {
        final Promise apply = Promise$.MODULE$.apply();
        this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$kinesisClient.putRecordsAsync(new PutRecordsRequest().withStreamName(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$streamName).withRecords(JavaConverters$.MODULE$.asJavaCollectionConverter((Iterable) job.records().map(tuple2 -> {
            return (PutRecordsRequestEntry) tuple2._1();
        }, Seq$.MODULE$.canBuildFrom())).asJavaCollection()), new AsyncHandler<PutRecordsRequest, PutRecordsResult>(this, job, apply) { // from class: akka.stream.alpakka.kinesis.impl.KinesisFlowStage$$anon$1$$anon$2
            private final /* synthetic */ KinesisFlowStage$$anon$1 $outer;
            private final KinesisFlowStage.Job job$1;
            private final Promise p$1;

            public void onError(Exception exc) {
                this.p$1.failure(new KinesisErrors.FailurePublishingRecords(exc));
            }

            public void onSuccess(PutRecordsRequest putRecordsRequest, PutRecordsResult putRecordsResult) {
                List list = ((TraversableOnce) ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(putRecordsResult.getRecords()).asScala()).zip(this.job$1.records(), Buffer$.MODULE$.canBuildFrom())).toList();
                if (Predef$.MODULE$.Integer2int(putRecordsResult.getFailedRecordCount()) > 0) {
                    KinesisFlowStage.Result result = new KinesisFlowStage.Result(this.job$1.attempt(), (Seq) list.filter(tuple22 -> {
                        return BoxesRunTime.boxToBoolean($anonfun$onSuccess$1(tuple22));
                    }));
                    if (this.job$1.attempt() > this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$$outer().akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$maxRetries) {
                        this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$failAfterResendsCallback().invoke(result);
                    } else {
                        this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$resendCallback().invoke(result);
                    }
                } else {
                    this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$putRecordsSuccessfulCallback().invoke(NotUsed$.MODULE$);
                }
                this.p$1.success(((List) list.filter(tuple23 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$onSuccess$2(tuple23));
                })).map(tuple24 -> {
                    if (tuple24 != null) {
                        PutRecordsResultEntry putRecordsResultEntry = (PutRecordsResultEntry) tuple24._1();
                        Tuple2 tuple24 = (Tuple2) tuple24._2();
                        if (tuple24 != null) {
                            return new Tuple2(putRecordsResultEntry, tuple24._2());
                        }
                    }
                    throw new MatchError(tuple24);
                }, List$.MODULE$.canBuildFrom()));
            }

            public static final /* synthetic */ boolean $anonfun$onSuccess$1(Tuple2 tuple22) {
                if (tuple22 != null) {
                    return ((PutRecordsResultEntry) tuple22._1()).getErrorCode() != null;
                }
                throw new MatchError(tuple22);
            }

            public static final /* synthetic */ boolean $anonfun$onSuccess$2(Tuple2 tuple22) {
                if (tuple22 != null) {
                    return ((PutRecordsResultEntry) tuple22._1()).getErrorCode() == null;
                }
                throw new MatchError(tuple22);
            }

            /* JADX WARN: Incorrect inner types in method signature: (Lakka/stream/alpakka/kinesis/impl/KinesisFlowStage<TT;>.$anon$1;)V */
            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.job$1 = job;
                this.p$1 = apply;
            }
        });
        return apply.future();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putRecordsSuccessful() {
        inFlight_$eq(inFlight() - 1);
        tryToExecute();
        if (!hasBeenPulled(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$in())) {
            tryPull(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$in());
        }
        checkForCompletion();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resend(KinesisFlowStage.Result<T> result) {
        FiniteDuration $times;
        log().debug("PutRecords call finished with partial errors; scheduling retry");
        inFlight_$eq(inFlight() - 1);
        waitingRetries().put(BoxesRunTime.boxToInteger(retryToken()), new KinesisFlowStage.Job(result.attempt() + 1, (Seq) result.recordsToRetry().map(tuple2 -> {
            if (tuple2 != null) {
                return (Tuple2) tuple2._2();
            }
            throw new MatchError(tuple2);
        }, Seq$.MODULE$.canBuildFrom())));
        Integer boxToInteger = BoxesRunTime.boxToInteger(retryToken());
        KinesisFlowSettings.RetryBackoffStrategy retryBackoffStrategy = this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$backoffStrategy;
        if (KinesisFlowSettings$Exponential$.MODULE$.equals(retryBackoffStrategy)) {
            $times = this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$retryInitialTimeout.$times((int) package$.MODULE$.pow(2.0d, result.attempt() - 1));
        } else {
            if (!KinesisFlowSettings$Linear$.MODULE$.equals(retryBackoffStrategy)) {
                throw new MatchError(retryBackoffStrategy);
            }
            $times = this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$retryInitialTimeout.$times(result.attempt());
        }
        scheduleOnce(boxToInteger, $times);
        retryToken_$eq(retryToken() + 1);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failAfterResends(KinesisFlowStage.Result<T> result) {
        log().debug("PutRecords call finished with partial errors after {} attempts", BoxesRunTime.boxToInteger(result.attempt()));
        failStage(new KinesisErrors.ErrorPublishingRecords(result.attempt(), (scala.collection.Seq) result.recordsToRetry().map(tuple2 -> {
            if (tuple2 != null) {
                PutRecordsResultEntry putRecordsResultEntry = (PutRecordsResultEntry) tuple2._1();
                Tuple2 tuple2 = (Tuple2) tuple2._2();
                if (tuple2 != null) {
                    return new Tuple2(putRecordsResultEntry, tuple2._2());
                }
            }
            throw new MatchError(tuple2);
        }, Seq$.MODULE$.canBuildFrom())));
    }

    private void checkForCompletion() {
        if (inFlight() == 0 && pendingRequests().isEmpty() && waitingRetries().isEmpty() && isClosed(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$in())) {
            boolean z = false;
            Some some = null;
            Option<Try<BoxedUnit>> completionState = completionState();
            if (completionState instanceof Some) {
                z = true;
                some = (Some) completionState;
                if (((Try) some.value()) instanceof Success) {
                    completeStage();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
            if (z) {
                Failure failure = (Try) some.value();
                if (failure instanceof Failure) {
                    failStage(failure.exception());
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
            }
            if (!None$.MODULE$.equals(completionState)) {
                throw new MatchError(completionState);
            }
            failStage(new IllegalStateException("Stage completed, but there is no info about status"));
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public void onTimer(Object obj) {
        waitingRetries().remove(BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(obj))).foreach(job -> {
            $anonfun$onTimer$1(this, job);
            return BoxedUnit.UNIT;
        });
    }

    public void postStop() {
        pendingRequests().clear();
        waitingRetries().clear();
    }

    public void onUpstreamFinish() {
        completionState_$eq(new Some(new Success(BoxedUnit.UNIT)));
        checkForCompletion();
    }

    public void onUpstreamFailure(Throwable th) {
        completionState_$eq(new Some(new Failure(th)));
        checkForCompletion();
    }

    public void onPull() {
        tryToExecute();
        if (!waitingRetries().isEmpty() || hasBeenPulled(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$in())) {
            return;
        }
        tryPull(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$in());
    }

    public void onPush() {
        pendingRequests().enqueue(Predef$.MODULE$.wrapRefArray(new KinesisFlowStage.Job[]{new KinesisFlowStage.Job(1, (Seq) grab(this.$outer.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$in()))}));
        tryToExecute();
    }

    public /* synthetic */ KinesisFlowStage akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ void $anonfun$onTimer$1(KinesisFlowStage$$anon$1 kinesisFlowStage$$anon$1, KinesisFlowStage.Job job) {
        kinesisFlowStage$$anon$1.log().debug("New PutRecords retry attempt available");
        kinesisFlowStage$$anon$1.pendingRequests().enqueue(Predef$.MODULE$.wrapRefArray(new KinesisFlowStage.Job[]{job}));
        kinesisFlowStage$$anon$1.tryToExecute();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KinesisFlowStage$$anon$1(KinesisFlowStage<T> kinesisFlowStage) {
        super(kinesisFlowStage.m8shape());
        if (kinesisFlowStage == 0) {
            throw null;
        }
        this.$outer = kinesisFlowStage;
        StageLogging.$init$(this);
        InHandler.$init$(this);
        OutHandler.$init$(this);
        this.completionState = None$.MODULE$;
        this.pendingRequests = (Queue) Queue$.MODULE$.empty();
        this.inFlight = 0;
        this.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$putRecordsSuccessfulCallback = getAsyncCallback(notUsed -> {
            this.putRecordsSuccessful();
            return BoxedUnit.UNIT;
        });
        this.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$resendCallback = getAsyncCallback(result -> {
            this.resend(result);
            return BoxedUnit.UNIT;
        });
        this.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$anon$$failAfterResendsCallback = getAsyncCallback(result2 -> {
            this.failAfterResends(result2);
            return BoxedUnit.UNIT;
        });
        this.waitingRetries = HashMap$.MODULE$.empty();
        this.retryToken = 0;
        setHandlers(kinesisFlowStage.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$in(), kinesisFlowStage.akka$stream$alpakka$kinesis$impl$KinesisFlowStage$$out(), this);
    }
}
