package org.apache.reef.tests.messaging.task;

import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.task.Task;
import org.apache.reef.task.TaskMessage;
import org.apache.reef.task.TaskMessageSource;
import org.apache.reef.task.events.DriverMessage;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;

@Unit
/* loaded from: input_file:org/apache/reef/tests/messaging/task/TaskMessagingTask.class */
public final class TaskMessagingTask implements Task, TaskMessageSource {
    private static final Logger LOG = Logger.getLogger(TaskMessagingTask.class.getName());
    private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
    private static final TaskMessage INIT_MESSAGE = TaskMessage.from("", CODEC.encode("MESSAGE::INIT"));
    private transient boolean isRunning = true;
    private transient Optional<TaskMessage> message = Optional.empty();

    /* loaded from: input_file:org/apache/reef/tests/messaging/task/TaskMessagingTask$DriverMessageHandler.class */
    public class DriverMessageHandler implements EventHandler<DriverMessage> {
        public DriverMessageHandler() {
        }

        @Override // org.apache.reef.wake.EventHandler
        public void onNext(DriverMessage driverMessage) {
            byte[] bArr = driverMessage.get().get();
            TaskMessagingTask.LOG.log(Level.INFO, "TaskMsg.send() invoked: {0}", TaskMessagingTask.CODEC.decode(bArr));
            TaskMessagingTask.this.message = Optional.of(TaskMessage.from(toString(), bArr));
        }
    }

    @Inject
    public TaskMessagingTask() {
        LOG.info("TaskMsg created.");
    }

    @Override // org.apache.reef.task.Task
    public synchronized byte[] call(byte[] bArr) {
        LOG.info("TaskMsg.call() invoked. Waiting for the message.");
        while (this.isRunning) {
            try {
                wait();
            } catch (InterruptedException e) {
                LOG.log(Level.WARNING, "wait() interrupted.", (Throwable) e);
            }
        }
        return this.message.orElse(INIT_MESSAGE).get();
    }

    @Override // org.apache.reef.task.TaskMessageSource
    public synchronized Optional<TaskMessage> getMessage() {
        LOG.log(Level.INFO, "TaskMsg.getMessage() invoked: {0}", CODEC.decode(this.message.orElse(INIT_MESSAGE).get()));
        if (this.message.isPresent()) {
            this.isRunning = false;
            notify();
        }
        return this.message;
    }
}
