package org.apache.pekko.kafka.internal;

import java.io.Serializable;
import java.util.UUID;
import org.apache.kafka.common.TopicPartition;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Status;
import org.apache.pekko.actor.Status$Failure$;
import org.apache.pekko.actor.Terminated;
import org.apache.pekko.actor.Terminated$;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.kafka.AutoSubscription;
import org.apache.pekko.kafka.ConsumerFailed;
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.pekko.kafka.ManualSubscription;
import org.apache.pekko.kafka.RestrictedConsumer;
import org.apache.pekko.kafka.internal.PartitionAssignmentHelpers;
import org.apache.pekko.kafka.scaladsl.Consumer;
import org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler;
import org.apache.pekko.pattern.AskableActorRef$;
import org.apache.pekko.pattern.package$;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import org.apache.pekko.util.Timeout$;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Iterator;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.package;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: SubSourceLogic.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/internal/SubSourceLogic.class */
public class SubSourceLogic<K, V, Msg> extends TimerGraphStageLogic implements PromiseControl, MetricsControl, SourceLogicSubscription, StageIdLogging, StageLogging, InstanceId, StageIdLogging {
    private Promise org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise;
    private Promise org$apache$pekko$kafka$internal$PromiseControl$$stopPromise;
    private AsyncCallback org$apache$pekko$kafka$internal$PromiseControl$$controlCallback;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private String org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    private LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    private final SourceShape shape;
    private final ConsumerSettings<K, V> settings;
    private final AutoSubscription subscription;
    private final Option<Function1<Set<TopicPartition>, Future<Map<TopicPartition, Object>>>> getOffsetsOnAssign;
    private final Function1<Set<TopicPartition>, BoxedUnit> onRevoke;
    private final SubSourceStageLogicFactory<K, V, Msg> subSourceStageLogicFactory;
    private final Promise<ActorRef> consumerPromise;
    private final int actorNumber;
    private ActorRef consumerActor;
    private GraphStageLogic.StageActor sourceActor;
    private Set<TopicPartition> pendingPartitions;
    private Set<TopicPartition> partitionsInStartup;
    private Map subSources;
    private Set<TopicPartition> partitionsToRevoke;
    private final AsyncCallback<Set<TopicPartition>> updatePendingPartitionsAndEmitSubSourcesCb;
    public final AsyncCallback<ConsumerFailed> org$apache$pekko$kafka$internal$SubSourceLogic$$stageFailCB;
    private final AsyncCallback<Tuple2<Set<TopicPartition>, Map<TopicPartition, Object>>> onOffsetsFromExternalResponseCB;
    private final AsyncCallback<Set<TopicPartition>> partitionAssignedCB;
    private final AsyncCallback<Set<TopicPartition>> partitionRevokedCB;
    private final AsyncCallback<Tuple2<TopicPartition, SubSourceCancellationStrategy>> subsourceCancelledCB;
    private final AsyncCallback<SubSourceStageLogicControl> subsourceStartedCB;

    /* compiled from: SubSourceLogic.scala */
    @InternalApi
    /* loaded from: input_file:org/apache/pekko/kafka/internal/SubSourceLogic$ControlAndStageActor.class */
    public static final class ControlAndStageActor implements Product, Serializable {
        private final Consumer.Control control;
        private final ActorRef stageActor;

        public static ControlAndStageActor apply(Consumer.Control control, ActorRef actorRef) {
            return SubSourceLogic$ControlAndStageActor$.MODULE$.apply(control, actorRef);
        }

        public static ControlAndStageActor fromProduct(Product product) {
            return SubSourceLogic$ControlAndStageActor$.MODULE$.m205fromProduct(product);
        }

        public static ControlAndStageActor unapply(ControlAndStageActor controlAndStageActor) {
            return SubSourceLogic$ControlAndStageActor$.MODULE$.unapply(controlAndStageActor);
        }

        public ControlAndStageActor(Consumer.Control control, ActorRef actorRef) {
            this.control = control;
            this.stageActor = actorRef;
        }

        public /* bridge */ /* synthetic */ Iterator productIterator() {
            return Product.productIterator$(this);
        }

        public /* bridge */ /* synthetic */ Iterator productElementNames() {
            return Product.productElementNames$(this);
        }

        public int hashCode() {
            return ScalaRunTime$.MODULE$._hashCode(this);
        }

        public boolean equals(Object obj) {
            boolean z;
            if (this != obj) {
                if (obj instanceof ControlAndStageActor) {
                    ControlAndStageActor controlAndStageActor = (ControlAndStageActor) obj;
                    Consumer.Control control = control();
                    Consumer.Control control2 = controlAndStageActor.control();
                    if (control != null ? control.equals(control2) : control2 == null) {
                        ActorRef stageActor = stageActor();
                        ActorRef stageActor2 = controlAndStageActor.stageActor();
                        if (stageActor != null ? stageActor.equals(stageActor2) : stageActor2 == null) {
                            z = true;
                        }
                    }
                    z = false;
                } else {
                    z = false;
                }
                if (!z) {
                    return false;
                }
            }
            return true;
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof ControlAndStageActor;
        }

        public int productArity() {
            return 2;
        }

        public String productPrefix() {
            return "ControlAndStageActor";
        }

        public Object productElement(int i) {
            if (0 == i) {
                return _1();
            }
            if (1 == i) {
                return _2();
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public String productElementName(int i) {
            if (0 == i) {
                return "control";
            }
            if (1 == i) {
                return "stageActor";
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public Consumer.Control control() {
            return this.control;
        }

        public ActorRef stageActor() {
            return this.stageActor;
        }

        public ControlAndStageActor copy(Consumer.Control control, ActorRef actorRef) {
            return new ControlAndStageActor(control, actorRef);
        }

        public Consumer.Control copy$default$1() {
            return control();
        }

        public ActorRef copy$default$2() {
            return stageActor();
        }

        public Consumer.Control _1() {
            return control();
        }

        public ActorRef _2() {
            return stageActor();
        }
    }

    /* compiled from: SubSourceLogic.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/SubSourceLogic$SeekToOffsetAndReEmit.class */
    public static final class SeekToOffsetAndReEmit implements SubSourceCancellationStrategy, Product, Serializable {
        private final long offset;

        public static SeekToOffsetAndReEmit apply(long j) {
            return SubSourceLogic$SeekToOffsetAndReEmit$.MODULE$.apply(j);
        }

        public static SeekToOffsetAndReEmit fromProduct(Product product) {
            return SubSourceLogic$SeekToOffsetAndReEmit$.MODULE$.m211fromProduct(product);
        }

        public static SeekToOffsetAndReEmit unapply(SeekToOffsetAndReEmit seekToOffsetAndReEmit) {
            return SubSourceLogic$SeekToOffsetAndReEmit$.MODULE$.unapply(seekToOffsetAndReEmit);
        }

        public SeekToOffsetAndReEmit(long j) {
            this.offset = j;
        }

        public /* bridge */ /* synthetic */ Iterator productIterator() {
            return Product.productIterator$(this);
        }

        public /* bridge */ /* synthetic */ Iterator productElementNames() {
            return Product.productElementNames$(this);
        }

        public int hashCode() {
            return Statics.finalizeHash(Statics.mix(Statics.mix(-889275714, productPrefix().hashCode()), Statics.longHash(offset())), 1);
        }

        public boolean equals(Object obj) {
            if (this != obj) {
                if (!(obj instanceof SeekToOffsetAndReEmit ? offset() == ((SeekToOffsetAndReEmit) obj).offset() : false)) {
                    return false;
                }
            }
            return true;
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof SeekToOffsetAndReEmit;
        }

        public int productArity() {
            return 1;
        }

        public String productPrefix() {
            return "SeekToOffsetAndReEmit";
        }

        public Object productElement(int i) {
            if (0 == i) {
                return BoxesRunTime.boxToLong(_1());
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public String productElementName(int i) {
            if (0 == i) {
                return "offset";
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public long offset() {
            return this.offset;
        }

        public SeekToOffsetAndReEmit copy(long j) {
            return new SeekToOffsetAndReEmit(j);
        }

        public long copy$default$1() {
            return offset();
        }

        public long _1() {
            return offset();
        }
    }

    /* compiled from: SubSourceLogic.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/SubSourceLogic$SubSourceCancellationStrategy.class */
    public interface SubSourceCancellationStrategy {
    }

    /* compiled from: SubSourceLogic.scala */
    @InternalApi
    /* loaded from: input_file:org/apache/pekko/kafka/internal/SubSourceLogic$SubSourceStageLogicControl.class */
    public static final class SubSourceStageLogicControl implements Product, Serializable {
        private final TopicPartition tp;
        private final ControlAndStageActor controlAndStageActor;
        private final AsyncCallback filterRevokedPartitionsCB;

        public static SubSourceStageLogicControl apply(TopicPartition topicPartition, ControlAndStageActor controlAndStageActor, AsyncCallback<Set<TopicPartition>> asyncCallback) {
            return SubSourceLogic$SubSourceStageLogicControl$.MODULE$.apply(topicPartition, controlAndStageActor, asyncCallback);
        }

        public static SubSourceStageLogicControl fromProduct(Product product) {
            return SubSourceLogic$SubSourceStageLogicControl$.MODULE$.m213fromProduct(product);
        }

        public static SubSourceStageLogicControl unapply(SubSourceStageLogicControl subSourceStageLogicControl) {
            return SubSourceLogic$SubSourceStageLogicControl$.MODULE$.unapply(subSourceStageLogicControl);
        }

        public SubSourceStageLogicControl(TopicPartition topicPartition, ControlAndStageActor controlAndStageActor, AsyncCallback<Set<TopicPartition>> asyncCallback) {
            this.tp = topicPartition;
            this.controlAndStageActor = controlAndStageActor;
            this.filterRevokedPartitionsCB = asyncCallback;
        }

        public /* bridge */ /* synthetic */ Iterator productIterator() {
            return Product.productIterator$(this);
        }

        public /* bridge */ /* synthetic */ Iterator productElementNames() {
            return Product.productElementNames$(this);
        }

        public int hashCode() {
            return ScalaRunTime$.MODULE$._hashCode(this);
        }

        public boolean equals(Object obj) {
            boolean z;
            if (this != obj) {
                if (obj instanceof SubSourceStageLogicControl) {
                    SubSourceStageLogicControl subSourceStageLogicControl = (SubSourceStageLogicControl) obj;
                    TopicPartition tp = tp();
                    TopicPartition tp2 = subSourceStageLogicControl.tp();
                    if (tp != null ? tp.equals(tp2) : tp2 == null) {
                        ControlAndStageActor controlAndStageActor = controlAndStageActor();
                        ControlAndStageActor controlAndStageActor2 = subSourceStageLogicControl.controlAndStageActor();
                        if (controlAndStageActor != null ? controlAndStageActor.equals(controlAndStageActor2) : controlAndStageActor2 == null) {
                            AsyncCallback<Set<TopicPartition>> filterRevokedPartitionsCB = filterRevokedPartitionsCB();
                            AsyncCallback<Set<TopicPartition>> filterRevokedPartitionsCB2 = subSourceStageLogicControl.filterRevokedPartitionsCB();
                            if (filterRevokedPartitionsCB != null ? filterRevokedPartitionsCB.equals(filterRevokedPartitionsCB2) : filterRevokedPartitionsCB2 == null) {
                                z = true;
                            }
                        }
                    }
                    z = false;
                } else {
                    z = false;
                }
                if (!z) {
                    return false;
                }
            }
            return true;
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof SubSourceStageLogicControl;
        }

        public int productArity() {
            return 3;
        }

        public String productPrefix() {
            return "SubSourceStageLogicControl";
        }

        public Object productElement(int i) {
            switch (i) {
                case 0:
                    return _1();
                case 1:
                    return _2();
                case 2:
                    return _3();
                default:
                    throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
            }
        }

        public String productElementName(int i) {
            switch (i) {
                case 0:
                    return "tp";
                case 1:
                    return "controlAndStageActor";
                case 2:
                    return "filterRevokedPartitionsCB";
                default:
                    throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
            }
        }

        public TopicPartition tp() {
            return this.tp;
        }

        public ControlAndStageActor controlAndStageActor() {
            return this.controlAndStageActor;
        }

        public AsyncCallback<Set<TopicPartition>> filterRevokedPartitionsCB() {
            return this.filterRevokedPartitionsCB;
        }

        public Consumer.Control control() {
            return controlAndStageActor().control();
        }

        public ActorRef stageActor() {
            return controlAndStageActor().stageActor();
        }

        public SubSourceStageLogicControl copy(TopicPartition topicPartition, ControlAndStageActor controlAndStageActor, AsyncCallback<Set<TopicPartition>> asyncCallback) {
            return new SubSourceStageLogicControl(topicPartition, controlAndStageActor, asyncCallback);
        }

        public TopicPartition copy$default$1() {
            return tp();
        }

        public ControlAndStageActor copy$default$2() {
            return controlAndStageActor();
        }

        public AsyncCallback<Set<TopicPartition>> copy$default$3() {
            return filterRevokedPartitionsCB();
        }

        public TopicPartition _1() {
            return tp();
        }

        public ControlAndStageActor _2() {
            return controlAndStageActor();
        }

        public AsyncCallback<Set<TopicPartition>> _3() {
            return filterRevokedPartitionsCB();
        }
    }

    /* compiled from: SubSourceLogic.scala */
    @InternalApi
    /* loaded from: input_file:org/apache/pekko/kafka/internal/SubSourceLogic$SubSourceStageLogicFactory.class */
    public interface SubSourceStageLogicFactory<K, V, Msg> {
        SubSourceStageLogic<K, V, Msg> create(SourceShape<Msg> sourceShape, TopicPartition topicPartition, ActorRef actorRef, AsyncCallback<SubSourceStageLogicControl> asyncCallback, AsyncCallback<Tuple2<TopicPartition, SubSourceCancellationStrategy>> asyncCallback2, int i);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SubSourceLogic(SourceShape<Tuple2<TopicPartition, Source<Msg, NotUsed>>> sourceShape, ConsumerSettings<K, V> consumerSettings, AutoSubscription autoSubscription, Option<Function1<Set<TopicPartition>, Future<Map<TopicPartition, Object>>>> option, Function1<Set<TopicPartition>, BoxedUnit> function1, SubSourceStageLogicFactory<K, V, Msg> subSourceStageLogicFactory) {
        super(sourceShape);
        this.shape = sourceShape;
        this.settings = consumerSettings;
        this.subscription = autoSubscription;
        this.getOffsetsOnAssign = option;
        this.onRevoke = function1;
        this.subSourceStageLogicFactory = subSourceStageLogicFactory;
        PromiseControl.$init$(this);
        StageLogging.$init$(this);
        org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(StringOps$.MODULE$.take$extension(Predef$.MODULE$.augmentString(UUID.randomUUID().toString()), 5));
        StageIdLogging.$init$((StageIdLogging) this);
        this.consumerPromise = Promise$.MODULE$.apply();
        this.actorNumber = KafkaConsumerActor$Internal$.MODULE$.nextNumber();
        this.pendingPartitions = Set$.MODULE$.empty();
        this.partitionsInStartup = Set$.MODULE$.empty();
        this.subSources = Map$.MODULE$.empty();
        this.partitionsToRevoke = Predef$.MODULE$.Set().empty();
        this.updatePendingPartitionsAndEmitSubSourcesCb = getAsyncCallback(set -> {
            updatePendingPartitionsAndEmitSubSources(set);
        });
        this.org$apache$pekko$kafka$internal$SubSourceLogic$$stageFailCB = getAsyncCallback(consumerFailed -> {
            failStage(consumerFailed);
        });
        this.onOffsetsFromExternalResponseCB = getAsyncCallback(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            seekAndEmitSubSources((Set) ((Set) tuple2._1()).$minus$minus(this.partitionsToRevoke.$plus$plus(this.partitionsInStartup).$plus$plus(this.pendingPartitions)), ((Map) tuple2._2()).view().filterKeys(topicPartition -> {
                return !this.partitionsToRevoke.contains(topicPartition);
            }).toMap($less$colon$less$.MODULE$.refl()));
        });
        this.partitionAssignedCB = getAsyncCallback(set2 -> {
            Set<TopicPartition> set2 = (Set) set2.$minus$minus(this.partitionsToRevoke);
            if (log().isDebugEnabled() && set2.nonEmpty()) {
                log().debug("Assigning new partitions: {}", set2.mkString(", "));
            }
            this.partitionsToRevoke = this.partitionsToRevoke.$minus$minus(set2);
            if (None$.MODULE$.equals(option)) {
                updatePendingPartitionsAndEmitSubSources(set2);
            } else {
                if (!(option instanceof Some)) {
                    throw new MatchError(option);
                }
                Function1 function12 = (Function1) ((Some) option).value();
                ((Future) function12.apply(set2)).onComplete(r9 -> {
                    if (r9 instanceof Failure) {
                        this.org$apache$pekko$kafka$internal$SubSourceLogic$$stageFailCB.invoke(new ConsumerFailed(new StringBuilder(41).append(idLogPrefix()).append(" Failed to fetch offset for partitions: ").append(set2.mkString(", ")).append(".").toString(), ((Failure) r9).exception()));
                    } else {
                        if (!(r9 instanceof Success)) {
                            throw new MatchError(r9);
                        }
                        this.onOffsetsFromExternalResponseCB.invoke(Tuple2$.MODULE$.apply(set2, (Map) ((Success) r9).value()));
                    }
                }, materializer().executionContext());
            }
        });
        this.partitionRevokedCB = getAsyncCallback(set3 -> {
            this.partitionsToRevoke = this.partitionsToRevoke.$plus$plus(set3);
            scheduleOnce(SubSourceLogic$CloseRevokedPartitions$.MODULE$, consumerSettings.waitClosePartition());
        });
        this.subsourceCancelledCB = getAsyncCallback(tuple22 -> {
            if (tuple22 != null) {
                SubSourceCancellationStrategy subSourceCancellationStrategy = (SubSourceCancellationStrategy) tuple22._2();
                TopicPartition topicPartition = (TopicPartition) tuple22._1();
                if (subSourceCancellationStrategy != null) {
                    subSources_$eq((Map) subSources().$minus(topicPartition));
                    this.partitionsInStartup = this.partitionsInStartup.$minus(topicPartition);
                    if (subSourceCancellationStrategy instanceof SeekToOffsetAndReEmit) {
                        long _1 = SubSourceLogic$SeekToOffsetAndReEmit$.MODULE$.unapply((SeekToOffsetAndReEmit) subSourceCancellationStrategy)._1();
                        this.pendingPartitions = this.pendingPartitions.$plus(topicPartition);
                        if (log().isDebugEnabled()) {
                            log().debug("Seeking {} to {} after partition SubSource cancelled", topicPartition, BoxesRunTime.boxToLong(_1));
                        }
                        seekAndEmitSubSources(Predef$.MODULE$.Set().empty(), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(_1))})));
                        return;
                    }
                    if (SubSourceLogic$ReEmit$.MODULE$.equals(subSourceCancellationStrategy)) {
                        this.pendingPartitions = this.pendingPartitions.$plus(topicPartition);
                        org$apache$pekko$kafka$internal$SubSourceLogic$$emitSubSourcesForPendingPartitions();
                        return;
                    } else {
                        if (!SubSourceLogic$DoNothing$.MODULE$.equals(subSourceCancellationStrategy)) {
                            throw new MatchError(subSourceCancellationStrategy);
                        }
                        return;
                    }
                }
            }
            throw new MatchError(tuple22);
        });
        this.subsourceStartedCB = getAsyncCallback(subSourceStageLogicControl -> {
            if (subSourceStageLogicControl != null) {
                SubSourceStageLogicControl unapply = SubSourceLogic$SubSourceStageLogicControl$.MODULE$.unapply(subSourceStageLogicControl);
                TopicPartition _1 = unapply._1();
                ControlAndStageActor _2 = unapply._2();
                unapply._3();
                if (_2 != null) {
                    ControlAndStageActor unapply2 = SubSourceLogic$ControlAndStageActor$.MODULE$.unapply(_2);
                    Consumer.Control _12 = unapply2._1();
                    unapply2._2();
                    if (!this.partitionsInStartup.contains(_1)) {
                        _12.shutdown();
                        return;
                    }
                    subSources_$eq((Map) subSources().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(_1), subSourceStageLogicControl)));
                    this.partitionsInStartup = this.partitionsInStartup.$minus(_1);
                    return;
                }
            }
            throw new MatchError(subSourceStageLogicControl);
        });
        setHandler(sourceShape.out(), new OutHandler(this) { // from class: org.apache.pekko.kafka.internal.SubSourceLogic$$anon$1
            private final /* synthetic */ SubSourceLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onPull() {
                this.$outer.org$apache$pekko$kafka$internal$SubSourceLogic$$emitSubSourcesForPendingPartitions();
            }

            public void onDownstreamFinish(Throwable th) {
                this.$outer.performShutdown();
            }
        });
        Statics.releaseFence();
    }

    @Override // org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future drainAndShutdown(Future future, ExecutionContext executionContext) {
        Future drainAndShutdown;
        drainAndShutdown = drainAndShutdown(future, executionContext);
        return drainAndShutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public Promise org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public Promise org$apache$pekko$kafka$internal$PromiseControl$$stopPromise() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$stopPromise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public AsyncCallback org$apache$pekko$kafka$internal$PromiseControl$$controlCallback() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$controlCallback;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise_$eq(Promise promise) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise = promise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$stopPromise_$eq(Promise promise) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$stopPromise = promise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$controlCallback_$eq(AsyncCallback asyncCallback) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$controlCallback = asyncCallback;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* bridge */ /* synthetic */ boolean onStop() {
        boolean onStop;
        onStop = onStop();
        return onStop;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* bridge */ /* synthetic */ boolean onShutdown() {
        boolean onShutdown;
        onShutdown = onShutdown();
        return onShutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future stop() {
        Future stop;
        stop = stop();
        return stop;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future shutdown() {
        Future shutdown;
        shutdown = shutdown();
        return shutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future isShutdown() {
        Future isShutdown;
        isShutdown = isShutdown();
        return isShutdown;
    }

    @Override // org.apache.pekko.kafka.scaladsl.Consumer.Control, org.apache.pekko.kafka.internal.MetricsControl
    public /* bridge */ /* synthetic */ Future metrics() {
        Future metrics;
        metrics = metrics();
        return metrics;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public /* bridge */ /* synthetic */ void configureSubscription(AsyncCallback asyncCallback, AsyncCallback asyncCallback2) {
        SourceLogicSubscription.configureSubscription$(this, asyncCallback, asyncCallback2);
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public /* bridge */ /* synthetic */ void configureManualSubscription(ManualSubscription manualSubscription) {
        SourceLogicSubscription.configureManualSubscription$(this, manualSubscription);
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public String org$apache$pekko$kafka$internal$InstanceId$$instanceId() {
        return this.org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public void org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(String str) {
        this.org$apache$pekko$kafka$internal$InstanceId$$instanceId = str;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log() {
        return this.org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public void org$apache$pekko$kafka$internal$StageIdLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$kafka$internal$StageIdLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ String idLogPrefix() {
        String idLogPrefix;
        idLogPrefix = idLogPrefix();
        return idLogPrefix;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* synthetic */ void org$apache$pekko$kafka$internal$PromiseControl$$super$setKeepGoing(boolean z) {
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.setKeepGoing(z);
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* synthetic */ void org$apache$pekko$kafka$internal$PromiseControl$$super$complete(Outlet outlet) {
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.complete(outlet);
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* synthetic */ LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$super$log() {
        return StageLogging.log$(this);
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public SourceShape<Tuple2<TopicPartition, Source<Msg, NotUsed>>> shape() {
        return this.shape;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public AutoSubscription subscription() {
        return this.subscription;
    }

    public final int actorNumber() {
        return this.actorNumber;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public String id() {
        String id;
        StringBuilder sb = new StringBuilder(1);
        id = id();
        return sb.append(id).append("#").append(actorNumber()).toString();
    }

    @Override // org.apache.pekko.kafka.internal.MetricsControl
    public ExecutionContext executionContext() {
        return materializer().executionContext();
    }

    @Override // org.apache.pekko.kafka.internal.MetricsControl
    public Future<ActorRef> consumerFuture() {
        return this.consumerPromise.future();
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public ActorRef consumerActor() {
        return this.consumerActor;
    }

    public void consumerActor_$eq(ActorRef actorRef) {
        this.consumerActor = actorRef;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public GraphStageLogic.StageActor sourceActor() {
        return this.sourceActor;
    }

    public void sourceActor_$eq(GraphStageLogic.StageActor stageActor) {
        this.sourceActor = stageActor;
    }

    public Map<TopicPartition, SubSourceStageLogicControl> subSources() {
        return this.subSources;
    }

    public void subSources_$eq(Map<TopicPartition, SubSourceStageLogicControl> map) {
        this.subSources = map;
    }

    public void preStart() {
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.preStart();
        log().info("Starting");
        sourceActor_$eq(getStageActor(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Object _2 = tuple2._2();
            if (_2 instanceof Status.Failure) {
                failStage(Status$Failure$.MODULE$.unapply((Status.Failure) _2)._1());
                return;
            }
            if (_2 instanceof Terminated) {
                ActorRef _1 = Terminated$.MODULE$.unapply((Terminated) _2)._1();
                ActorRef consumerActor = consumerActor();
                if (_1 != null ? _1.equals(consumerActor) : consumerActor == null) {
                    failStage(new ConsumerFailed());
                    return;
                }
            }
            log().warning("ignoring message [{}]", _2);
        }));
        consumerActor_$eq(materializer().system().systemActorOf(org.apache.pekko.kafka.KafkaConsumerActor$.MODULE$.props(sourceActor().ref(), this.settings), new StringBuilder(15).append("kafka-consumer-").append(actorNumber()).toString()));
        this.consumerPromise.success(consumerActor());
        sourceActor().watch(consumerActor());
        configureSubscription(this.partitionAssignedCB, this.partitionRevokedCB);
    }

    private void seekAndEmitSubSources(Set<TopicPartition> set, Map<TopicPartition, Object> map) {
        ExecutionContextExecutor executionContext = materializer().executionContext();
        AskableActorRef$.MODULE$.ask$extension(package$.MODULE$.ask(consumerActor()), KafkaConsumerActor$Internal$Seek$.MODULE$.apply(map), Timeout$.MODULE$.apply(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(10)).seconds()), sourceActor().ref()).map(obj -> {
            this.updatePendingPartitionsAndEmitSubSourcesCb.invoke(set);
        }, executionContext).recover(new SubSourceLogic$$anon$2(map, this), executionContext);
    }

    public void onTimer(Object obj) {
        if (!SubSourceLogic$CloseRevokedPartitions$.MODULE$.equals(obj)) {
            log().warning("unexpected timer [{}]", obj);
            return;
        }
        if (log().isDebugEnabled()) {
            log().debug("Closing SubSources for revoked partitions: {}", this.partitionsToRevoke.mkString(", "));
        }
        this.onRevoke.apply(this.partitionsToRevoke);
        this.pendingPartitions = this.pendingPartitions.$minus$minus(this.partitionsToRevoke);
        this.partitionsInStartup = this.partitionsInStartup.$minus$minus(this.partitionsToRevoke);
        Set<TopicPartition> set = this.partitionsToRevoke;
        Map<TopicPartition, SubSourceStageLogicControl> subSources = subSources();
        ((IterableOnceOps) ((IterableOps) set.flatMap(topicPartition -> {
            return subSources.get(topicPartition);
        })).map(subSourceStageLogicControl -> {
            return subSourceStageLogicControl.control();
        })).foreach(control -> {
            return control.shutdown();
        });
        subSources_$eq((Map) subSources().$minus$minus(this.partitionsToRevoke));
        this.partitionsToRevoke = Predef$.MODULE$.Set().empty();
    }

    private void updatePendingPartitionsAndEmitSubSources(Set<TopicPartition> set) {
        this.pendingPartitions = this.pendingPartitions.$plus$plus((IterableOnce) set.filter(topicPartition -> {
            return !this.partitionsInStartup.contains(topicPartition);
        }));
        org$apache$pekko$kafka$internal$SubSourceLogic$$emitSubSourcesForPendingPartitions();
    }

    public void org$apache$pekko$kafka$internal$SubSourceLogic$$emitSubSourcesForPendingPartitions() {
        while (this.pendingPartitions.nonEmpty() && isAvailable(shape().out())) {
            TopicPartition topicPartition = (TopicPartition) this.pendingPartitions.head();
            this.pendingPartitions = (Set) this.pendingPartitions.tail();
            this.partitionsInStartup = this.partitionsInStartup.$plus(topicPartition);
            push(shape().out(), Tuple2$.MODULE$.apply(topicPartition, Source$.MODULE$.fromGraph(new SubSourceStage(topicPartition, consumerActor(), this.subsourceStartedCB, this.subsourceCancelledCB, actorNumber(), this.subSourceStageLogicFactory))));
        }
    }

    public void postStop() {
        consumerActor().tell(KafkaConsumerActor$Internal$StopFromStage$.MODULE$.apply(id()), sourceActor().ref());
        onShutdown();
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.postStop();
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void performStop() {
        setKeepGoing(true);
        subSources().values().foreach(subSourceStageLogicControl -> {
            return subSourceStageLogicControl.control().stop();
        });
        complete(shape().out());
        onStop();
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void performShutdown() {
        log().info("Completing. Partitions [{}], StageActor {}", subSources().keys().mkString(","), sourceActor().ref());
        setKeepGoing(true);
        subSources().values().foreach(subSourceStageLogicControl -> {
            return subSourceStageLogicControl.control().shutdown();
        });
        if (!isClosed(shape().out())) {
            complete(shape().out());
        }
        sourceActor().become(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Object _2 = tuple2._2();
            if (_2 instanceof Terminated) {
                ActorRef _1 = Terminated$.MODULE$.unapply((Terminated) _2)._1();
                ActorRef consumerActor = consumerActor();
                if (_1 != null ? _1.equals(consumerActor) : consumerActor == null) {
                    onShutdown();
                    completeStage();
                    return;
                }
            }
            log().warning("ignoring message [{}]", _2);
        });
        materializer().scheduleOnce(this.settings.stopTimeout(), new Runnable(this) { // from class: org.apache.pekko.kafka.internal.SubSourceLogic$$anon$3
            private final /* synthetic */ SubSourceLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.$outer.consumerActor().tell(KafkaConsumerActor$Internal$StopFromStage$.MODULE$.apply(this.$outer.id()), this.$outer.sourceActor().ref());
            }
        });
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public PartitionAssignmentHandler addToPartitionAssignmentHandler(PartitionAssignmentHandler partitionAssignmentHandler) {
        return new PartitionAssignmentHelpers.Chain(partitionAssignmentHandler, new PartitionAssignmentHandler(this) { // from class: org.apache.pekko.kafka.internal.SubSourceLogic$$anon$4
            private Set lastRevoked;
            private final /* synthetic */ SubSourceLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
                this.lastRevoked = Predef$.MODULE$.Set().empty();
            }

            @Override // org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler
            public void onRevoke(Set set, RestrictedConsumer restrictedConsumer) {
                this.lastRevoked = set;
            }

            @Override // org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler
            public void onAssign(Set set, RestrictedConsumer restrictedConsumer) {
                this.lastRevoked.$minus$minus(set).foreach(topicPartition -> {
                    this.$outer.subSources().get(topicPartition).foreach((v1) -> {
                        SubSourceLogic.org$apache$pekko$kafka$internal$SubSourceLogic$$anon$4$$_$onAssign$$anonfun$1$$anonfun$1(r1, v1);
                    });
                });
            }

            @Override // org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler
            public void onLost(Set set, RestrictedConsumer restrictedConsumer) {
                set.foreach(topicPartition -> {
                    this.$outer.subSources().get(topicPartition).foreach((v1) -> {
                        SubSourceLogic.org$apache$pekko$kafka$internal$SubSourceLogic$$anon$4$$_$onLost$$anonfun$1$$anonfun$1(r1, v1);
                    });
                });
            }

            @Override // org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler
            public void onStop(Set set, RestrictedConsumer restrictedConsumer) {
            }
        });
    }

    public static final /* synthetic */ void org$apache$pekko$kafka$internal$SubSourceLogic$$anon$4$$_$onAssign$$anonfun$1$$anonfun$1(TopicPartition topicPartition, SubSourceStageLogicControl subSourceStageLogicControl) {
        subSourceStageLogicControl.filterRevokedPartitionsCB().invoke(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
    }

    public static final /* synthetic */ void org$apache$pekko$kafka$internal$SubSourceLogic$$anon$4$$_$onLost$$anonfun$1$$anonfun$1(TopicPartition topicPartition, SubSourceStageLogicControl subSourceStageLogicControl) {
        subSourceStageLogicControl.filterRevokedPartitionsCB().invoke(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
    }
}
