package org.apache.pekko.kafka.internal;

import java.util.UUID;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.kafka.CommitterSettings;
import org.apache.pekko.kafka.ConsumerMessage;
import org.apache.pekko.kafka.ConsumerMessage$CommittableOffsetBatch$;
import org.apache.pekko.kafka.internal.CommitTrigger;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.impl.ContextPropagation;
import org.apache.pekko.stream.impl.ContextPropagation$;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import scala.Predef$;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.runtime.Statics;

/* compiled from: CommitCollectorStage.scala */
/* loaded from: input_file:org/apache/pekko/kafka/internal/CommitCollectorStageLogic.class */
public final class CommitCollectorStageLogic extends TimerGraphStageLogic implements CommitObservationLogic, StageLogging, InstanceId, StageIdLogging {
    private ConsumerMessage.CommittableOffsetBatch offsetBatch;
    private Map org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private String org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    private LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    public final CommitCollectorStage org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage;
    private final CommitterSettings settings;
    private final ContextPropagation contextPropagation;
    private boolean contextSuspended;
    public boolean org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushOnNextPull;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public CommitCollectorStageLogic(CommitCollectorStage commitCollectorStage, Attributes attributes) {
        super(commitCollectorStage.m96shape());
        this.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage = commitCollectorStage;
        CommitObservationLogic.$init$(this);
        StageLogging.$init$(this);
        org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(StringOps$.MODULE$.take$extension(Predef$.MODULE$.augmentString(UUID.randomUUID().toString()), 5));
        StageIdLogging.$init$((StageIdLogging) this);
        this.settings = commitCollectorStage.committerSettings();
        this.contextPropagation = ContextPropagation$.MODULE$.apply();
        this.contextSuspended = false;
        this.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushOnNextPull = false;
        setHandler(commitCollectorStage.in(), new InHandler(this) { // from class: org.apache.pekko.kafka.internal.CommitCollectorStageLogic$$anon$1
            private final /* synthetic */ CommitCollectorStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public void onPush() {
                boolean z = false;
                ConsumerMessage.Committable committable = (ConsumerMessage.Committable) this.$outer.protected$grab(this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.in());
                this.$outer.log().debug("Consuming offset {}", committable);
                if (!this.$outer.updateBatch(committable)) {
                    this.$outer.protected$tryPull(this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.in());
                } else if (this.$outer.isAvailable(this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.out())) {
                    this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushDownStream(CommitTrigger$BatchSize$.MODULE$);
                    z = true;
                }
                if (z) {
                    return;
                }
                this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$suspendContext();
            }

            public void onUpstreamFinish() {
                if (this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$activeBatchInProgress()) {
                    this.$outer.log().debug("pushDownStream triggered by {}, outstanding batch {}", CommitTrigger$UpstreamFinish$.MODULE$, this.$outer.offsetBatch());
                    this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$resumeContext();
                    this.$outer.protected$emit(this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.out(), this.$outer.offsetBatch());
                }
                this.$outer.completeStage();
            }

            public void onUpstreamFailure(Throwable th) {
                this.$outer.log().debug("onUpstreamFailure with exception {} with {}", th, this.$outer.offsetBatch());
                if (this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$activeBatchInProgress()) {
                    this.$outer.offsetBatch().tellCommitEmergency();
                    this.$outer.offsetBatch_$eq(ConsumerMessage$CommittableOffsetBatch$.MODULE$.empty());
                }
                this.$outer.failStage(th);
            }
        });
        setHandler(commitCollectorStage.out(), new OutHandler(this) { // from class: org.apache.pekko.kafka.internal.CommitCollectorStageLogic$$anon$2
            private final /* synthetic */ CommitCollectorStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                if (this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushOnNextPull) {
                    this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushDownStream(CommitTrigger$Interval$.MODULE$);
                    this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushOnNextPull = false;
                } else {
                    if (this.$outer.protected$hasBeenPulled(this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.in())) {
                        return;
                    }
                    this.$outer.protected$tryPull(this.$outer.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.in());
                }
            }
        });
        Statics.releaseFence();
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public ConsumerMessage.CommittableOffsetBatch offsetBatch() {
        return this.offsetBatch;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public Map org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets() {
        return this.org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public void offsetBatch_$eq(ConsumerMessage.CommittableOffsetBatch committableOffsetBatch) {
        this.offsetBatch = committableOffsetBatch;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public void org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets_$eq(Map map) {
        this.org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets = map;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public /* bridge */ /* synthetic */ boolean updateBatch(ConsumerMessage.Committable committable) {
        boolean updateBatch;
        updateBatch = updateBatch(committable);
        return updateBatch;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public /* bridge */ /* synthetic */ int clearDeferredOffsets() {
        int clearDeferredOffsets;
        clearDeferredOffsets = clearDeferredOffsets();
        return clearDeferredOffsets;
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public String org$apache$pekko$kafka$internal$InstanceId$$instanceId() {
        return this.org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public void org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(String str) {
        this.org$apache$pekko$kafka$internal$InstanceId$$instanceId = str;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public /* bridge */ /* synthetic */ String id() {
        String id;
        id = id();
        return id;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log() {
        return this.org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public void org$apache$pekko$kafka$internal$StageIdLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$kafka$internal$StageIdLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ String idLogPrefix() {
        String idLogPrefix;
        idLogPrefix = idLogPrefix();
        return idLogPrefix;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* synthetic */ LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$super$log() {
        return StageLogging.log$(this);
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public CommitterSettings settings() {
        return this.settings;
    }

    public Class<?> logSource() {
        return CommitCollectorStageLogic.class;
    }

    public void preStart() {
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.preStart();
        scheduleCommit();
        log().debug("CommitCollectorStage initialized");
    }

    private void scheduleCommit() {
        scheduleOnce(CommitCollectorStage$.MODULE$.CommitNow(), this.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.committerSettings().maxInterval());
    }

    public void onTimer(Object obj) {
        boolean z = false;
        String CommitNow = CommitCollectorStage$.MODULE$.CommitNow();
        if (CommitNow != null ? !CommitNow.equals(obj) : obj != null) {
            log().warning("unexpected timer [{}]", obj);
        } else if (!org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$activeBatchInProgress()) {
            scheduleCommit();
        } else if (isAvailable(this.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.out())) {
            org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushDownStream(CommitTrigger$Interval$.MODULE$);
            z = true;
        } else {
            this.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushOnNextPull = true;
        }
        if (z) {
            return;
        }
        org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$suspendContext();
    }

    public void org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$pushDownStream(CommitTrigger.TriggerdBy triggerdBy) {
        log().debug("pushDownStream triggered by {}, outstanding batch {}", triggerdBy, offsetBatch());
        org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$resumeContext();
        push(this.org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$stage.out(), offsetBatch());
        offsetBatch_$eq(ConsumerMessage$CommittableOffsetBatch$.MODULE$.empty());
        scheduleCommit();
    }

    public void postStop() {
        log().debug("CommitCollectorStage stopped");
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.postStop();
    }

    public boolean org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$activeBatchInProgress() {
        return !offsetBatch().isEmpty();
    }

    public void org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$suspendContext() {
        if (this.contextSuspended) {
            return;
        }
        this.contextPropagation.suspendContext();
        this.contextSuspended = true;
    }

    public void org$apache$pekko$kafka$internal$CommitCollectorStageLogic$$resumeContext() {
        if (this.contextSuspended) {
            this.contextPropagation.resumeContext();
            this.contextSuspended = false;
        }
    }

    public <T> T protected$grab(Inlet<T> inlet) {
        return (T) grab(inlet);
    }

    public <T> void protected$tryPull(Inlet<T> inlet) {
        tryPull(inlet);
    }

    public <T> void protected$emit(Outlet<T> outlet, T t) {
        emit(outlet, t);
    }

    public <T> boolean protected$hasBeenPulled(Inlet<T> inlet) {
        return hasBeenPulled(inlet);
    }
}
