package io.camunda.zeebe.broker.transport.commandapi;

import io.camunda.zeebe.broker.system.configuration.BrokerCfgTest;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerPublishMessageRequest;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.log.WriteContext;
import io.camunda.zeebe.protocol.impl.encoding.ErrorResponse;
import io.camunda.zeebe.protocol.impl.encoding.ExecuteCommandRequest;
import io.camunda.zeebe.protocol.impl.encoding.ExecuteCommandResponse;
import io.camunda.zeebe.protocol.impl.encoding.ExecuteQueryRequest;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.scheduler.testing.ControlledActorSchedulerRule;
import io.camunda.zeebe.transport.ServerOutput;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/broker/transport/commandapi/CommandApiRequestHandlerTest.class */
public class CommandApiRequestHandlerTest {

    @Rule
    public final ControlledActorSchedulerRule scheduler = new ControlledActorSchedulerRule();
    final CommandApiRequestHandler handler = new CommandApiRequestHandler();
    private LogStreamWriter logStreamWriter;

    @Before
    public void setup() {
        this.scheduler.submitActor(this.handler);
        this.logStreamWriter = (LogStreamWriter) Mockito.mock(LogStreamWriter.class);
        this.handler.addPartition(0, this.logStreamWriter);
        this.scheduler.workUntilDone();
    }

    @Test
    public void shouldRejectCommandWithInvalidTemplate() {
        Assertions.assertThat(handleRequest(new ExecuteQueryRequest())).succeedsWithin(Duration.ofMinutes(1L)).matches((v0) -> {
            return v0.isLeft();
        }).matches((v0) -> {
            return v0.isLeft();
        }).extracting((v0) -> {
            return v0.getLeft();
        }).extracting((v0) -> {
            return v0.getErrorCode();
        }).isEqualTo(ErrorCode.INVALID_MESSAGE_TEMPLATE);
    }

    @Test
    public void shouldRejectCommandIfNotLeader() {
        ExecuteCommandRequest executeCommandRequest = new ExecuteCommandRequest();
        this.handler.removePartition(0);
        this.scheduler.workUntilDone();
        Assertions.assertThat(handleRequest(executeCommandRequest)).succeedsWithin(Duration.ofMinutes(1L)).matches((v0) -> {
            return v0.isLeft();
        }).matches((v0) -> {
            return v0.isLeft();
        }).extracting((v0) -> {
            return v0.getLeft();
        }).extracting((v0) -> {
            return v0.getErrorCode();
        }).isEqualTo(ErrorCode.PARTITION_LEADER_MISMATCH);
    }

    @Test
    public void shouldRejectCommandWithoutEvent() {
        Assertions.assertThat(handleRequest(new ExecuteCommandRequest())).succeedsWithin(Duration.ofMinutes(1L)).matches((v0) -> {
            return v0.isLeft();
        }).matches((v0) -> {
            return v0.isLeft();
        }).extracting((v0) -> {
            return v0.getLeft();
        }).extracting((v0) -> {
            return v0.getErrorCode();
        }).isEqualTo(ErrorCode.UNSUPPORTED_MESSAGE);
    }

    @Test
    public void shouldRejectCommandWithUnknownEvent() {
        ExecuteCommandRequest executeCommandRequest = new ExecuteCommandRequest();
        executeCommandRequest.setValueType(ValueType.ERROR);
        Assertions.assertThat(handleRequest(executeCommandRequest)).succeedsWithin(Duration.ofMinutes(1L)).matches((v0) -> {
            return v0.isLeft();
        }).matches((v0) -> {
            return v0.isLeft();
        }).extracting((v0) -> {
            return v0.getLeft();
        }).extracting((v0) -> {
            return v0.getErrorCode();
        }).isEqualTo(ErrorCode.UNSUPPORTED_MESSAGE);
    }

    @Test
    public void shouldWriteToLog() {
        LogStreamWriter logStreamWriter = (LogStreamWriter) Mockito.mock(LogStreamWriter.class);
        Mockito.when(Boolean.valueOf(logStreamWriter.canWriteEvents(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()))).thenReturn(true);
        this.handler.addPartition(0, logStreamWriter);
        this.scheduler.workUntilDone();
        BrokerPublishMessageRequest timeToLive = new BrokerPublishMessageRequest(BrokerCfgTest.BROKER_BASE, "1").setMessageId("1").setTimeToLive(0L);
        timeToLive.serializeValue();
        handleRequest(timeToLive);
        ((LogStreamWriter) Mockito.verify(logStreamWriter)).tryWrite((WriteContext) ArgumentMatchers.any(WriteContext.class), (LogAppendEntry) ArgumentMatchers.any(LogAppendEntry.class));
    }

    @Test
    public void shouldReturnNormalErrorMessageIfProcessingPausedOnDifferentPartition() {
        LogStreamWriter logStreamWriter = (LogStreamWriter) Mockito.mock(LogStreamWriter.class);
        Mockito.when(Boolean.valueOf(logStreamWriter.canWriteEvents(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()))).thenReturn(true);
        Mockito.when(logStreamWriter.tryWrite((WriteContext) ArgumentMatchers.any(WriteContext.class), (LogAppendEntry) ArgumentMatchers.any(LogAppendEntry.class))).thenReturn(Either.left(LogStreamWriter.WriteFailure.FULL));
        this.handler.addPartition(0, logStreamWriter);
        this.handler.onRecovered(0);
        this.handler.onPaused(1);
        this.scheduler.workUntilDone();
        BrokerPublishMessageRequest timeToLive = new BrokerPublishMessageRequest(BrokerCfgTest.BROKER_BASE, "1").setMessageId("1").setTimeToLive(0L);
        timeToLive.serializeValue();
        Assertions.assertThat(handleRequest(timeToLive)).succeedsWithin(Duration.ofSeconds(5L)).matches((v0) -> {
            return v0.isLeft();
        }).extracting((v0) -> {
            return v0.getLeft();
        }).extracting((v0) -> {
            return v0.getErrorData();
        }).extracting(directBuffer -> {
            return BufferUtil.bufferAsString(directBuffer);
        }).isEqualTo("Failed to write client request to partition '0', because the writer buffer is full.");
    }

    @Test
    public void shouldReturnProcessingPausedIfPartitionPaused() {
        LogStreamWriter logStreamWriter = (LogStreamWriter) Mockito.mock(LogStreamWriter.class);
        Mockito.when(Boolean.valueOf(logStreamWriter.canWriteEvents(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()))).thenReturn(true);
        Mockito.when(logStreamWriter.tryWrite((WriteContext) ArgumentMatchers.any(WriteContext.class), (LogAppendEntry) ArgumentMatchers.any(LogAppendEntry.class))).thenReturn(Either.left(LogStreamWriter.WriteFailure.FULL));
        this.handler.addPartition(0, logStreamWriter);
        this.handler.onRecovered(0);
        this.handler.onPaused(0);
        this.scheduler.workUntilDone();
        BrokerPublishMessageRequest timeToLive = new BrokerPublishMessageRequest(BrokerCfgTest.BROKER_BASE, "1").setMessageId("1").setTimeToLive(0L);
        timeToLive.serializeValue();
        Assertions.assertThat(handleRequest(timeToLive)).succeedsWithin(Duration.ofSeconds(5L)).matches((v0) -> {
            return v0.isLeft();
        }).extracting((v0) -> {
            return v0.getLeft();
        }).extracting((v0) -> {
            return v0.getErrorData();
        }).extracting(directBuffer -> {
            return BufferUtil.bufferAsString(directBuffer);
        }).isEqualTo("Processing paused for partition '0'");
    }

    @Test
    public void shouldReturnPartitionLeaderMismatchWhenWriterClosed() {
        LogStreamWriter logStreamWriter = (LogStreamWriter) Mockito.mock(LogStreamWriter.class);
        Mockito.when(Boolean.valueOf(logStreamWriter.canWriteEvents(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()))).thenReturn(true);
        Mockito.when(logStreamWriter.tryWrite((WriteContext) ArgumentMatchers.any(WriteContext.class), (LogAppendEntry) ArgumentMatchers.any(LogAppendEntry.class))).thenReturn(Either.left(LogStreamWriter.WriteFailure.CLOSED));
        this.handler.addPartition(0, logStreamWriter);
        this.scheduler.workUntilDone();
        BrokerPublishMessageRequest timeToLive = new BrokerPublishMessageRequest(BrokerCfgTest.BROKER_BASE, "1").setMessageId("1").setTimeToLive(0L);
        timeToLive.serializeValue();
        Assertions.assertThat(handleRequest(timeToLive)).succeedsWithin(Duration.ofMinutes(1L)).matches((v0) -> {
            return v0.isLeft();
        }).extracting((v0) -> {
            return v0.getLeft();
        }).extracting((v0) -> {
            return v0.getErrorCode();
        }).isEqualTo(ErrorCode.PARTITION_LEADER_MISMATCH);
    }

    @Test
    public void shouldRejectRequestIfTooLarge() {
        Mockito.when(Boolean.valueOf(this.logStreamWriter.canWriteEvents(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()))).thenReturn(false);
        BrokerPublishMessageRequest timeToLive = new BrokerPublishMessageRequest(BrokerCfgTest.BROKER_BASE, "1").setMessageId("1").setTimeToLive(0L);
        timeToLive.serializeValue();
        Assertions.assertThat(handleRequest(timeToLive)).succeedsWithin(Duration.ofMinutes(1L)).matches((v0) -> {
            return v0.isLeft();
        }).extracting((v0) -> {
            return v0.getLeft();
        }).extracting(new Function[]{(v0) -> {
            return v0.getErrorCode();
        }, errorResponse -> {
            return BufferUtil.bufferAsString(errorResponse.getErrorData());
        }}).containsExactly(new Object[]{ErrorCode.MALFORMED_REQUEST, "Request size is above configured maxMessageSize."});
    }

    private CompletableFuture<Either<ErrorResponse, ExecuteCommandResponse>> handleRequest(BufferWriter bufferWriter) {
        CompletableFuture<Either<ErrorResponse, ExecuteCommandResponse>> completableFuture = new CompletableFuture<>();
        ServerOutput createServerOutput = createServerOutput(completableFuture);
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[bufferWriter.getLength()]);
        bufferWriter.write(unsafeBuffer, 0);
        this.handler.onRequest(createServerOutput, 0, 0L, unsafeBuffer, 0, bufferWriter.getLength());
        this.scheduler.workUntilDone();
        return completableFuture;
    }

    private ServerOutput createServerOutput(CompletableFuture<Either<ErrorResponse, ExecuteCommandResponse>> completableFuture) {
        return serverResponse -> {
            ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer();
            serverResponse.write(expandableArrayBuffer, 0);
            ErrorResponse errorResponse = new ErrorResponse();
            if (errorResponse.tryWrap(expandableArrayBuffer)) {
                errorResponse.wrap(expandableArrayBuffer, 0, serverResponse.getLength());
                completableFuture.complete(Either.left(errorResponse));
                return;
            }
            ExecuteCommandResponse executeCommandResponse = new ExecuteCommandResponse();
            try {
                executeCommandResponse.wrap(expandableArrayBuffer, 0, serverResponse.getLength());
                completableFuture.complete(Either.right(executeCommandResponse));
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        };
    }
}
