package com.github.zella.rxprocess2;

import com.github.zella.rxprocess2.errors.ProcessException;
import com.github.zella.rxprocess2.errors.ProcessTimeoutException;
import com.zaxxer.nuprocess.NuProcess;
import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.Single;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/github/zella/rxprocess2/AsStdOut.class */
class AsStdOut {
    private final RxNuProcessBuilder pb;

    /* loaded from: input_file:com/github/zella/rxprocess2/AsStdOut$ColdStdoutHandler.class */
    static class ColdStdoutHandler extends BaseRxHandler {
        final BlockingQueue<Notification<byte[]>> queue;

        ColdStdoutHandler(BlockingQueue<Notification<byte[]>> blockingQueue) {
            this.queue = blockingQueue;
        }

        @Override // com.github.zella.rxprocess2.BaseRxHandler
        void onNext(byte[] bArr) {
            this.queue.add(Notification.createOnNext(bArr));
        }

        @Override // com.github.zella.rxprocess2.BaseRxHandler
        void onError(int i) {
            this.queue.add(Notification.createOnError(error(i, getErr())));
        }

        @Override // com.github.zella.rxprocess2.BaseRxHandler
        void onSuccesfullComplete() {
            this.queue.add(Notification.createOnComplete());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsStdOut(RxNuProcessBuilder rxNuProcessBuilder) {
        this.pb = rxNuProcessBuilder;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Observable<byte[]> create(long j, TimeUnit timeUnit) {
        return Observable.create(observableEmitter -> {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            this.pb.builder.setProcessListener(new ColdStdoutHandler(linkedBlockingQueue));
            NuProcess start = this.pb.builder.start();
            observableEmitter.setCancellable(() -> {
                try {
                    start.destroy(false);
                    start.waitFor(BaseRxHandler.GRACEFULL_STOP_SECONDS, TimeUnit.SECONDS);
                    start.destroy(true);
                    Single.timer(1L, TimeUnit.SECONDS).map(l -> {
                        return Boolean.valueOf(linkedBlockingQueue.offer(Notification.createOnError(new ProcessException(-1, "Nu process callback missing, clean by rx"))));
                    }).subscribe();
                } catch (Throwable th) {
                    Single.timer(1L, TimeUnit.SECONDS).map(l2 -> {
                        return Boolean.valueOf(linkedBlockingQueue.offer(Notification.createOnError(new ProcessException(-1, "Nu process callback missing, clean by rx"))));
                    }).subscribe();
                    throw th;
                }
            });
            while (true) {
                Notification notification = (Notification) linkedBlockingQueue.take();
                if (notification.isOnNext()) {
                    observableEmitter.onNext(notification.getValue());
                } else if (notification.isOnComplete()) {
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onComplete();
                    return;
                } else if (notification.isOnError()) {
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onError(notification.getError());
                    return;
                }
            }
        }).compose(observable -> {
            return j == -1 ? observable : observable.takeUntil(Observable.timer(j, timeUnit).map(l -> {
                throw new ProcessTimeoutException(Integer.MIN_VALUE);
            }));
        });
    }
}
