package org.apache.pekko.kafka.internal;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.pekko.kafka.RestrictedConsumer;
import org.apache.pekko.kafka.Subscription;
import org.apache.pekko.kafka.internal.PartitionAssignmentHelpers;
import org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler;
import org.apache.pekko.stream.SourceShape;
import scala.PartialFunction;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.Set;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.runtime.BoxedUnit;

/* compiled from: SingleSourceLogic.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/internal/SingleSourceLogic.class */
public abstract class SingleSourceLogic<K, V, Msg> extends BaseSingleSourceLogic<K, V, Msg> {
    private final ConsumerSettings<K, V> settings;
    private final Subscription subscription;
    private final Promise<ActorRef> consumerPromise;
    private final int actorNumber;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SingleSourceLogic(SourceShape<Msg> sourceShape, ConsumerSettings<K, V> consumerSettings, Subscription subscription) {
        super(sourceShape);
        this.settings = consumerSettings;
        this.subscription = subscription;
        this.consumerPromise = Promise$.MODULE$.apply();
        this.actorNumber = KafkaConsumerActor$Internal$.MODULE$.nextNumber();
    }

    private SourceShape<Msg> shape$accessor() {
        return super.shape();
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public Subscription subscription() {
        return this.subscription;
    }

    @Override // org.apache.pekko.kafka.internal.BaseSingleSourceLogic
    public Class<?> logSource() {
        return SingleSourceLogic.class;
    }

    public final int actorNumber() {
        return this.actorNumber;
    }

    @Override // org.apache.pekko.kafka.internal.BaseSingleSourceLogic, org.apache.pekko.kafka.internal.MetricsControl
    public final Future<ActorRef> consumerFuture() {
        return this.consumerPromise.future();
    }

    @Override // org.apache.pekko.kafka.internal.BaseSingleSourceLogic
    public final ActorRef createConsumerActor() {
        ActorRef systemActorOf = materializer().system().systemActorOf(org.apache.pekko.kafka.KafkaConsumerActor$.MODULE$.props(sourceActor().ref(), this.settings), new StringBuilder(15).append("kafka-consumer-").append(actorNumber()).toString());
        this.consumerPromise.success(systemActorOf);
        return systemActorOf;
    }

    @Override // org.apache.pekko.kafka.internal.BaseSingleSourceLogic
    public final void postStop() {
        consumerActor().tell(KafkaConsumerActor$Internal$StopFromStage$.MODULE$.apply(id()), sourceActor().ref());
        super.postStop();
    }

    @Override // org.apache.pekko.kafka.internal.BaseSingleSourceLogic, org.apache.pekko.kafka.internal.PromiseControl
    public final void performShutdown() {
        super.performShutdown();
        setKeepGoing(true);
        if (!isClosed(shape$accessor().out())) {
            complete(shape$accessor().out());
        }
        sourceActor().become(shuttingDownReceive().orElse(new SingleSourceLogic$$anon$1(this)));
        stopConsumerActor();
    }

    public PartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> shuttingDownReceive() {
        return new SingleSourceLogic$$anon$2(this);
    }

    public void stopConsumerActor() {
        materializer().scheduleOnce(this.settings.stopTimeout(), new Runnable(this) { // from class: org.apache.pekko.kafka.internal.SingleSourceLogic$$anon$3
            private final /* synthetic */ SingleSourceLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.$outer.consumerActor().tell(KafkaConsumerActor$Internal$StopFromStage$.MODULE$.apply(this.$outer.id()), this.$outer.sourceActor().ref());
            }
        });
    }

    @Override // org.apache.pekko.kafka.internal.BaseSingleSourceLogic, org.apache.pekko.kafka.internal.SourceLogicSubscription
    public PartitionAssignmentHandler addToPartitionAssignmentHandler(PartitionAssignmentHandler partitionAssignmentHandler) {
        return new PartitionAssignmentHelpers.Chain(partitionAssignmentHandler, new PartitionAssignmentHandler(this) { // from class: org.apache.pekko.kafka.internal.SingleSourceLogic$$anon$4
            private Set lastRevoked;
            private final /* synthetic */ SingleSourceLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
                this.lastRevoked = Predef$.MODULE$.Set().empty();
            }

            @Override // org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler
            public void onRevoke(Set set, RestrictedConsumer restrictedConsumer) {
                this.lastRevoked = set;
            }

            @Override // org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler
            public void onAssign(Set set, RestrictedConsumer restrictedConsumer) {
                this.$outer.filterRevokedPartitionsCB().invoke(this.lastRevoked.$minus$minus(set));
            }

            @Override // org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler
            public void onLost(Set set, RestrictedConsumer restrictedConsumer) {
                this.$outer.filterRevokedPartitionsCB().invoke(set);
            }

            @Override // org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler
            public void onStop(Set set, RestrictedConsumer restrictedConsumer) {
            }
        });
    }
}
