/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.azure.cosmos;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.IRoutingMapProvider;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.azure.cosmos.AutoValue_CosmosIO_Read;
import org.apache.beam.sdk.io.azure.cosmos.CosmosOptions;
import org.apache.beam.sdk.io.azure.cosmos.NormalizedRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import reactor.core.publisher.Mono;

public class CosmosIO {
    private static final @UnknownKeyFor @NonNull @Initialized String DEFAULT_QUERY = "SELECT * FROM root";

    private CosmosIO() {
    }

    public static <T> @UnknownKeyFor @NonNull @Initialized Read<T> read(@UnknownKeyFor @NonNull @Initialized Class<T> classType) {
        return new AutoValue_CosmosIO_Read.Builder<T>().setClassType(classType).build();
    }

    private static class BoundedCosmosReader<@UnknownKeyFor T>
    extends BoundedSource.BoundedReader<T> {
        private final @UnknownKeyFor @NonNull @Initialized BoundedCosmosBDSource<T> source;
        private final @UnknownKeyFor @NonNull @Initialized CosmosAsyncClient client;
        private T current;
        private @UnknownKeyFor @NonNull @Initialized Iterator<T> iterator;

        private BoundedCosmosReader(@UnknownKeyFor @NonNull @Initialized BoundedCosmosBDSource<T> source, @UnknownKeyFor @NonNull @Initialized CosmosOptions options) {
            this.source = source;
            this.client = ((CosmosOptions)options.as(CosmosOptions.class)).getCosmosClientBuilder().buildAsyncClient();
        }

        public @UnknownKeyFor @NonNull @Initialized boolean start() throws @UnknownKeyFor @NonNull @Initialized IOException {
            String database = ((BoundedCosmosBDSource)this.source).spec.getDatabase();
            String container = ((BoundedCosmosBDSource)this.source).spec.getContainer();
            String query = ((BoundedCosmosBDSource)this.source).spec.getQuery();
            Class classType = ((BoundedCosmosBDSource)this.source).spec.getClassType();
            CosmosAsyncContainer c = this.client.getDatabase(database).getContainer(container);
            CosmosQueryRequestOptions queryOptions = ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.getCosmosQueryRequestOptionsAccessor().disallowQueryPlanRetrieval(new CosmosQueryRequestOptions()).setFeedRange(((BoundedCosmosBDSource)this.source).range.toFeedRange());
            this.iterator = c.queryItems(query == null ? CosmosIO.DEFAULT_QUERY : query, queryOptions, classType).toIterable().iterator();
            return this.readNext();
        }

        public @UnknownKeyFor @NonNull @Initialized boolean advance() throws @UnknownKeyFor @NonNull @Initialized IOException {
            return this.readNext();
        }

        private @UnknownKeyFor @NonNull @Initialized boolean readNext() {
            boolean nonEmpty = this.iterator.hasNext();
            if (nonEmpty) {
                this.current = this.iterator.next();
            }
            return nonEmpty;
        }

        public T getCurrent() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.client.close();
        }

        public @UnknownKeyFor @NonNull @Initialized BoundedSource<T> getCurrentSource() {
            return this.source;
        }
    }

    @VisibleForTesting
    public static class BoundedCosmosBDSource<@UnknownKeyFor T>
    extends BoundedSource<T> {
        private final @UnknownKeyFor @NonNull @Initialized Read<T> spec;
        private final @UnknownKeyFor @NonNull @Initialized NormalizedRange range;
        private @Nullable @UnknownKeyFor @Initialized Long estimatedByteSize;

        BoundedCosmosBDSource(@UnknownKeyFor @NonNull @Initialized Read<T> spec) {
            this(spec, NormalizedRange.FULL_RANGE, null);
        }

        BoundedCosmosBDSource(@UnknownKeyFor @NonNull @Initialized Read<T> spec, @UnknownKeyFor @NonNull @Initialized NormalizedRange range, @Nullable @UnknownKeyFor @Initialized Long estimatedSize) {
            this.spec = spec;
            this.range = range;
            this.estimatedByteSize = estimatedSize;
        }

        public @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized BoundedSource<T>> split(@UnknownKeyFor @NonNull @Initialized long desiredBundleSizeBytes, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized Exception {
            CosmosClientBuilder builder = ((CosmosOptions)options.as(CosmosOptions.class)).getCosmosClientBuilder();
            try (CosmosAsyncClient client = builder.buildAsyncClient();){
                CosmosAsyncDatabase database = client.getDatabase(this.spec.getDatabase());
                CosmosAsyncContainer container = database.getContainer(this.spec.getContainer());
                AsyncDocumentClient document = CosmosBridgeInternal.getAsyncDocumentClient((CosmosAsyncClient)client);
                ArrayList<BoundedCosmosBDSource<T>> sources = new ArrayList<BoundedCosmosBDSource<T>>();
                long rangeSize = this.getEstimatedSizeBytes(options);
                float splitsFloat = (float)rangeSize / (float)desiredBundleSizeBytes;
                int splits = (int)Math.ceil(splitsFloat);
                String databaseLink = ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor().getLink(database);
                String containerLink = databaseLink + "/" + "colls" + "/" + container.getId();
                Mono getCollectionObservable = document.getCollectionCache().resolveByNameAsync(null, containerLink, null).map(Utils.ValueHolder::initialize);
                List subRanges = ((List)FeedRangeInternal.convert((FeedRange)this.range.toFeedRange()).trySplit((IRoutingMapProvider)document.getPartitionKeyRangeCache(), null, getCollectionObservable, splits).block()).stream().map(NormalizedRange::fromFeedRange).collect(Collectors.toList());
                long estimatedSubRangeSize = rangeSize / (long)subRanges.size();
                for (NormalizedRange subrange : subRanges) {
                    sources.add(new BoundedCosmosBDSource<T>(this.spec, subrange, estimatedSubRangeSize));
                }
                ArrayList<BoundedCosmosBDSource<T>> arrayList = sources;
                return arrayList;
            }
        }

        public @UnknownKeyFor @NonNull @Initialized long getEstimatedSizeBytes(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized Exception {
            if (this.estimatedByteSize != null) {
                return this.estimatedByteSize;
            }
            CosmosClientBuilder builder = ((CosmosOptions)options.as(CosmosOptions.class)).getCosmosClientBuilder();
            try (CosmosAsyncClient client = builder.buildAsyncClient();){
                CosmosAsyncContainer container = client.getDatabase(this.spec.getDatabase()).getContainer(this.spec.getContainer());
                CosmosChangeFeedRequestOptions requestOptions = CosmosChangeFeedRequestOptions.createForProcessingFromNow((FeedRange)this.range.toFeedRange());
                requestOptions.setMaxItemCount(1);
                requestOptions.setMaxPrefetchPageCount(1);
                requestOptions.setQuotaInfoEnabled(true);
                this.estimatedByteSize = (Long)container.queryChangeFeed(requestOptions, ObjectNode.class).byPage().take(1L).map(FeedResponse::getDocumentUsage).map(kiloByteSize -> kiloByteSize * 1024L).single().block();
                long l = this.estimatedByteSize == null ? 0L : this.estimatedByteSize;
                return l;
            }
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<T> getOutputCoder() {
            return this.spec.getCoder();
        }

        public // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T> createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized IOException {
            return new BoundedCosmosReader(this, (CosmosOptions)options.as(CosmosOptions.class));
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class Read<@UnknownKeyFor T>
    extends PTransform<PBegin, PCollection<T>> {
        abstract @Nullable @UnknownKeyFor @Initialized Class<T> getClassType();

        abstract @Nullable @UnknownKeyFor @Initialized String getDatabase();

        abstract @Nullable @UnknownKeyFor @Initialized String getContainer();

        abstract @Nullable @UnknownKeyFor @Initialized String getQuery();

        abstract @Nullable @UnknownKeyFor @Initialized Coder<T> getCoder();

        abstract @UnknownKeyFor @NonNull @Initialized Builder<T> builder();

        public @UnknownKeyFor @NonNull @Initialized Read<T> withDatabase(@UnknownKeyFor @NonNull @Initialized String database) {
            Preconditions.checkArgument((database != null ? 1 : 0) != 0, (Object)"database can not be null");
            Preconditions.checkArgument((!database.isEmpty() ? 1 : 0) != 0, (Object)"database can not be empty");
            return this.builder().setDatabase(database).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<T> withContainer(@UnknownKeyFor @NonNull @Initialized String container) {
            Preconditions.checkArgument((container != null ? 1 : 0) != 0, (Object)"container can not be null");
            Preconditions.checkArgument((!container.isEmpty() ? 1 : 0) != 0, (Object)"container can not be empty");
            return this.builder().setContainer(container).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<T> withQuery(@UnknownKeyFor @NonNull @Initialized String query) {
            return this.builder().setQuery(query).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<T> withCoder(@UnknownKeyFor @NonNull @Initialized Coder<T> coder) {
            Preconditions.checkArgument((coder != null ? 1 : 0) != 0, (Object)"coder can not be null");
            return this.builder().setCoder(coder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<T> expand(@UnknownKeyFor @NonNull @Initialized PBegin input) {
            Preconditions.checkState((this.getDatabase() != null ? 1 : 0) != 0, (Object)"withDatabase() is required");
            Preconditions.checkState((this.getContainer() != null ? 1 : 0) != 0, (Object)"withContainer() is required");
            Preconditions.checkState((this.getCoder() != null ? 1 : 0) != 0, (Object)"withCoder() is required");
            return (PCollection)input.apply((PTransform)org.apache.beam.sdk.io.Read.from(new BoundedCosmosBDSource(this)));
        }

        @AutoValue.Builder
        static abstract class Builder<@UnknownKeyFor T> {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setClassType(@UnknownKeyFor @NonNull @Initialized Class<T> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setDatabase(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setContainer(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setQuery(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setCoder(@UnknownKeyFor @NonNull @Initialized Coder<T> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Read<T> build();
        }
    }
}

