package akka.kafka.internal;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.event.LoggingAdapter;
import akka.kafka.ConsumerFailed;
import akka.kafka.internal.PromiseControl;
import akka.kafka.scaladsl.Consumer;
import akka.stream.SourceShape;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SubSourceLogic.scala */
/* loaded from: input_file:BOOT-INF/lib/akka-stream-kafka_2.12-1.0.5.jar:akka/kafka/internal/SubSourceStage$$anon$3.class */
public final class SubSourceStage$$anon$3 extends GraphStageLogic implements PromiseControl, MetricsControl, StageLogging {
    private final SourceShape<Msg> shape;
    private final KafkaConsumerActor$Internal$RequestMessages requestMessages;
    private boolean requested;
    private GraphStageLogic.StageActor subSourceActor;
    private Iterator<ConsumerRecord<K, V>> akka$kafka$internal$SubSourceStage$$anon$$buffer;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise;
    private final AsyncCallback<PromiseControl.ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback;
    private final /* synthetic */ SubSourceStage $outer;

    @Override // akka.stream.stage.StageLogging
    public Class<?> logSource() {
        Class<?> logSource;
        logSource = logSource();
        return logSource;
    }

    @Override // akka.stream.stage.StageLogging
    public LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // akka.kafka.scaladsl.Consumer.Control, akka.kafka.internal.MetricsControl
    public Future<Map<MetricName, Metric>> metrics() {
        Future<Map<MetricName, Metric>> metrics;
        metrics = metrics();
        return metrics;
    }

    @Override // akka.kafka.internal.PromiseControl
    public /* synthetic */ void akka$kafka$internal$PromiseControl$$super$setKeepGoing(boolean z) {
        super.setKeepGoing(z);
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performStop() {
        performStop();
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onStop() {
        boolean onStop;
        onStop = onStop();
        return onStop;
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onShutdown() {
        boolean onShutdown;
        onShutdown = onShutdown();
        return onShutdown;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> stop() {
        Future<Done> stop;
        stop = stop();
        return stop;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> shutdown() {
        Future<Done> shutdown;
        shutdown = shutdown();
        return shutdown;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> isShutdown() {
        Future<Done> isShutdown;
        isShutdown = isShutdown();
        return isShutdown;
    }

    @Override // akka.kafka.scaladsl.Consumer.Control
    public <S> Future<S> drainAndShutdown(Future<S> future, ExecutionContext executionContext) {
        Future<S> drainAndShutdown;
        drainAndShutdown = drainAndShutdown(future, executionContext);
        return drainAndShutdown;
    }

    @Override // akka.stream.stage.StageLogging
    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    @Override // akka.stream.stage.StageLogging
    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise() {
        return this.akka$kafka$internal$PromiseControl$$shutdownPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise() {
        return this.akka$kafka$internal$PromiseControl$$stopPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public AsyncCallback<PromiseControl.ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback() {
        return this.akka$kafka$internal$PromiseControl$$controlCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$shutdownPromise_$eq(Promise<Done> promise) {
        this.akka$kafka$internal$PromiseControl$$shutdownPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$stopPromise_$eq(Promise<Done> promise) {
        this.akka$kafka$internal$PromiseControl$$stopPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$controlCallback_$eq(AsyncCallback<PromiseControl.ControlOperation> asyncCallback) {
        this.akka$kafka$internal$PromiseControl$$controlCallback = asyncCallback;
    }

    @Override // akka.kafka.internal.MetricsControl
    public ExecutionContext executionContext() {
        return materializer().executionContext();
    }

    @Override // akka.kafka.internal.MetricsControl
    public Future<ActorRef> consumerFuture() {
        return Future$.MODULE$.successful(this.$outer.akka$kafka$internal$SubSourceStage$$consumerActor);
    }

    @Override // akka.kafka.internal.PromiseControl
    public SourceShape<Msg> shape() {
        return this.shape;
    }

    private KafkaConsumerActor$Internal$RequestMessages requestMessages() {
        return this.requestMessages;
    }

    private boolean requested() {
        return this.requested;
    }

    private void requested_$eq(boolean z) {
        this.requested = z;
    }

    private GraphStageLogic.StageActor subSourceActor() {
        return this.subSourceActor;
    }

    private void subSourceActor_$eq(GraphStageLogic.StageActor stageActor) {
        this.subSourceActor = stageActor;
    }

    public Iterator<ConsumerRecord<K, V>> akka$kafka$internal$SubSourceStage$$anon$$buffer() {
        return this.akka$kafka$internal$SubSourceStage$$anon$$buffer;
    }

    private void akka$kafka$internal$SubSourceStage$$anon$$buffer_$eq(Iterator<ConsumerRecord<K, V>> iterator) {
        this.akka$kafka$internal$SubSourceStage$$anon$$buffer = iterator;
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void preStart() {
        log().debug("#{} Starting SubSource for partition {}", BoxesRunTime.boxToInteger(this.$outer.akka$kafka$internal$SubSourceStage$$actorNumber), this.$outer.akka$kafka$internal$SubSourceStage$$tp);
        super.preStart();
        this.$outer.akka$kafka$internal$SubSourceStage$$subSourceStartedCb.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.$outer.akka$kafka$internal$SubSourceStage$$tp), this));
        subSourceActor_$eq(getStageActor(tuple2 -> {
            $anonfun$preStart$2(this, tuple2);
            return BoxedUnit.UNIT;
        }));
        subSourceActor().watch(this.$outer.akka$kafka$internal$SubSourceStage$$consumerActor);
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void postStop() {
        onShutdown();
        super.postStop();
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performShutdown() {
        log().debug("#{} Completing SubSource for partition {}", BoxesRunTime.boxToInteger(this.$outer.akka$kafka$internal$SubSourceStage$$actorNumber), this.$outer.akka$kafka$internal$SubSourceStage$$tp);
        completeStage();
    }

    public void akka$kafka$internal$SubSourceStage$$anon$$pump() {
        while (isAvailable(this.$outer.out())) {
            if (!akka$kafka$internal$SubSourceStage$$anon$$buffer().hasNext()) {
                if (requested()) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                requested_$eq(true);
                this.$outer.akka$kafka$internal$SubSourceStage$$consumerActor.tell(requestMessages(), subSourceActor().ref());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
            push(this.$outer.out(), this.$outer.akka$kafka$internal$SubSourceStage$$messageBuilder.createMessage((ConsumerRecord) akka$kafka$internal$SubSourceStage$$anon$$buffer().mo1929next()));
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public /* synthetic */ SubSourceStage akka$kafka$internal$SubSourceStage$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ void $anonfun$preStart$2(SubSourceStage$$anon$3 subSourceStage$$anon$3, Tuple2 tuple2) {
        if (tuple2 != null) {
            Object mo1304_2 = tuple2.mo1304_2();
            if (mo1304_2 instanceof KafkaConsumerActor$Internal$Messages) {
                KafkaConsumerActor$Internal$Messages kafkaConsumerActor$Internal$Messages = (KafkaConsumerActor$Internal$Messages) mo1304_2;
                subSourceStage$$anon$3.requested_$eq(false);
                if (subSourceStage$$anon$3.akka$kafka$internal$SubSourceStage$$anon$$buffer().hasNext()) {
                    subSourceStage$$anon$3.akka$kafka$internal$SubSourceStage$$anon$$buffer_$eq(subSourceStage$$anon$3.akka$kafka$internal$SubSourceStage$$anon$$buffer().$plus$plus(() -> {
                        return kafkaConsumerActor$Internal$Messages.messages();
                    }));
                } else {
                    subSourceStage$$anon$3.akka$kafka$internal$SubSourceStage$$anon$$buffer_$eq(kafkaConsumerActor$Internal$Messages.messages());
                }
                subSourceStage$$anon$3.akka$kafka$internal$SubSourceStage$$anon$$pump();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            Object mo1304_22 = tuple2.mo1304_2();
            if (mo1304_22 instanceof Status.Failure) {
                subSourceStage$$anon$3.failStage(((Status.Failure) mo1304_22).cause());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            Object mo1304_23 = tuple2.mo1304_2();
            if (mo1304_23 instanceof Terminated) {
                ActorRef actor = ((Terminated) mo1304_23).actor();
                ActorRef actorRef = subSourceStage$$anon$3.$outer.akka$kafka$internal$SubSourceStage$$consumerActor;
                if (actor != null ? actor.equals(actorRef) : actorRef == null) {
                    subSourceStage$$anon$3.failStage(new ConsumerFailed());
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        throw new MatchError(tuple2);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SubSourceStage$$anon$3(SubSourceStage<K, V, Msg> subSourceStage) {
        super(subSourceStage.shape());
        if (subSourceStage == 0) {
            throw null;
        }
        this.$outer = subSourceStage;
        Consumer.Control.$init$(this);
        PromiseControl.$init$((PromiseControl) this);
        MetricsControl.$init$((MetricsControl) this);
        StageLogging.$init$(this);
        this.shape = subSourceStage.shape();
        this.requestMessages = new KafkaConsumerActor$Internal$RequestMessages(0, (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{subSourceStage.akka$kafka$internal$SubSourceStage$$tp})));
        this.requested = false;
        this.akka$kafka$internal$SubSourceStage$$anon$$buffer = package$.MODULE$.Iterator().empty();
        setHandler(subSourceStage.out(), new OutHandler(this) { // from class: akka.kafka.internal.SubSourceStage$$anon$3$$anon$4
            private final /* synthetic */ SubSourceStage$$anon$3 $outer;

            @Override // akka.stream.stage.OutHandler
            public void onPull() {
                this.$outer.akka$kafka$internal$SubSourceStage$$anon$$pump();
            }

            @Override // akka.stream.stage.OutHandler
            public void onDownstreamFinish() {
                this.$outer.akka$kafka$internal$SubSourceStage$$anon$$$outer().akka$kafka$internal$SubSourceStage$$subSourceCancelledCb.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.$outer.akka$kafka$internal$SubSourceStage$$anon$$$outer().akka$kafka$internal$SubSourceStage$$tp), this.$outer.akka$kafka$internal$SubSourceStage$$anon$$buffer().hasNext() ? new Some(this.$outer.akka$kafka$internal$SubSourceStage$$anon$$buffer().mo1929next()) : None$.MODULE$));
                onDownstreamFinish();
            }

            /* JADX WARN: Incorrect inner types in method signature: (Lakka/kafka/internal/SubSourceStage<TK;TV;TMsg;>.$anon$3;)V */
            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
