package org.apache.reef.wake.examples.join;

import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.reef.wake.rx.Observer;
import org.apache.reef.wake.rx.StaticObservable;

/* loaded from: input_file:org/apache/reef/wake/examples/join/NonBlockingJoin.class */
public class NonBlockingJoin implements StaticObservable {
    private final Observer<TupleEvent> out;
    private final AtomicBoolean leftDone = new AtomicBoolean(false);
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicBoolean sentCompleted = new AtomicBoolean(false);
    private final ConcurrentSkipListSet<TupleEvent> leftTable = new ConcurrentSkipListSet<>();
    private final ConcurrentSkipListSet<TupleEvent> rightTable = new ConcurrentSkipListSet<>();

    public NonBlockingJoin(Observer<TupleEvent> observer) {
        this.out = observer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drainRight() {
        if (this.leftDone.get()) {
            while (true) {
                TupleEvent pollFirst = this.rightTable.pollFirst();
                if (pollFirst == null) {
                    break;
                } else if (this.leftTable.contains(pollFirst)) {
                    this.out.onNext(pollFirst);
                }
            }
            if (this.completed.get()) {
                while (true) {
                    TupleEvent pollFirst2 = this.rightTable.pollFirst();
                    if (pollFirst2 == null) {
                        break;
                    } else if (this.leftTable.contains(pollFirst2)) {
                        this.out.onNext(pollFirst2);
                    }
                }
                if (this.sentCompleted.getAndSet(true)) {
                    return;
                }
                this.out.onCompleted();
            }
        }
    }

    public Observer<TupleEvent> wireLeft() {
        return new Observer<TupleEvent>() { // from class: org.apache.reef.wake.examples.join.NonBlockingJoin.1
            @Override // org.apache.reef.wake.rx.Observer
            public void onNext(TupleEvent tupleEvent) {
                NonBlockingJoin.this.leftTable.add(tupleEvent);
            }

            @Override // org.apache.reef.wake.rx.Observer
            public void onError(Exception exc) {
                NonBlockingJoin.this.leftTable.clear();
                NonBlockingJoin.this.rightTable.clear();
                NonBlockingJoin.this.out.onError(exc);
            }

            @Override // org.apache.reef.wake.rx.Observer
            public void onCompleted() {
                NonBlockingJoin.this.leftDone.set(true);
                NonBlockingJoin.this.drainRight();
            }
        };
    }

    public Observer<TupleEvent> wireRight() {
        return new Observer<TupleEvent>() { // from class: org.apache.reef.wake.examples.join.NonBlockingJoin.2
            @Override // org.apache.reef.wake.rx.Observer
            public void onNext(TupleEvent tupleEvent) {
                if (NonBlockingJoin.this.leftTable.contains(tupleEvent)) {
                    NonBlockingJoin.this.out.onNext(tupleEvent);
                } else if (!NonBlockingJoin.this.leftDone.get()) {
                    NonBlockingJoin.this.rightTable.add(tupleEvent);
                }
                NonBlockingJoin.this.drainRight();
            }

            @Override // org.apache.reef.wake.rx.Observer
            public void onError(Exception exc) {
                NonBlockingJoin.this.leftTable.clear();
                NonBlockingJoin.this.rightTable.clear();
                NonBlockingJoin.this.out.onError(exc);
            }

            @Override // org.apache.reef.wake.rx.Observer
            public void onCompleted() {
                NonBlockingJoin.this.completed.set(true);
                NonBlockingJoin.this.drainRight();
            }
        };
    }
}
