package org.apache.kylin.storage.stream.rpc;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.DataFormatException;
import org.apache.commons.codec.binary.Base64;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.metadata.filter.StringCodeSystem;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilterSerializer;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.shaded.com.google.common.base.Stopwatch;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
import org.apache.kylin.stream.core.model.DataRequest;
import org.apache.kylin.stream.core.model.DataResponse;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.query.ResponseResultSchema;
import org.apache.kylin.stream.core.query.StreamingTupleConverter;
import org.apache.kylin.stream.core.query.StreamingTupleIterator;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.RecordsSerializer;
import org.apache.kylin.stream.core.util.RestService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.SystemPropertyUtils;

/* loaded from: input_file:org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.class */
public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
    public static final long WAIT_DURATION = 120000;
    private RestService restService;
    public static final Logger logger = LoggerFactory.getLogger((Class<?>) HttpStreamDataSearchClient.class);
    private static ExecutorService executorService = new ThreadPoolExecutor(20, 100, 60, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("stream-rpc-pool-t"));
    private Map<Node, Long> failedReceivers = Maps.newConcurrentMap();
    private AssignmentsCache assignmentsCache = AssignmentsCache.getInstance();

    /* loaded from: input_file:org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient$QueuedStreamingTupleIterator.class */
    public static class QueuedStreamingTupleIterator implements ITupleIterator {
        private BlockingQueue<Iterator<ITuple>> queue;
        private int totalBlockNum;
        private long timeoutTS;
        private volatile Exception endpointException;
        private Iterator<ITuple> currentBlock = Collections.emptyIterator();
        private int numConsumeBlocks = 0;
        private int timeout = (int) (this.timeout * 1.1d);
        private int timeout = (int) (this.timeout * 1.1d);

        public QueuedStreamingTupleIterator(int i, int i2) {
            this.queue = new LinkedBlockingQueue(i);
            this.totalBlockNum = i;
            this.timeoutTS = System.currentTimeMillis() + i2;
        }

        public void addBlock(Iterator<ITuple> it) {
            try {
                this.queue.put(it);
            } catch (InterruptedException e) {
                HttpStreamDataSearchClient.logger.error("interrupted", (Throwable) e);
                throw new RuntimeException("interrupted", e);
            }
        }

        public void setEndpointException(Exception exc) {
            this.endpointException = exc;
        }

        private boolean hasEndpointFail() {
            return this.endpointException != null;
        }

        @Override // org.apache.kylin.metadata.tuple.ITupleIterator
        public void close() {
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            try {
                if (this.currentBlock.hasNext()) {
                    return true;
                }
                if (this.numConsumeBlocks < this.totalBlockNum) {
                    while (this.numConsumeBlocks < this.totalBlockNum) {
                        if (hasEndpointFail()) {
                            throw new RuntimeException("endpoint fail", this.endpointException);
                        }
                        Iterator<ITuple> it = null;
                        while (it == null && this.endpointException == null && this.timeoutTS > System.currentTimeMillis()) {
                            it = this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                        }
                        this.currentBlock = it;
                        if (this.currentBlock == null) {
                            throw new RuntimeException("timeout when call stream rpc");
                        }
                        this.numConsumeBlocks++;
                        if (this.currentBlock.hasNext()) {
                            return true;
                        }
                    }
                }
                return false;
            } catch (InterruptedException e) {
                HttpStreamDataSearchClient.logger.error("interrupted", (Throwable) e);
                throw new RuntimeException("interrupted", e);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ITuple next() {
            return this.currentBlock.next();
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    public HttpStreamDataSearchClient() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        this.restService = new RestService(instanceFromEnv.getStreamingRPCHttpConnTimeout(), instanceFromEnv.getStreamingRPCHttpReadTimeout());
    }

    @Override // org.apache.kylin.storage.stream.rpc.IStreamDataSearchClient
    public ITupleIterator search(long j, final CubeInstance cubeInstance, final TupleInfo tupleInfo, TupleFilter tupleFilter, Set<TblColRef> set, Set<TblColRef> set2, Set<FunctionDesc> set3, int i, boolean z) {
        List<ReplicaSet> replicaSetsByCube = this.assignmentsCache.getReplicaSetsByCube(cubeInstance.getName());
        final QueuedStreamingTupleIterator queuedStreamingTupleIterator = new QueuedStreamingTupleIterator(replicaSetsByCube.size(), cubeInstance.getConfig().getStreamingRPCHttpReadTimeout() * 2);
        QueryContext current = QueryContextFacade.current();
        ResponseResultSchema responseResultSchema = new ResponseResultSchema(cubeInstance.getDescriptor(), set, set3);
        final StreamingTupleConverter streamingTupleConverter = new StreamingTupleConverter(responseResultSchema, tupleInfo);
        final RecordsSerializer recordsSerializer = new RecordsSerializer(responseResultSchema);
        final DataRequest createDataRequest = createDataRequest(current.getQueryId(), cubeInstance.getName(), j, tupleInfo, tupleFilter, set, set2, set3, i, z);
        logger.info("Query-{}:send request to stream receivers", current.getQueryId());
        for (final ReplicaSet replicaSet : replicaSetsByCube) {
            executorService.submit(new Runnable() { // from class: org.apache.kylin.storage.stream.rpc.HttpStreamDataSearchClient.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        queuedStreamingTupleIterator.addBlock(HttpStreamDataSearchClient.this.search(createDataRequest, cubeInstance, streamingTupleConverter, recordsSerializer, replicaSet, tupleInfo));
                    } catch (Exception e) {
                        queuedStreamingTupleIterator.setEndpointException(e);
                    }
                }
            });
        }
        return queuedStreamingTupleIterator;
    }

    public Iterator<ITuple> search(DataRequest dataRequest, CubeInstance cubeInstance, StreamingTupleConverter streamingTupleConverter, RecordsSerializer recordsSerializer, ReplicaSet replicaSet, TupleInfo tupleInfo) throws Exception {
        ArrayList newArrayList = Lists.newArrayList(replicaSet.getNodes());
        Node findBestReceiverServeQuery = findBestReceiverServeQuery(newArrayList, cubeInstance.getName());
        try {
            return doSearch(dataRequest, cubeInstance, streamingTupleConverter, recordsSerializer, findBestReceiverServeQuery, tupleInfo);
        } catch (IOException e) {
            IOException iOException = e;
            this.failedReceivers.put(findBestReceiverServeQuery, Long.valueOf(System.currentTimeMillis()));
            logger.error("exception throws for receiver:" + findBestReceiverServeQuery + " retry another receiver");
            for (int i = 0; i < newArrayList.size(); i++) {
                Node node = newArrayList.get(i);
                if (!node.equals(findBestReceiverServeQuery)) {
                    try {
                        return doSearch(dataRequest, cubeInstance, streamingTupleConverter, recordsSerializer, node, tupleInfo);
                    } catch (IOException e2) {
                        iOException = e2;
                        this.failedReceivers.put(node, Long.valueOf(System.currentTimeMillis()));
                        logger.error("exception throws for receiver:" + node + " retry another receiver");
                    }
                }
            }
            throw iOException;
        }
    }

    private Node findBestReceiverServeQuery(List<Node> list, String str) {
        int size = list.size();
        int abs = Math.abs(str.hashCode()) % size;
        Node node = list.get(abs);
        Long l = this.failedReceivers.get(node);
        if (l != null && System.currentTimeMillis() - l.longValue() <= WAIT_DURATION) {
            return list.get((abs + 1) % size);
        }
        return node;
    }

    public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cubeInstance, StreamingTupleConverter streamingTupleConverter, RecordsSerializer recordsSerializer, Node node, TupleInfo tupleInfo) throws Exception {
        String queryId = dataRequest.getQueryId();
        String str = RestClient.SCHEME_HTTP + node.getHost() + SystemPropertyUtils.VALUE_SEPARATOR + node.getPort() + "/kylin/api/data/query";
        try {
            int streamingRPCHttpConnTimeout = cubeInstance.getConfig().getStreamingRPCHttpConnTimeout();
            int streamingRPCHttpReadTimeout = cubeInstance.getConfig().getStreamingRPCHttpReadTimeout();
            dataRequest.setDeadline(System.currentTimeMillis() + ((int) (streamingRPCHttpReadTimeout * 1.5d)));
            String writeValueAsString = JsonUtil.writeValueAsString(dataRequest);
            Stopwatch createUnstarted = Stopwatch.createUnstarted();
            createUnstarted.start();
            String postRequest = this.restService.postRequest(str, writeValueAsString, streamingRPCHttpConnTimeout, streamingRPCHttpReadTimeout);
            logger.info("query-{}: receive response from {} take time:{}", queryId, node, Long.valueOf(createUnstarted.elapsed(TimeUnit.MILLISECONDS)));
            if (this.failedReceivers.containsKey(node)) {
                this.failedReceivers.remove(node);
            }
            DataResponse dataResponse = (DataResponse) JsonUtil.readValue(postRequest, DataResponse.class);
            logger.info("query-{}: receiver {} profile info:{}", queryId, node, dataResponse.getProfile());
            return deserializeResponse(streamingTupleConverter, recordsSerializer, cubeInstance.getName(), tupleInfo, dataResponse);
        } catch (Exception e) {
            logger.error("error when search data from receiver:" + str, (Throwable) e);
            throw e;
        }
    }

    public Iterator<ITuple> deserializeResponse(StreamingTupleConverter streamingTupleConverter, RecordsSerializer recordsSerializer, String str, TupleInfo tupleInfo, DataResponse dataResponse) throws IOException, DataFormatException {
        return new StreamingTupleIterator(recordsSerializer.deserialize(Base64.decodeBase64(dataResponse.getData())), streamingTupleConverter, tupleInfo);
    }

    private DataRequest createDataRequest(String str, String str2, long j, TupleInfo tupleInfo, TupleFilter tupleFilter, Set<TblColRef> set, Set<TblColRef> set2, Set<FunctionDesc> set3, int i, boolean z) {
        DataRequest dataRequest = new DataRequest();
        dataRequest.setCubeName(str2);
        dataRequest.setQueryId(str);
        dataRequest.setMinSegmentTime(j);
        dataRequest.setTupleFilter(Base64.encodeBase64String(TupleFilterSerializer.serialize(tupleFilter, StringCodeSystem.INSTANCE)));
        dataRequest.setStoragePushDownLimit(i);
        dataRequest.setAllowStorageAggregation(z);
        dataRequest.setRequestSendTime(System.currentTimeMillis());
        dataRequest.setEnableDetailProfile(BackdoorToggles.isStreamingProfileEnable());
        dataRequest.setStorageBehavior(BackdoorToggles.getCoprocessorBehavior());
        HashSet newHashSet = Sets.newHashSet();
        Iterator<TblColRef> it = set.iterator();
        while (it.hasNext()) {
            newHashSet.add(it.next().getCanonicalName());
        }
        dataRequest.setDimensions(newHashSet);
        HashSet newHashSet2 = Sets.newHashSet();
        Iterator<TblColRef> it2 = set2.iterator();
        while (it2.hasNext()) {
            newHashSet2.add(it2.next().getCanonicalName());
        }
        dataRequest.setGroups(newHashSet2);
        dataRequest.setMetrics(Lists.newArrayList(set3));
        return dataRequest;
    }
}
