package org.apache.hama.bsp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.bsp.JobStatus;
import org.apache.hama.ipc.JobSubmissionProtocol;
import org.apache.zookeeper.KeeperException;

/* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner.class */
public class LocalBSPRunner implements JobSubmissionProtocol {
    private static final String IDENTIFIER = "localrunner";
    protected HashMap<String, BSPPeerProtocol> localGrooms = new HashMap<>();
    protected String jobFile;
    protected String jobName;
    protected JobStatus currentJobStatus;
    protected Configuration conf;
    protected FileSystem fs;
    public static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
    private static String WORKING_DIR = "/user/hama/bsp/";
    protected static final LinkedList<Future<BSP>> futureList = new LinkedList<>();
    protected static int threadPoolSize = Runtime.getRuntime().availableProcessors();
    protected static CyclicBarrier barrier = new CyclicBarrier(threadPoolSize);
    protected static volatile ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadPoolSize);

    /* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner$BSPRunner.class */
    class BSPRunner implements Callable<BSP> {
        Configuration conf;
        BSPJob job;
        BSP bsp;
        BSPPeerProtocol groom;

        public BSPRunner(Configuration configuration, BSPJob bSPJob, BSP bsp, BSPPeerProtocol bSPPeerProtocol) {
            this.conf = configuration;
            this.job = bSPJob;
            this.bsp = bsp;
            this.groom = bSPPeerProtocol;
        }

        public void run() {
            this.bsp.setConf(this.conf);
            try {
                this.bsp.bsp(this.groom);
            } catch (Exception e) {
                LocalBSPRunner.LOG.error("Exception during BSP execution!", e);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public BSP call() throws Exception {
            run();
            return this.bsp;
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner$LocalGroom.class */
    class LocalGroom implements BSPPeerProtocol {
        private long superStepCount = 0;
        private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<>();
        private final Map<String, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap();
        private final String peerName;

        public LocalGroom(String str) {
            this.peerName = str;
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public void send(String str, BSPMessage bSPMessage) throws IOException {
            if (this.peerName.equals(str)) {
                put(bSPMessage);
                return;
            }
            if (this.outgoingQueues.get(str) == null) {
                this.outgoingQueues.put(str, new ConcurrentLinkedQueue<>());
            }
            this.outgoingQueues.get(str).add(bSPMessage);
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public void put(BSPMessage bSPMessage) throws IOException {
            this.localMessageQueue.add(bSPMessage);
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public BSPMessage getCurrentMessage() throws IOException {
            return this.localMessageQueue.poll();
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public int getNumCurrentMessages() {
            return this.localMessageQueue.size();
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public void sync() throws IOException, KeeperException, InterruptedException {
            barrierSync();
            for (Map.Entry<String, ConcurrentLinkedQueue<BSPMessage>> entry : this.outgoingQueues.entrySet()) {
                String key = entry.getKey();
                Iterator<BSPMessage> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    LocalBSPRunner.this.localGrooms.get(key).put(it.next());
                }
            }
            this.outgoingQueues.clear();
            barrierSync();
            incrementSuperSteps();
        }

        private void barrierSync() throws InterruptedException {
            try {
                LocalBSPRunner.barrier.await();
            } catch (BrokenBarrierException e) {
                throw new InterruptedException("Barrier has been broken!" + e);
            }
        }

        private void incrementSuperSteps() {
            JobStatus jobStatus = LocalBSPRunner.this.currentJobStatus;
            long j = this.superStepCount;
            this.superStepCount = j + 1;
            jobStatus.setprogress(j);
            LocalBSPRunner.this.currentJobStatus.setSuperstepCount(LocalBSPRunner.this.currentJobStatus.progress());
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public long getSuperstepCount() {
            return this.superStepCount;
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public String getPeerName() {
            return this.peerName;
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public String[] getAllPeerNames() {
            return (String[]) LocalBSPRunner.this.localGrooms.keySet().toArray(new String[LocalBSPRunner.this.localGrooms.keySet().size()]);
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public void clear() {
            this.localMessageQueue.clear();
        }

        @Override // org.apache.hadoop.ipc.VersionedProtocol
        public long getProtocolVersion(String str, long j) throws IOException {
            return 3L;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.hama.bsp.BSPPeerProtocol
        public Task getTask(TaskAttemptID taskAttemptID) throws IOException {
            return null;
        }

        @Override // org.apache.hama.bsp.BSPPeerProtocol
        public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
            return true;
        }

        @Override // org.apache.hama.bsp.BSPPeerProtocol
        public void done(TaskAttemptID taskAttemptID, boolean z) throws IOException {
        }

        @Override // org.apache.hama.bsp.BSPPeerProtocol
        public void fsError(TaskAttemptID taskAttemptID, String str) throws IOException {
        }

        @Override // org.apache.hama.bsp.BSPPeerInterface
        public void put(BSPMessageBundle bSPMessageBundle) throws IOException {
            throw new UnsupportedOperationException("Messagebundle is not supported by local testing");
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner$ThreadObserver.class */
    class ThreadObserver implements Runnable {
        JobStatus status;

        public ThreadObserver(JobStatus jobStatus) {
            this.status = jobStatus;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            Iterator<Future<BSP>> it = LocalBSPRunner.futureList.iterator();
            while (it.hasNext()) {
                try {
                    it.next().get();
                } catch (InterruptedException e) {
                    LocalBSPRunner.LOG.error("Exception during BSP execution!", e);
                    z = false;
                } catch (ExecutionException e2) {
                    LocalBSPRunner.LOG.error("Exception during BSP execution!", e2);
                    z = false;
                }
            }
            if (z) {
                LocalBSPRunner.this.currentJobStatus.setState(JobStatus.State.SUCCEEDED);
                LocalBSPRunner.this.currentJobStatus.setRunState(2);
            } else {
                LocalBSPRunner.this.currentJobStatus.setState(JobStatus.State.FAILED);
                LocalBSPRunner.this.currentJobStatus.setRunState(3);
            }
            LocalBSPRunner.threadPool.shutdownNow();
        }
    }

    public LocalBSPRunner(Configuration configuration) throws IOException {
        this.conf = configuration;
        this.fs = FileSystem.get(configuration);
        String str = configuration.get("bsp.local.dir");
        if (str != null && !str.isEmpty()) {
            WORKING_DIR = str;
        }
        threadPoolSize = configuration.getInt("bsp.local.tasks.maximum", 20);
        threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadPoolSize);
        barrier = new CyclicBarrier(threadPoolSize);
        for (int i = 0; i < threadPoolSize; i++) {
            String str2 = "localrunner " + i;
            this.localGrooms.put(str2, new LocalGroom(str2));
        }
    }

    @Override // org.apache.hadoop.ipc.VersionedProtocol
    public long getProtocolVersion(String str, long j) throws IOException {
        return 3L;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public BSPJobID getNewJobId() throws IOException {
        return new BSPJobID(IDENTIFIER, 1);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus submitJob(BSPJobID bSPJobID, String str) throws IOException {
        this.jobFile = str;
        BSPJob bSPJob = new BSPJob(bSPJobID, str);
        bSPJob.setNumBspTask(threadPoolSize);
        this.jobName = bSPJob.getJobName();
        this.currentJobStatus = new JobStatus(bSPJobID, System.getProperty("user.name"), 0L, 1);
        for (int i = 0; i < threadPoolSize; i++) {
            String str2 = "localrunner " + i;
            LocalGroom localGroom = new LocalGroom(str2);
            this.localGrooms.put(str2, localGroom);
            futureList.add(threadPool.submit(new BSPRunner(this.conf, bSPJob, (BSP) ReflectionUtils.newInstance(bSPJob.getBspClass(), this.conf), localGroom)));
        }
        new Thread(new ThreadObserver(this.currentJobStatus)).start();
        return this.currentJobStatus;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol, org.apache.hama.bsp.GroomServerManager
    public ClusterStatus getClusterStatus(boolean z) throws IOException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, BSPPeerProtocol> entry : this.localGrooms.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().getPeerName());
        }
        return new ClusterStatus(hashMap, threadPoolSize, threadPoolSize, BSPMaster.State.RUNNING);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobProfile getJobProfile(BSPJobID bSPJobID) throws IOException {
        return new JobProfile(System.getProperty("user.name"), bSPJobID, this.jobFile, this.jobName);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus getJobStatus(BSPJobID bSPJobID) throws IOException {
        if (this.currentJobStatus == null) {
            this.currentJobStatus = new JobStatus(bSPJobID, System.getProperty("user.name"), 0L, 1);
        }
        return this.currentJobStatus;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public String getFilesystemName() throws IOException {
        return this.fs.getUri().toString();
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus[] jobsToComplete() throws IOException {
        return null;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus[] getAllJobs() throws IOException {
        return null;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol, org.apache.hama.ipc.MasterProtocol
    public String getSystemDir() {
        return WORKING_DIR;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public void killJob(BSPJobID bSPJobID) throws IOException {
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public boolean killTask(TaskAttemptID taskAttemptID, boolean z) throws IOException {
        return false;
    }
}
