package org.apache.hugegraph.computer.core.output.hg.task;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.output.hg.exceptions.WriteBackException;
import org.apache.hugegraph.computer.core.output.hg.metrics.LoadSummary;
import org.apache.hugegraph.computer.core.output.hg.metrics.Printer;
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/output/hg/task/TaskManager.class */
public final class TaskManager {
    private static final Logger LOG = Log.logger((Class<?>) TaskManager.class);
    public static final String BATCH_WORKER = "batch-worker-%d";
    public static final String SINGLE_WORKER = "single-worker-%d";
    private HugeClient client;
    private Config config;
    private final ExecutorService batchService;
    private final ExecutorService singleService;
    private final Semaphore batchSemaphore = new Semaphore(batchSemaphoreNum());
    private final Semaphore singleSemaphore = new Semaphore(singleSemaphoreNum());
    private LoadSummary loadSummary = new LoadSummary();

    public TaskManager(Config config) {
        this.config = config;
        this.client = new HugeClientBuilder((String) config.get(ComputerOptions.HUGEGRAPH_URL), (String) config.get(ComputerOptions.HUGEGRAPH_GRAPH_NAME)).build();
        this.batchService = ExecutorUtil.newFixedThreadPool(((Integer) config.get(ComputerOptions.OUTPUT_BATCH_THREADS)).intValue(), BATCH_WORKER);
        this.singleService = ExecutorUtil.newFixedThreadPool(((Integer) config.get(ComputerOptions.OUTPUT_SINGLE_THREADS)).intValue(), SINGLE_WORKER);
        this.loadSummary.startTimer();
    }

    public HugeClient client() {
        return this.client;
    }

    private int batchSemaphoreNum() {
        return 1 + ((Integer) this.config.get(ComputerOptions.OUTPUT_BATCH_THREADS)).intValue();
    }

    private int singleSemaphoreNum() {
        return 2 * ((Integer) this.config.get(ComputerOptions.OUTPUT_SINGLE_THREADS)).intValue();
    }

    public void waitFinished() {
        LOG.info("Waiting for the insert tasks finished");
        try {
            this.batchSemaphore.acquire(batchSemaphoreNum());
            LOG.info("The batch-mode tasks stopped");
        } catch (InterruptedException e) {
            LOG.error("Interrupted while waiting batch-mode tasks");
        } finally {
            this.batchSemaphore.release(batchSemaphoreNum());
        }
        try {
            this.singleSemaphore.acquire(singleSemaphoreNum());
            LOG.info("The single-mode tasks stopped");
        } catch (InterruptedException e2) {
            LOG.error("Interrupted while waiting single-mode tasks");
        } finally {
            this.singleSemaphore.release(singleSemaphoreNum());
        }
    }

    public void shutdown() {
        long intValue = ((Integer) this.config.get(ComputerOptions.OUTPUT_THREAD_POOL_SHUTDOWN_TIMEOUT)).intValue();
        try {
            try {
                this.batchService.shutdown();
                this.batchService.awaitTermination(intValue, TimeUnit.SECONDS);
                LOG.info("The batch-mode tasks service executor shutdown");
                if (!this.batchService.isTerminated()) {
                    LOG.error("The unfinished batch-mode tasks will be cancelled");
                }
                this.batchService.shutdownNow();
            } catch (Throwable th) {
                if (!this.batchService.isTerminated()) {
                    LOG.error("The unfinished batch-mode tasks will be cancelled");
                }
                this.batchService.shutdownNow();
                throw th;
            }
        } catch (InterruptedException e) {
            LOG.error("The batch-mode tasks are interrupted");
            if (!this.batchService.isTerminated()) {
                LOG.error("The unfinished batch-mode tasks will be cancelled");
            }
            this.batchService.shutdownNow();
        }
        try {
            try {
                this.singleService.shutdown();
                this.singleService.awaitTermination(intValue, TimeUnit.SECONDS);
                LOG.info("The single-mode tasks service executor shutdown");
                if (!this.singleService.isTerminated()) {
                    LOG.error("The unfinished single-mode tasks will be cancelled");
                }
                this.singleService.shutdownNow();
            } catch (InterruptedException e2) {
                LOG.error("The single-mode tasks are interrupted");
                if (!this.singleService.isTerminated()) {
                    LOG.error("The unfinished single-mode tasks will be cancelled");
                }
                this.singleService.shutdownNow();
            }
            this.loadSummary.stopTimer();
            Printer.printSummary(this.loadSummary);
            this.client.close();
        } catch (Throwable th2) {
            if (!this.singleService.isTerminated()) {
                LOG.error("The unfinished single-mode tasks will be cancelled");
            }
            this.singleService.shutdownNow();
            throw th2;
        }
    }

    public void submitBatch(List<Vertex> list) {
        try {
            this.batchSemaphore.acquire();
            CompletableFuture.runAsync(new BatchInsertTask(this.config, this.client, list, this.loadSummary), this.batchService).exceptionally(th -> {
                LOG.warn("Batch insert error, try single insert", th);
                submitInSingle(list);
                return null;
            }).whenComplete((r3, th2) -> {
                this.batchSemaphore.release();
            });
        } catch (InterruptedException e) {
            throw new WriteBackException("Interrupted while waiting to submit batch", e);
        }
    }

    private void submitInSingle(List<Vertex> list) {
        try {
            this.singleSemaphore.acquire();
            CompletableFuture.runAsync(new SingleInsertTask(this.config, this.client, list, this.loadSummary), this.singleService).whenComplete((r3, th) -> {
                this.singleSemaphore.release();
            });
        } catch (InterruptedException e) {
            throw new WriteBackException("Interrupted while waiting to submit single", e);
        }
    }
}
