package org.apache.james.blob.api;

import com.google.common.base.Optional;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.io.ByteProcessor;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.CharSource;
import com.google.common.io.FileBackedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.blob.api.BlobPartsId;
import org.apache.james.blob.api.BlobStore;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

/* loaded from: input_file:org/apache/james/blob/api/Store.class */
public interface Store<T, I> {
    public static final int FILE_THRESHOLD = 102400;

    /* loaded from: input_file:org/apache/james/blob/api/Store$CloseableByteSource.class */
    public static abstract class CloseableByteSource extends ByteSource implements Closeable {
    }

    /* loaded from: input_file:org/apache/james/blob/api/Store$DelegateCloseableByteSource.class */
    public static class DelegateCloseableByteSource extends CloseableByteSource {
        private final ByteSource wrapped;
        private final Closeable closeable;

        DelegateCloseableByteSource(ByteSource byteSource, Closeable closeable) {
            this.wrapped = byteSource;
            this.closeable = closeable;
        }

        public InputStream openStream() throws IOException {
            return this.wrapped.openStream();
        }

        public CharSource asCharSource(Charset charset) {
            return this.wrapped.asCharSource(charset);
        }

        public InputStream openBufferedStream() throws IOException {
            return this.wrapped.openBufferedStream();
        }

        public ByteSource slice(long j, long j2) {
            return this.wrapped.slice(j, j2);
        }

        public boolean isEmpty() throws IOException {
            return this.wrapped.isEmpty();
        }

        public Optional<Long> sizeIfKnown() {
            return this.wrapped.sizeIfKnown();
        }

        public long size() throws IOException {
            return this.wrapped.size();
        }

        public long copyTo(OutputStream outputStream) throws IOException {
            return this.wrapped.copyTo(outputStream);
        }

        public long copyTo(ByteSink byteSink) throws IOException {
            return this.wrapped.copyTo(byteSink);
        }

        public byte[] read() throws IOException {
            return this.wrapped.read();
        }

        public <T> T read(ByteProcessor<T> byteProcessor) throws IOException {
            return (T) this.wrapped.read(byteProcessor);
        }

        public HashCode hash(HashFunction hashFunction) throws IOException {
            return this.wrapped.hash(hashFunction);
        }

        public boolean contentEquals(ByteSource byteSource) throws IOException {
            return this.wrapped.contentEquals(byteSource);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.closeable.close();
        }
    }

    /* loaded from: input_file:org/apache/james/blob/api/Store$Impl.class */
    public static class Impl<T, I extends BlobPartsId> implements Store<T, I> {
        public static final int DEFAULT_CONCURRENCY = 16;
        private final BlobPartsId.Factory<I> idFactory;
        private final Encoder<T> encoder;
        private final Decoder<T> decoder;
        private final BlobStore blobStore;

        @FunctionalInterface
        /* loaded from: input_file:org/apache/james/blob/api/Store$Impl$Decoder.class */
        public interface Decoder<T> {
            T decode(Stream<Pair<BlobType, CloseableByteSource>> stream);
        }

        @FunctionalInterface
        /* loaded from: input_file:org/apache/james/blob/api/Store$Impl$Encoder.class */
        public interface Encoder<T> {
            Stream<Pair<BlobType, ValueToSave>> encode(T t);
        }

        @FunctionalInterface
        /* loaded from: input_file:org/apache/james/blob/api/Store$Impl$ValueToSave.class */
        public interface ValueToSave {
            Mono<BlobId> saveIn(BucketName bucketName, BlobStore blobStore);
        }

        public Impl(BlobPartsId.Factory<I> factory, Encoder<T> encoder, Decoder<T> decoder, BlobStore blobStore) {
            this.idFactory = factory;
            this.encoder = encoder;
            this.decoder = decoder;
            this.blobStore = blobStore;
        }

        @Override // org.apache.james.blob.api.Store
        public Mono<I> save(T t) {
            Mono collectMap = Flux.fromStream(this.encoder.encode(t)).flatMapSequential(this::saveEntry).collectMap((v0) -> {
                return v0.getT1();
            }, (v0) -> {
                return v0.getT2();
            });
            BlobPartsId.Factory<I> factory = this.idFactory;
            Objects.requireNonNull(factory);
            return collectMap.map(factory::generate);
        }

        private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, ValueToSave> pair) {
            return Mono.just((BlobType) pair.getLeft()).zipWith(((ValueToSave) pair.getRight()).saveIn(this.blobStore.getDefaultBucketName(), this.blobStore));
        }

        @Override // org.apache.james.blob.api.Store
        public Mono<T> read(I i) {
            Mono map = Flux.fromIterable(i.asMap().entrySet()).publishOn(Schedulers.elastic()).map(entry -> {
                return Pair.of((BlobType) entry.getKey(), readByteSource(this.blobStore.getDefaultBucketName(), (BlobId) entry.getValue(), ((BlobType) entry.getKey()).getStoragePolicy()));
            }).collectList().map((v0) -> {
                return v0.stream();
            });
            Decoder<T> decoder = this.decoder;
            Objects.requireNonNull(decoder);
            return map.map(decoder::decode);
        }

        private CloseableByteSource readByteSource(BucketName bucketName, BlobId blobId, BlobStore.StoragePolicy storagePolicy) {
            OutputStream fileBackedOutputStream = new FileBackedOutputStream(Store.FILE_THRESHOLD);
            try {
                this.blobStore.read(bucketName, blobId, storagePolicy).transferTo(fileBackedOutputStream);
                ByteSource asByteSource = fileBackedOutputStream.asByteSource();
                Objects.requireNonNull(fileBackedOutputStream);
                return new DelegateCloseableByteSource(asByteSource, fileBackedOutputStream::reset);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.james.blob.api.Store
        public Publisher<Void> delete(I i) {
            return Flux.fromIterable(i.asMap().values()).flatMap(blobId -> {
                return this.blobStore.delete(this.blobStore.getDefaultBucketName(), blobId);
            }, 16).then();
        }
    }

    Mono<I> save(T t);

    Mono<T> read(I i);

    Publisher<Void> delete(I i);
}
