package be.ugent.idlab.knows.dataio.flow.base;

import be.ugent.idlab.knows.dataio.access.Access;
import be.ugent.idlab.knows.dataio.record.Record;
import be.ugent.idlab.knows.dataio.streams.SourceStream;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import java.io.Serializable;
import java.util.concurrent.Callable;

/* loaded from: input_file:be/ugent/idlab/knows/dataio/flow/base/SourceObservable.class */
public abstract class SourceObservable<T extends Record> extends Observable<T> implements AutoCloseable, Serializable {
    private static final long serialVersionUID = 2571726576202799037L;
    protected Access access;
    protected SourceStream stream;

    public SourceObservable(Access access, Callable<SourceStream> callable) {
        this.access = access;
        try {
            this.stream = callable.call();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // io.reactivex.rxjava3.core.Observable
    protected void subscribeActual(@NonNull Observer<? super T> observer) {
        this.stream.getStream().forEach(record -> {
            observer.onNext(record);
        });
        observer.onComplete();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.stream.close();
    }
}
