package org.apache.druid.client;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URL;
import java.util.Enumeration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.QueryResource;
import org.apache.druid.utils.CloseableUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/client/DirectDruidClient.class */
public class DirectDruidClient<T> implements QueryRunner<T> {
    public static final String QUERY_FAIL_TIME = "queryFailTime";
    private static final Logger log = new Logger(DirectDruidClient.class);
    private static final int VAL_TO_REDUCE_REMAINING_RESPONSES = -1;
    private final QueryToolChestWarehouse warehouse;
    private final QueryWatcher queryWatcher;
    private final ObjectMapper objectMapper;
    private final HttpClient httpClient;
    private final String scheme;
    private final String host;
    private final ServiceEmitter emitter;
    private final boolean isSmile;
    private final AtomicInteger openConnections = new AtomicInteger();
    private final ScheduledExecutorService queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor");

    public static void removeMagicResponseContextFields(ResponseContext responseContext) {
        responseContext.remove(ResponseContext.Keys.QUERY_TOTAL_BYTES_GATHERED);
        responseContext.remove(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
    }

    public static ConcurrentResponseContext makeResponseContextForQuery() {
        ConcurrentResponseContext createEmpty = ConcurrentResponseContext.createEmpty();
        createEmpty.initialize();
        return createEmpty;
    }

    public DirectDruidClient(QueryToolChestWarehouse queryToolChestWarehouse, QueryWatcher queryWatcher, ObjectMapper objectMapper, HttpClient httpClient, String str, String str2, ServiceEmitter serviceEmitter) {
        this.warehouse = queryToolChestWarehouse;
        this.queryWatcher = queryWatcher;
        this.objectMapper = objectMapper;
        this.httpClient = httpClient;
        this.scheme = str;
        this.host = str2;
        this.emitter = serviceEmitter;
        this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
    }

    public int getNumOpenConnections() {
        return this.openConnections.get();
    }

    public Sequence<T> run(QueryPlus<T> queryPlus, final ResponseContext responseContext) {
        final Query query = queryPlus.getQuery();
        final QueryToolChest toolChest = this.warehouse.getToolChest(query);
        boolean isBySegment = query.context().isBySegment();
        final JavaType bySegmentResultType = isBySegment ? toolChest.getBySegmentResultType() : toolChest.getBaseResultType();
        final String str = this.scheme + "://" + this.host + "/druid/v2/";
        final String str2 = str + query.getId();
        try {
            log.debug("Querying queryId [%s] url [%s]", new Object[]{query.getId(), str});
            final long nanoTime = System.nanoTime();
            QueryContext context = query.context();
            final long longValue = context.getLong(QUERY_FAIL_TIME).longValue();
            final long maxScatterGatherBytes = context.getMaxScatterGatherBytes();
            final AtomicLong totalBytes = responseContext.getTotalBytes();
            final long maxQueuedBytes = context.getMaxQueuedBytes(0L);
            final boolean z = maxQueuedBytes > 0;
            HttpResponseHandler<InputStream, InputStream> httpResponseHandler = new HttpResponseHandler<InputStream, InputStream>() { // from class: org.apache.druid.client.DirectDruidClient.1
                private final AtomicLong totalByteCount = new AtomicLong(0);
                private final AtomicLong queuedByteCount = new AtomicLong(0);
                private final AtomicLong channelSuspendedTime = new AtomicLong(0);
                private final BlockingQueue<InputStreamHolder> queue = new LinkedBlockingQueue();
                private final AtomicBoolean done = new AtomicBoolean(false);
                private final AtomicReference<String> fail = new AtomicReference<>();
                private final AtomicReference<HttpResponseHandler.TrafficCop> trafficCopRef = new AtomicReference<>();
                private QueryMetrics<? super Query<T>> queryMetrics;
                private long responseStartTimeNs;

                private QueryMetrics<? super Query<T>> acquireResponseMetrics() {
                    if (this.queryMetrics == null) {
                        this.queryMetrics = toolChest.makeMetrics(query);
                        this.queryMetrics.server(DirectDruidClient.this.host);
                    }
                    return this.queryMetrics;
                }

                private boolean enqueue(ChannelBuffer channelBuffer, long j) throws InterruptedException {
                    InputStreamHolder fromChannelBuffer = InputStreamHolder.fromChannelBuffer(channelBuffer, j);
                    long addAndGet = this.queuedByteCount.addAndGet(fromChannelBuffer.getLength());
                    this.queue.put(fromChannelBuffer);
                    return !z || addAndGet < maxQueuedBytes;
                }

                /* JADX INFO: Access modifiers changed from: private */
                public InputStream dequeue() throws InterruptedException {
                    InputStreamHolder poll = this.queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
                    if (poll == null) {
                        throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", new Object[]{query.getId(), str}));
                    }
                    long addAndGet = this.queuedByteCount.addAndGet(-poll.getLength());
                    if (z && addAndGet < maxQueuedBytes) {
                        this.channelSuspendedTime.addAndGet(((HttpResponseHandler.TrafficCop) Preconditions.checkNotNull(this.trafficCopRef.get(), "No TrafficCop, how can this be?")).resume(poll.getChunkNum()));
                    }
                    return poll.getStream();
                }

                public ClientResponse<InputStream> handleResponse(HttpResponse httpResponse, HttpResponseHandler.TrafficCop trafficCop) {
                    this.trafficCopRef.set(trafficCop);
                    checkQueryTimeout();
                    checkTotalBytesLimit(httpResponse.getContent().readableBytes());
                    DirectDruidClient.log.debug("Initial response from url[%s] for queryId[%s]", new Object[]{str, query.getId()});
                    this.responseStartTimeNs = System.nanoTime();
                    acquireResponseMetrics().reportNodeTimeToFirstByte(this.responseStartTimeNs - nanoTime).emit(DirectDruidClient.this.emitter);
                    try {
                        DirectDruidClient.log.trace("Got a response from [%s] for query ID[%s], subquery ID[%s]", new Object[]{str, query.getId(), query.getSubQueryId()});
                        String str3 = httpResponse.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
                        responseContext.addRemainingResponse(query.getMostSpecificId(), -1);
                        if (str3 != null) {
                            responseContext.merge(ResponseContext.deserialize(str3, DirectDruidClient.this.objectMapper));
                        }
                        boolean enqueue = enqueue(httpResponse.getContent(), 0L);
                        this.totalByteCount.addAndGet(httpResponse.getContent().readableBytes());
                        return ClientResponse.finished(new SequenceInputStream(new Enumeration<InputStream>() { // from class: org.apache.druid.client.DirectDruidClient.1.2
                            @Override // java.util.Enumeration
                            public boolean hasMoreElements() {
                                boolean z2;
                                if (AnonymousClass1.this.fail.get() != null) {
                                    throw new RE((String) AnonymousClass1.this.fail.get(), new Object[0]);
                                }
                                checkQueryTimeout();
                                synchronized (AnonymousClass1.this.done) {
                                    z2 = (AnonymousClass1.this.done.get() && AnonymousClass1.this.queue.isEmpty()) ? false : true;
                                }
                                return z2;
                            }

                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.util.Enumeration
                            public InputStream nextElement() {
                                if (AnonymousClass1.this.fail.get() != null) {
                                    throw new RE((String) AnonymousClass1.this.fail.get(), new Object[0]);
                                }
                                try {
                                    return dequeue();
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                    throw new RuntimeException(e);
                                }
                            }
                        }), enqueue);
                    } catch (IOException e) {
                        DirectDruidClient.log.error(e, "Error parsing response context from url [%s]", new Object[]{str});
                        return ClientResponse.finished(new InputStream() { // from class: org.apache.druid.client.DirectDruidClient.1.1
                            @Override // java.io.InputStream
                            public int read() throws IOException {
                                throw e;
                            }
                        });
                    } catch (InterruptedException e2) {
                        DirectDruidClient.log.error(e2, "Queue appending interrupted", new Object[0]);
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e2);
                    }
                }

                public ClientResponse<InputStream> handleChunk(ClientResponse<InputStream> clientResponse, HttpChunk httpChunk, long j) {
                    checkQueryTimeout();
                    ChannelBuffer content = httpChunk.getContent();
                    int readableBytes = content.readableBytes();
                    checkTotalBytesLimit(readableBytes);
                    boolean z2 = true;
                    if (readableBytes > 0) {
                        try {
                            z2 = enqueue(content, j);
                            this.totalByteCount.addAndGet(readableBytes);
                        } catch (InterruptedException e) {
                            DirectDruidClient.log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", new Object[]{str});
                            Thread.currentThread().interrupt();
                            throw new RuntimeException(e);
                        }
                    }
                    return ClientResponse.finished(clientResponse.getObj(), z2);
                }

                public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse) {
                    long nanoTime2 = System.nanoTime() - nanoTime;
                    long millis = TimeUnit.NANOSECONDS.toMillis(nanoTime2);
                    DirectDruidClient.log.debug("Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", new Object[]{query.getId(), str, Long.valueOf(this.totalByteCount.get()), Long.valueOf(millis), Double.valueOf(this.totalByteCount.get() / (0.001d * millis))});
                    QueryMetrics<? super Query<T>> acquireResponseMetrics = acquireResponseMetrics();
                    acquireResponseMetrics.reportNodeTime(nanoTime2);
                    acquireResponseMetrics.reportNodeBytes(this.totalByteCount.get());
                    if (z) {
                        acquireResponseMetrics.reportBackPressureTime(this.channelSuspendedTime.get());
                    }
                    acquireResponseMetrics.emit(DirectDruidClient.this.emitter);
                    synchronized (this.done) {
                        try {
                            try {
                                this.queue.put(InputStreamHolder.fromChannelBuffer(ChannelBuffers.EMPTY_BUFFER, Long.MAX_VALUE));
                                this.done.set(true);
                            } catch (InterruptedException e) {
                                DirectDruidClient.log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", new Object[]{str});
                                Thread.currentThread().interrupt();
                                throw new RuntimeException(e);
                            }
                        } catch (Throwable th) {
                            this.done.set(true);
                            throw th;
                        }
                    }
                    return ClientResponse.finished(clientResponse.getObj());
                }

                public void exceptionCaught(ClientResponse<InputStream> clientResponse, Throwable th) {
                    setupResponseReadFailure(StringUtils.format("Query[%s] url[%s] failed with exception msg [%s]", new Object[]{query.getId(), str, th.getMessage()}), th);
                }

                private void setupResponseReadFailure(final String str3, final Throwable th) {
                    this.fail.set(str3);
                    this.queue.clear();
                    this.queue.offer(InputStreamHolder.fromStream(new InputStream() { // from class: org.apache.druid.client.DirectDruidClient.1.3
                        @Override // java.io.InputStream
                        public int read() throws IOException {
                            if (th != null) {
                                throw new IOException(str3, th);
                            }
                            throw new IOException(str3);
                        }
                    }, -1L, 0L));
                }

                /* JADX INFO: Access modifiers changed from: private */
                public long checkQueryTimeout() {
                    long currentTimeMillis = longValue - System.currentTimeMillis();
                    if (currentTimeMillis > 0) {
                        return currentTimeMillis;
                    }
                    String format = StringUtils.format("Query[%s] url[%s] timed out.", new Object[]{query.getId(), str});
                    setupResponseReadFailure(format, null);
                    throw new QueryTimeoutException(format);
                }

                private void checkTotalBytesLimit(long j) {
                    if (maxScatterGatherBytes >= Long.MAX_VALUE || totalBytes.addAndGet(j) <= maxScatterGatherBytes) {
                        return;
                    }
                    String format = StringUtils.format("Query[%s] url[%s] max scatter-gather bytes limit reached.", new Object[]{query.getId(), str});
                    setupResponseReadFailure(format, null);
                    throw new ResourceLimitExceededException(format);
                }
            };
            long currentTimeMillis = longValue - System.currentTimeMillis();
            if (currentTimeMillis <= 0) {
                throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", new Object[]{query.getId(), str}));
            }
            final ListenableFuture go = this.httpClient.go(new Request(HttpMethod.POST, new URL(str)).setContent(this.objectMapper.writeValueAsBytes(Queries.withTimeout(query, currentTimeMillis))).setHeader("Content-Type", this.isSmile ? "application/x-jackson-smile" : "application/json"), httpResponseHandler, Duration.millis(currentTimeMillis));
            this.queryWatcher.registerQueryFuture(query, go);
            this.openConnections.getAndIncrement();
            Futures.addCallback(go, new FutureCallback<InputStream>() { // from class: org.apache.druid.client.DirectDruidClient.2
                public void onSuccess(InputStream inputStream) {
                    DirectDruidClient.this.openConnections.getAndDecrement();
                }

                public void onFailure(Throwable th) {
                    DirectDruidClient.this.openConnections.getAndDecrement();
                    if (go.isCancelled()) {
                        DirectDruidClient.this.cancelQuery(query, str2);
                    }
                }
            }, Execs.directExecutor());
            Sequence<T> baseSequence = new BaseSequence<>(new BaseSequence.IteratorMaker<T, JsonParserIterator<T>>() { // from class: org.apache.druid.client.DirectDruidClient.3
                /* renamed from: make, reason: merged with bridge method [inline-methods] */
                public JsonParserIterator<T> m18make() {
                    return new JsonParserIterator<>(bySegmentResultType, go, str, query, DirectDruidClient.this.host, toolChest.decorateObjectMapper(DirectDruidClient.this.objectMapper, query));
                }

                public void cleanup(JsonParserIterator<T> jsonParserIterator) {
                    CloseableUtils.closeAndWrapExceptions(jsonParserIterator);
                }
            });
            if (!isBySegment) {
                baseSequence = Sequences.map(baseSequence, toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing()));
            }
            return baseSequence;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelQuery(Query<T> query, String str) {
        this.queryCancellationExecutor.submit(() -> {
            try {
                ListenableFuture go = this.httpClient.go(new Request(HttpMethod.DELETE, new URL(str)).setContent(this.objectMapper.writeValueAsBytes(query)).setHeader("Content-Type", this.isSmile ? "application/x-jackson-smile" : "application/json"), StatusResponseHandler.getInstance(), Duration.standardSeconds(1L));
                this.queryCancellationExecutor.schedule(() -> {
                    try {
                        if (!go.isDone()) {
                            log.error("Error cancelling query[%s]", new Object[]{query});
                        }
                        StatusResponseHolder statusResponseHolder = (StatusResponseHolder) go.get();
                        if (statusResponseHolder.getStatus().getCode() >= 500) {
                            log.error("Error cancelling query[%s]: queriable node returned status[%d] [%s].", new Object[]{query, Integer.valueOf(statusResponseHolder.getStatus().getCode()), statusResponseHolder.getStatus().getReasonPhrase()});
                        }
                    } catch (InterruptedException | ExecutionException e) {
                        log.error(e, "Error cancelling query[%s]", new Object[]{query});
                    }
                }, 5L, TimeUnit.SECONDS);
            } catch (IOException e) {
                log.error(e, "Error cancelling query[%s]", new Object[]{query});
            }
        });
    }

    public String toString() {
        return "DirectDruidClient{host='" + this.host + "', isSmile=" + this.isSmile + '}';
    }
}
