/*
 * Decompiled with CFR 0.152.
 */
package org.rostore.v2.container.async;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.rostore.entity.Record;
import org.rostore.entity.RoStoreException;
import org.rostore.entity.StreamProcessingException;
import org.rostore.v2.container.async.AsyncException;
import org.rostore.v2.container.async.AsyncFunction;
import org.rostore.v2.container.async.AsyncListener;
import org.rostore.v2.container.async.AsyncStatus;

public class AsyncStream<S extends AutoCloseable>
implements AutoCloseable,
Future<S> {
    private final S stream;
    private Exception exception;
    private AsyncStatus status;
    private CountDownLatch countDownLatch;
    private AsyncListener asyncListener;

    public static <S extends AutoCloseable> AsyncStream<S> wrap(S s) {
        return AsyncStream.wrap(s, null);
    }

    public static <S extends AutoCloseable> AsyncStream<S> wrap(S s, AsyncListener asyncListener) {
        return new AsyncStream<S>(s, false, asyncListener);
    }

    public static <S extends AutoCloseable> AsyncStream<S> wrapBlocking(S s) {
        return AsyncStream.wrapBlocking(s, null);
    }

    public static <S extends AutoCloseable> AsyncStream<S> wrapBlocking(S s, AsyncListener asyncListener) {
        return new AsyncStream<S>(s, true, asyncListener);
    }

    public Exception getException() {
        return this.exception;
    }

    public void notifyRecord(Record record) {
        if (this.asyncListener != null) {
            this.asyncListener.record(record);
        }
    }

    public final void processFunction(AsyncFunction<S> runnable) {
        this.start();
        try {
            runnable.process(this.stream);
            this.status = AsyncStatus.SUCCESS;
            if (this.asyncListener != null) {
                this.asyncListener.status(this.status);
            }
        }
        catch (Exception e) {
            this.status = AsyncStatus.ERROR;
            this.exception = e;
            StreamProcessingException streamProcessingException = new StreamProcessingException(e);
            if (this.asyncListener != null) {
                this.asyncListener.error(e);
                this.asyncListener.status(this.status);
            }
            throw streamProcessingException;
        }
        finally {
            if (this.countDownLatch != null) {
                this.countDownLatch.countDown();
            }
        }
    }

    public void fail(Exception e) {
        this.status = AsyncStatus.ERROR;
        if (this.asyncListener != null) {
            this.asyncListener.error(e);
            this.asyncListener.status(this.status);
        }
    }

    private AsyncStream(S s, boolean blocking, AsyncListener asyncListener) {
        this.stream = s;
        this.status = AsyncStatus.OPENED;
        this.asyncListener = asyncListener;
        if (blocking) {
            this.countDownLatch = new CountDownLatch(1);
        }
        if (asyncListener != null) {
            asyncListener.status(this.status);
        }
    }

    @Override
    public boolean cancel(boolean b) {
        this.status = AsyncStatus.CANCELED;
        if (this.asyncListener != null) {
            this.asyncListener.status(this.status);
        }
        if (this.countDownLatch != null) {
            this.countDownLatch.countDown();
        }
        return true;
    }

    @Override
    public boolean isCancelled() {
        return this.status == AsyncStatus.CANCELED;
    }

    @Override
    public boolean isDone() {
        return this.status.isFinished();
    }

    private void start() {
        if (this.status == AsyncStatus.OPENED) {
            this.status = AsyncStatus.STARTED;
            if (this.asyncListener != null) {
                this.asyncListener.status(this.status);
            }
            return;
        }
        throw new RoStoreException("The stream is not in opened state.");
    }

    @Override
    public S get(long l, TimeUnit timeUnit) {
        if (this.countDownLatch != null) {
            try {
                this.countDownLatch.await(l, timeUnit);
                if (this.exception != null) {
                    throw new AsyncException(this.exception);
                }
                return this.stream;
            }
            catch (InterruptedException e) {
                throw new RoStoreException("Interrupted while waiting for stream", (Throwable)e);
            }
        }
        throw new RoStoreException("Can't wait for a non-blocking stream");
    }

    @Override
    public S get() {
        if (this.countDownLatch != null) {
            try {
                this.countDownLatch.await();
                if (this.exception != null) {
                    throw new AsyncException(this.exception);
                }
                return this.stream;
            }
            catch (InterruptedException e) {
                throw new RoStoreException("Interrupted while waiting for stream", (Throwable)e);
            }
        }
        throw new RoStoreException("Can't wait for a non-blocking stream");
    }

    @Override
    public void close() throws Exception {
        this.stream.close();
    }

    public void empty() {
        if (this.status != AsyncStatus.OPENED) {
            throw new RoStoreException("Try to mark the stream as empty after it has been started.");
        }
        this.status = AsyncStatus.SUCCESS;
        if (this.countDownLatch != null) {
            this.countDownLatch.countDown();
        }
        if (this.asyncListener != null) {
            this.asyncListener.status(this.status);
        }
    }
}

