package org.apache.beam.runners.fnexecution.state;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/state/GrpcStateServiceTest.class */
public class GrpcStateServiceTest {
    private static final long TIMEOUT_MS = 30000;
    private GrpcStateService stateService;

    @Mock
    private StreamObserver<BeamFnApi.StateResponse> responseObserver;

    @Mock
    private StateRequestHandler handler;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks(this);
        this.stateService = GrpcStateService.create();
    }

    @Test
    public void testStateRequestsHandledByRegisteredHandlers() throws Exception {
        this.stateService.registerForProcessBundleInstructionId("bundle_instruction", this.handler);
        StreamObserver state = this.stateService.state(this.responseObserver);
        BeamFnApi.StateRequest build = BeamFnApi.StateRequest.newBuilder().setInstructionId("bundle_instruction").build();
        state.onNext(build);
        ((StateRequestHandler) Mockito.verify(this.handler)).handle(build);
    }

    @Test
    public void testHandlerResponseSentToStateStream() throws Exception {
        ByteString copyFrom = ByteString.copyFrom("EXPECTED_RESPONSE_DATA", StandardCharsets.UTF_8);
        BeamFnApi.StateResponse.Builder get = BeamFnApi.StateResponse.newBuilder().setGet(BeamFnApi.StateGetResponse.newBuilder().setData(copyFrom));
        StateRequestHandler stateRequestHandler = stateRequest -> {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(get);
            return completableFuture;
        };
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        Objects.requireNonNull(linkedBlockingDeque);
        StreamObserver streamObserver = (StreamObserver) Mockito.spy(TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        }).build());
        this.stateService.registerForProcessBundleInstructionId("EXPECTED_BUNDLE_INSTRUCTION_ID", stateRequestHandler);
        this.stateService.state(streamObserver).onNext(BeamFnApi.StateRequest.newBuilder().setInstructionId("EXPECTED_BUNDLE_INSTRUCTION_ID").build());
        BeamFnApi.StateResponse stateResponse = (BeamFnApi.StateResponse) linkedBlockingDeque.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
        ((StreamObserver) Mockito.verify(streamObserver, Mockito.times(1))).onNext((BeamFnApi.StateResponse) Matchers.any());
        ((StreamObserver) Mockito.verify(streamObserver, Mockito.never())).onCompleted();
        ((StreamObserver) Mockito.verify(streamObserver, Mockito.never())).onError((Throwable) Matchers.any());
        MatcherAssert.assertThat(stateResponse.getGet().getData(), CoreMatchers.equalTo(copyFrom));
    }
}
