package org.apache.nemo.runtime.common.message.grpc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import javax.inject.Inject;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageListener;
import org.apache.nemo.runtime.common.message.MessageParameters;
import org.apache.nemo.runtime.common.message.MessageSender;
import org.apache.reef.io.network.naming.NameResolver;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/common/message/grpc/GrpcMessageEnvironment.class */
public final class GrpcMessageEnvironment implements MessageEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcMessageEnvironment.class);
    private final NameResolver nameResolver;
    private final IdentifierFactory idFactory;
    private final GrpcMessageServer grpcServer;
    private final String localSenderId;

    @Inject
    private GrpcMessageEnvironment(LocalAddressProvider localAddressProvider, NameResolver nameResolver, IdentifierFactory identifierFactory, @Parameter(MessageParameters.SenderId.class) String str) {
        this.nameResolver = nameResolver;
        this.idFactory = identifierFactory;
        this.grpcServer = new GrpcMessageServer(localAddressProvider, nameResolver, identifierFactory, str);
        this.localSenderId = str;
        try {
            this.grpcServer.start();
        } catch (Exception e) {
            LOG.warn("Failed to start grpc server", e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.nemo.runtime.common.message.MessageEnvironment
    public <T> void setupListener(String str, MessageListener<T> messageListener) {
        this.grpcServer.setupListener(str, messageListener);
    }

    @Override // org.apache.nemo.runtime.common.message.MessageEnvironment
    public void removeListener(String str) {
        this.grpcServer.removeListener(str);
    }

    @Override // org.apache.nemo.runtime.common.message.MessageEnvironment
    public <T> Future<MessageSender<T>> asyncConnect(String str, String str2) {
        CompletableFuture completableFuture = new CompletableFuture();
        GrpcMessageClient grpcMessageClient = new GrpcMessageClient(this.nameResolver, this.idFactory, str);
        try {
            grpcMessageClient.connect();
            completableFuture.complete(new GrpcMessageSender(str, str2, grpcMessageClient));
        } catch (Exception e) {
            LOG.warn("Failed to connect a receiver id=" + str + ", listenerId=" + str2, e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // org.apache.nemo.runtime.common.message.MessageEnvironment
    public void close() throws Exception {
        this.grpcServer.close();
    }

    @Override // org.apache.nemo.runtime.common.message.MessageEnvironment
    public String getId() {
        return this.localSenderId;
    }
}
