package org.apache.reef.wake.remote.impl;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.Transport;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.class */
public class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
    private static final Logger LOG = Logger.getLogger(RemoteSenderEventHandler.class.getName());
    private final RemoteEventEncoder<T> encoder;
    private final Transport transport;
    private final ExecutorService executor;
    private final AtomicReference<Link<byte[]>> linkRef = new AtomicReference<>();
    private final BlockingQueue<RemoteEvent<T>> queue = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoteSenderEventHandler(Encoder<T> encoder, Transport transport, ExecutorService executorService) {
        this.encoder = new RemoteEventEncoder<>(encoder);
        this.transport = transport;
        this.executor = executorService;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setLink(Link<byte[]> link) {
        LOG.log(Level.FINEST, "thread {0} link {1}", new Object[]{Thread.currentThread(), link});
        this.linkRef.compareAndSet(null, link);
        consumeQueue();
    }

    void consumeQueue() {
        while (true) {
            try {
                RemoteEvent<T> poll = this.queue.poll(0L, TimeUnit.MICROSECONDS);
                if (poll == null) {
                    return;
                }
                LOG.log(Level.FINEST, "{0}", poll);
                this.linkRef.get().write(this.encoder.encode((RemoteEvent) poll));
            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new RemoteRuntimeException(e);
            }
        }
    }

    @Override // org.apache.reef.wake.EventHandler
    public void onNext(RemoteEvent<T> remoteEvent) {
        try {
            if (this.linkRef.get() == null) {
                this.queue.add(remoteEvent);
                Link<byte[]> link = this.transport.get(remoteEvent.remoteAddress());
                if (link != null) {
                    LOG.log(Level.FINEST, "transport get link: {0}", link);
                    setLink(link);
                } else {
                    this.executor.submit(new ConnectFutureTask(new ConnectCallable(this.transport, remoteEvent.localAddress(), remoteEvent.remoteAddress()), new ConnectEventHandler(this)));
                }
            } else {
                if (LOG.isLoggable(Level.FINEST)) {
                    LOG.log(Level.FINEST, "Send an event from " + this.linkRef.get().getLocalAddress() + " to " + this.linkRef.get().getRemoteAddress() + " value " + remoteEvent);
                }
                this.linkRef.get().write(this.encoder.encode((RemoteEvent) remoteEvent));
            }
        } catch (RemoteRuntimeException e) {
            e.printStackTrace();
            throw e;
        }
    }
}
