/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.client;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.geaflow.cluster.client.AbstractPipelineClient;
import org.apache.geaflow.cluster.rpc.RpcClient;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.common.utils.ExecutorUtil;
import org.apache.geaflow.common.utils.ThreadUtil;
import org.apache.geaflow.pipeline.IPipelineResult;
import org.apache.geaflow.pipeline.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncPipelineClient
extends AbstractPipelineClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncPipelineClient.class);
    private static final String PREFIX_DRIVER_EXECUTE_PIPELINE = "driver-submit-pipeline-";
    private ExecutorService executorService;

    @Override
    public IPipelineResult submit(Pipeline pipeline) {
        int driverNum = this.driverAddresses.size();
        this.executorService = new ThreadPoolExecutor(driverNum, driverNum, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(driverNum), ThreadUtil.namedThreadFactory((boolean)true, (String)PREFIX_DRIVER_EXECUTE_PIPELINE));
        ArrayList<Future<IPipelineResult>> list = new ArrayList<Future<IPipelineResult>>(driverNum);
        int pipelineIndex = 0;
        for (Map.Entry entry : this.driverAddresses.entrySet()) {
            list.add(this.executorService.submit(new ExecutePipelineTask(driverNum, pipelineIndex, pipeline, (String)entry.getKey())));
            ++pipelineIndex;
        }
        try {
            return (IPipelineResult)((Future)list.get(0)).get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("submit pipeline failed", (Throwable)e);
            throw new GeaflowRuntimeException((Throwable)e);
        }
    }

    @Override
    public boolean isSync() {
        return false;
    }

    @Override
    public void close() {
        if (this.executorService != null) {
            ExecutorUtil.shutdown((ExecutorService)this.executorService);
        }
    }

    private class ExecutePipelineTask
    implements Callable<IPipelineResult> {
        private final String driverId;
        private final Pipeline pipeline;
        private final int total;
        private final int index;

        private ExecutePipelineTask(int total, int index, Pipeline pipeline, String driverId) {
            this.driverId = driverId;
            this.pipeline = pipeline;
            this.total = total;
            this.index = index;
        }

        @Override
        public IPipelineResult call() throws Exception {
            int num = this.index + 1;
            LOGGER.info("execute pipeline [{}/{}]", (Object)num, (Object)this.total);
            long start = System.currentTimeMillis();
            IPipelineResult future = RpcClient.getInstance().executePipeline(this.driverId, this.pipeline);
            LOGGER.info("execute pipeline [{}/{}] costs {}ms, driver: {}", new Object[]{num, this.total, System.currentTimeMillis() - start, this.driverId});
            return future;
        }
    }
}

