package org.apache.reef.wake.remote.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.WakeParameters;
import org.apache.reef.wake.impl.DefaultThreadFactory;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.remote.exception.RemoteRuntimeException;

/* loaded from: input_file:org/apache/reef/wake/remote/impl/RemoteReceiverStage.class */
public class RemoteReceiverStage implements EStage<TransportEvent> {
    private static final Logger LOG = Logger.getLogger(RemoteReceiverStage.class.getName());
    private final EventHandler<TransportEvent> handler;
    private final ThreadPoolStage<TransportEvent> stage;
    private final ExecutorService executor;
    private final long shutdownTimeout = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT;

    public RemoteReceiverStage(EventHandler<RemoteEvent<byte[]>> eventHandler, EventHandler<Throwable> eventHandler2, int i) {
        this.handler = new RemoteReceiverEventHandler(eventHandler);
        this.executor = Executors.newFixedThreadPool(i, new DefaultThreadFactory(RemoteReceiverStage.class.getName()));
        this.stage = new ThreadPoolStage<>(this.handler, this.executor, eventHandler2);
    }

    @Override // org.apache.reef.wake.EventHandler
    public void onNext(TransportEvent transportEvent) {
        LOG.log(Level.FINEST, "{0}", transportEvent);
        this.stage.onNext(transportEvent);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.log(Level.FINE, "close");
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    LOG.log(Level.WARNING, "Executor did not terminate in {0} ms.", Long.valueOf(WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT));
                    LOG.log(Level.WARNING, "Executor dropped {0} tasks.", Integer.valueOf(this.executor.shutdownNow().size()));
                }
            } catch (InterruptedException e) {
                LOG.log(Level.WARNING, "Close interrupted", (Throwable) e);
                throw new RemoteRuntimeException(e);
            }
        }
    }
}
