/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.CallOptions;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ClientCall;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ClientInterceptor;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingClientCall;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Metadata;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.MethodDescriptor;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class GrpcContextHeaderAccessorProviderTest {
    @Test
    public void testWorkerIdOnConnect() throws Exception {
        String worker1 = "worker1";
        CompletableFuture workerId = new CompletableFuture();
        Consumer<StreamObserver> consumer = elementsStreamObserver -> workerId.complete(GrpcContextHeaderAccessorProvider.getHeaderAccessor().getSdkWorkerId());
        TestDataService testService = new TestDataService((StreamObserver)Mockito.mock(StreamObserver.class), consumer);
        Endpoints.ApiServiceDescriptor serviceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("testServer").build();
        Server server = InProcessServerFactory.create().create((List)ImmutableList.of((Object)((Object)testService)), serviceDescriptor);
        final Metadata.Key workerIdKey = Metadata.Key.of((String)"worker_id", (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER);
        ManagedChannel channel = ((InProcessChannelBuilder)InProcessChannelBuilder.forName((String)serviceDescriptor.getUrl()).intercept(new ClientInterceptor[]{new ClientInterceptor(){

            public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
                ClientCall call = next.newCall(method, callOptions);
                return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call){

                    public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
                        headers.put(workerIdKey, (Object)"worker1");
                        super.start(responseListener, headers);
                    }
                };
            }
        }})).build();
        BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub((Channel)channel);
        stub.data((StreamObserver)Mockito.mock(StreamObserver.class));
        server.shutdown();
        Assert.assertEquals((Object)"worker1", workerId.get());
    }

    private static class TestDataService
    extends BeamFnDataGrpc.BeamFnDataImplBase {
        private final StreamObserver<BeamFnApi.Elements> inboundObserver;
        private final Consumer<StreamObserver<BeamFnApi.Elements>> consumer;

        private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver, Consumer<StreamObserver<BeamFnApi.Elements>> consumer) {
            this.inboundObserver = inboundObserver;
            this.consumer = consumer;
        }

        public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> outboundObserver) {
            this.consumer.accept(outboundObserver);
            return this.inboundObserver;
        }
    }
}

