/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.status;

import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
import org.apache.beam.runners.fnexecution.status.WorkerStatusClient;
import org.apache.beam.sdk.fn.channel.AddHarnessIdInterceptor;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.GrpcContextHeaderAccessorProvider;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.HeaderAccessor;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ClientInterceptor;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.testing.GrpcCleanupRule;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class BeamWorkerStatusGrpcServiceTest {
    @Rule
    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
    private static final String ID = "id";
    private BeamWorkerStatusGrpcService service;
    private GrpcFnServer<BeamWorkerStatusGrpcService> server;
    private ManagedChannel channel;
    private BeamFnWorkerStatusGrpc.BeamFnWorkerStatusStub stub;
    @Mock
    private StreamObserver<BeamFnApi.WorkerStatusRequest> mockObserver;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        this.service = BeamWorkerStatusGrpcService.create((Endpoints.ApiServiceDescriptor)Endpoints.ApiServiceDescriptor.newBuilder().setUrl(UUID.randomUUID().toString()).build(), (HeaderAccessor)GrpcContextHeaderAccessorProvider.getHeaderAccessor());
        this.server = GrpcFnServer.allocatePortAndCreateFor((FnService)this.service, (ServerFactory)InProcessServerFactory.create());
        this.channel = InProcessChannelBuilder.forName((String)this.server.getApiServiceDescriptor().getUrl()).build();
        this.stub = (BeamFnWorkerStatusGrpc.BeamFnWorkerStatusStub)BeamFnWorkerStatusGrpc.newStub((Channel)this.channel).withInterceptors(new ClientInterceptor[]{AddHarnessIdInterceptor.create((String)ID)});
        this.grpcCleanup.register(this.server.getServer());
        this.grpcCleanup.register(this.channel);
    }

    @After
    public void tearDown() throws Exception {
        if (this.service != null) {
            this.service.close();
        }
    }

    @Test
    public void testClientConnected() throws Exception {
        this.stub.workerStatus(this.mockObserver);
        WorkerStatusClient client = this.waitAndGetStatusClient(ID);
        Assert.assertNotNull((Object)client);
    }

    @Test
    public void testGetWorkerStatusNoResponse() throws Exception {
        StreamObserver unused = this.stub.workerStatus(this.mockObserver);
        this.waitAndGetStatusClient(ID);
        String response = this.service.getSingleWorkerStatus(ID, 1L, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)"Error: exception encountered getting status from SDK harness: java.util.concurrent.TimeoutException", (Object)response);
    }

    @Test
    public void testGetWorkerStatusSuccess() throws Exception {
        StreamObserver observer = this.stub.workerStatus(this.mockObserver);
        this.waitAndGetStatusClient(ID);
        ((StreamObserver)Mockito.doAnswer(invocation -> {
            BeamFnApi.WorkerStatusRequest request = (BeamFnApi.WorkerStatusRequest)invocation.getArguments()[0];
            observer.onNext((Object)BeamFnApi.WorkerStatusResponse.newBuilder().setId(request.getId()).setStatusInfo("status").build());
            return null;
        }).when(this.mockObserver)).onNext((Object)((BeamFnApi.WorkerStatusRequest)ArgumentMatchers.any()));
        CompletableFuture future = this.service.getWorkerStatus(ID);
        String response = (String)future.get(5L, TimeUnit.SECONDS);
        Assert.assertEquals((Object)"status", (Object)response);
    }

    @Test
    public void testGetWorkerStatusReturnError() throws Exception {
        StreamObserver observer = this.stub.workerStatus(this.mockObserver);
        this.waitAndGetStatusClient(ID);
        ((StreamObserver)Mockito.doAnswer(invocation -> {
            BeamFnApi.WorkerStatusRequest request = (BeamFnApi.WorkerStatusRequest)invocation.getArguments()[0];
            observer.onNext((Object)BeamFnApi.WorkerStatusResponse.newBuilder().setId(request.getId()).setError("error").build());
            return null;
        }).when(this.mockObserver)).onNext((Object)((BeamFnApi.WorkerStatusRequest)ArgumentMatchers.any()));
        CompletableFuture future = this.service.getWorkerStatus(ID);
        String response = (String)future.get(5L, TimeUnit.SECONDS);
        Assert.assertEquals((Object)"error", (Object)response);
    }

    @Test
    public void testGetAllWorkerStatuses() throws Exception {
        HashSet ids = Sets.newHashSet((Object[])new String[]{"id0", "id3", "id11", "id12", "id21"});
        for (String id : ids) {
            StreamObserver requestObserverMock = (StreamObserver)Mockito.mock(StreamObserver.class);
            BeamFnWorkerStatusGrpc.BeamFnWorkerStatusStub workerStatusStub = (BeamFnWorkerStatusGrpc.BeamFnWorkerStatusStub)BeamFnWorkerStatusGrpc.newStub((Channel)this.channel).withInterceptors(new ClientInterceptor[]{AddHarnessIdInterceptor.create((String)id)});
            StreamObserver observer = workerStatusStub.workerStatus(requestObserverMock);
            this.waitAndGetStatusClient(id);
            ((StreamObserver)Mockito.doAnswer(invocation -> {
                BeamFnApi.WorkerStatusRequest request = (BeamFnApi.WorkerStatusRequest)invocation.getArguments()[0];
                observer.onNext((Object)BeamFnApi.WorkerStatusResponse.newBuilder().setId(request.getId()).setStatusInfo("status").build());
                return null;
            }).when((Object)requestObserverMock)).onNext((Object)((BeamFnApi.WorkerStatusRequest)ArgumentMatchers.any()));
        }
        Map allWorkerStatuses = this.service.getAllWorkerStatuses(5L, TimeUnit.SECONDS);
        Assert.assertEquals((Object)ids, allWorkerStatuses.keySet());
        for (String id : ids) {
            Assert.assertEquals((Object)"status", allWorkerStatuses.get(id));
        }
    }

    private WorkerStatusClient waitAndGetStatusClient(String id) throws InterruptedException, ExecutionException, TimeoutException {
        CompletableFuture clientFuture = this.service.getStatusClient(id);
        return (WorkerStatusClient)clientFuture.get(1L, TimeUnit.SECONDS);
    }
}

