package akka.kafka.internal;

import akka.actor.ActorRef;
import akka.actor.ExtendedActorSystem;
import akka.annotation.InternalApi;
import akka.kafka.AutoSubscription;
import akka.kafka.ConsumerSettings;
import akka.kafka.ManualSubscription;
import akka.kafka.Subscription;
import akka.kafka.Subscriptions;
import akka.kafka.internal.PartitionAssignmentHelpers;
import akka.kafka.scaladsl.PartitionAssignmentHandler;
import akka.stream.ActorMaterializerHelper$;
import akka.stream.SourceShape;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.PartialFunction;
import scala.Tuple2;
import scala.collection.immutable.Set;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: SingleSourceLogic.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\rd!\u0002\u000b\u0016\u0003\u0013a\u0002\"\u0003\u001c\u0001\u0005\u0003\u0005\u000b\u0011B\u001c>\u0011!q\u0004A!A!\u0002\u0013y\u0004\u0002C\"\u0001\u0005\u0003\u0005\u000b\u0011\u0002#\t\u000b\u001d\u0003A\u0011\u0001%\t\u000b5\u0003A\u0011\u000b(\t\u000fy\u0003!\u0019!C\u0005?\"1A\u000e\u0001Q\u0001\n\u0001Dq!\u001c\u0001C\u0002\u0013\u0015a\u000e\u0003\u0004s\u0001\u0001\u0006ia\u001c\u0005\u0006g\u0002!)\u0001\u001e\u0005\u0006q\u0002!)!\u001f\u0005\u0006{\u0002!)A \u0005\u0006\u007f\u0002!)%\u001f\u0005\u0007\u0003\u0003\u0001AQA=\t\u000f\u0005\r\u0001\u0001\"\u0005\u0002\u0006!1\u00111\u0003\u0001\u0005\u0012eDq!!\u0006\u0001\t#\t9\u0002C\u0004\u0002:\u0001!\t\"a\u000f\t\u000f\u0005\u0005\u0003\u0001\"\u0005\u0002D\t\t2+\u001b8hY\u0016\u001cv.\u001e:dK2{w-[2\u000b\u0005Y9\u0012\u0001C5oi\u0016\u0014h.\u00197\u000b\u0005aI\u0012!B6bM.\f'\"\u0001\u000e\u0002\t\u0005\\7.Y\u0002\u0001+\u0011iB%\r\u001b\u0014\u0005\u0001q\u0002#B\u0010!EA\u001aT\"A\u000b\n\u0005\u0005*\"!\u0006\"bg\u0016\u001c\u0016N\\4mKN{WO]2f\u0019><\u0017n\u0019\t\u0003G\u0011b\u0001\u0001B\u0003&\u0001\t\u0007aEA\u0001L#\t9S\u0006\u0005\u0002)W5\t\u0011FC\u0001+\u0003\u0015\u00198-\u00197b\u0013\ta\u0013FA\u0004O_RD\u0017N\\4\u0011\u0005!r\u0013BA\u0018*\u0005\r\te.\u001f\t\u0003GE\"QA\r\u0001C\u0002\u0019\u0012\u0011A\u0016\t\u0003GQ\"Q!\u000e\u0001C\u0002\u0019\u00121!T:h\u0003\u0015\u0019\b.\u00199f!\rA4hM\u0007\u0002s)\u0011!(G\u0001\u0007gR\u0014X-Y7\n\u0005qJ$aC*pkJ\u001cWm\u00155ba\u0016L!A\u000e\u0011\u0002\u0011M,G\u000f^5oON\u0004B\u0001Q!#a5\tq#\u0003\u0002C/\t\u00012i\u001c8tk6,'oU3ui&twm]\u0001\rgV\u00147o\u0019:jaRLwN\u001c\t\u0003\u0001\u0016K!AR\f\u0003\u0019M+(m]2sSB$\u0018n\u001c8\u0002\rqJg.\u001b;?)\u0011I%j\u0013'\u0011\u000b}\u0001!\u0005M\u001a\t\u000bY\"\u0001\u0019A\u001c\t\u000by\"\u0001\u0019A \t\u000b\r#\u0001\u0019\u0001#\u0002\u00131|wmU8ve\u000e,W#A(1\u0005Ac\u0006cA)Y7:\u0011!K\u0016\t\u0003'&j\u0011\u0001\u0016\u0006\u0003+n\ta\u0001\u0010:p_Rt\u0014BA,*\u0003\u0019\u0001&/\u001a3fM&\u0011\u0011L\u0017\u0002\u0006\u00072\f7o\u001d\u0006\u0003/&\u0002\"a\t/\u0005\u0013u+\u0011\u0011!A\u0001\u0006\u00031#aA0%c\u0005y1m\u001c8tk6,'\u000f\u0015:p[&\u001cX-F\u0001a!\r\tGMZ\u0007\u0002E*\u00111-K\u0001\u000bG>t7-\u001e:sK:$\u0018BA3c\u0005\u001d\u0001&o\\7jg\u0016\u0004\"a\u001a6\u000e\u0003!T!![\r\u0002\u000b\u0005\u001cGo\u001c:\n\u0005-D'\u0001C!di>\u0014(+\u001a4\u0002!\r|gn];nKJ\u0004&o\\7jg\u0016\u0004\u0013aC1di>\u0014h*^7cKJ,\u0012a\u001c\t\u0003QAL!!]\u0015\u0003\u0007%sG/\u0001\u0007bGR|'OT;nE\u0016\u0014\b%\u0001\bd_:\u001cX/\\3s\rV$XO]3\u0016\u0003U\u00042!\u0019<g\u0013\t9(M\u0001\u0004GkR,(/Z\u0001\u0016G>tg-[4ve\u0016\u001cVOY:de&\u0004H/[8o)\u0005Q\bC\u0001\u0015|\u0013\ta\u0018F\u0001\u0003V]&$\u0018aE2sK\u0006$XmQ8ogVlWM]!di>\u0014H#\u00014\u0002\u0011A|7\u000f^*u_B\fq\u0002]3sM>\u0014Xn\u00155vi\u0012|wO\\\u0001\u0014g\",H\u000f^5oO\u0012{wO\u001c*fG\u0016Lg/Z\u000b\u0003\u0003\u000f\u0001b\u0001KA\u0005\u0003\u001bQ\u0018bAA\u0006S\ty\u0001+\u0019:uS\u0006dg)\u001e8di&|g\u000eE\u0003)\u0003\u001f1W&C\u0002\u0002\u0012%\u0012a\u0001V;qY\u0016\u0014\u0014!E:u_B\u001cuN\\:v[\u0016\u0014\u0018i\u0019;pe\u0006A\u0002/\u0019:uSRLwN\\!tg&<g.\u001a3IC:$G.\u001a:\u0015\u0007i\fI\u0002C\u0004\u0002\u001cE\u0001\r!!\b\u0002\u0017\u0005\u001c8/[4oK\u0012$\u0006o\u001d\t\u0006#\u0006}\u00111E\u0005\u0004\u0003CQ&aA*fiB!\u0011QEA\u001b\u001b\t\t9C\u0003\u0003\u0002*\u0005-\u0012AB2p[6|gNC\u0002\u0019\u0003[QA!a\f\u00022\u00051\u0011\r]1dQ\u0016T!!a\r\u0002\u0007=\u0014x-\u0003\u0003\u00028\u0005\u001d\"A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\\\u0001\u0018a\u0006\u0014H/\u001b;j_:\u0014VM^8lK\u0012D\u0015M\u001c3mKJ$2A_A\u001f\u0011\u001d\tyD\u0005a\u0001\u0003;\t!B]3w_.,G\r\u00169t\u0003}\tG\r\u001a+p!\u0006\u0014H/\u001b;j_:\f5o]5h]6,g\u000e\u001e%b]\u0012dWM\u001d\u000b\u0005\u0003\u000b\n\t\u0006\u0005\u0003\u0002H\u00055SBAA%\u0015\r\tYeF\u0001\tg\u000e\fG.\u00193tY&!\u0011qJA%\u0005i\u0001\u0016M\u001d;ji&|g.Q:tS\u001etW.\u001a8u\u0011\u0006tG\r\\3s\u0011\u001d\t\u0019f\u0005a\u0001\u0003\u000b\nq\u0001[1oI2,'\u000fK\u0002\u0001\u0003/\u0002B!!\u0017\u0002`5\u0011\u00111\f\u0006\u0004\u0003;J\u0012AC1o]>$\u0018\r^5p]&!\u0011\u0011MA.\u0005-Ie\u000e^3s]\u0006d\u0017\t]5")
@InternalApi
/* loaded from: input_file:BOOT-INF/lib/akka-stream-kafka_2.12-1.0.5.jar:akka/kafka/internal/SingleSourceLogic.class */
public abstract class SingleSourceLogic<K, V, Msg> extends BaseSingleSourceLogic<K, V, Msg> {
    private final ConsumerSettings<K, V> settings;
    private final Subscription subscription;
    private final Promise<ActorRef> consumerPromise;
    private final int actorNumber;

    @Override // akka.kafka.internal.BaseSingleSourceLogic, akka.stream.stage.StageLogging
    public Class<?> logSource() {
        return SingleSourceLogic.class;
    }

    private Promise<ActorRef> consumerPromise() {
        return this.consumerPromise;
    }

    public final int actorNumber() {
        return this.actorNumber;
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic, akka.kafka.internal.MetricsControl
    public final Future<ActorRef> consumerFuture() {
        return consumerPromise().future();
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic
    public final void configureSubscription() {
        Subscription subscription = this.subscription;
        if (subscription instanceof Subscriptions.TopicSubscription) {
            Subscriptions.TopicSubscription topicSubscription = (Subscriptions.TopicSubscription) subscription;
            consumerActor().tell(new KafkaConsumerActor$Internal$Subscribe(topicSubscription.tps(), addToPartitionAssignmentHandler(rebalanceListener$1(topicSubscription))), sourceActor().ref());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (subscription instanceof Subscriptions.TopicSubscriptionPattern) {
            Subscriptions.TopicSubscriptionPattern topicSubscriptionPattern = (Subscriptions.TopicSubscriptionPattern) subscription;
            consumerActor().tell(new KafkaConsumerActor$Internal$SubscribePattern(topicSubscriptionPattern.pattern(), addToPartitionAssignmentHandler(rebalanceListener$1(topicSubscriptionPattern))), sourceActor().ref());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (!(subscription instanceof ManualSubscription)) {
            throw new MatchError(subscription);
        }
        configureManualSubscription((ManualSubscription) subscription);
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic
    public final ActorRef createConsumerActor() {
        ActorRef systemActorOf = ((ExtendedActorSystem) ActorMaterializerHelper$.MODULE$.downcast(materializer()).system()).systemActorOf(akka.kafka.KafkaConsumerActor$.MODULE$.props(sourceActor().ref(), this.settings), new StringBuilder(15).append("kafka-consumer-").append(actorNumber()).toString());
        consumerPromise().success(systemActorOf);
        return systemActorOf;
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic, akka.stream.stage.GraphStageLogic
    public final void postStop() {
        consumerActor().tell(KafkaConsumerActor$Internal$.MODULE$.Stop(), sourceActor().ref());
        super.postStop();
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic, akka.kafka.internal.PromiseControl
    public final void performShutdown() {
        setKeepGoing(true);
        if (!isClosed(super.shape().out())) {
            complete(super.shape().out());
        }
        sourceActor().become(shuttingDownReceive());
        stopConsumerActor();
    }

    public PartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> shuttingDownReceive() {
        return new SingleSourceLogic$$anonfun$shuttingDownReceive$1(this);
    }

    public void stopConsumerActor() {
        materializer().scheduleOnce(this.settings.stopTimeout(), new Runnable(this) { // from class: akka.kafka.internal.SingleSourceLogic$$anon$1
            private final /* synthetic */ SingleSourceLogic $outer;

            @Override // java.lang.Runnable
            public void run() {
                this.$outer.consumerActor().tell(KafkaConsumerActor$Internal$.MODULE$.Stop(), this.$outer.sourceActor().ref());
            }

            {
                if (this == 0) {
                    throw null;
                }
                this.$outer = this;
            }
        });
    }

    public void partitionAssignedHandler(Set<TopicPartition> set) {
        tps_$eq((Set) tps().$plus$plus(set));
        log().debug("Assigned partitions: {}. All partitions: {}", set, tps());
        requestMessages();
    }

    public void partitionRevokedHandler(Set<TopicPartition> set) {
        tps_$eq((Set) tps().$minus$minus(set));
        log().debug("Revoked partitions: {}. All partitions: {}", set, tps());
    }

    public PartitionAssignmentHandler addToPartitionAssignmentHandler(PartitionAssignmentHandler partitionAssignmentHandler) {
        return partitionAssignmentHandler;
    }

    private final PartitionAssignmentHandler rebalanceListener$1(AutoSubscription autoSubscription) {
        return new PartitionAssignmentHelpers.AsyncCallbacks(autoSubscription, sourceActor().ref(), getAsyncCallback(set -> {
            this.partitionAssignedHandler(set);
            return BoxedUnit.UNIT;
        }), getAsyncCallback(set2 -> {
            this.partitionRevokedHandler(set2);
            return BoxedUnit.UNIT;
        }));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SingleSourceLogic(SourceShape<Msg> sourceShape, ConsumerSettings<K, V> consumerSettings, Subscription subscription) {
        super(sourceShape);
        this.settings = consumerSettings;
        this.subscription = subscription;
        this.consumerPromise = Promise$.MODULE$.apply();
        this.actorNumber = KafkaConsumerActor$Internal$.MODULE$.nextNumber();
    }
}
