package org.apache.arrow.flight;

import com.google.common.base.Preconditions;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ArrowBuf;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BooleanSupplier;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.auth.AuthConstants;
import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.flight.auth.ServerAuthWrapper;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/arrow/flight/FlightService.class */
public class FlightService extends FlightServiceGrpc.FlightServiceImplBase {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) FlightService.class);
    private static final int PENDING_REQUESTS = 5;
    private final BufferAllocator allocator;
    private final FlightProducer producer;
    private final ServerAuthHandler authHandler;
    private final ExecutorService executors = Executors.newCachedThreadPool();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/arrow/flight/FlightService$CallContext.class */
    public static class CallContext implements FlightProducer.CallContext {
        private final String peerIdentity;
        private final BooleanSupplier isCancelled;

        CallContext(String str, BooleanSupplier booleanSupplier) {
            this.peerIdentity = str;
            this.isCancelled = booleanSupplier;
        }

        @Override // org.apache.arrow.flight.FlightProducer.CallContext
        public String peerIdentity() {
            return this.peerIdentity;
        }

        @Override // org.apache.arrow.flight.FlightProducer.CallContext
        public boolean isCancelled() {
            return this.isCancelled.getAsBoolean();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/arrow/flight/FlightService$GetListener.class */
    public static class GetListener implements FlightProducer.ServerStreamListener {
        private ServerCallStreamObserver<ArrowMessage> responseObserver;
        private volatile VectorUnloader unloader;

        public GetListener(StreamObserver<ArrowMessage> streamObserver) {
            this.responseObserver = (ServerCallStreamObserver) streamObserver;
            this.responseObserver.setOnCancelHandler(() -> {
                onCancel();
            });
            this.responseObserver.disableAutoInboundFlowControl();
        }

        private void onCancel() {
            FlightService.logger.debug("Stream cancelled by client.");
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public boolean isReady() {
            return this.responseObserver.isReady();
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public boolean isCancelled() {
            return this.responseObserver.isCancelled();
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void start(VectorSchemaRoot vectorSchemaRoot) {
            start(vectorSchemaRoot, new DictionaryProvider.MapDictionaryProvider(new Dictionary[0]));
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void start(VectorSchemaRoot vectorSchemaRoot, DictionaryProvider dictionaryProvider) {
            this.unloader = new VectorUnloader(vectorSchemaRoot, true, true);
            Schema schema = vectorSchemaRoot.getSchema();
            ServerCallStreamObserver<ArrowMessage> serverCallStreamObserver = this.responseObserver;
            serverCallStreamObserver.getClass();
            DictionaryUtils.generateSchemaMessages(schema, null, dictionaryProvider, (v1) -> {
                r3.onNext(v1);
            });
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void putNext() {
            putNext(null);
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void putNext(ArrowBuf arrowBuf) {
            Preconditions.checkNotNull(this.unloader);
            this.responseObserver.onNext(new ArrowMessage(this.unloader.getRecordBatch(), arrowBuf));
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void error(Throwable th) {
            this.responseObserver.onError(th);
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void completed() {
            this.responseObserver.onCompleted();
        }
    }

    public FlightService(BufferAllocator bufferAllocator, FlightProducer flightProducer, ServerAuthHandler serverAuthHandler) {
        this.allocator = bufferAllocator;
        this.producer = flightProducer;
        this.authHandler = serverAuthHandler;
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public StreamObserver<Flight.HandshakeRequest> handshake(StreamObserver<Flight.HandshakeResponse> streamObserver) {
        return ServerAuthWrapper.wrapHandshake(this.authHandler, streamObserver, this.executors);
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightInfo> streamObserver) {
        try {
            this.producer.listFlights(makeContext((ServerCallStreamObserver) streamObserver), new Criteria(criteria), StreamPipe.wrap(streamObserver, (v0) -> {
                return v0.toProtocol();
            }));
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }

    private CallContext makeContext(ServerCallStreamObserver<?> serverCallStreamObserver) {
        String str = AuthConstants.PEER_IDENTITY_KEY.get();
        serverCallStreamObserver.getClass();
        return new CallContext(str, serverCallStreamObserver::isCancelled);
    }

    public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> streamObserver) {
        try {
            this.producer.getStream(makeContext((ServerCallStreamObserver) streamObserver), new Ticket(ticket), new GetListener(streamObserver));
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public void doAction(Flight.Action action, StreamObserver<Flight.Result> streamObserver) {
        try {
            this.producer.doAction(makeContext((ServerCallStreamObserver) streamObserver), new Action(action), StreamPipe.wrap(streamObserver, (v0) -> {
                return v0.toProtocol();
            }));
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public void listActions(Flight.Empty empty, StreamObserver<Flight.ActionType> streamObserver) {
        try {
            this.producer.listActions(makeContext((ServerCallStreamObserver) streamObserver), StreamPipe.wrap(streamObserver, actionType -> {
                return actionType.toProtocol();
            }));
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }

    public StreamObserver<ArrowMessage> doPutCustom(StreamObserver<Flight.PutResult> streamObserver) {
        ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver) streamObserver;
        serverCallStreamObserver.disableAutoInboundFlowControl();
        serverCallStreamObserver.request(1);
        BufferAllocator bufferAllocator = this.allocator;
        FlightStream.Cancellable cancellable = (str, th) -> {
            serverCallStreamObserver.onError(Status.CANCELLED.withCause(th).withDescription(str).asException());
        };
        serverCallStreamObserver.getClass();
        FlightStream flightStream = new FlightStream(bufferAllocator, 5, cancellable, serverCallStreamObserver::request);
        this.executors.submit(() -> {
            try {
                this.producer.acceptPut(makeContext(serverCallStreamObserver), flightStream, StreamPipe.wrap(serverCallStreamObserver, (v0) -> {
                    return v0.toProtocol();
                })).run();
                serverCallStreamObserver.onCompleted();
            } catch (Exception e) {
                serverCallStreamObserver.onError(e);
                logger.error("Exception handling DoPut", (Throwable) e);
            }
            try {
                flightStream.close();
            } catch (Exception e2) {
                logger.error("Exception closing Flight stream", (Throwable) e2);
                throw new RuntimeException(e2);
            }
        });
        return flightStream.asObserver();
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public void getFlightInfo(Flight.FlightDescriptor flightDescriptor, StreamObserver<Flight.FlightInfo> streamObserver) {
        try {
            streamObserver.onNext(this.producer.getFlightInfo(makeContext((ServerCallStreamObserver) streamObserver), new FlightDescriptor(flightDescriptor)).toProtocol());
            streamObserver.onCompleted();
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }
}
