package com.github.j5ik2o.pekko.persistence.dynamodb.client.v1;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
import com.amazonaws.services.dynamodbv2.model.QueryRequest;
import com.amazonaws.services.dynamodbv2.model.QueryResult;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.github.j5ik2o.pekko.persistence.dynamodb.client.StreamSupport;
import com.github.j5ik2o.pekko.persistence.dynamodb.config.BackoffConfig;
import com.github.j5ik2o.pekko.persistence.dynamodb.context.PluginContext;
import com.github.j5ik2o.pekko.persistence.dynamodb.utils.CompletableFutureUtils$;
import com.github.j5ik2o.pekko.persistence.dynamodb.utils.CompletableFutureUtils$CompletableFutureOps$;
import com.github.j5ik2o.pekko.persistence.dynamodb.utils.DispatcherUtils$;
import com.github.j5ik2o.pekko.persistence.dynamodb.utils.DispatcherUtils$ApplyV1DispatcherOps$;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pekko.NotUsed;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.Graph;
import org.apache.pekko.stream.javadsl.Flow$;
import org.apache.pekko.stream.scaladsl.Concat$;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple2$;
import scala.concurrent.ExecutionContextExecutorService;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: StreamReadClient.scala */
/* loaded from: input_file:com/github/j5ik2o/pekko/persistence/dynamodb/client/v1/StreamReadClient.class */
public final class StreamReadClient implements StreamSupport {
    private final PluginContext pluginContext;
    private final Option asyncClient;
    private final Option syncClient;
    private final BackoffConfig readBackoffConfig;
    private final LoggingAdapter log;

    public StreamReadClient(PluginContext pluginContext, Option<AmazonDynamoDBAsync> option, Option<AmazonDynamoDB> option2, BackoffConfig backoffConfig) {
        this.pluginContext = pluginContext;
        this.asyncClient = option;
        this.syncClient = option2;
        this.readBackoffConfig = backoffConfig;
        this.log = pluginContext.system().log();
    }

    public /* bridge */ /* synthetic */ Flow flowWithBackoffSettings(BackoffConfig backoffConfig, Flow flow) {
        return StreamSupport.flowWithBackoffSettings$(this, backoffConfig, flow);
    }

    public PluginContext pluginContext() {
        return this.pluginContext;
    }

    public Option<AmazonDynamoDBAsync> asyncClient() {
        return this.asyncClient;
    }

    public Option<AmazonDynamoDB> syncClient() {
        return this.syncClient;
    }

    public BackoffConfig readBackoffConfig() {
        return this.readBackoffConfig;
    }

    public Flow<GetItemRequest, GetItemResult, NotUsed> getFlow() {
        Flow withV1Dispatcher$extension;
        Tuple2 apply = Tuple2$.MODULE$.apply(asyncClient(), syncClient());
        if (apply != null) {
            Some some = (Option) apply._1();
            Some some2 = (Option) apply._2();
            if (some instanceof Some) {
                final AmazonDynamoDBAsync amazonDynamoDBAsync = (AmazonDynamoDBAsync) some.value();
                if (None$.MODULE$.equals(some2)) {
                    final ExecutionContextExecutorService newV1Executor = DispatcherUtils$.MODULE$.newV1Executor(pluginContext());
                    withV1Dispatcher$extension = Flow$.MODULE$.create().mapAsync(1, new Function<GetItemRequest, CompletableFuture<GetItemResult>>(amazonDynamoDBAsync, newV1Executor) { // from class: com.github.j5ik2o.pekko.persistence.dynamodb.client.v1.StreamReadClient$$anon$1
                        private final AmazonDynamoDBAsync c$1;
                        private final ExecutionContextExecutorService executor$1;

                        {
                            this.c$1 = amazonDynamoDBAsync;
                            this.executor$1 = newV1Executor;
                        }

                        public CompletableFuture apply(GetItemRequest getItemRequest) {
                            return CompletableFutureUtils$CompletableFutureOps$.MODULE$.toCompletableFuture$extension(CompletableFutureUtils$.MODULE$.CompletableFutureOps(this.c$1.getItemAsync(getItemRequest)), this.executor$1);
                        }
                    }).asScala();
                    Flow flow = withV1Dispatcher$extension;
                    Function1 log$default$2 = flow.log$default$2();
                    return flowWithBackoffSettings(readBackoffConfig(), (Flow) flow.log("getFlow", log$default$2, flow.log$default$3("getFlow", log$default$2)));
                }
            }
            if (None$.MODULE$.equals(some) && (some2 instanceof Some)) {
                AmazonDynamoDB amazonDynamoDB = (AmazonDynamoDB) some2.value();
                withV1Dispatcher$extension = DispatcherUtils$ApplyV1DispatcherOps$.MODULE$.withV1Dispatcher$extension(DispatcherUtils$.MODULE$.ApplyV1DispatcherOps(org.apache.pekko.stream.scaladsl.Flow$.MODULE$.apply().map(getItemRequest -> {
                    return amazonDynamoDB.getItem(getItemRequest);
                })), pluginContext().pluginConfig());
                Flow flow2 = withV1Dispatcher$extension;
                Function1 log$default$22 = flow2.log$default$2();
                return flowWithBackoffSettings(readBackoffConfig(), (Flow) flow2.log("getFlow", log$default$22, flow2.log$default$3("getFlow", log$default$22)));
            }
        }
        throw new IllegalStateException("invalid state");
    }

    public Flow<QueryRequest, QueryResult, NotUsed> queryFlow() {
        Flow withV1Dispatcher$extension;
        Tuple2 apply = Tuple2$.MODULE$.apply(asyncClient(), syncClient());
        if (apply != null) {
            Some some = (Option) apply._1();
            Some some2 = (Option) apply._2();
            if (some instanceof Some) {
                final AmazonDynamoDBAsync amazonDynamoDBAsync = (AmazonDynamoDBAsync) some.value();
                if (None$.MODULE$.equals(some2)) {
                    final ExecutionContextExecutorService newV1Executor = DispatcherUtils$.MODULE$.newV1Executor(pluginContext());
                    withV1Dispatcher$extension = Flow$.MODULE$.create().mapAsync(1, new Function<QueryRequest, CompletableFuture<QueryResult>>(amazonDynamoDBAsync, newV1Executor) { // from class: com.github.j5ik2o.pekko.persistence.dynamodb.client.v1.StreamReadClient$$anon$2
                        private final AmazonDynamoDBAsync c$3;
                        private final ExecutionContextExecutorService executor$2;

                        {
                            this.c$3 = amazonDynamoDBAsync;
                            this.executor$2 = newV1Executor;
                        }

                        public CompletableFuture apply(QueryRequest queryRequest) {
                            return CompletableFutureUtils$CompletableFutureOps$.MODULE$.toCompletableFuture$extension(CompletableFutureUtils$.MODULE$.CompletableFutureOps(this.c$3.queryAsync(queryRequest)), this.executor$2);
                        }
                    }).asScala();
                    Flow flow = withV1Dispatcher$extension;
                    Function1 log$default$2 = flow.log$default$2();
                    return flowWithBackoffSettings(readBackoffConfig(), (Flow) flow.log("queryFlow", log$default$2, flow.log$default$3("queryFlow", log$default$2)));
                }
            }
            if (None$.MODULE$.equals(some) && (some2 instanceof Some)) {
                AmazonDynamoDB amazonDynamoDB = (AmazonDynamoDB) some2.value();
                withV1Dispatcher$extension = DispatcherUtils$ApplyV1DispatcherOps$.MODULE$.withV1Dispatcher$extension(DispatcherUtils$.MODULE$.ApplyV1DispatcherOps(org.apache.pekko.stream.scaladsl.Flow$.MODULE$.apply().map(queryRequest -> {
                    return amazonDynamoDB.query(queryRequest);
                })), pluginContext().pluginConfig());
                Flow flow2 = withV1Dispatcher$extension;
                Function1 log$default$22 = flow2.log$default$2();
                return flowWithBackoffSettings(readBackoffConfig(), (Flow) flow2.log("queryFlow", log$default$22, flow2.log$default$3("queryFlow", log$default$22)));
            }
        }
        throw new IllegalStateException("invalid state");
    }

    public Source<QueryResult, NotUsed> recursiveQuerySource(QueryRequest queryRequest, Option<Object> option) {
        return loop$1(queryRequest, option, None$.MODULE$, loop$default$4$1(), loop$default$5$1(), loop$default$6$1());
    }

    public Flow<ScanRequest, ScanResult, NotUsed> scanFlow() {
        Flow withV1Dispatcher$extension;
        Tuple2 apply = Tuple2$.MODULE$.apply(asyncClient(), syncClient());
        if (apply != null) {
            Some some = (Option) apply._1();
            Some some2 = (Option) apply._2();
            if (some instanceof Some) {
                final AmazonDynamoDBAsync amazonDynamoDBAsync = (AmazonDynamoDBAsync) some.value();
                if (None$.MODULE$.equals(some2)) {
                    final ExecutionContextExecutorService newV1Executor = DispatcherUtils$.MODULE$.newV1Executor(pluginContext());
                    withV1Dispatcher$extension = Flow$.MODULE$.create().mapAsync(1, new Function<ScanRequest, CompletableFuture<ScanResult>>(amazonDynamoDBAsync, newV1Executor) { // from class: com.github.j5ik2o.pekko.persistence.dynamodb.client.v1.StreamReadClient$$anon$3
                        private final AmazonDynamoDBAsync c$5;
                        private final ExecutionContextExecutorService executor$3;

                        {
                            this.c$5 = amazonDynamoDBAsync;
                            this.executor$3 = newV1Executor;
                        }

                        public CompletableFuture apply(ScanRequest scanRequest) {
                            return CompletableFutureUtils$CompletableFutureOps$.MODULE$.toCompletableFuture$extension(CompletableFutureUtils$.MODULE$.CompletableFutureOps(this.c$5.scanAsync(scanRequest)), this.executor$3);
                        }
                    }).asScala();
                    Flow flow = withV1Dispatcher$extension;
                    Function1 log$default$2 = flow.log$default$2();
                    return flowWithBackoffSettings(readBackoffConfig(), (Flow) flow.log("scanFlow", log$default$2, flow.log$default$3("scanFlow", log$default$2)));
                }
            }
            if (None$.MODULE$.equals(some) && (some2 instanceof Some)) {
                AmazonDynamoDB amazonDynamoDB = (AmazonDynamoDB) some2.value();
                withV1Dispatcher$extension = DispatcherUtils$ApplyV1DispatcherOps$.MODULE$.withV1Dispatcher$extension(DispatcherUtils$.MODULE$.ApplyV1DispatcherOps(org.apache.pekko.stream.scaladsl.Flow$.MODULE$.apply().map(scanRequest -> {
                    return amazonDynamoDB.scan(scanRequest);
                })), pluginContext().pluginConfig());
                Flow flow2 = withV1Dispatcher$extension;
                Function1 log$default$22 = flow2.log$default$2();
                return flowWithBackoffSettings(readBackoffConfig(), (Flow) flow2.log("scanFlow", log$default$22, flow2.log$default$3("scanFlow", log$default$22)));
            }
        }
        throw new IllegalStateException("invalid state");
    }

    public Source<ScanResult, NotUsed> recursiveScanSource(ScanRequest scanRequest, Option<Object> option) {
        return loop$2(scanRequest, option, None$.MODULE$, loop$default$4$2(), loop$default$5$2(), loop$default$6$2());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final /* synthetic */ Graph $anonfun$5(int i) {
        return Concat$.MODULE$.apply(i);
    }

    private static final boolean loop$1$$anonfun$1$$anonfun$1() {
        return true;
    }

    private final Source loop$1(QueryRequest queryRequest, Option option, Option option2, Source source, long j, int i) {
        QueryRequest withExclusiveStartKey;
        if (None$.MODULE$.equals(option2)) {
            withExclusiveStartKey = queryRequest;
        } else {
            if (!(option2 instanceof Some)) {
                throw new MatchError(option2);
            }
            withExclusiveStartKey = queryRequest.withExclusiveStartKey((Map) option2.map(map -> {
                return CollectionConverters$.MODULE$.MapHasAsJava(map).asJava();
            }).orNull($less$colon$less$.MODULE$.refl()));
        }
        return Source$.MODULE$.single(withExclusiveStartKey).via(queryFlow()).flatMapConcat(queryResult -> {
            if (queryResult.getSdkHttpMetadata().getHttpStatusCode() != 200) {
                return Source$.MODULE$.failed(new IOException(new StringBuilder(12).append("statusCode: ").append(queryResult.getSdkHttpMetadata().getHttpStatusCode()).toString()));
            }
            Option map2 = Option$.MODULE$.apply(queryResult.getLastEvaluatedKey()).map(map3 -> {
                return CollectionConverters$.MODULE$.MapHasAsScala(map3).asScala().toMap($less$colon$less$.MODULE$.refl());
            });
            Source combine = Source$.MODULE$.combine(source, Source$.MODULE$.single(queryResult), ScalaRunTime$.MODULE$.wrapRefArray(new Source[0]), obj -> {
                return $anonfun$5(BoxesRunTime.unboxToInt(obj));
            });
            if (!map2.nonEmpty() || !BoxesRunTime.unboxToBoolean(option.fold(StreamReadClient::loop$1$$anonfun$1$$anonfun$1, j2 -> {
                return j + ((long) Predef$.MODULE$.Integer2int(queryResult.getCount())) < j2;
            }))) {
                return combine;
            }
            this.log.debug("next loop: count = {}, response.count = {}", BoxesRunTime.boxToLong(j), queryResult.getCount());
            return loop$1(queryRequest, option, map2, combine, j + Predef$.MODULE$.Integer2int(queryResult.getCount()), i + 1);
        });
    }

    private static final Source loop$default$4$1() {
        return Source$.MODULE$.empty();
    }

    private static final long loop$default$5$1() {
        return 0L;
    }

    private static final int loop$default$6$1() {
        return 1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final /* synthetic */ Graph $anonfun$9(int i) {
        return Concat$.MODULE$.apply(i);
    }

    private static final boolean loop$2$$anonfun$1$$anonfun$1() {
        return true;
    }

    private final Source loop$2(ScanRequest scanRequest, Option option, Option option2, Source source, long j, int i) {
        ScanRequest withExclusiveStartKey;
        if (None$.MODULE$.equals(option2)) {
            withExclusiveStartKey = scanRequest;
        } else {
            if (!(option2 instanceof Some)) {
                throw new MatchError(option2);
            }
            withExclusiveStartKey = scanRequest.withExclusiveStartKey((Map) option2.map(map -> {
                return CollectionConverters$.MODULE$.MapHasAsJava(map).asJava();
            }).orNull($less$colon$less$.MODULE$.refl()));
        }
        return Source$.MODULE$.single(withExclusiveStartKey).via(scanFlow()).flatMapConcat(scanResult -> {
            if (scanResult.getSdkHttpMetadata().getHttpStatusCode() != 200) {
                return Source$.MODULE$.failed(new IOException(new StringBuilder(12).append("statusCode: ").append(scanResult.getSdkHttpMetadata().getHttpStatusCode()).toString()));
            }
            Option map2 = Option$.MODULE$.apply(scanResult.getLastEvaluatedKey()).map(map3 -> {
                return CollectionConverters$.MODULE$.MapHasAsScala(map3).asScala().toMap($less$colon$less$.MODULE$.refl());
            });
            Source combine = Source$.MODULE$.combine(source, Source$.MODULE$.single(scanResult), ScalaRunTime$.MODULE$.wrapRefArray(new Source[0]), obj -> {
                return $anonfun$9(BoxesRunTime.unboxToInt(obj));
            });
            if (!map2.nonEmpty() || !BoxesRunTime.unboxToBoolean(option.fold(StreamReadClient::loop$2$$anonfun$1$$anonfun$1, j2 -> {
                return j + ((long) Predef$.MODULE$.Integer2int(scanResult.getCount())) < j2;
            }))) {
                return combine;
            }
            this.log.debug("next loop: count = {}, response.count = {}", BoxesRunTime.boxToLong(j), scanResult.getCount());
            return loop$2(scanRequest, option, map2, combine, j + Predef$.MODULE$.Integer2int(scanResult.getCount()), i + 1);
        });
    }

    private static final Source loop$default$4$2() {
        return Source$.MODULE$.empty();
    }

    private static final long loop$default$5$2() {
        return 0L;
    }

    private static final int loop$default$6$2() {
        return 1;
    }
}
