package org.apache.kafka.raft;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/kafka/raft/KafkaNetworkChannelTest.class */
public class KafkaNetworkChannelTest {
    private static final List<ApiKeys> RAFT_APIS = Arrays.asList(ApiKeys.VOTE, ApiKeys.BEGIN_QUORUM_EPOCH, ApiKeys.END_QUORUM_EPOCH, ApiKeys.FETCH, ApiKeys.FETCH_SNAPSHOT);
    private final int requestTimeoutMs = 30000;
    private final Time time = new MockTime();
    private final MockClient client = new MockClient(this.time, new StubMetadataUpdater(null));
    private final TopicPartition topicPartition = new TopicPartition("topic", 0);
    private final Uuid topicId = Uuid.randomUuid();
    private final KafkaNetworkChannel channel = new KafkaNetworkChannel(this.time, ListenerName.normalised("NAME"), this.client, 30000, "test-raft");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.raft.KafkaNetworkChannelTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaNetworkChannelTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$ApiKeys = new int[ApiKeys.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.BEGIN_QUORUM_EPOCH.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.END_QUORUM_EPOCH.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.VOTE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FETCH.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FETCH_SNAPSHOT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/raft/KafkaNetworkChannelTest$StubMetadataUpdater.class */
    private static class StubMetadataUpdater implements MockClient.MockMetadataUpdater {
        private StubMetadataUpdater() {
        }

        public List<Node> fetchNodes() {
            return Collections.emptyList();
        }

        public boolean isUpdateNeeded() {
            return false;
        }

        public void update(Time time, MockClient.MetadataUpdate metadataUpdate) {
        }

        /* synthetic */ StubMetadataUpdater(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    private Node nodeWithId(boolean z) {
        return new Node(z ? 2 : -2, "127.0.0.1", 9092);
    }

    @BeforeEach
    public void setupSupportedApis() {
        this.client.setNodeApiVersions(NodeApiVersions.create((List) RAFT_APIS.stream().map(ApiVersionsResponse::toApiVersion).collect(Collectors.toList())));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSendToBlackedOutDestination(boolean z) throws ExecutionException, InterruptedException {
        Node nodeWithId = nodeWithId(z);
        this.client.backoff(nodeWithId, 500L);
        assertBrokerNotAvailable(nodeWithId);
    }

    @Test
    public void testWakeupClientOnSend() throws InterruptedException, ExecutionException {
        Node node = new Node(2, "127.0.0.1", 9092);
        this.client.enableBlockingUntilWakeup(1);
        Thread thread = new Thread(() -> {
            this.channel.pollOnce();
            this.channel.pollOnce();
        });
        this.client.prepareResponseFrom(buildResponse(buildTestErrorResponse(ApiKeys.FETCH, Errors.INVALID_REQUEST)), node, false);
        thread.start();
        RaftRequest.Outbound sendTestRequest = sendTestRequest(ApiKeys.FETCH, node);
        thread.join();
        assertResponseCompleted(sendTestRequest, Errors.INVALID_REQUEST);
    }

    @Test
    public void testSendAndDisconnect() throws ExecutionException, InterruptedException {
        Node node = new Node(2, "127.0.0.1", 9092);
        for (ApiKeys apiKeys : RAFT_APIS) {
            this.client.prepareResponseFrom(buildResponse(buildTestErrorResponse(apiKeys, Errors.INVALID_REQUEST)), node, true);
            sendAndAssertErrorResponse(apiKeys, node, Errors.BROKER_NOT_AVAILABLE);
        }
    }

    @Test
    public void testSendAndFailAuthentication() throws ExecutionException, InterruptedException {
        Node node = new Node(2, "127.0.0.1", 9092);
        for (ApiKeys apiKeys : RAFT_APIS) {
            this.client.createPendingAuthenticationError(node, 100L);
            sendAndAssertErrorResponse(apiKeys, node, Errors.NETWORK_EXCEPTION);
            this.client.reset();
        }
    }

    private void assertBrokerNotAvailable(Node node) throws ExecutionException, InterruptedException {
        Iterator<ApiKeys> it = RAFT_APIS.iterator();
        while (it.hasNext()) {
            sendAndAssertErrorResponse(it.next(), node, Errors.BROKER_NOT_AVAILABLE);
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSendAndReceiveOutboundRequest(boolean z) throws ExecutionException, InterruptedException {
        Node nodeWithId = nodeWithId(z);
        for (ApiKeys apiKeys : RAFT_APIS) {
            Errors errors = Errors.INVALID_REQUEST;
            AbstractResponse buildResponse = buildResponse(buildTestErrorResponse(apiKeys, errors));
            this.client.prepareResponseFrom(buildResponse, nodeWithId);
            System.out.println("api key " + apiKeys + ", response " + buildResponse);
            sendAndAssertErrorResponse(apiKeys, nodeWithId, errors);
        }
    }

    @Test
    public void testUnsupportedVersionError() throws ExecutionException, InterruptedException {
        Node node = new Node(2, "127.0.0.1", 9092);
        for (ApiKeys apiKeys : RAFT_APIS) {
            this.client.prepareUnsupportedVersionResponse(abstractRequest -> {
                return abstractRequest.apiKey() == apiKeys;
            });
            sendAndAssertErrorResponse(apiKeys, node, Errors.UNSUPPORTED_VERSION);
        }
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
    @ParameterizedTest
    public void testFetchRequestDowngrade(short s) {
        sendTestRequest(ApiKeys.FETCH, new Node(2, "127.0.0.1", 9092));
        this.channel.pollOnce();
        Assertions.assertEquals(1, this.client.requests().size());
        FetchRequest build = ((ClientRequest) this.client.requests().peek()).requestBuilder().build(s);
        if (s < 15) {
            Assertions.assertEquals(1, build.data().replicaId());
            Assertions.assertEquals(-1, build.data().replicaState().replicaId());
        } else {
            Assertions.assertEquals(-1, build.data().replicaId());
            Assertions.assertEquals(1, build.data().replicaState().replicaId());
        }
    }

    private RaftRequest.Outbound sendTestRequest(ApiKeys apiKeys, Node node) {
        RaftRequest.Outbound outbound = new RaftRequest.Outbound(this.channel.newCorrelationId(), buildTestRequest(apiKeys), node, this.time.milliseconds());
        this.channel.send(outbound);
        return outbound;
    }

    private void assertResponseCompleted(RaftRequest.Outbound outbound, Errors errors) throws ExecutionException, InterruptedException {
        Assertions.assertTrue(outbound.completion.isDone());
        RaftResponse.Inbound inbound = (RaftResponse.Inbound) outbound.completion.get();
        Assertions.assertEquals(outbound.destination(), inbound.source());
        Assertions.assertEquals(outbound.correlationId(), inbound.correlationId());
        Assertions.assertEquals(outbound.data().apiKey(), inbound.data().apiKey());
        Assertions.assertEquals(errors, extractError(inbound.data()));
    }

    private void sendAndAssertErrorResponse(ApiKeys apiKeys, Node node, Errors errors) throws ExecutionException, InterruptedException {
        RaftRequest.Outbound sendTestRequest = sendTestRequest(apiKeys, node);
        this.channel.pollOnce();
        assertResponseCompleted(sendTestRequest, errors);
    }

    private ApiMessage buildTestRequest(ApiKeys apiKeys) {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$ApiKeys[apiKeys.ordinal()]) {
            case 1:
                return BeginQuorumEpochRequest.singletonRequest(this.topicPartition, "clusterId", 5, 1);
            case 2:
                return EndQuorumEpochRequest.singletonRequest(this.topicPartition, "clusterId", 1, 5, Collections.singletonList(2));
            case 3:
                return VoteRequest.singletonRequest(this.topicPartition, "clusterId", 5, 1, 4, 329L);
            case 4:
                FetchRequestData singletonFetchRequest = RaftUtil.singletonFetchRequest(this.topicPartition, this.topicId, fetchPartition -> {
                    fetchPartition.setCurrentLeaderEpoch(5).setFetchOffset(333L).setLastFetchedEpoch(5);
                });
                singletonFetchRequest.setReplicaState(new FetchRequestData.ReplicaState().setReplicaId(1));
                return singletonFetchRequest;
            case 5:
                return FetchSnapshotRequest.singleton("clusterId", 1, this.topicPartition, partitionSnapshot -> {
                    return partitionSnapshot.setCurrentLeaderEpoch(5).setSnapshotId(new FetchSnapshotRequestData.SnapshotId().setEpoch(4).setEndOffset(323L)).setPosition(10L);
                });
            default:
                throw new AssertionError("Unexpected api " + apiKeys);
        }
    }

    private ApiMessage buildTestErrorResponse(ApiKeys apiKeys, Errors errors) {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$ApiKeys[apiKeys.ordinal()]) {
            case 1:
                return new BeginQuorumEpochResponseData().setErrorCode(errors.code());
            case 2:
                return new EndQuorumEpochResponseData().setErrorCode(errors.code());
            case 3:
                return VoteResponse.singletonResponse(errors, this.topicPartition, Errors.NONE, 1, 5, false);
            case 4:
                return new FetchResponseData().setErrorCode(errors.code());
            case 5:
                return new FetchSnapshotResponseData().setErrorCode(errors.code());
            default:
                throw new AssertionError("Unexpected api " + apiKeys);
        }
    }

    private Errors extractError(ApiMessage apiMessage) {
        short errorCode;
        if (apiMessage instanceof BeginQuorumEpochResponseData) {
            errorCode = ((BeginQuorumEpochResponseData) apiMessage).errorCode();
        } else if (apiMessage instanceof EndQuorumEpochResponseData) {
            errorCode = ((EndQuorumEpochResponseData) apiMessage).errorCode();
        } else if (apiMessage instanceof FetchResponseData) {
            errorCode = ((FetchResponseData) apiMessage).errorCode();
        } else if (apiMessage instanceof VoteResponseData) {
            errorCode = ((VoteResponseData) apiMessage).errorCode();
        } else {
            if (!(apiMessage instanceof FetchSnapshotResponseData)) {
                throw new IllegalArgumentException("Unexpected type for responseData: " + apiMessage);
            }
            errorCode = ((FetchSnapshotResponseData) apiMessage).errorCode();
        }
        return Errors.forCode(errorCode);
    }

    private AbstractResponse buildResponse(ApiMessage apiMessage) {
        if (apiMessage instanceof VoteResponseData) {
            return new VoteResponse((VoteResponseData) apiMessage);
        }
        if (apiMessage instanceof BeginQuorumEpochResponseData) {
            return new BeginQuorumEpochResponse((BeginQuorumEpochResponseData) apiMessage);
        }
        if (apiMessage instanceof EndQuorumEpochResponseData) {
            return new EndQuorumEpochResponse((EndQuorumEpochResponseData) apiMessage);
        }
        if (apiMessage instanceof FetchResponseData) {
            return new FetchResponse((FetchResponseData) apiMessage);
        }
        if (apiMessage instanceof FetchSnapshotResponseData) {
            return new FetchSnapshotResponse((FetchSnapshotResponseData) apiMessage);
        }
        throw new IllegalArgumentException("Unexpected type for responseData: " + apiMessage);
    }
}
