package io.zeebe.client.event.impl;

import io.zeebe.client.event.PollableTopicSubscription;
import io.zeebe.client.event.TopicSubscription;
import io.zeebe.client.event.UniversalEventHandler;
import io.zeebe.client.task.impl.subscription.EventAcquisition;
import io.zeebe.client.task.impl.subscription.EventSubscription;
import io.zeebe.client.task.impl.subscription.EventSubscriptionCreationResult;
import io.zeebe.msgpack.mapping.MsgPackTreeNodeIdConstructor;
import io.zeebe.util.CheckedConsumer;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/zeebe/client/event/impl/TopicSubscriptionImpl.class */
public class TopicSubscriptionImpl extends EventSubscription<TopicSubscriptionImpl> implements TopicSubscription, PollableTopicSubscription {
    protected static final int MAX_HANDLING_RETRIES = 2;
    protected CheckedConsumer<GeneralEventImpl> handler;
    protected final TopicClientImpl client;
    protected AtomicBoolean processingFlag;
    protected volatile long lastProcessedEventPosition;
    protected long lastAcknowledgedPosition;
    protected final long startPosition;
    protected final boolean forceStart;
    protected final String name;
    protected final int prefetchCapacity;

    public TopicSubscriptionImpl(TopicClientImpl topicClientImpl, String str, int i, CheckedConsumer<GeneralEventImpl> checkedConsumer, int i2, long j, boolean z, String str2, EventAcquisition<TopicSubscriptionImpl> eventAcquisition) {
        super(str, i, i2, eventAcquisition);
        this.processingFlag = new AtomicBoolean(false);
        this.prefetchCapacity = i2;
        this.client = topicClientImpl;
        if (checkedConsumer != null) {
            this.handler = checkedConsumer.andThen(this::recordProcessedEvent).andOnExceptionRetry(2, this::logRetry).andOnException(this::logExceptionAndClose);
        }
        this.startPosition = j;
        this.forceStart = z;
        this.name = str2;
        this.lastProcessedEventPosition = j;
        this.lastAcknowledgedPosition = j;
    }

    @Override // io.zeebe.client.task.impl.subscription.EventSubscription
    public boolean isManagedSubscription() {
        return this.handler != null;
    }

    @Override // io.zeebe.client.task.impl.subscription.EventSubscription
    public int poll() {
        return pollEvents(this.handler);
    }

    @Override // io.zeebe.client.event.PollableTopicSubscription
    public int poll(UniversalEventHandler universalEventHandler) {
        CheckedConsumer checkedConsumer = generalEventImpl -> {
            universalEventHandler.handle(generalEventImpl);
        };
        return pollEvents(checkedConsumer.andThen(this::recordProcessedEvent).andOnException(this::logExceptionAndPropagate));
    }

    @Override // io.zeebe.client.task.impl.subscription.EventSubscription
    public int pollEvents(CheckedConsumer<GeneralEventImpl> checkedConsumer) {
        if (!this.processingFlag.compareAndSet(false, true)) {
            return 0;
        }
        try {
            return super.pollEvents(checkedConsumer);
        } finally {
            this.processingFlag.set(false);
        }
    }

    protected void logExceptionAndClose(GeneralEventImpl generalEventImpl, Exception exc) {
        logEventHandlingError(exc, generalEventImpl, "Closing subscription.");
        closeAsync();
    }

    protected void logExceptionAndPropagate(GeneralEventImpl generalEventImpl, Exception exc) {
        logEventHandlingError(exc, generalEventImpl, "Propagating exception to caller.");
        throw new RuntimeException(exc);
    }

    protected void logRetry(GeneralEventImpl generalEventImpl, Exception exc) {
        logEventHandlingError(exc, generalEventImpl, "Retrying.");
    }

    public CheckedConsumer<GeneralEventImpl> getHandler() {
        return this.handler;
    }

    @Override // io.zeebe.client.task.impl.subscription.EventSubscription
    protected EventSubscriptionCreationResult requestNewSubscription() {
        return this.client.createTopicSubscription(this.topic, this.partitionId).startPosition(this.startPosition).prefetchCapacity(this.prefetchCapacity).name(this.name).forceStart(this.forceStart).execute();
    }

    @Override // io.zeebe.client.task.impl.subscription.EventSubscription
    protected void requestSubscriptionClose() {
        acknowledgeLastProcessedEvent();
        this.client.closeTopicSubscription(this.partitionId, this.subscriberKey).execute();
    }

    @Override // io.zeebe.client.task.impl.subscription.EventSubscription
    protected void requestEventSourceReplenishment(int i) {
        acknowledgeLastProcessedEvent();
    }

    protected void acknowledgeLastProcessedEvent() {
        long j = this.lastProcessedEventPosition;
        if (j > this.lastAcknowledgedPosition) {
            this.client.acknowledgeEvent(this.topic, this.partitionId).subscriptionName(this.name).ackPosition(j).execute();
            this.lastAcknowledgedPosition = j;
        }
    }

    protected void recordProcessedEvent(GeneralEventImpl generalEventImpl) {
        this.lastProcessedEventPosition = generalEventImpl.getMetadata().getPosition();
    }

    protected void logEventHandlingError(Exception exc, GeneralEventImpl generalEventImpl, String str) {
        LOGGER.error("Topic subscription " + this.name + ": Unhandled exception during handling of event " + generalEventImpl + ". " + str, (Throwable) exc);
    }

    public String toString() {
        return "TopicSubscriptionImpl [name=" + this.name + ", subscriberKey=" + this.subscriberKey + MsgPackTreeNodeIdConstructor.JSON_PATH_SEPARATOR_END;
    }
}
