package org.apache.kylin.stream.core.query;

import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.I0Itec.zkclient.ZkServer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.stream.core.storage.Record;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/stream/core/query/MultiThreadsResultCollector.class */
public class MultiThreadsResultCollector extends ResultCollector {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) MultiThreadsResultCollector.class);
    private static ExecutorService executor;
    private int timeout;
    private Semaphore workersSemaphore;
    final BlockingQueue<Record> queue = new LinkedBlockingQueue(ZkServer.DEFAULT_MIN_SESSION_TIMEOUT);
    private AtomicInteger notCompletedWorkers;

    /* loaded from: input_file:org/apache/kylin/stream/core/query/MultiThreadsResultCollector$ResultIterateWorker.class */
    private class ResultIterateWorker implements Runnable {
        IStreamingSearchResult result;

        public ResultIterateWorker(IStreamingSearchResult iStreamingSearchResult) {
            this.result = iStreamingSearchResult;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.result.startRead();
                    Iterator<Record> it = this.result.iterator();
                    while (it.hasNext()) {
                        try {
                            MultiThreadsResultCollector.this.queue.put(it.next().copy());
                        } catch (InterruptedException e) {
                            throw new RuntimeException("Timeout when visiting streaming segmenent", e);
                        }
                    }
                    this.result.endRead();
                    MultiThreadsResultCollector.this.notCompletedWorkers.decrementAndGet();
                    MultiThreadsResultCollector.this.workersSemaphore.release();
                } catch (Exception e2) {
                    MultiThreadsResultCollector.logger.error("error when iterate search result", (Throwable) e2);
                    MultiThreadsResultCollector.this.notCompletedWorkers.decrementAndGet();
                    MultiThreadsResultCollector.this.workersSemaphore.release();
                }
            } catch (Throwable th) {
                MultiThreadsResultCollector.this.notCompletedWorkers.decrementAndGet();
                MultiThreadsResultCollector.this.workersSemaphore.release();
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/kylin/stream/core/query/MultiThreadsResultCollector$WorkSubmitter.class */
    private class WorkSubmitter implements Runnable {
        private WorkSubmitter() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Iterator<IStreamingSearchResult> it = MultiThreadsResultCollector.this.searchResults.iterator();
            while (it.hasNext()) {
                MultiThreadsResultCollector.executor.submit(new ResultIterateWorker(it.next()));
                try {
                    MultiThreadsResultCollector.this.workersSemaphore.acquire();
                } catch (InterruptedException e) {
                    MultiThreadsResultCollector.logger.error("interrupted", (Throwable) e);
                }
            }
        }
    }

    public MultiThreadsResultCollector(int i, int i2) {
        this.workersSemaphore = new Semaphore(i);
        this.timeout = i2;
    }

    @Override // java.lang.Iterable
    public Iterator<Record> iterator() {
        this.notCompletedWorkers = new AtomicInteger(this.searchResults.size());
        executor.submit(new WorkSubmitter());
        final long currentTimeMillis = System.currentTimeMillis();
        return new Iterator<Record>() { // from class: org.apache.kylin.stream.core.query.MultiThreadsResultCollector.1
            List<Record> recordList = Lists.newArrayListWithExpectedSize(100);
            Iterator<Record> internalIT = this.recordList.iterator();

            /* JADX WARN: Code restructure failed: missing block: B:10:0x0032, code lost:
            
                java.lang.Thread.yield();
             */
            /* JADX WARN: Code restructure failed: missing block: B:11:0x0048, code lost:
            
                if ((java.lang.System.currentTimeMillis() - r6) <= r5.this$0.timeout) goto L16;
             */
            /* JADX WARN: Code restructure failed: missing block: B:13:0x005e, code lost:
            
                if (r5.internalIT.hasNext() != false) goto L27;
             */
            /* JADX WARN: Code restructure failed: missing block: B:15:0x006d, code lost:
            
                if (r5.this$0.queue.size() <= 0) goto L29;
             */
            /* JADX WARN: Code restructure failed: missing block: B:17:0x0070, code lost:
            
                return true;
             */
            /* JADX WARN: Code restructure failed: missing block: B:21:?, code lost:
            
                return true;
             */
            /* JADX WARN: Code restructure failed: missing block: B:24:0x0054, code lost:
            
                throw new java.lang.RuntimeException("Timeout when iterate search result");
             */
            /* JADX WARN: Code restructure failed: missing block: B:27:0x0076, code lost:
            
                return r6;
             */
            /* JADX WARN: Code restructure failed: missing block: B:7:0x0022, code lost:
            
                if (r6 == false) goto L10;
             */
            /* JADX WARN: Code restructure failed: missing block: B:9:0x002f, code lost:
            
                if (r5.this$0.notCompletedWorkers.get() <= 0) goto L28;
             */
            @Override // java.util.Iterator
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public boolean hasNext() {
                /*
                    r5 = this;
                    r0 = r5
                    java.util.Iterator<org.apache.kylin.stream.core.storage.Record> r0 = r0.internalIT
                    boolean r0 = r0.hasNext()
                    if (r0 != 0) goto L1b
                    r0 = r5
                    org.apache.kylin.stream.core.query.MultiThreadsResultCollector r0 = org.apache.kylin.stream.core.query.MultiThreadsResultCollector.this
                    java.util.concurrent.BlockingQueue<org.apache.kylin.stream.core.storage.Record> r0 = r0.queue
                    int r0 = r0.size()
                    if (r0 <= 0) goto L1f
                L1b:
                    r0 = 1
                    goto L20
                L1f:
                    r0 = 0
                L20:
                    r6 = r0
                    r0 = r6
                    if (r0 != 0) goto L75
                L25:
                    r0 = r5
                    org.apache.kylin.stream.core.query.MultiThreadsResultCollector r0 = org.apache.kylin.stream.core.query.MultiThreadsResultCollector.this
                    java.util.concurrent.atomic.AtomicInteger r0 = org.apache.kylin.stream.core.query.MultiThreadsResultCollector.access$100(r0)
                    int r0 = r0.get()
                    if (r0 <= 0) goto L75
                    java.lang.Thread.yield()
                    long r0 = java.lang.System.currentTimeMillis()
                    r1 = r5
                    long r1 = r6
                    long r0 = r0 - r1
                    r7 = r0
                    r0 = r7
                    r1 = r5
                    org.apache.kylin.stream.core.query.MultiThreadsResultCollector r1 = org.apache.kylin.stream.core.query.MultiThreadsResultCollector.this
                    int r1 = org.apache.kylin.stream.core.query.MultiThreadsResultCollector.access$200(r1)
                    long r1 = (long) r1
                    int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                    if (r0 <= 0) goto L55
                    java.lang.RuntimeException r0 = new java.lang.RuntimeException
                    r1 = r0
                    java.lang.String r2 = "Timeout when iterate search result"
                    r1.<init>(r2)
                    throw r0
                L55:
                    r0 = r5
                    java.util.Iterator<org.apache.kylin.stream.core.storage.Record> r0 = r0.internalIT
                    boolean r0 = r0.hasNext()
                    if (r0 != 0) goto L70
                    r0 = r5
                    org.apache.kylin.stream.core.query.MultiThreadsResultCollector r0 = org.apache.kylin.stream.core.query.MultiThreadsResultCollector.this
                    java.util.concurrent.BlockingQueue<org.apache.kylin.stream.core.storage.Record> r0 = r0.queue
                    int r0 = r0.size()
                    if (r0 <= 0) goto L72
                L70:
                    r0 = 1
                    return r0
                L72:
                    goto L25
                L75:
                    r0 = r6
                    return r0
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.kylin.stream.core.query.MultiThreadsResultCollector.AnonymousClass1.hasNext():boolean");
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public Record next() {
                try {
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (currentTimeMillis2 > MultiThreadsResultCollector.this.timeout) {
                        throw new RuntimeException("Timeout when iterate search result");
                    }
                    if (!this.internalIT.hasNext()) {
                        this.recordList.clear();
                        Record poll = MultiThreadsResultCollector.this.queue.poll(MultiThreadsResultCollector.this.timeout - currentTimeMillis2, TimeUnit.MILLISECONDS);
                        if (poll == null) {
                            throw new RuntimeException("Timeout when iterate search result");
                        }
                        this.recordList.add(poll);
                        MultiThreadsResultCollector.this.queue.drainTo(this.recordList, 99);
                        this.internalIT = this.recordList.iterator();
                    }
                    return this.internalIT.next();
                } catch (InterruptedException e) {
                    throw new RuntimeException("Error when waiting queue", e);
                }
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException("not support");
            }
        };
    }

    static {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        executor = new ThreadPoolExecutor(instanceFromEnv.getStreamingReceiverQueryCoreThreads(), instanceFromEnv.getStreamingReceiverQueryMaxThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("query-worker"));
    }
}
