package io.zeebe.broker.logstreams.processor;

import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamBatchWriter;
import io.zeebe.logstreams.log.LogStreamBatchWriterImpl;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

/* loaded from: input_file:io/zeebe/broker/logstreams/processor/TypedStreamWriterImpl.class */
public class TypedStreamWriterImpl implements TypedStreamWriter, TypedBatchWriter {
    protected final Consumer<BrokerEventMetadata> noop = brokerEventMetadata -> {
    };
    protected BrokerEventMetadata metadata = new BrokerEventMetadata();
    protected final Map<Class<? extends UnpackedObject>, EventType> typeRegistry;
    protected final LogStream stream;
    protected LogStreamWriter writer;
    protected LogStreamBatchWriter batchWriter;
    protected int producerId;
    protected int sourcePartitionId;
    protected long sourcePosition;

    public TypedStreamWriterImpl(LogStream logStream, Map<EventType, Class<? extends UnpackedObject>> map) {
        this.stream = logStream;
        this.metadata.protocolVersion(1);
        this.writer = new LogStreamWriterImpl(logStream);
        this.batchWriter = new LogStreamBatchWriterImpl(logStream);
        this.typeRegistry = new HashMap();
        map.forEach((eventType, cls) -> {
            this.typeRegistry.put(cls, eventType);
        });
    }

    public void configureSourceContext(int i, int i2, long j) {
        this.producerId = i;
        this.sourcePartitionId = i2;
        this.sourcePosition = j;
    }

    protected void initMetadata(UnpackedObject unpackedObject) {
        this.metadata.reset();
        this.metadata.eventType(this.typeRegistry.get(unpackedObject.getClass()));
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeFollowupEvent(long j, UnpackedObject unpackedObject, Consumer<BrokerEventMetadata> consumer) {
        this.writer.reset();
        this.writer.raftTermId(this.stream.getTerm());
        this.writer.producerId(this.producerId);
        if (this.sourcePartitionId >= 0) {
            this.writer.sourceEvent(this.sourcePartitionId, this.sourcePosition);
        }
        initMetadata(unpackedObject);
        consumer.accept(this.metadata);
        if (j >= 0) {
            this.writer.key(j);
        } else {
            this.writer.positionAsKey();
        }
        return this.writer.metadataWriter(this.metadata).valueWriter(unpackedObject).tryWrite();
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeFollowupEvent(long j, UnpackedObject unpackedObject) {
        return writeFollowupEvent(j, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeNewEvent(UnpackedObject unpackedObject, Consumer<BrokerEventMetadata> consumer) {
        return writeFollowupEvent(-1L, unpackedObject, consumer);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public long writeNewEvent(UnpackedObject unpackedObject) {
        return writeFollowupEvent(-1L, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addFollowUpEvent(long j, UnpackedObject unpackedObject, Consumer<BrokerEventMetadata> consumer) {
        initMetadata(unpackedObject);
        consumer.accept(this.metadata);
        LogStreamBatchWriter.LogEntryBuilder event = this.batchWriter.event();
        if (j >= 0) {
            event.key(j);
        } else {
            event.positionAsKey();
        }
        event.metadataWriter(this.metadata).valueWriter(unpackedObject).done();
        return this;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addFollowUpEvent(long j, UnpackedObject unpackedObject) {
        return addFollowUpEvent(j, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addNewEvent(UnpackedObject unpackedObject) {
        return addFollowUpEvent(-1L, unpackedObject, this.noop);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public TypedBatchWriter addNewEvent(UnpackedObject unpackedObject, Consumer<BrokerEventMetadata> consumer) {
        return addFollowUpEvent(-1L, unpackedObject, consumer);
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedBatchWriter
    public long write() {
        return this.batchWriter.tryWrite();
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedStreamWriter
    public TypedBatchWriter newBatch() {
        this.batchWriter.reset();
        this.batchWriter.producerId(this.producerId);
        this.batchWriter.raftTermId(this.stream.getTerm());
        if (this.sourcePartitionId >= 0) {
            this.batchWriter.sourceEvent(this.sourcePartitionId, this.sourcePosition);
        }
        return this;
    }
}
