package org.apache.reef.vortex.evaluator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.task.HeartBeatTriggerManager;
import org.apache.reef.vortex.common.KryoUtils;
import org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregationRequest;
import org.apache.reef.vortex.protocol.workertomaster.TaskletAggregationFailureReport;
import org.apache.reef.vortex.protocol.workertomaster.TaskletAggregationResultReport;
import org.apache.reef.vortex.protocol.workertomaster.TaskletFailureReport;
import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReport;
import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports;

/* JADX INFO: Access modifiers changed from: package-private */
@DriverSide
@Unstable
@Private
/* loaded from: input_file:org/apache/reef/vortex/evaluator/AggregateContainer.class */
public final class AggregateContainer {
    private final TaskletAggregationRequest taskletAggregationRequest;
    private final HeartBeatTriggerManager heartBeatTriggerManager;
    private final KryoUtils kryoUtils;
    private final BlockingDeque<byte[]> workerReportsQueue;
    private final Object stateLock = new Object();
    private final ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);

    @GuardedBy("stateLock")
    private final HashMap<Integer, Integer> pendingTasklets = new HashMap<>();

    @GuardedBy("stateLock")
    private final List<Pair<Integer, Object>> completedTasklets = new ArrayList();

    @GuardedBy("stateLock")
    private final List<Pair<Integer, Exception>> failedTasklets = new ArrayList();

    /* loaded from: input_file:org/apache/reef/vortex/evaluator/AggregateContainer$AggregateTriggerType.class */
    private enum AggregateTriggerType {
        ALARM,
        COUNT
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AggregateContainer(HeartBeatTriggerManager heartBeatTriggerManager, KryoUtils kryoUtils, BlockingDeque<byte[]> blockingDeque, TaskletAggregationRequest taskletAggregationRequest) {
        this.heartBeatTriggerManager = heartBeatTriggerManager;
        this.kryoUtils = kryoUtils;
        this.workerReportsQueue = blockingDeque;
        this.taskletAggregationRequest = taskletAggregationRequest;
    }

    public TaskletAggregationRequest getTaskletAggregationRequest() {
        return this.taskletAggregationRequest;
    }

    @GuardedBy("stateLock")
    private void aggregateTasklets(List<WorkerToMasterReport> list, List<Object> list2, List<Integer> list3) {
        synchronized (this.stateLock) {
            for (Pair<Integer, Object> pair : this.completedTasklets) {
                list3.add(pair.getLeft());
                list2.add(pair.getRight());
            }
            for (Pair<Integer, Exception> pair2 : this.failedTasklets) {
                list.add(new TaskletFailureReport(pair2.getLeft().intValue(), pair2.getRight()));
            }
            this.completedTasklets.clear();
            this.failedTasklets.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void aggregateTasklets(AggregateTriggerType aggregateTriggerType) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        synchronized (this.stateLock) {
            switch (aggregateTriggerType) {
                case ALARM:
                    aggregateTasklets(arrayList, arrayList2, arrayList3);
                    break;
                case COUNT:
                    if (aggregateOnCount()) {
                        aggregateTasklets(arrayList, arrayList2, arrayList3);
                        break;
                    } else {
                        return;
                    }
                default:
                    throw new RuntimeException("Unexpected aggregate type.");
            }
            if (!arrayList2.isEmpty()) {
                try {
                    arrayList.add(new TaskletAggregationResultReport(arrayList3, this.taskletAggregationRequest.executeAggregation(arrayList2)));
                } catch (Exception e) {
                    arrayList.add(new TaskletAggregationFailureReport(arrayList3, e));
                }
            }
            if (arrayList.isEmpty()) {
                return;
            }
            this.workerReportsQueue.addLast(this.kryoUtils.serialize(new WorkerToMasterReports(arrayList)));
            this.heartBeatTriggerManager.triggerHeartBeat();
        }
    }

    public void scheduleTasklet(int i) {
        synchronized (this.stateLock) {
            if (!outstandingTasklets()) {
                this.timer.schedule(new Runnable() { // from class: org.apache.reef.vortex.evaluator.AggregateContainer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        AggregateContainer.this.aggregateTasklets(AggregateTriggerType.ALARM);
                        synchronized (AggregateContainer.this.stateLock) {
                            if (AggregateContainer.this.outstandingTasklets()) {
                                AggregateContainer.this.timer.schedule(this, AggregateContainer.this.taskletAggregationRequest.getPolicy().getPeriodMilliseconds(), TimeUnit.MILLISECONDS);
                            }
                        }
                    }
                }, this.taskletAggregationRequest.getPolicy().getPeriodMilliseconds(), TimeUnit.MILLISECONDS);
            }
            if (!this.pendingTasklets.containsKey(Integer.valueOf(i))) {
                this.pendingTasklets.put(Integer.valueOf(i), 0);
            }
            this.pendingTasklets.put(Integer.valueOf(i), Integer.valueOf(this.pendingTasklets.get(Integer.valueOf(i)).intValue() + 1));
        }
    }

    public void taskletComplete(int i, Object obj) {
        boolean aggregateOnCount;
        synchronized (this.stateLock) {
            this.completedTasklets.add(new ImmutablePair(Integer.valueOf(i), obj));
            removePendingTaskletReferenceCount(i);
            aggregateOnCount = aggregateOnCount();
        }
        if (aggregateOnCount) {
            aggregateTasklets(AggregateTriggerType.COUNT);
        }
    }

    public void taskletFailed(int i, Exception exc) {
        boolean aggregateOnCount;
        synchronized (this.stateLock) {
            this.failedTasklets.add(new ImmutablePair(Integer.valueOf(i), exc));
            removePendingTaskletReferenceCount(i);
            aggregateOnCount = aggregateOnCount();
        }
        if (aggregateOnCount) {
            aggregateTasklets(AggregateTriggerType.COUNT);
        }
    }

    @GuardedBy("stateLock")
    private void removePendingTaskletReferenceCount(int i) {
        this.pendingTasklets.put(Integer.valueOf(i), Integer.valueOf(this.pendingTasklets.get(Integer.valueOf(i)).intValue() - 1));
        if (this.pendingTasklets.get(Integer.valueOf(i)).intValue() <= 0) {
            this.pendingTasklets.remove(Integer.valueOf(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @GuardedBy("stateLock")
    public boolean outstandingTasklets() {
        return (this.pendingTasklets.isEmpty() && this.completedTasklets.isEmpty() && this.failedTasklets.isEmpty()) ? false : true;
    }

    @GuardedBy("stateLock")
    private boolean aggregateOnCount() {
        return this.taskletAggregationRequest.getPolicy().getCount().isPresent() && this.completedTasklets.size() + this.failedTasklets.size() >= this.taskletAggregationRequest.getPolicy().getCount().get().intValue();
    }
}
