package org.apache.reef.wake.rx.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.StageConfiguration;
import org.apache.reef.wake.exception.WakeRuntimeException;
import org.apache.reef.wake.impl.DefaultThreadFactory;
import org.apache.reef.wake.impl.StageManager;
import org.apache.reef.wake.rx.AbstractRxStage;
import org.apache.reef.wake.rx.Observer;

/* loaded from: input_file:org/apache/reef/wake/rx/impl/RxThreadPoolStage.class */
public final class RxThreadPoolStage<T> extends AbstractRxStage<T> {
    private static final Logger LOG = Logger.getLogger(RxThreadPoolStage.class.getName());
    private final Observer<T> observer;
    private final ExecutorService executor;
    private final long shutdownTimeout = 1000;
    private ExecutorService completionExecutor;
    private DefaultThreadFactory tf;

    @Inject
    public RxThreadPoolStage(@Parameter(StageConfiguration.StageObserver.class) Observer<T> observer, @Parameter(StageConfiguration.NumberOfThreads.class) int i) {
        this(observer.getClass().getName(), observer, i);
    }

    @Inject
    public RxThreadPoolStage(@Parameter(StageConfiguration.StageName.class) String str, @Parameter(StageConfiguration.StageObserver.class) Observer<T> observer, @Parameter(StageConfiguration.NumberOfThreads.class) int i) {
        super(str);
        this.shutdownTimeout = 1000L;
        this.observer = observer;
        if (i <= 0) {
            throw new WakeRuntimeException(str + " numThreads " + i + " is less than or equal to 0");
        }
        this.tf = new DefaultThreadFactory(str);
        this.executor = Executors.newFixedThreadPool(i, this.tf);
        this.completionExecutor = Executors.newSingleThreadExecutor(this.tf);
        StageManager.instance().register(this);
    }

    @Override // org.apache.reef.wake.rx.Observer
    public void onNext(final T t) {
        beforeOnNext();
        this.executor.submit(new Runnable() { // from class: org.apache.reef.wake.rx.impl.RxThreadPoolStage.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.lang.Runnable
            public void run() {
                RxThreadPoolStage.this.observer.onNext(t);
                RxThreadPoolStage.this.afterOnNext();
            }
        });
    }

    @Override // org.apache.reef.wake.rx.Observer
    public void onError(final Exception exc) {
        submitCompletion(new Runnable() { // from class: org.apache.reef.wake.rx.impl.RxThreadPoolStage.2
            @Override // java.lang.Runnable
            public void run() {
                RxThreadPoolStage.this.observer.onError(exc);
            }
        });
    }

    @Override // org.apache.reef.wake.rx.Observer
    public void onCompleted() {
        submitCompletion(new Runnable() { // from class: org.apache.reef.wake.rx.impl.RxThreadPoolStage.3
            @Override // java.lang.Runnable
            public void run() {
                RxThreadPoolStage.this.observer.onCompleted();
            }
        });
    }

    private void submitCompletion(final Runnable runnable) {
        this.executor.shutdown();
        this.completionExecutor.submit(new Runnable() { // from class: org.apache.reef.wake.rx.impl.RxThreadPoolStage.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (!RxThreadPoolStage.this.executor.awaitTermination(3153600000L, TimeUnit.SECONDS)) {
                        TimeoutException timeoutException = new TimeoutException("Executor terminated due to unrequired timeout");
                        RxThreadPoolStage.LOG.log(Level.SEVERE, timeoutException.getMessage());
                        RxThreadPoolStage.this.observer.onError(timeoutException);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    RxThreadPoolStage.this.observer.onError(e);
                }
                runnable.run();
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            this.executor.shutdown();
            this.completionExecutor.shutdown();
            if (!this.executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
                LOG.log(Level.WARNING, "Executor did not terminate in 1000ms.");
                LOG.log(Level.WARNING, "Executor dropped " + this.executor.shutdownNow().size() + " tasks.");
            }
            if (this.completionExecutor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
                return;
            }
            LOG.log(Level.WARNING, "Executor did not terminate in 1000ms.");
            LOG.log(Level.WARNING, "Completion executor dropped " + this.completionExecutor.shutdownNow().size() + " tasks.");
        }
    }

    public int getQueueLength() {
        return ((ThreadPoolExecutor) this.executor).getQueue().size();
    }
}
