/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tika.server.client;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.pipesiterator.CallablePipesIterator;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.apache.tika.server.client.TikaClient;
import org.apache.tika.server.client.TikaServerClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

public class TikaClientCLI {
    private static final Logger LOGGER = LoggerFactory.getLogger(TikaClientCLI.class);
    private static final int QUEUE_SIZE = 10000;

    public static void main(String[] args) throws Exception {
        Path tikaConfigPath = Paths.get(args[0], new String[0]);
        TikaClientCLI cli = new TikaClientCLI();
        cli.execute(tikaConfigPath);
    }

    private void execute(Path tikaConfigPath) throws TikaException, IOException, SAXException {
        TikaClient client;
        int i;
        TikaServerClientConfig clientConfig = TikaServerClientConfig.build(tikaConfigPath);
        ExecutorService executorService = Executors.newFixedThreadPool(clientConfig.getNumThreads() + 1);
        ExecutorCompletionService<Long> completionService = new ExecutorCompletionService<Long>(executorService);
        PipesIterator pipesIterator = PipesIterator.build(tikaConfigPath);
        ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<FetchEmitTuple>(10000);
        completionService.submit(new CallablePipesIterator(pipesIterator, queue));
        if (clientConfig.getTikaEndpoints().size() == clientConfig.getNumThreads()) {
            this.logDiffSizes(clientConfig.getTikaEndpoints().size(), clientConfig.getNumThreads());
            for (i = 0; i < clientConfig.getNumThreads(); ++i) {
                client = TikaClient.get(clientConfig.getHttpClientFactory(), Collections.singletonList(clientConfig.getTikaEndpoints().get(i)));
                completionService.submit(new FetchWorker(queue, client, clientConfig.getMaxWaitMillis()));
            }
        } else {
            for (i = 0; i < clientConfig.getNumThreads(); ++i) {
                client = TikaClient.get(clientConfig.getHttpClientFactory(), clientConfig.getTikaEndpoints());
                completionService.submit(new FetchWorker(queue, client, clientConfig.getMaxWaitMillis()));
            }
        }
        int finished = 0;
        while (finished < clientConfig.getNumThreads() + 1) {
            Future future = null;
            try {
                future = completionService.poll(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOGGER.error("", e);
                throw new RuntimeException(e);
            }
            if (future == null) continue;
            ++finished;
            try {
                future.get();
            }
            catch (InterruptedException | ExecutionException e) {
                LOGGER.error("critical main loop failure", e);
                throw new RuntimeException(e);
            }
        }
    }

    private void logDiffSizes(int servers, int numThreads) {
        LOGGER.info("tika server count ({}) != numThreads ({}). Each client will randomly select a server from this list", (Object)servers, (Object)numThreads);
    }

    private static class FetchWorker
    implements Callable<Long> {
        private final ArrayBlockingQueue<FetchEmitTuple> queue;
        private final TikaClient client;
        private final long maxWaitMs;

        public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client, long maxWaitMs) {
            this.queue = queue;
            this.client = client;
            this.maxWaitMs = maxWaitMs;
        }

        @Override
        public Long call() throws Exception {
            while (true) {
                FetchEmitTuple t;
                if ((t = this.queue.poll(this.maxWaitMs, TimeUnit.MILLISECONDS)) == null) {
                    throw new TimeoutException("exceeded maxWaitMs");
                }
                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                    this.queue.put(PipesIterator.COMPLETED_SEMAPHORE);
                    return 1L;
                }
                try {
                    LOGGER.debug("about to parse: {}", (Object)t.getFetchKey());
                    this.client.parse(t);
                    continue;
                }
                catch (IOException | TikaException e) {
                    LOGGER.warn(t.getFetchKey().toString(), e);
                    continue;
                }
                break;
            }
        }
    }
}

