package org.apache.nemo.runtime.common.message;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.SyncStage;
import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.impl.TransportEvent;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.TransportFactory;

/* loaded from: input_file:org/apache/nemo/runtime/common/message/ClientRPC.class */
public final class ClientRPC {
    private static final DriverToClientMessageEncoder ENCODER = new DriverToClientMessageEncoder();
    private static final ClientRPCLinkListener LINK_LISTENER = new ClientRPCLinkListener();
    private static final int RETRY_COUNT = 10;
    private static final int RETRY_TIMEOUT = 100;
    private final Transport transport;
    private final Link<ControlMessage.DriverToClientMessage> link;
    private final Map<ControlMessage.ClientToDriverMessageType, EventHandler<ControlMessage.ClientToDriverMessage>> handlers = new ConcurrentHashMap();
    private volatile boolean isClosed = false;

    /* loaded from: input_file:org/apache/nemo/runtime/common/message/ClientRPC$ClientRPCLinkListener.class */
    private static final class ClientRPCLinkListener implements LinkListener<ControlMessage.DriverToClientMessage> {
        private ClientRPCLinkListener() {
        }

        public void onSuccess(ControlMessage.DriverToClientMessage driverToClientMessage) {
        }

        public void onException(Throwable th, SocketAddress socketAddress, ControlMessage.DriverToClientMessage driverToClientMessage) {
            throw new RuntimeException(th);
        }
    }

    /* loaded from: input_file:org/apache/nemo/runtime/common/message/ClientRPC$DriverToClientMessageEncoder.class */
    private static final class DriverToClientMessageEncoder implements Encoder<ControlMessage.DriverToClientMessage> {
        private DriverToClientMessageEncoder() {
        }

        public byte[] encode(ControlMessage.DriverToClientMessage driverToClientMessage) {
            return driverToClientMessage.toByteArray();
        }
    }

    /* loaded from: input_file:org/apache/nemo/runtime/common/message/ClientRPC$RPCEventHandler.class */
    private final class RPCEventHandler implements EventHandler<TransportEvent> {
        private RPCEventHandler() {
        }

        public void onNext(TransportEvent transportEvent) {
            try {
                ClientRPC.this.handleMessage(ControlMessage.ClientToDriverMessage.parseFrom(transportEvent.getData()));
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
    }

    @Inject
    private ClientRPC(TransportFactory transportFactory, LocalAddressProvider localAddressProvider, @Parameter(JobConf.ClientSideRPCServerHost.class) String str, @Parameter(JobConf.ClientSideRPCServerPort.class) int i) {
        this.transport = transportFactory.newInstance(localAddressProvider.getLocalAddress(), 0, new SyncStage(new RPCEventHandler()), (EStage) null, 10, RETRY_TIMEOUT);
        try {
            this.link = this.transport.open(new InetSocketAddress(str, i), ENCODER, LINK_LISTENER);
        } catch (IOException e) {
            throw new IllegalStateException("Failed to setup an RPC connection to the Client. A failure at the client-side is suspected.");
        }
    }

    public ClientRPC registerHandler(ControlMessage.ClientToDriverMessageType clientToDriverMessageType, EventHandler<ControlMessage.ClientToDriverMessage> eventHandler) {
        if (this.handlers.putIfAbsent(clientToDriverMessageType, eventHandler) != null) {
            throw new RuntimeException(String.format("A handler for %s already registered", clientToDriverMessageType));
        }
        return this;
    }

    public void shutdown() {
        ensureRunning();
        try {
            try {
                this.transport.close();
                this.isClosed = true;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.isClosed = true;
            throw th;
        }
    }

    public void send(ControlMessage.DriverToClientMessage driverToClientMessage) {
        ensureRunning();
        this.link.write(driverToClientMessage);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleMessage(ControlMessage.ClientToDriverMessage clientToDriverMessage) {
        ControlMessage.ClientToDriverMessageType type = clientToDriverMessage.getType();
        EventHandler<ControlMessage.ClientToDriverMessage> eventHandler = this.handlers.get(type);
        if (eventHandler == null) {
            throw new RuntimeException(String.format("Handler for message type %s not registered", type));
        }
        eventHandler.onNext(clientToDriverMessage);
    }

    private void ensureRunning() {
        if (this.isClosed) {
            throw new RuntimeException("The ClientRPC is already closed");
        }
    }
}
