package org.apache.pekko.stream.connectors.kinesis.impl;

import java.io.Serializable;
import java.util.concurrent.Semaphore;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.ActorAttributes$;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.Outlet$;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.connectors.kinesis.CommittableRecord;
import org.apache.pekko.stream.connectors.kinesis.KinesisSchedulerErrors$SchedulerUnexpectedShutdown$;
import org.apache.pekko.stream.connectors.kinesis.KinesisSchedulerSourceSettings;
import org.apache.pekko.stream.connectors.kinesis.impl.KinesisSchedulerSourceStage;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Product;
import scala.Some$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

/* compiled from: KinesisSchedulerSourceStage.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.class */
public class KinesisSchedulerSourceStage extends GraphStageWithMaterializedValue<SourceShape<CommittableRecord>, Future<Scheduler>> {
    public final KinesisSchedulerSourceSettings org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$settings;
    public final Function1<ShardRecordProcessorFactory, Scheduler> org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$schedulerBuilder;
    public final ExecutionContext org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$ec;
    public final Outlet<CommittableRecord> org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$out = Outlet$.MODULE$.apply("Records");

    /* compiled from: KinesisSchedulerSourceStage.scala */
    /* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage$Command.class */
    public interface Command {
    }

    /* compiled from: KinesisSchedulerSourceStage.scala */
    /* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage$Logic.class */
    public final class Logic extends GraphStageLogic implements StageLogging, OutHandler {
        private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
        private final Promise<Scheduler> matValue;
        private final Semaphore backpressureSemaphore;
        private final Queue<CommittableRecord> buffer;
        private Option<Scheduler> schedulerOpt;
        private final AsyncCallback<Command> callback;
        private final /* synthetic */ KinesisSchedulerSourceStage $outer;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public Logic(KinesisSchedulerSourceStage kinesisSchedulerSourceStage, Promise<Scheduler> promise) {
            super(kinesisSchedulerSourceStage.m25shape());
            this.matValue = promise;
            if (kinesisSchedulerSourceStage == null) {
                throw new NullPointerException();
            }
            this.$outer = kinesisSchedulerSourceStage;
            StageLogging.$init$(this);
            setHandler(kinesisSchedulerSourceStage.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$out, this);
            this.backpressureSemaphore = new Semaphore(kinesisSchedulerSourceStage.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$settings.bufferSize());
            this.buffer = Queue$.MODULE$.empty();
            this.schedulerOpt = None$.MODULE$;
            this.callback = getAsyncCallback(command -> {
                awaitingRecords(command);
            });
        }

        public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
            return this.org$apache$pekko$stream$stage$StageLogging$$_log;
        }

        public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
            this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
        }

        public /* bridge */ /* synthetic */ Class logSource() {
            return StageLogging.logSource$(this);
        }

        public /* bridge */ /* synthetic */ LoggingAdapter log() {
            return StageLogging.log$(this);
        }

        public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
            OutHandler.onDownstreamFinish$(this);
        }

        public void preStart() {
            Scheduler scheduler = (Scheduler) this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$schedulerBuilder.apply(new ShardRecordProcessorFactory(this) { // from class: org.apache.pekko.stream.connectors.kinesis.impl.KinesisSchedulerSourceStage$$anon$1
                private final /* synthetic */ KinesisSchedulerSourceStage.Logic $outer;

                {
                    if (this == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = this;
                }

                public /* bridge */ /* synthetic */ ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
                    return super.shardRecordProcessor(streamIdentifier);
                }

                public ShardRecordProcessor shardRecordProcessor() {
                    return new ShardProcessor(committableRecord -> {
                        this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$Logic$$newRecordCallback(committableRecord);
                    });
                }
            });
            this.schedulerOpt = Some$.MODULE$.apply(scheduler);
            Future$.MODULE$.apply(() -> {
                return KinesisSchedulerSourceStage.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$Logic$$_$preStart$$anonfun$adapted$1(r1);
            }, this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$ec).onComplete(r5 -> {
                this.callback.invoke(KinesisSchedulerSourceStage$SchedulerShutdown$.MODULE$.apply(r5));
            }, this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$ec);
            this.matValue.success(scheduler);
        }

        public void org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$Logic$$newRecordCallback(CommittableRecord committableRecord) {
            this.backpressureSemaphore.tryAcquire(this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$settings.backpressureTimeout().length(), this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$settings.backpressureTimeout().unit());
            this.callback.invoke(KinesisSchedulerSourceStage$NewRecord$.MODULE$.apply(committableRecord));
        }

        public void onPull() {
            awaitingRecords(KinesisSchedulerSourceStage$Pump$.MODULE$);
        }

        public void onDownstreamFinish(Throwable th) {
            awaitingRecords(KinesisSchedulerSourceStage$Complete$.MODULE$);
        }

        private void awaitingRecords(Command command) {
            while (true) {
                Command command2 = command;
                if (command2 instanceof NewRecord) {
                    this.buffer.enqueue(KinesisSchedulerSourceStage$NewRecord$.MODULE$.unapply((NewRecord) command2)._1());
                    command = KinesisSchedulerSourceStage$Pump$.MODULE$;
                } else {
                    if (!KinesisSchedulerSourceStage$Pump$.MODULE$.equals(command2)) {
                        if (((command2 instanceof SchedulerShutdown) && (KinesisSchedulerSourceStage$SchedulerShutdown$.MODULE$.unapply((SchedulerShutdown) command2)._1() instanceof Success)) || KinesisSchedulerSourceStage$Complete$.MODULE$.equals(command2)) {
                            this.buffer.clear();
                            completeStage();
                            return;
                        }
                        if (command2 instanceof SchedulerShutdown) {
                            Failure _1 = KinesisSchedulerSourceStage$SchedulerShutdown$.MODULE$.unapply((SchedulerShutdown) command2)._1();
                            if (_1 instanceof Failure) {
                                Throwable exception = _1.exception();
                                this.buffer.clear();
                                failStage(KinesisSchedulerErrors$SchedulerUnexpectedShutdown$.MODULE$.apply(exception));
                                return;
                            }
                        }
                        throw new MatchError(command2);
                    }
                    if (!isAvailable(this.$outer.m25shape().out()) || !this.buffer.nonEmpty()) {
                        return;
                    }
                    push(this.$outer.m25shape().out(), this.buffer.dequeue());
                    this.backpressureSemaphore.release();
                    command = KinesisSchedulerSourceStage$Pump$.MODULE$;
                }
            }
        }

        public void postStop() {
            this.schedulerOpt.foreach(scheduler -> {
                return Future$.MODULE$.apply(() -> {
                    return KinesisSchedulerSourceStage.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$Logic$$_$postStop$$anonfun$1$$anonfun$adapted$1(r1);
                }, this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$ec);
            });
        }

        public final /* synthetic */ KinesisSchedulerSourceStage org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$Logic$$$outer() {
            return this.$outer;
        }
    }

    /* compiled from: KinesisSchedulerSourceStage.scala */
    /* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage$NewRecord.class */
    public static final class NewRecord implements Command, Product, Serializable {
        private final CommittableRecord cr;

        public static NewRecord apply(CommittableRecord committableRecord) {
            return KinesisSchedulerSourceStage$NewRecord$.MODULE$.apply(committableRecord);
        }

        public static NewRecord fromProduct(Product product) {
            return KinesisSchedulerSourceStage$NewRecord$.MODULE$.m30fromProduct(product);
        }

        public static NewRecord unapply(NewRecord newRecord) {
            return KinesisSchedulerSourceStage$NewRecord$.MODULE$.unapply(newRecord);
        }

        public NewRecord(CommittableRecord committableRecord) {
            this.cr = committableRecord;
        }

        public /* bridge */ /* synthetic */ Iterator productIterator() {
            return Product.productIterator$(this);
        }

        public /* bridge */ /* synthetic */ Iterator productElementNames() {
            return Product.productElementNames$(this);
        }

        public int hashCode() {
            return ScalaRunTime$.MODULE$._hashCode(this);
        }

        public boolean equals(Object obj) {
            boolean z;
            if (this != obj) {
                if (obj instanceof NewRecord) {
                    CommittableRecord cr = cr();
                    CommittableRecord cr2 = ((NewRecord) obj).cr();
                    z = cr != null ? cr.equals(cr2) : cr2 == null;
                } else {
                    z = false;
                }
                if (!z) {
                    return false;
                }
            }
            return true;
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof NewRecord;
        }

        public int productArity() {
            return 1;
        }

        public String productPrefix() {
            return "NewRecord";
        }

        public Object productElement(int i) {
            if (0 == i) {
                return _1();
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public String productElementName(int i) {
            if (0 == i) {
                return "cr";
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public CommittableRecord cr() {
            return this.cr;
        }

        public NewRecord copy(CommittableRecord committableRecord) {
            return new NewRecord(committableRecord);
        }

        public CommittableRecord copy$default$1() {
            return cr();
        }

        public CommittableRecord _1() {
            return cr();
        }
    }

    /* compiled from: KinesisSchedulerSourceStage.scala */
    /* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage$SchedulerShutdown.class */
    public static final class SchedulerShutdown implements Command, Product, Serializable {
        private final Try result;

        public static SchedulerShutdown apply(Try<?> r3) {
            return KinesisSchedulerSourceStage$SchedulerShutdown$.MODULE$.apply(r3);
        }

        public static SchedulerShutdown fromProduct(Product product) {
            return KinesisSchedulerSourceStage$SchedulerShutdown$.MODULE$.m34fromProduct(product);
        }

        public static SchedulerShutdown unapply(SchedulerShutdown schedulerShutdown) {
            return KinesisSchedulerSourceStage$SchedulerShutdown$.MODULE$.unapply(schedulerShutdown);
        }

        public SchedulerShutdown(Try<?> r4) {
            this.result = r4;
        }

        public /* bridge */ /* synthetic */ Iterator productIterator() {
            return Product.productIterator$(this);
        }

        public /* bridge */ /* synthetic */ Iterator productElementNames() {
            return Product.productElementNames$(this);
        }

        public int hashCode() {
            return ScalaRunTime$.MODULE$._hashCode(this);
        }

        public boolean equals(Object obj) {
            boolean z;
            if (this != obj) {
                if (obj instanceof SchedulerShutdown) {
                    Try<?> result = result();
                    Try<?> result2 = ((SchedulerShutdown) obj).result();
                    z = result != null ? result.equals(result2) : result2 == null;
                } else {
                    z = false;
                }
                if (!z) {
                    return false;
                }
            }
            return true;
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof SchedulerShutdown;
        }

        public int productArity() {
            return 1;
        }

        public String productPrefix() {
            return "SchedulerShutdown";
        }

        public Object productElement(int i) {
            if (0 == i) {
                return _1();
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public String productElementName(int i) {
            if (0 == i) {
                return "result";
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public Try<?> result() {
            return this.result;
        }

        public SchedulerShutdown copy(Try<?> r5) {
            return new SchedulerShutdown(r5);
        }

        public Try<?> copy$default$1() {
            return result();
        }

        public Try<?> _1() {
            return result();
        }
    }

    public KinesisSchedulerSourceStage(KinesisSchedulerSourceSettings kinesisSchedulerSourceSettings, Function1<ShardRecordProcessorFactory, Scheduler> function1, ExecutionContext executionContext) {
        this.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$settings = kinesisSchedulerSourceSettings;
        this.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$schedulerBuilder = function1;
        this.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$ec = executionContext;
    }

    /* renamed from: shape, reason: merged with bridge method [inline-methods] */
    public SourceShape<CommittableRecord> m25shape() {
        return new SourceShape<>(this.org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$$out);
    }

    public Attributes initialAttributes() {
        return super.initialAttributes().and(ActorAttributes$.MODULE$.IODispatcher());
    }

    public Tuple2<GraphStageLogic, Future<Scheduler>> createLogicAndMaterializedValue(Attributes attributes) {
        Promise apply = Promise$.MODULE$.apply();
        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((Logic) Predef$.MODULE$.ArrowAssoc(new Logic(this, apply)), apply.future());
    }

    public static /* bridge */ /* synthetic */ Object org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$Logic$$_$preStart$$anonfun$adapted$1(Scheduler scheduler) {
        scheduler.run();
        return BoxedUnit.UNIT;
    }

    private static final void postStop$$anonfun$1$$anonfun$1(Scheduler scheduler) {
        if (scheduler.shutdownComplete()) {
            return;
        }
        scheduler.shutdown();
    }

    public static /* bridge */ /* synthetic */ Object org$apache$pekko$stream$connectors$kinesis$impl$KinesisSchedulerSourceStage$Logic$$_$postStop$$anonfun$1$$anonfun$adapted$1(Scheduler scheduler) {
        postStop$$anonfun$1$$anonfun$1(scheduler);
        return BoxedUnit.UNIT;
    }
}
