/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.stream;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class DirectStreamObserver<T>
implements StreamObserver<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DirectStreamObserver.class);
    private static final int DEFAULT_MAX_MESSAGES_BEFORE_CHECK = 100;
    private final Phaser phaser;
    private final CallStreamObserver<T> outboundObserver;
    private final int maxMessagesBeforeCheck;
    private int numberOfMessagesBeforeReadyCheck;

    public DirectStreamObserver(Phaser phaser, CallStreamObserver<T> outboundObserver) {
        this(phaser, outboundObserver, 100);
    }

    DirectStreamObserver(Phaser phaser, CallStreamObserver<T> outboundObserver, int maxMessagesBeforeCheck) {
        this.phaser = phaser;
        this.outboundObserver = outboundObserver;
        this.maxMessagesBeforeCheck = maxMessagesBeforeCheck;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(T value) {
        ++this.numberOfMessagesBeforeReadyCheck;
        if (this.numberOfMessagesBeforeReadyCheck >= this.maxMessagesBeforeCheck) {
            this.numberOfMessagesBeforeReadyCheck = 0;
            int waitTime = 1;
            int totalTimeWaited = 0;
            int phase = this.phaser.getPhase();
            while (!this.outboundObserver.isReady()) {
                try {
                    this.phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
                }
                catch (TimeoutException e) {
                    totalTimeWaited += waitTime;
                    waitTime *= 2;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            if (totalTimeWaited > 0) {
                if (phase == this.phaser.getPhase()) {
                    LOGGER.info("Output channel stalled for {}s, outbound thread {}. See: https://issues.apache.org/jira/browse/BEAM-4280 for the history for this issue.", (Object)totalTimeWaited, (Object)Thread.currentThread().getName());
                } else {
                    LOGGER.debug("Output channel stalled for {}s, outbound thread {}.", (Object)totalTimeWaited, (Object)Thread.currentThread().getName());
                }
            }
        }
        CallStreamObserver<T> callStreamObserver = this.outboundObserver;
        synchronized (callStreamObserver) {
            this.outboundObserver.onNext(value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(Throwable t) {
        CallStreamObserver<T> callStreamObserver = this.outboundObserver;
        synchronized (callStreamObserver) {
            this.outboundObserver.onError(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onCompleted() {
        CallStreamObserver<T> callStreamObserver = this.outboundObserver;
        synchronized (callStreamObserver) {
            this.outboundObserver.onCompleted();
        }
    }
}

