package io.camunda.zeebe.broker.transport.queryapi;

import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.impl.encoding.ErrorResponse;
import io.camunda.zeebe.protocol.impl.encoding.ExecuteQueryRequest;
import io.camunda.zeebe.protocol.impl.encoding.ExecuteQueryResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessIntent;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.test.broker.protocol.commandapi.CommandApiRule;
import io.camunda.zeebe.test.broker.protocol.commandapi.PartitionTestClient;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.transport.ClientRequest;
import io.camunda.zeebe.transport.ClientTransport;
import io.camunda.zeebe.transport.RequestType;
import io.camunda.zeebe.transport.impl.AtomixClientTransportAdapter;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.netty.util.NetUtil;
import java.time.Duration;
import java.util.Objects;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/camunda/zeebe/broker/transport/queryapi/QueryApiIT.class */
public final class QueryApiIT {
    public final ActorSchedulerRule actor = new ActorSchedulerRule();
    public final EmbeddedBrokerRule broker = new EmbeddedBrokerRule(brokerCfg -> {
        brokerCfg.getExperimental().getQueryApi().setEnabled(true);
    });
    public final CommandApiRule command;

    @Rule
    public final RuleChain ruleChain;
    private ClientTransport clientTransport;
    private String serverAddress;

    /* loaded from: input_file:io/camunda/zeebe/broker/transport/queryapi/QueryApiIT$Request.class */
    private static final class Request implements ClientRequest {
        private final ExecuteQueryRequest request = new ExecuteQueryRequest();

        private Request() {
        }

        public Request partitionId(int i) {
            this.request.setPartitionId(i);
            return this;
        }

        public Request key(long j) {
            this.request.setKey(j);
            return this;
        }

        public Request valueType(ValueType valueType) {
            this.request.setValueType(valueType);
            return this;
        }

        public int getPartitionId() {
            return this.request.getPartitionId();
        }

        public RequestType getRequestType() {
            return RequestType.QUERY;
        }

        public int getLength() {
            return this.request.getLength();
        }

        public void write(MutableDirectBuffer mutableDirectBuffer, int i) {
            this.request.write(mutableDirectBuffer, i);
        }
    }

    public QueryApiIT() {
        EmbeddedBrokerRule embeddedBrokerRule = this.broker;
        Objects.requireNonNull(embeddedBrokerRule);
        this.command = new CommandApiRule(embeddedBrokerRule::getAtomixCluster);
        this.ruleChain = RuleChain.outerRule(this.broker).around(this.command).around(this.actor);
    }

    @Before
    public void setup() {
        this.serverAddress = NetUtil.toSocketAddressString(this.broker.getBrokerCfg().getNetwork().getCommandApi().getAddress());
        NettyMessagingService nettyMessagingService = new NettyMessagingService(this.broker.getBrokerCfg().getCluster().getClusterName(), Address.from(SocketUtil.getNextAddress().getPort()), new MessagingConfig(), this.broker.getMeterRegistry());
        this.clientTransport = new AtomixClientTransportAdapter(nettyMessagingService);
        this.actor.submitActor(this.clientTransport).join();
        nettyMessagingService.start().join();
    }

    @Test
    public void shouldRespondWithErrorWhenDisabled() {
        this.broker.getBrokerCfg().getExperimental().getQueryApi().setEnabled(false);
        DirectBuffer directBuffer = (DirectBuffer) this.clientTransport.sendRequest(() -> {
            return this.serverAddress;
        }, new Request().partitionId(1).key(123L).valueType(ValueType.PROCESS), Duration.ofSeconds(10L)).join();
        Assertions.assertThat(directBuffer).isNotNull();
        ErrorResponse errorResponse = new ErrorResponse();
        errorResponse.wrap(directBuffer, 0, directBuffer.capacity());
        Assertions.assertThat(errorResponse.getErrorCode()).isEqualTo(ErrorCode.UNSUPPORTED_MESSAGE);
        Assertions.assertThat(BufferUtil.bufferAsString(errorResponse.getErrorData())).isEqualTo("Failed to handle query as the query API is disabled; did you configure zeebe.broker.experimental.queryapi.enabled?");
    }

    @Test
    public void shouldRespondWithBpmnProcessIdWhenProcessFound() {
        long processDefinitionKey = this.command.partitionClient(1).deployProcess(Bpmn.createExecutableProcess("OneProcessToRuleThemAll").startEvent().endEvent().done()).getProcessDefinitionKey();
        ((AbstractBooleanAssert) Assertions.assertThat(RecordingExporter.processRecords().withIntent(ProcessIntent.CREATED).withRecordKey(processDefinitionKey).limit(1L).exists()).as("wait until the process definition actually exists in the state", new Object[0])).isTrue();
        DirectBuffer directBuffer = (DirectBuffer) this.clientTransport.sendRequest(() -> {
            return this.serverAddress;
        }, new Request().partitionId(1).key(processDefinitionKey).valueType(ValueType.PROCESS), Duration.ofSeconds(10L)).join();
        ExecuteQueryResponse executeQueryResponse = new ExecuteQueryResponse();
        executeQueryResponse.wrap(directBuffer, 0, directBuffer.capacity());
        Assertions.assertThat(executeQueryResponse).extracting((v0) -> {
            return v0.getBpmnProcessId();
        }).isEqualTo("OneProcessToRuleThemAll");
    }

    @Test
    public void shouldRespondWithBpmnProcessIdWhenProcessInstanceFound() {
        PartitionTestClient partitionClient = this.command.partitionClient(1);
        partitionClient.deploy(Bpmn.createExecutableProcess("OneProcessToRuleThemAll").startEvent().serviceTask("task", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("type");
        }).endEvent().done());
        long processInstanceKey = partitionClient.createProcessInstance(processInstanceCreationRecord -> {
            return processInstanceCreationRecord.setBpmnProcessId("OneProcessToRuleThemAll");
        }).getProcessInstanceKey();
        ((AbstractBooleanAssert) Assertions.assertThat(RecordingExporter.processInstanceRecords().withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATING).withProcessInstanceKey(processInstanceKey).filterRootScope().limit(1L).exists()).as("wait until the element instance actually exists in the state", new Object[0])).isTrue();
        DirectBuffer directBuffer = (DirectBuffer) this.clientTransport.sendRequest(() -> {
            return this.serverAddress;
        }, new Request().partitionId(1).key(processInstanceKey).valueType(ValueType.PROCESS_INSTANCE), Duration.ofSeconds(10L)).join();
        ExecuteQueryResponse executeQueryResponse = new ExecuteQueryResponse();
        executeQueryResponse.wrap(directBuffer, 0, directBuffer.capacity());
        Assertions.assertThat(executeQueryResponse).extracting((v0) -> {
            return v0.getBpmnProcessId();
        }).isEqualTo("OneProcessToRuleThemAll");
    }

    @Test
    public void shouldRespondWithBpmnProcessIdWhenJobFound() {
        DirectBuffer directBuffer = (DirectBuffer) this.clientTransport.sendRequest(() -> {
            return this.serverAddress;
        }, new Request().partitionId(1).key(this.command.partitionClient(1).createJob("type")).valueType(ValueType.JOB), Duration.ofSeconds(10L)).join();
        ExecuteQueryResponse executeQueryResponse = new ExecuteQueryResponse();
        executeQueryResponse.wrap(directBuffer, 0, directBuffer.capacity());
        Assertions.assertThat(executeQueryResponse).extracting((v0) -> {
            return v0.getBpmnProcessId();
        }).isEqualTo("process");
    }
}
