package org.apache.beam.runners.fnexecution.control;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.class */
public class FnApiControlClientTest {

    @Rule
    public transient Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Mock
    public StreamObserver<BeamFnApi.InstructionRequest> mockObserver;
    private FnApiControlClient client;
    private ConcurrentMap<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.processBundleDescriptors = new ConcurrentHashMap();
        this.client = FnApiControlClient.forRequestObserver("DUMMY", this.mockObserver, this.processBundleDescriptors);
    }

    @Test
    public void testRequestSent() {
        this.client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("instructionId").build());
        ((StreamObserver) Mockito.verify(this.mockObserver)).onNext((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class));
    }

    @Test
    public void testRequestSuccess() throws Exception {
        CompletionStage handle = this.client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("successfulInstruction").build());
        this.client.asResponseObserver().onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId("successfulInstruction").build());
        MatcherAssert.assertThat(((BeamFnApi.InstructionResponse) MoreFutures.get(handle)).getInstructionId(), org.hamcrest.Matchers.equalTo("successfulInstruction"));
    }

    @Test
    public void testRequestError() throws Exception {
        CompletionStage handle = this.client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("instructionId").build());
        this.client.asResponseObserver().onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId("instructionId").setError("Oh no an error!").build());
        this.thrown.expectCause(org.hamcrest.Matchers.isA(RuntimeException.class));
        this.thrown.expectMessage("Oh no an error!");
        MoreFutures.get(handle);
    }

    @Test
    public void testUnknownResponseIgnored() throws Exception {
        CompletionStage handle = this.client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("actualInstruction").build());
        this.client.asResponseObserver().onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId("unknownInstruction").build());
        MatcherAssert.assertThat(Boolean.valueOf(MoreFutures.isDone(handle)), org.hamcrest.Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(MoreFutures.isCancelled(handle)), org.hamcrest.Matchers.is(false));
    }

    @Test
    public void testOnCompletedCancelsOutstanding() throws Exception {
        CompletionStage handle = this.client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("clientHangUpInstruction").build());
        this.client.asResponseObserver().onCompleted();
        this.thrown.expect(ExecutionException.class);
        this.thrown.expectCause(org.hamcrest.Matchers.isA(IllegalStateException.class));
        this.thrown.expectMessage("closed");
        MoreFutures.get(handle);
    }

    @Test
    public void testOnErrorCancelsOutstanding() throws Exception {
        CompletionStage handle = this.client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("errorInstruction").build());
        this.client.asResponseObserver().onError(new Exception() { // from class: org.apache.beam.runners.fnexecution.control.FnApiControlClientTest.1FrazzleException
        });
        this.thrown.expect(ExecutionException.class);
        this.thrown.expectCause(org.hamcrest.Matchers.isA(C1FrazzleException.class));
        MoreFutures.get(handle);
    }

    @Test
    public void testCloseCancelsOutstanding() throws Exception {
        CompletionStage handle = this.client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("serverCloseInstruction").build());
        this.client.close();
        this.thrown.expect(ExecutionException.class);
        this.thrown.expectCause(org.hamcrest.Matchers.isA(IllegalStateException.class));
        this.thrown.expectMessage("closed");
        MoreFutures.get(handle);
    }

    @Test
    public void testOnCloseMultipleListener() throws Exception {
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        Consumer consumer2 = (Consumer) Mockito.mock(Consumer.class);
        this.client.onClose(consumer);
        this.client.onClose(consumer2);
        this.client.close();
        ((Consumer) Mockito.verify(consumer)).accept(this.client);
        ((Consumer) Mockito.verify(consumer2)).accept(this.client);
    }
}
