package io.zeebe.broker.system.log;

import io.zeebe.broker.clustering.management.Partition;
import io.zeebe.broker.clustering.management.PartitionManager;
import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamBatchWriter;
import io.zeebe.logstreams.log.LogStreamBatchWriterImpl;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.snapshot.ComposedZbMapSnapshot;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.protocol.Protocol;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.DeferredCommandContext;
import java.util.Iterator;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/broker/system/log/CreateTopicStreamProcessor.class */
public class CreateTopicStreamProcessor implements StreamProcessor {
    protected final PartitionManager partitionManager;
    protected LogStreamReader sourceLogReader;
    protected ResolvePendingPartitionsCommand resolvePartitionsCommand;
    protected LogStreamBatchWriter logStreamBatchWriter;
    protected LoggedEvent currentEvent;
    protected LogStream sourceStream;
    protected LogStream targetStream;
    protected final CommandResponseWriter responseWriter;
    protected int streamProcessorId;
    protected DeferredCommandContext commandQueue;
    protected final BrokerEventMetadata sourceMetadata = new BrokerEventMetadata();
    protected final TopicEvent topicEvent = new TopicEvent();
    protected final PartitionEvent partitionEvent = new PartitionEvent();
    protected final BrokerEventMetadata targetMetadata = new BrokerEventMetadata().protocolVersion(1);
    protected CreateTopicProcessor createTopicProcessor = new CreateTopicProcessor();
    protected CreatePartitionProcessor createPartitionProcessor = new CreatePartitionProcessor();
    protected CompletePartitionProcessor completePartitionProcessor = new CompletePartitionProcessor();
    protected PartitionCreatedProcessor partitionCreatedProcessor = new PartitionCreatedProcessor();
    protected final TopicsIndex topics = new TopicsIndex();
    protected final PartitionsIndex partitions = new PartitionsIndex();
    protected final ComposedZbMapSnapshot snapshotSupport = new ComposedZbMapSnapshot(this.topics.getStateResource(), this.partitions.getStateResource());

    /* loaded from: input_file:io/zeebe/broker/system/log/CreateTopicStreamProcessor$CompletePartitionProcessor.class */
    protected class CompletePartitionProcessor implements EventProcessor {
        protected CompletePartitionProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            if (CreateTopicStreamProcessor.this.partitions.getPartitionKey(CreateTopicStreamProcessor.this.partitionEvent.getTopicName(), CreateTopicStreamProcessor.this.partitionEvent.getId()) >= 0) {
                CreateTopicStreamProcessor.this.partitionEvent.setState(PartitionState.CREATED);
            } else {
                CreateTopicStreamProcessor.this.partitionEvent.setState(PartitionState.CREATE_COMPLETE_REJECTED);
            }
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            CreateTopicStreamProcessor.this.targetMetadata.raftTermId(CreateTopicStreamProcessor.this.targetStream.getTerm());
            CreateTopicStreamProcessor.this.targetMetadata.eventType(EventType.PARTITION_EVENT);
            return logStreamWriter.key(CreateTopicStreamProcessor.this.currentEvent.getKey()).metadataWriter(CreateTopicStreamProcessor.this.targetMetadata).valueWriter(CreateTopicStreamProcessor.this.partitionEvent).tryWrite();
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            DirectBuffer topicName = CreateTopicStreamProcessor.this.partitionEvent.getTopicName();
            if (CreateTopicStreamProcessor.this.partitionEvent.getState() == PartitionState.CREATED) {
                CreateTopicStreamProcessor.this.topics.moveTo(topicName);
                int remainingPartitions = CreateTopicStreamProcessor.this.topics.getRemainingPartitions();
                if (remainingPartitions > 0) {
                    CreateTopicStreamProcessor.this.topics.putRemainingPartitions(topicName, remainingPartitions - 1);
                }
                CreateTopicStreamProcessor.this.partitions.removePartitionKey(topicName, CreateTopicStreamProcessor.this.partitionEvent.getId());
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/system/log/CreateTopicStreamProcessor$CreatePartitionProcessor.class */
    protected class CreatePartitionProcessor implements EventProcessor {
        protected CreatePartitionProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            CreateTopicStreamProcessor.this.partitionEvent.setState(PartitionState.CREATING);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            CreateTopicStreamProcessor.this.targetMetadata.raftTermId(CreateTopicStreamProcessor.this.targetStream.getTerm());
            CreateTopicStreamProcessor.this.targetMetadata.eventType(EventType.PARTITION_EVENT);
            return logStreamWriter.key(CreateTopicStreamProcessor.this.currentEvent.getKey()).metadataWriter(CreateTopicStreamProcessor.this.targetMetadata).valueWriter(CreateTopicStreamProcessor.this.partitionEvent).tryWrite();
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public boolean executeSideEffects() {
            CreateTopicStreamProcessor.this.partitionManager.createPartitionAsync(CreateTopicStreamProcessor.this.partitionEvent.getTopicName(), CreateTopicStreamProcessor.this.partitionEvent.getId());
            return true;
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            CreateTopicStreamProcessor.this.partitions.putPartitionKey(CreateTopicStreamProcessor.this.partitionEvent.getTopicName(), CreateTopicStreamProcessor.this.partitionEvent.getId(), CreateTopicStreamProcessor.this.currentEvent.getKey());
        }
    }

    /* loaded from: input_file:io/zeebe/broker/system/log/CreateTopicStreamProcessor$CreateTopicProcessor.class */
    protected class CreateTopicProcessor implements EventProcessor {
        protected CreateTopicProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            if (CreateTopicStreamProcessor.this.topics.moveTo(CreateTopicStreamProcessor.this.topicEvent.getName()) || CreateTopicStreamProcessor.this.topicEvent.getPartitions() <= 0) {
                CreateTopicStreamProcessor.this.topicEvent.setState(TopicState.CREATE_REJECTED);
            }
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public boolean executeSideEffects() {
            if (CreateTopicStreamProcessor.this.topicEvent.getState() == TopicState.CREATE_REJECTED) {
                return CreateTopicStreamProcessor.this.responseWriter.topicName(CreateTopicStreamProcessor.this.sourceStream.getTopicName()).partitionId(CreateTopicStreamProcessor.this.sourceStream.getPartitionId()).position(CreateTopicStreamProcessor.this.currentEvent.getPosition()).key(CreateTopicStreamProcessor.this.currentEvent.getKey()).eventWriter(CreateTopicStreamProcessor.this.topicEvent).tryWriteResponse(CreateTopicStreamProcessor.this.sourceMetadata.getRequestStreamId(), CreateTopicStreamProcessor.this.sourceMetadata.getRequestId());
            }
            return true;
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            CreateTopicStreamProcessor.this.targetMetadata.raftTermId(CreateTopicStreamProcessor.this.targetStream.getTerm());
            if (CreateTopicStreamProcessor.this.topicEvent.getState() == TopicState.CREATE_REJECTED) {
                CreateTopicStreamProcessor.this.targetMetadata.eventType(EventType.TOPIC_EVENT);
                return logStreamWriter.key(CreateTopicStreamProcessor.this.currentEvent.getKey()).metadataWriter(CreateTopicStreamProcessor.this.targetMetadata).valueWriter(CreateTopicStreamProcessor.this.topicEvent).tryWrite();
            }
            CreateTopicStreamProcessor.this.targetMetadata.eventType(EventType.PARTITION_EVENT);
            CreateTopicStreamProcessor.this.logStreamBatchWriter.reset();
            CreateTopicStreamProcessor.this.logStreamBatchWriter.producerId(CreateTopicStreamProcessor.this.streamProcessorId).sourceEvent(CreateTopicStreamProcessor.this.sourceStream.getTopicName(), CreateTopicStreamProcessor.this.sourceStream.getPartitionId(), CreateTopicStreamProcessor.this.currentEvent.getPosition());
            for (int i = 0; i < CreateTopicStreamProcessor.this.topicEvent.getPartitions(); i++) {
                CreateTopicStreamProcessor.this.partitionEvent.reset();
                CreateTopicStreamProcessor.this.partitionEvent.setState(PartitionState.CREATE);
                CreateTopicStreamProcessor.this.partitionEvent.setTopicName(CreateTopicStreamProcessor.this.topicEvent.getName());
                CreateTopicStreamProcessor.this.partitionEvent.setId(i);
                CreateTopicStreamProcessor.this.logStreamBatchWriter.event().positionAsKey().metadataWriter(CreateTopicStreamProcessor.this.targetMetadata).valueWriter(CreateTopicStreamProcessor.this.partitionEvent).done();
            }
            return CreateTopicStreamProcessor.this.logStreamBatchWriter.tryWrite();
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            if (CreateTopicStreamProcessor.this.topicEvent.getState() != TopicState.CREATE_REJECTED) {
                CreateTopicStreamProcessor.this.topics.put(CreateTopicStreamProcessor.this.topicEvent.getName(), CreateTopicStreamProcessor.this.topicEvent.getPartitions(), CreateTopicStreamProcessor.this.currentEvent.getPosition());
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/system/log/CreateTopicStreamProcessor$PartitionCreatedProcessor.class */
    public class PartitionCreatedProcessor implements EventProcessor {
        protected LoggedEvent request;
        protected boolean topicCreationComplete = false;
        protected final BrokerEventMetadata topicEventMetadata = new BrokerEventMetadata();

        public PartitionCreatedProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            CreateTopicStreamProcessor.this.topics.moveTo(CreateTopicStreamProcessor.this.partitionEvent.getTopicName());
            this.topicCreationComplete = CreateTopicStreamProcessor.this.topics.getRemainingPartitions() == 0;
            if (this.topicCreationComplete) {
                if (!CreateTopicStreamProcessor.this.sourceLogReader.seek(CreateTopicStreamProcessor.this.topics.getRequestPosition())) {
                    throw new RuntimeException("Could not find request for topic creation");
                }
                this.request = CreateTopicStreamProcessor.this.sourceLogReader.next();
                this.request.readMetadata(this.topicEventMetadata);
                this.request.readValue(CreateTopicStreamProcessor.this.topicEvent);
                CreateTopicStreamProcessor.this.topicEvent.setState(TopicState.CREATED);
            }
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            if (!this.topicCreationComplete) {
                return 0L;
            }
            CreateTopicStreamProcessor.this.targetMetadata.raftTermId(CreateTopicStreamProcessor.this.targetStream.getTerm());
            CreateTopicStreamProcessor.this.targetMetadata.eventType(EventType.TOPIC_EVENT);
            return logStreamWriter.metadataWriter(CreateTopicStreamProcessor.this.targetMetadata).valueWriter(CreateTopicStreamProcessor.this.topicEvent).key(this.request.getKey()).tryWrite();
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public boolean executeSideEffects() {
            if (this.topicCreationComplete) {
                return CreateTopicStreamProcessor.this.responseWriter.topicName(CreateTopicStreamProcessor.this.sourceStream.getTopicName()).partitionId(CreateTopicStreamProcessor.this.sourceStream.getPartitionId()).position(CreateTopicStreamProcessor.this.currentEvent.getPosition()).key(this.request.getKey()).eventWriter(CreateTopicStreamProcessor.this.topicEvent).tryWriteResponse(this.topicEventMetadata.getRequestStreamId(), this.topicEventMetadata.getRequestId());
            }
            return true;
        }
    }

    /* loaded from: input_file:io/zeebe/broker/system/log/CreateTopicStreamProcessor$ResolvePendingPartitionsCommand.class */
    public class ResolvePendingPartitionsCommand implements Runnable {
        protected final LogStreamWriter writer;

        public ResolvePendingPartitionsCommand() {
            this.writer = new LogStreamWriterImpl(CreateTopicStreamProcessor.this.targetStream);
        }

        @Override // java.lang.Runnable
        public void run() {
            if (CreateTopicStreamProcessor.this.partitions.isEmpty()) {
                return;
            }
            Iterator<Partition> knownPartitions = CreateTopicStreamProcessor.this.partitionManager.getKnownPartitions();
            while (knownPartitions.hasNext()) {
                Partition next = knownPartitions.next();
                DirectBuffer topicName = next.getTopicName();
                int partitionId = next.getPartitionId();
                long partitionKey = CreateTopicStreamProcessor.this.partitions.getPartitionKey(topicName, partitionId);
                if (partitionKey >= 0) {
                    CreateTopicStreamProcessor.this.targetMetadata.raftTermId(CreateTopicStreamProcessor.this.targetStream.getTerm());
                    CreateTopicStreamProcessor.this.targetMetadata.eventType(EventType.PARTITION_EVENT);
                    CreateTopicStreamProcessor.this.partitionEvent.reset();
                    CreateTopicStreamProcessor.this.partitionEvent.setTopicName(topicName);
                    CreateTopicStreamProcessor.this.partitionEvent.setId(partitionId);
                    CreateTopicStreamProcessor.this.partitionEvent.setState(PartitionState.CREATE_COMPLETE);
                    this.writer.key(partitionKey).metadataWriter(CreateTopicStreamProcessor.this.targetMetadata).valueWriter(CreateTopicStreamProcessor.this.partitionEvent).tryWrite();
                }
            }
        }
    }

    public CreateTopicStreamProcessor(CommandResponseWriter commandResponseWriter, PartitionManager partitionManager) {
        this.responseWriter = commandResponseWriter;
        this.partitionManager = partitionManager;
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public void onOpen(StreamProcessorContext streamProcessorContext) {
        this.sourceStream = streamProcessorContext.getSourceStream();
        this.targetStream = streamProcessorContext.getTargetStream();
        this.logStreamBatchWriter = new LogStreamBatchWriterImpl(this.targetStream);
        this.streamProcessorId = streamProcessorContext.getId();
        this.sourceLogReader = new BufferedLogStreamReader(this.sourceStream);
        this.commandQueue = streamProcessorContext.getStreamProcessorCmdQueue();
        this.resolvePartitionsCommand = new ResolvePendingPartitionsCommand();
        this.topics.put(Protocol.SYSTEM_TOPIC_BUF, 0, -1L);
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public SnapshotSupport getStateResource() {
        return this.snapshotSupport;
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public EventProcessor onEvent(LoggedEvent loggedEvent) {
        this.currentEvent = loggedEvent;
        this.sourceMetadata.reset();
        this.partitionEvent.reset();
        loggedEvent.readMetadata(this.sourceMetadata);
        if (EventType.TOPIC_EVENT == this.sourceMetadata.getEventType()) {
            this.topicEvent.reset();
            loggedEvent.readValue(this.topicEvent);
            if (TopicState.CREATE == this.topicEvent.getState()) {
                return this.createTopicProcessor;
            }
            return null;
        }
        if (EventType.PARTITION_EVENT != this.sourceMetadata.getEventType()) {
            return null;
        }
        this.partitionEvent.reset();
        loggedEvent.readValue(this.partitionEvent);
        if (PartitionState.CREATE == this.partitionEvent.getState()) {
            return this.createPartitionProcessor;
        }
        if (PartitionState.CREATE_COMPLETE == this.partitionEvent.getState()) {
            return this.completePartitionProcessor;
        }
        if (PartitionState.CREATED == this.partitionEvent.getState()) {
            return this.partitionCreatedProcessor;
        }
        return null;
    }

    public static MetadataFilter eventFilter() {
        return brokerEventMetadata -> {
            return brokerEventMetadata.getEventType() == EventType.TOPIC_EVENT || brokerEventMetadata.getEventType() == EventType.PARTITION_EVENT;
        };
    }

    public void checkPendingPartitionsAsync() {
        if (this.resolvePartitionsCommand != null) {
            this.commandQueue.runAsync(this.resolvePartitionsCommand);
        }
    }
}
