package com.linkedin.r2.message.stream.entitystream;

import com.linkedin.data.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/r2-core-11.0.0.jar:com/linkedin/r2/message/stream/entitystream/EntityStreams.class */
public final class EntityStreams {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) EntityStreams.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/r2-core-11.0.0.jar:com/linkedin/r2/message/stream/entitystream/EntityStreams$EntityStreamImpl.class */
    public static class EntityStreamImpl implements EntityStream {
        private final Writer _writer;
        private Reader _reader;
        private final Object _lock = new Object();
        private List<Observer> _observers = new ArrayList();
        private int _remaining = 0;
        private boolean _notifyWritePossible = true;
        private State _state = State.UNINITIALIZED;

        /* loaded from: input_file:WEB-INF/lib/r2-core-11.0.0.jar:com/linkedin/r2/message/stream/entitystream/EntityStreams$EntityStreamImpl$ReadHandleImpl.class */
        private class ReadHandleImpl implements ReadHandle {
            private ReadHandleImpl() {
            }

            @Override // com.linkedin.r2.message.stream.entitystream.ReadHandle
            public void request(int i) {
                if (i <= 0) {
                    throw new IllegalArgumentException("cannot request non-positive number of data chunks: " + i);
                }
                boolean z = false;
                synchronized (EntityStreamImpl.this._lock) {
                    if (EntityStreamImpl.this._state != State.ACTIVE) {
                        return;
                    }
                    EntityStreamImpl.this._remaining += i;
                    if (EntityStreamImpl.this._remaining < 0) {
                        EntityStreams.LOG.warn("chunkNum overflow, setting to Integer.MAX_VALUE");
                        EntityStreamImpl.this._remaining = Integer.MAX_VALUE;
                    }
                    if (EntityStreamImpl.this._notifyWritePossible) {
                        z = true;
                        EntityStreamImpl.this._notifyWritePossible = false;
                    }
                    if (z) {
                        try {
                            EntityStreamImpl.this._writer.onWritePossible();
                        } catch (Throwable th) {
                            EntityStreams.LOG.warn("Writer throws at onWritePossible", th);
                            synchronized (EntityStreamImpl.this._lock) {
                                EntityStreamImpl.this._state = State.ABORTED;
                                EntityStreamImpl.this.doCancel(th, true);
                            }
                        }
                    }
                }
            }

            @Override // com.linkedin.r2.message.stream.entitystream.ReadHandle
            public void cancel() {
                boolean z;
                synchronized (EntityStreamImpl.this._lock) {
                    z = EntityStreamImpl.this._notifyWritePossible && EntityStreamImpl.this._state == State.ACTIVE;
                    if (z) {
                        EntityStreamImpl.this._state = State.ABORTED;
                    } else if (EntityStreamImpl.this._state == State.ACTIVE) {
                        EntityStreamImpl.this._state = State.ABORT_REQUESTED;
                    }
                }
                if (z) {
                    EntityStreamImpl.this.doCancel(EntityStreamImpl.access$800(), false);
                }
            }
        }

        /* loaded from: input_file:WEB-INF/lib/r2-core-11.0.0.jar:com/linkedin/r2/message/stream/entitystream/EntityStreams$EntityStreamImpl$WriteHandleImpl.class */
        private class WriteHandleImpl implements WriteHandle {
            private WriteHandleImpl() {
            }

            @Override // com.linkedin.r2.message.stream.entitystream.WriteHandle
            public void write(ByteString byteString) {
                boolean z = false;
                synchronized (EntityStreamImpl.this._lock) {
                    if (EntityStreamImpl.this._state == State.FINISHED) {
                        throw new IllegalStateException("Attempting to write after done or error of WriteHandle is invoked");
                    }
                    if (EntityStreamImpl.this._state == State.ABORTED) {
                        return;
                    }
                    EntityStreamImpl.access$710(EntityStreamImpl.this);
                    if (EntityStreamImpl.this._remaining < 0) {
                        throw new IllegalStateException("Attempt to write when remaining is 0");
                    }
                    if (EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                        z = true;
                        EntityStreamImpl.this._state = State.ABORTED;
                    }
                    if (z) {
                        EntityStreamImpl.this.doCancel(EntityStreamImpl.access$800(), false);
                        return;
                    }
                    Iterator it = EntityStreamImpl.this._observers.iterator();
                    while (it.hasNext()) {
                        try {
                            ((Observer) it.next()).onDataAvailable(byteString);
                        } catch (Throwable th) {
                            EntityStreams.LOG.warn("Observer throws exception at onDataAvailable", th);
                        }
                    }
                    try {
                        EntityStreamImpl.this._reader.onDataAvailable(byteString);
                    } catch (Throwable th2) {
                        EntityStreams.LOG.warn("Reader throws exception at onDataAvailable", th2);
                        synchronized (EntityStreamImpl.this._lock) {
                            EntityStreamImpl.this._state = State.ABORTED;
                            EntityStreamImpl.this.doCancel(th2, true);
                        }
                    }
                }
            }

            @Override // com.linkedin.r2.message.stream.entitystream.WriteHandle
            public void done() {
                boolean z = false;
                synchronized (EntityStreamImpl.this._lock) {
                    if (EntityStreamImpl.this._state == State.ACTIVE || EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                        if (EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                            z = true;
                            EntityStreamImpl.this._state = State.ABORTED;
                        } else {
                            EntityStreamImpl.this._state = State.FINISHED;
                        }
                        if (z) {
                            EntityStreamImpl.this.doCancel(EntityStreamImpl.access$800(), false);
                            return;
                        }
                        Iterator it = EntityStreamImpl.this._observers.iterator();
                        while (it.hasNext()) {
                            try {
                                ((Observer) it.next()).onDone();
                            } catch (Throwable th) {
                                EntityStreams.LOG.warn("Observer throws exception at onDone, ignored.", th);
                            }
                        }
                        try {
                            EntityStreamImpl.this._reader.onDone();
                        } catch (Throwable th2) {
                            EntityStreams.LOG.warn("Reader throws exception at onDone; notifying writer", th2);
                            EntityStreamImpl.this.safeAbortWriter(th2);
                        }
                    }
                }
            }

            @Override // com.linkedin.r2.message.stream.entitystream.WriteHandle
            public void error(Throwable th) {
                boolean z = false;
                synchronized (EntityStreamImpl.this._lock) {
                    if (EntityStreamImpl.this._state == State.ACTIVE || EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                        if (EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                            z = true;
                            EntityStreamImpl.this._state = State.ABORTED;
                        } else {
                            EntityStreamImpl.this._state = State.FINISHED;
                        }
                        if (z) {
                            EntityStreamImpl.this.doCancel(EntityStreamImpl.access$800(), false);
                            return;
                        }
                        EntityStreamImpl.this.safeNotifyErrorToObservers(th);
                        try {
                            EntityStreamImpl.this._reader.onError(th);
                        } catch (Throwable th2) {
                            EntityStreams.LOG.warn("Reader throws exception at onError; notifying writer", th2);
                            EntityStreamImpl.this.safeAbortWriter(th2);
                        }
                    }
                }
            }

            @Override // com.linkedin.r2.message.stream.entitystream.WriteHandle
            public int remaining() {
                int i;
                boolean z = false;
                synchronized (EntityStreamImpl.this._lock) {
                    if (EntityStreamImpl.this._state != State.ACTIVE && EntityStreamImpl.this._state != State.ABORT_REQUESTED) {
                        return 0;
                    }
                    if (EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                        z = true;
                        EntityStreamImpl.this._state = State.ABORTED;
                        i = 0;
                    } else {
                        if (EntityStreamImpl.this._remaining == 0) {
                            EntityStreamImpl.this._notifyWritePossible = true;
                        }
                        i = EntityStreamImpl.this._remaining;
                    }
                    if (z) {
                        EntityStreamImpl.this.doCancel(EntityStreamImpl.access$800(), false);
                    }
                    return i;
                }
            }
        }

        EntityStreamImpl(Writer writer) {
            this._writer = writer;
        }

        @Override // com.linkedin.r2.message.stream.entitystream.EntityStream
        public void addObserver(Observer observer) {
            synchronized (this._lock) {
                checkInit();
                this._observers.add(observer);
            }
        }

        @Override // com.linkedin.r2.message.stream.entitystream.EntityStream
        public void setReader(Reader reader) {
            ReadHandle readHandle;
            synchronized (this._lock) {
                checkInit();
                this._state = State.ACTIVE;
                this._reader = reader;
                this._observers = Collections.unmodifiableList(this._observers);
            }
            Throwable th = null;
            try {
                this._writer.onInit(new WriteHandleImpl());
            } catch (Throwable th2) {
                EntityStreams.LOG.warn("Writer throws exception at onInit", th2);
                synchronized (this._lock) {
                    this._state = State.ABORTED;
                    safeAbortWriter(th2);
                    th = th2;
                }
            }
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            if (th == null) {
                readHandle = new ReadHandleImpl();
            } else {
                final Throwable th3 = th;
                readHandle = new ReadHandle() { // from class: com.linkedin.r2.message.stream.entitystream.EntityStreams.EntityStreamImpl.1
                    @Override // com.linkedin.r2.message.stream.entitystream.ReadHandle
                    public void request(int i) {
                        notifyError();
                    }

                    @Override // com.linkedin.r2.message.stream.entitystream.ReadHandle
                    public void cancel() {
                        notifyError();
                    }

                    void notifyError() {
                        if (atomicBoolean.compareAndSet(false, true)) {
                            EntityStreamImpl.this.safeNotifyErrorToObservers(th3);
                            EntityStreamImpl.this.safeNotifyErrorToReader(th3);
                        }
                    }
                };
            }
            try {
                this._reader.onInit(readHandle);
            } catch (RuntimeException e) {
                EntityStreams.LOG.warn("Reader throws exception at onInit", (Throwable) e);
                synchronized (this._lock) {
                    if (this._state == State.ACTIVE || this._state == State.ABORT_REQUESTED || th != null) {
                        this._state = State.ABORTED;
                        if (th == null) {
                            doCancel(e, true);
                        } else if (atomicBoolean.compareAndSet(false, true)) {
                            safeNotifyErrorToObservers(e);
                            safeNotifyErrorToReader(e);
                        }
                    }
                }
            }
        }

        private void checkInit() {
            if (this._state != State.UNINITIALIZED) {
                throw new IllegalStateException("EntityStream had already been initialized and can no longer accept Observers or Reader");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void safeAbortWriter(Throwable th) {
            try {
                this._writer.onAbort(th);
            } catch (Throwable th2) {
                EntityStreams.LOG.warn("Writer throws exception at onAbort", th2);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void safeNotifyErrorToObservers(Throwable th) {
            Iterator<Observer> it = this._observers.iterator();
            while (it.hasNext()) {
                try {
                    it.next().onError(th);
                } catch (Throwable th2) {
                    EntityStreams.LOG.warn("Observer throws exception at onError, ignored.", th2);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void safeNotifyErrorToReader(Throwable th) {
            try {
                this._reader.onError(th);
            } catch (Throwable th2) {
                EntityStreams.LOG.error("Reader throws exception at onError", th2);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doCancel(Throwable th, boolean z) {
            safeAbortWriter(th);
            safeNotifyErrorToObservers(th);
            if (z) {
                safeNotifyErrorToReader(th);
            }
        }

        private static Exception getAbortedException() {
            return new AbortedException("Reader aborted");
        }

        static /* synthetic */ int access$710(EntityStreamImpl entityStreamImpl) {
            int i = entityStreamImpl._remaining;
            entityStreamImpl._remaining = i - 1;
            return i;
        }

        static /* synthetic */ Exception access$800() {
            return getAbortedException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/r2-core-11.0.0.jar:com/linkedin/r2/message/stream/entitystream/EntityStreams$State.class */
    public enum State {
        UNINITIALIZED,
        ACTIVE,
        FINISHED,
        ABORTED,
        ABORT_REQUESTED
    }

    private EntityStreams() {
    }

    public static EntityStream emptyStream() {
        return newEntityStream(new Writer() { // from class: com.linkedin.r2.message.stream.entitystream.EntityStreams.1
            private WriteHandle _wh;

            @Override // com.linkedin.r2.message.stream.entitystream.Writer
            public void onInit(WriteHandle writeHandle) {
                this._wh = writeHandle;
            }

            @Override // com.linkedin.r2.message.stream.entitystream.Writer
            public void onWritePossible() {
                this._wh.done();
            }

            @Override // com.linkedin.r2.message.stream.entitystream.Writer
            public void onAbort(Throwable th) {
            }
        });
    }

    public static EntityStream newEntityStream(Writer writer) {
        return new EntityStreamImpl(writer);
    }
}
