package io.zeebe.broker.logstreams.processor;

import io.zeebe.logstreams.LogStreams;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventFilter;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.logstreams.snapshot.TimeBasedSnapshotPolicy;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.util.actor.ActorScheduler;
import java.time.Duration;

/* loaded from: input_file:io/zeebe/broker/logstreams/processor/StreamProcessorService.class */
public class StreamProcessorService implements Service<StreamProcessorController> {
    private final String name;
    private final int id;
    private final StreamProcessor streamProcessor;
    protected MetadataFilter customEventFilter;
    protected EventFilter customReprocessingEventFilter;
    protected boolean readOnly;
    private StreamProcessorController streamProcessorController;
    private final Injector<LogStream> logStreamInjector = new Injector<>();
    private final Injector<SnapshotStorage> snapshotStorageInjector = new Injector<>();
    private final Injector<ActorScheduler> actorSchedulerInjector = new Injector<>();
    protected final MetadataFilter versionFilter = brokerEventMetadata -> {
        if (brokerEventMetadata.getProtocolVersion() > 1) {
            throw new RuntimeException(String.format("Cannot handle event with version newer than what is implemented by broker (%d > %d)", Integer.valueOf(brokerEventMetadata.getProtocolVersion()), 1));
        }
        return true;
    };

    /* loaded from: input_file:io/zeebe/broker/logstreams/processor/StreamProcessorService$MetadataEventFilter.class */
    protected static class MetadataEventFilter implements EventFilter {
        protected final BrokerEventMetadata metadata = new BrokerEventMetadata();
        protected final MetadataFilter metadataFilter;

        public MetadataEventFilter(MetadataFilter metadataFilter) {
            this.metadataFilter = metadataFilter;
        }

        @Override // io.zeebe.logstreams.processor.EventFilter
        public boolean applies(LoggedEvent loggedEvent) {
            loggedEvent.readMetadata(this.metadata);
            return this.metadataFilter.applies(this.metadata);
        }
    }

    public StreamProcessorService(String str, int i, StreamProcessor streamProcessor) {
        this.name = str;
        this.id = i;
        this.streamProcessor = streamProcessor;
    }

    public StreamProcessorService eventFilter(MetadataFilter metadataFilter) {
        this.customEventFilter = metadataFilter;
        return this;
    }

    public StreamProcessorService reprocessingEventFilter(EventFilter eventFilter) {
        this.customReprocessingEventFilter = eventFilter;
        return this;
    }

    public StreamProcessorService readOnly(boolean z) {
        this.readOnly = z;
        return this;
    }

    @Override // io.zeebe.servicecontainer.Service
    public void start(ServiceStartContext serviceStartContext) {
        LogStream value = this.logStreamInjector.getValue();
        SnapshotStorage value2 = this.snapshotStorageInjector.getValue();
        ActorScheduler value3 = this.actorSchedulerInjector.getValue();
        MetadataFilter metadataFilter = this.versionFilter;
        if (this.customEventFilter != null) {
            metadataFilter = metadataFilter.and(this.customEventFilter);
        }
        MetadataEventFilter metadataEventFilter = new MetadataEventFilter(metadataFilter);
        MetadataEventFilter metadataEventFilter2 = new MetadataEventFilter(this.versionFilter);
        if (this.customReprocessingEventFilter != null) {
            metadataEventFilter2 = metadataEventFilter2.and(this.customReprocessingEventFilter);
        }
        this.streamProcessorController = LogStreams.createStreamProcessor(this.name, this.id, this.streamProcessor).logStream(value).snapshotStorage(value2).snapshotPolicy(new TimeBasedSnapshotPolicy(Duration.ofMinutes(15L))).actorScheduler(value3).eventFilter(metadataEventFilter).reprocessingEventFilter(metadataEventFilter2).readOnly(this.readOnly).build();
        serviceStartContext.async(this.streamProcessorController.openAsync());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.zeebe.servicecontainer.Service
    public StreamProcessorController get() {
        return this.streamProcessorController;
    }

    @Override // io.zeebe.servicecontainer.Service
    public void stop(ServiceStopContext serviceStopContext) {
        serviceStopContext.async(this.streamProcessorController.closeAsync());
    }

    public Injector<SnapshotStorage> getSnapshotStorageInjector() {
        return this.snapshotStorageInjector;
    }

    public Injector<ActorScheduler> getActorSchedulerInjector() {
        return this.actorSchedulerInjector;
    }

    public Injector<LogStream> getLogStreamInjector() {
        return this.logStreamInjector;
    }

    public StreamProcessorController getStreamProcessorController() {
        return this.streamProcessorController;
    }

    public String getName() {
        return this.name;
    }
}
