package org.apache.beam.runners.fnexecution.status;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.server.GrpcContextHeaderAccessorProvider;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ClientInterceptor;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.testing.GrpcCleanupRule;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcServiceTest.class */
public class BeamWorkerStatusGrpcServiceTest {

    @Rule
    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
    private static final String ID = "id";
    private BeamWorkerStatusGrpcService service;
    private GrpcFnServer<BeamWorkerStatusGrpcService> server;
    private ManagedChannel channel;
    private BeamFnWorkerStatusGrpc.BeamFnWorkerStatusStub stub;

    @Mock
    private StreamObserver<BeamFnApi.WorkerStatusRequest> mockObserver;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks(this);
        this.service = BeamWorkerStatusGrpcService.create(Endpoints.ApiServiceDescriptor.newBuilder().setUrl(UUID.randomUUID().toString()).build(), GrpcContextHeaderAccessorProvider.getHeaderAccessor());
        this.server = GrpcFnServer.allocatePortAndCreateFor(this.service, InProcessServerFactory.create());
        this.channel = InProcessChannelBuilder.forName(this.server.getApiServiceDescriptor().getUrl()).build();
        this.stub = BeamFnWorkerStatusGrpc.newStub(this.channel).withInterceptors(new ClientInterceptor[]{AddHarnessIdInterceptor.create(ID)});
        this.grpcCleanup.register(this.server.getServer());
        this.grpcCleanup.register(this.channel);
    }

    @After
    public void tearDown() throws Exception {
        if (this.service != null) {
            this.service.close();
        }
    }

    @Test
    public void testClientConnected() throws Exception {
        this.stub.workerStatus(this.mockObserver);
        Assert.assertNotNull(waitAndGetStatusClient(ID));
    }

    @Test
    public void testGetWorkerStatusNoResponse() throws Exception {
        this.stub.workerStatus(this.mockObserver);
        waitAndGetStatusClient(ID);
        Assert.assertEquals("Error: exception encountered getting status from SDK harness: java.util.concurrent.TimeoutException", this.service.getSingleWorkerStatus(ID, 1L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testGetWorkerStatusSuccess() throws Exception {
        StreamObserver workerStatus = this.stub.workerStatus(this.mockObserver);
        waitAndGetStatusClient(ID);
        ((StreamObserver) Mockito.doAnswer(invocationOnMock -> {
            workerStatus.onNext(BeamFnApi.WorkerStatusResponse.newBuilder().setId(((BeamFnApi.WorkerStatusRequest) invocationOnMock.getArguments()[0]).getId()).setStatusInfo("status").build());
            return null;
        }).when(this.mockObserver)).onNext((BeamFnApi.WorkerStatusRequest) ArgumentMatchers.any());
        Assert.assertEquals("status", (String) this.service.getWorkerStatus(ID).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testGetWorkerStatusReturnError() throws Exception {
        StreamObserver workerStatus = this.stub.workerStatus(this.mockObserver);
        waitAndGetStatusClient(ID);
        ((StreamObserver) Mockito.doAnswer(invocationOnMock -> {
            workerStatus.onNext(BeamFnApi.WorkerStatusResponse.newBuilder().setId(((BeamFnApi.WorkerStatusRequest) invocationOnMock.getArguments()[0]).getId()).setError("error").build());
            return null;
        }).when(this.mockObserver)).onNext((BeamFnApi.WorkerStatusRequest) ArgumentMatchers.any());
        Assert.assertEquals("error", (String) this.service.getWorkerStatus(ID).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testGetAllWorkerStatuses() throws Exception {
        HashSet<String> newHashSet = Sets.newHashSet(new String[]{"id0", "id3", "id11", "id12", "id21"});
        for (String str : newHashSet) {
            StreamObserver streamObserver = (StreamObserver) Mockito.mock(StreamObserver.class);
            StreamObserver workerStatus = BeamFnWorkerStatusGrpc.newStub(this.channel).withInterceptors(new ClientInterceptor[]{AddHarnessIdInterceptor.create(str)}).workerStatus(streamObserver);
            waitAndGetStatusClient(str);
            ((StreamObserver) Mockito.doAnswer(invocationOnMock -> {
                workerStatus.onNext(BeamFnApi.WorkerStatusResponse.newBuilder().setId(((BeamFnApi.WorkerStatusRequest) invocationOnMock.getArguments()[0]).getId()).setStatusInfo("status").build());
                return null;
            }).when(streamObserver)).onNext((BeamFnApi.WorkerStatusRequest) ArgumentMatchers.any());
        }
        Map allWorkerStatuses = this.service.getAllWorkerStatuses(5L, TimeUnit.SECONDS);
        Assert.assertEquals(newHashSet, allWorkerStatuses.keySet());
        Iterator it = newHashSet.iterator();
        while (it.hasNext()) {
            Assert.assertEquals("status", allWorkerStatuses.get((String) it.next()));
        }
    }

    private WorkerStatusClient waitAndGetStatusClient(String str) throws InterruptedException, ExecutionException, TimeoutException {
        return (WorkerStatusClient) this.service.getStatusClient(str).get(1L, TimeUnit.SECONDS);
    }
}
