/*
 * Decompiled with CFR 0.152.
 */
package org.apache.reef.tests.messaging.task;

import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.driver.client.JobMessageObserver;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.driver.task.TaskMessage;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.tang.formats.Impl;
import org.apache.reef.tang.formats.Param;
import org.apache.reef.tests.library.exceptions.DriverSideFailure;
import org.apache.reef.tests.messaging.task.TaskMessagingTask;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
import org.apache.reef.wake.time.Clock;
import org.apache.reef.wake.time.event.Alarm;

@Unit
public final class TaskMessagingDriver {
    private static final Logger LOG = Logger.getLogger(TaskMessagingDriver.class.getName());
    private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec();
    private static final byte[] HELLO_STR = CODEC.encode((Object)"MESSAGE::HELLO");
    private static final int DELAY = 1000;
    private final transient JobMessageObserver client;
    private final transient Clock clock;

    @Inject
    public TaskMessagingDriver(JobMessageObserver client, Clock clock) {
        this.client = client;
        this.clock = clock;
    }

    public final class TaskMessageHandler
    implements EventHandler<TaskMessage> {
        public void onNext(TaskMessage msg) {
            LOG.log(Level.FINE, "TaskMessage: from {0}: {1}", new Object[]{msg.getId(), CODEC.decode(msg.get())});
            if (!Arrays.equals(msg.get(), HELLO_STR)) {
                DriverSideFailure ex = new DriverSideFailure("Unexpected message: " + (String)CODEC.decode(msg.get()));
                LOG.log(Level.SEVERE, "Bad message from " + msg.getId(), ex);
                throw ex;
            }
        }
    }

    public final class TaskRunningHandler
    implements EventHandler<RunningTask> {
        public void onNext(final RunningTask task) {
            LOG.log(Level.FINE, "TaskRuntime: {0}", task.getId());
            TaskMessagingDriver.this.clock.scheduleAlarm(1000, (EventHandler)new EventHandler<Alarm>(){

                public void onNext(Alarm alarm) {
                    task.send(HELLO_STR);
                }
            });
        }
    }

    public final class EvaluatorAllocatedHandler
    implements EventHandler<AllocatedEvaluator> {
        public void onNext(AllocatedEvaluator eval) {
            String taskId = "Task_" + eval.getId();
            LOG.log(Level.INFO, "Submit task: {0}", taskId);
            Configuration taskConfig = TaskConfiguration.CONF.set((Param)TaskConfiguration.IDENTIFIER, taskId).set((Impl)TaskConfiguration.TASK, TaskMessagingTask.class).set((Impl)TaskConfiguration.ON_MESSAGE, TaskMessagingTask.DriverMessageHandler.class).set((Impl)TaskConfiguration.ON_SEND_MESSAGE, TaskMessagingTask.class).build();
            eval.submitTask(taskConfig);
        }
    }
}

