package org.apache.bookkeeper.stream.storage.impl.routing;

import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.class */
public class RoutingHeaderProxyInterceptorTest extends GrpcClientTestBase {
    private static final Logger log = LoggerFactory.getLogger(RoutingHeaderProxyInterceptorTest.class);
    private final long streamId = 1234;
    private final long rangeId = 2345;
    private final byte[] routingKey = ("routing-key-" + System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8);
    private final AtomicReference<Object> receivedRequest = new AtomicReference<>();
    private StorageServerChannel channel;

    protected void doSetup() {
        this.serviceRegistry.addService(new TableServiceGrpc.TableServiceImplBase() { // from class: org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptorTest.1
            public void range(RangeRequest rangeRequest, StreamObserver<RangeResponse> streamObserver) {
                RoutingHeaderProxyInterceptorTest.log.info("Received range request : {}", rangeRequest);
                RoutingHeaderProxyInterceptorTest.this.receivedRequest.set(rangeRequest);
                streamObserver.onNext(RangeResponse.newBuilder().setHeader(ResponseHeader.newBuilder().setCode(StatusCode.SUCCESS).setRoutingHeader(rangeRequest.getHeader()).build()).build());
                streamObserver.onCompleted();
            }

            public void delete(DeleteRangeRequest deleteRangeRequest, StreamObserver<DeleteRangeResponse> streamObserver) {
                RoutingHeaderProxyInterceptorTest.log.info("Received delete range request : {}", deleteRangeRequest);
                RoutingHeaderProxyInterceptorTest.this.receivedRequest.set(deleteRangeRequest);
                streamObserver.onNext(DeleteRangeResponse.newBuilder().setHeader(ResponseHeader.newBuilder().setCode(StatusCode.SUCCESS).setRoutingHeader(deleteRangeRequest.getHeader()).build()).build());
                streamObserver.onCompleted();
            }

            public void txn(TxnRequest txnRequest, StreamObserver<TxnResponse> streamObserver) {
                RoutingHeaderProxyInterceptorTest.log.info("Received txn request : {}", txnRequest);
                RoutingHeaderProxyInterceptorTest.this.receivedRequest.set(txnRequest);
                streamObserver.onNext(TxnResponse.newBuilder().setHeader(ResponseHeader.newBuilder().setCode(StatusCode.SUCCESS).setRoutingHeader(txnRequest.getHeader()).build()).build());
                streamObserver.onCompleted();
            }

            public void increment(IncrementRequest incrementRequest, StreamObserver<IncrementResponse> streamObserver) {
                RoutingHeaderProxyInterceptorTest.log.info("Received incr request : {}", incrementRequest);
                RoutingHeaderProxyInterceptorTest.this.receivedRequest.set(incrementRequest);
                streamObserver.onNext(IncrementResponse.newBuilder().setHeader(ResponseHeader.newBuilder().setCode(StatusCode.SUCCESS).setRoutingHeader(incrementRequest.getHeader()).build()).build());
                streamObserver.onCompleted();
            }

            public void put(PutRequest putRequest, StreamObserver<PutResponse> streamObserver) {
                RoutingHeaderProxyInterceptorTest.log.info("Received put request : {}", putRequest);
                RoutingHeaderProxyInterceptorTest.this.receivedRequest.set(putRequest);
                streamObserver.onNext(PutResponse.newBuilder().setHeader(ResponseHeader.newBuilder().setCode(StatusCode.SUCCESS).setRoutingHeader(putRequest.getHeader()).build()).build());
                streamObserver.onCompleted();
            }
        }.bindService());
        this.channel = new StorageServerChannel(InProcessChannelBuilder.forName(this.serverName).directExecutor().build(), Optional.empty()).intercept(new ClientInterceptor[]{new RoutingHeaderProxyInterceptor(), new ClientInterceptor() { // from class: org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptorTest.2
            public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
                return new ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) { // from class: org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptorTest.2.1
                    protected void checkedStart(ClientCall.Listener<RespT> listener, Metadata metadata) {
                        RoutingHeaderProxyInterceptorTest.log.info("Intercept the request with routing information : sid = {}, rid = {}, rk = {}", new Object[]{1234L, 2345L, new String(RoutingHeaderProxyInterceptorTest.this.routingKey, StandardCharsets.UTF_8)});
                        metadata.put(ProtocolConstants.RID_METADATA_KEY, 2345L);
                        metadata.put(ProtocolConstants.SID_METADATA_KEY, 1234L);
                        metadata.put(ProtocolConstants.RK_METADATA_KEY, RoutingHeaderProxyInterceptorTest.this.routingKey);
                        delegate().start(listener, metadata);
                    }
                };
            }
        }});
    }

    protected void doTeardown() {
        this.channel.close();
    }

    @Test
    public void testPutRequest() throws Exception {
        PutRequest build = PutRequest.newBuilder().setKey(ByteString.copyFromUtf8("test-key")).build();
        PutRequest build2 = PutRequest.newBuilder(build).setHeader(RoutingHeader.newBuilder(build.getHeader()).setStreamId(1234L).setRangeId(2345L).setRKey(ByteString.copyFrom(this.routingKey)).build()).build();
        PutResponse putResponse = (PutResponse) this.channel.getTableService().put(build).get();
        Assert.assertEquals(build2, this.receivedRequest.get());
        Assert.assertEquals(build2.getHeader(), putResponse.getHeader().getRoutingHeader());
    }

    @Test
    public void testRangeRequest() throws Exception {
        RangeRequest build = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8("test-key")).build();
        RangeRequest build2 = RangeRequest.newBuilder(build).setHeader(RoutingHeader.newBuilder(build.getHeader()).setStreamId(1234L).setRangeId(2345L).setRKey(ByteString.copyFrom(this.routingKey)).build()).build();
        RangeResponse rangeResponse = (RangeResponse) this.channel.getTableService().range(build).get();
        Assert.assertEquals(build2, this.receivedRequest.get());
        Assert.assertEquals(build2.getHeader(), rangeResponse.getHeader().getRoutingHeader());
    }

    @Test
    public void testDeleteRangeRequest() throws Exception {
        DeleteRangeRequest build = DeleteRangeRequest.newBuilder().setKey(ByteString.copyFromUtf8("test-key")).build();
        DeleteRangeRequest build2 = DeleteRangeRequest.newBuilder(build).setHeader(RoutingHeader.newBuilder(build.getHeader()).setStreamId(1234L).setRangeId(2345L).setRKey(ByteString.copyFrom(this.routingKey)).build()).build();
        DeleteRangeResponse deleteRangeResponse = (DeleteRangeResponse) this.channel.getTableService().delete(build).get();
        Assert.assertEquals(build2, this.receivedRequest.get());
        Assert.assertEquals(build2.getHeader(), deleteRangeResponse.getHeader().getRoutingHeader());
    }

    @Test
    public void testIncrementRequest() throws Exception {
        IncrementRequest build = IncrementRequest.newBuilder().setKey(ByteString.copyFromUtf8("test-key")).build();
        IncrementRequest build2 = IncrementRequest.newBuilder(build).setHeader(RoutingHeader.newBuilder(build.getHeader()).setStreamId(1234L).setRangeId(2345L).setRKey(ByteString.copyFrom(this.routingKey)).build()).build();
        IncrementResponse incrementResponse = (IncrementResponse) this.channel.getTableService().increment(build).get();
        Assert.assertEquals(build2, this.receivedRequest.get());
        Assert.assertEquals(build2.getHeader(), incrementResponse.getHeader().getRoutingHeader());
    }

    @Test
    public void testTxnRequest() throws Exception {
        TxnRequest build = TxnRequest.newBuilder().build();
        TxnRequest build2 = TxnRequest.newBuilder(build).setHeader(RoutingHeader.newBuilder(build.getHeader()).setStreamId(1234L).setRangeId(2345L).setRKey(ByteString.copyFrom(this.routingKey)).build()).build();
        TxnResponse txnResponse = (TxnResponse) this.channel.getTableService().txn(build).get();
        Assert.assertEquals(build2, this.receivedRequest.get());
        Assert.assertEquals(build2.getHeader(), txnResponse.getHeader().getRoutingHeader());
    }
}
