package io.camunda.zeebe.broker.jobstream;

import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.log.WriteContext;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.stream.api.scheduling.ScheduledCommandCache;
import io.camunda.zeebe.stream.api.scheduling.TaskResult;
import io.camunda.zeebe.stream.api.scheduling.TaskResultBuilder;
import io.camunda.zeebe.stream.impl.BufferedTaskResultBuilder;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import java.time.Duration;
import java.util.Objects;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/broker/jobstream/RemoteJobStreamErrorHandler.class */
final class RemoteJobStreamErrorHandler implements RemoteStreamErrorHandler<ActivatedJob> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStreamErrorHandler.class);
    private static final Logger NO_WRITER_LOGGER = new ThrottledLogger(LOGGER, Duration.ofSeconds(1));
    private static final Logger FAILED_WRITER_LOGGER = new ThrottledLogger(LOGGER, Duration.ofSeconds(1));
    private final JobStreamErrorHandler errorHandler;
    private final Int2ObjectHashMap<LogStreamWriter> partitionWriters = new Int2ObjectHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoteJobStreamErrorHandler(JobStreamErrorHandler jobStreamErrorHandler) {
        this.errorHandler = jobStreamErrorHandler;
    }

    public void handleError(Throwable th, ActivatedJob activatedJob) {
        int decodePartitionId = Protocol.decodePartitionId(activatedJob.jobKey());
        LogStreamWriter logStreamWriter = (LogStreamWriter) this.partitionWriters.get(decodePartitionId);
        if (logStreamWriter == null) {
            NO_WRITER_LOGGER.warn("Cannot handle failed job push on partition {} there is no writer registered;\nthis can occur during an election", Integer.valueOf(decodePartitionId));
            return;
        }
        Objects.requireNonNull(logStreamWriter);
        TaskResultBuilder bufferedTaskResultBuilder = new BufferedTaskResultBuilder((v1, v2) -> {
            return r2.canWriteEvents(v1, v2);
        }, new ScheduledCommandCache.NoopScheduledCommandCache());
        this.errorHandler.handleError(activatedJob, th, bufferedTaskResultBuilder);
        writeEntries(decodePartitionId, activatedJob, logStreamWriter, bufferedTaskResultBuilder.build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addWriter(int i, LogStreamWriter logStreamWriter) {
        this.partitionWriters.put(i, logStreamWriter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeWriter(int i) {
        this.partitionWriters.remove(i);
    }

    private void writeEntries(int i, ActivatedJob activatedJob, LogStreamWriter logStreamWriter, TaskResult taskResult) {
        Either tryWrite = logStreamWriter.tryWrite(WriteContext.processingResult(), taskResult.getRecordBatch().entries());
        if (tryWrite.isLeft()) {
            FAILED_WRITER_LOGGER.warn("Failed to handle failed job push {} on partition {}. Write to logstream failed with {};\njob will remain activated until it times out.", new Object[]{Long.valueOf(activatedJob.jobKey()), Integer.valueOf(i), tryWrite.getLeft()});
        }
    }
}
