package org.apache.reef.examples.suspend;

import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.client.RunningJob;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
import org.apache.reef.wake.remote.impl.TransportEvent;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;

/* loaded from: input_file:org/apache/reef/examples/suspend/SuspendClientControl.class */
public class SuspendClientControl implements AutoCloseable {
    private static final Logger LOG = Logger.getLogger(Control.class.getName());
    private static final ObjectSerializableCodec<byte[]> CODEC = new ObjectSerializableCodec<>();
    private final transient Transport transport;
    private transient RunningJob runningJob;

    /* loaded from: input_file:org/apache/reef/examples/suspend/SuspendClientControl$ControlMessageHandler.class */
    private class ControlMessageHandler implements EventHandler<TransportEvent> {
        private ControlMessageHandler() {
        }

        public synchronized void onNext(TransportEvent transportEvent) {
            SuspendClientControl.LOG.log(Level.INFO, "Control message: {0} destination: {1}", new Object[]{SuspendClientControl.CODEC.decode(transportEvent.getData()), SuspendClientControl.this.runningJob});
            if (SuspendClientControl.this.runningJob != null) {
                SuspendClientControl.this.runningJob.send(transportEvent.getData());
            }
        }
    }

    @NamedParameter(doc = "Port for suspend/resume control commands", short_name = "port", default_value = "7008")
    /* loaded from: input_file:org/apache/reef/examples/suspend/SuspendClientControl$Port.class */
    public static final class Port implements Name<Integer> {
    }

    @Inject
    public SuspendClientControl(@Parameter(Port.class) int i) throws IOException {
        LOG.log(Level.INFO, "Listen to control port {0}", Integer.valueOf(i));
        ThreadPoolStage threadPoolStage = new ThreadPoolStage("suspend-control-server", new ControlMessageHandler(), 1, new EventHandler<Throwable>() { // from class: org.apache.reef.examples.suspend.SuspendClientControl.1
            public void onNext(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        this.transport = new NettyMessagingTransport("localhost", i, threadPoolStage, threadPoolStage, 1, 10000);
    }

    public synchronized void setRunningJob(RunningJob runningJob) {
        this.runningJob = runningJob;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.transport.close();
    }
}
