/*
 * Decompiled with CFR 0.152.
 */
package org.opentsdb.client.sender.consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.opentsdb.client.OpenTSDBConfig;
import org.opentsdb.client.bean.request.Point;
import org.opentsdb.client.http.HttpClient;
import org.opentsdb.client.sender.consumer.Consumer;
import org.opentsdb.client.sender.consumer.ConsumerRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerImpl
implements Consumer {
    private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
    private final BlockingQueue<Point> queue;
    private final HttpClient httpClient;
    private final ExecutorService threadPool;
    private final int threadCount;
    private final OpenTSDBConfig config;
    private final CountDownLatch countDownLatch;

    public ConsumerImpl(BlockingQueue<Point> queue, HttpClient httpClient, OpenTSDBConfig config) {
        this.queue = queue;
        this.httpClient = httpClient;
        this.config = config;
        this.threadCount = config.getPutConsumerThreadCount();
        int[] i = new int[1];
        this.threadPool = Executors.newFixedThreadPool(this.threadCount, runnable -> {
            i[0] = i[0] + 1;
            return new Thread(runnable, "batch-put-thread-" + i[0]);
        });
        this.countDownLatch = new CountDownLatch(this.threadCount);
        log.debug("the consumer has started");
    }

    @Override
    public void start() {
        for (int i = 0; i < this.threadCount; ++i) {
            this.threadPool.execute(new ConsumerRunnable(this.queue, this.httpClient, this.config, this.countDownLatch));
        }
    }

    @Override
    public void gracefulStop() {
        this.stop(false);
    }

    @Override
    public void forceStop() {
        this.stop(true);
    }

    private void stop(boolean force) {
        if (this.threadPool != null) {
            if (force) {
                this.threadPool.shutdownNow();
            } else {
                while (!this.threadPool.isShutdown() || !this.threadPool.isTerminated()) {
                    this.threadPool.shutdownNow();
                }
                try {
                    this.countDownLatch.await();
                }
                catch (InterruptedException e) {
                    log.error("An error occurred waiting for the consumer thread to close", (Throwable)e);
                }
            }
        }
    }
}

