package io.zeebe.broker.event.processor;

import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.NoopSnapshotSupport;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.collection.LongRingBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionPushProcessor.class */
public class TopicSubscriptionPushProcessor implements StreamProcessor, EventProcessor {
    protected LoggedEvent event;
    protected final int clientStreamId;
    protected final long subscriberKey;
    protected long startPosition;
    protected final DirectBuffer name;
    protected final String nameString;
    protected int logStreamPartitionId;
    protected final SubscribedEventWriter channelWriter;
    protected LongRingBuffer pendingEvents;
    protected LongRingBuffer pendingAcks;
    protected final BrokerEventMetadata metadata = new BrokerEventMetadata();
    protected final SnapshotSupport snapshotSupport = new NoopSnapshotSupport();
    protected AtomicBoolean enabled = new AtomicBoolean(false);

    public TopicSubscriptionPushProcessor(int i, long j, long j2, DirectBuffer directBuffer, int i2, SubscribedEventWriter subscribedEventWriter) {
        this.channelWriter = subscribedEventWriter;
        this.clientStreamId = i;
        this.subscriberKey = j;
        this.startPosition = j2;
        this.name = BufferUtil.cloneBuffer(directBuffer);
        this.nameString = directBuffer.getStringWithoutLengthUtf8(0, directBuffer.capacity());
        if (i2 > 0) {
            this.pendingEvents = new LongRingBuffer(i2);
            this.pendingAcks = new LongRingBuffer(i2);
        }
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public void onOpen(StreamProcessorContext streamProcessorContext) {
        LogStreamReader logStreamReader = streamProcessorContext.getLogStreamReader();
        this.logStreamPartitionId = streamProcessorContext.getLogStream().getPartitionId();
        setToStartPosition(logStreamReader);
    }

    public long getStartPosition() {
        return this.startPosition;
    }

    protected void setToStartPosition(LogStreamReader logStreamReader) {
        if (this.startPosition >= 0) {
            logStreamReader.seek(this.startPosition);
        } else {
            logStreamReader.seekToLastEvent();
            if (logStreamReader.hasNext()) {
                logStreamReader.next();
            }
        }
        this.startPosition = logStreamReader.getPosition();
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public SnapshotSupport getStateResource() {
        return this.snapshotSupport;
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public EventProcessor onEvent(LoggedEvent loggedEvent) {
        this.event = loggedEvent;
        return this;
    }

    @Override // io.zeebe.logstreams.processor.EventProcessor
    public void processEvent() {
    }

    @Override // io.zeebe.logstreams.processor.EventProcessor
    public boolean executeSideEffects() {
        boolean addElementToHead;
        this.event.readMetadata(this.metadata);
        boolean tryWriteMessage = this.channelWriter.partitionId(this.logStreamPartitionId).eventType(this.metadata.getEventType()).key(this.event.getKey()).position(this.event.getPosition()).subscriberKey(this.subscriberKey).subscriptionType(SubscriptionType.TOPIC_SUBSCRIPTION).event(this.event.getValueBuffer(), this.event.getValueOffset(), this.event.getValueLength()).tryWriteMessage(this.clientStreamId);
        if (tryWriteMessage && recordsPendingEvents() && !(addElementToHead = this.pendingEvents.addElementToHead(this.event.getPosition()))) {
            throw new RuntimeException("Cannot record pending event " + addElementToHead);
        }
        return tryWriteMessage;
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public boolean isSuspended() {
        if (!this.enabled.get()) {
            return true;
        }
        if (!recordsPendingEvents()) {
            return false;
        }
        this.pendingAcks.consume(j -> {
            this.pendingEvents.consumeAscendingUntilInclusive(j);
        });
        return this.pendingEvents.isSaturated();
    }

    public int getChannelId() {
        return this.clientStreamId;
    }

    public SubscribedEventWriter getChannelWriter() {
        return this.channelWriter;
    }

    public String getNameAsString() {
        return this.nameString;
    }

    public void onAck(long j) {
        if (recordsPendingEvents() && !this.pendingAcks.addElementToHead(j)) {
            throw new RuntimeException("Could not acknowledge event at position " + j + "; ACK capacity saturated");
        }
    }

    protected boolean recordsPendingEvents() {
        return this.pendingEvents != null;
    }

    public static MetadataFilter eventFilter() {
        return brokerEventMetadata -> {
            EventType eventType = brokerEventMetadata.getEventType();
            return (eventType == EventType.SUBSCRIPTION_EVENT || eventType == EventType.SUBSCRIBER_EVENT || eventType == EventType.NOOP_EVENT) ? false : true;
        };
    }

    public DirectBuffer getName() {
        return this.name;
    }

    public long getSubscriptionId() {
        return this.subscriberKey;
    }

    public void enable() {
        this.enabled.set(true);
    }
}
