package com.github.j5ik2o.akka.persistence.dynamodb.query.dao;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.Attributes;
import akka.stream.scaladsl.Source;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.Select;
import com.github.j5ik2o.akka.persistence.dynamodb.client.v1.StreamReadClient;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.JournalRow;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.config.JournalColumnsDefConfig;
import com.github.j5ik2o.akka.persistence.dynamodb.metrics.MetricsReporter;
import com.github.j5ik2o.akka.persistence.dynamodb.model.PersistenceId;
import com.github.j5ik2o.akka.persistence.dynamodb.model.PersistenceId$;
import com.github.j5ik2o.akka.persistence.dynamodb.model.SequenceNumber;
import com.github.j5ik2o.akka.persistence.dynamodb.query.config.QueryPluginConfig;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.concurrent.ExecutionContext;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering$;
import scala.math.Ordering$Long$;
import scala.math.Ordering$String$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: V1QueryProcessor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005me\u0001\u0002\u000b\u0016\u0001\u0019B\u0001\"\r\u0001\u0003\u0002\u0003\u0006IA\r\u0005\ts\u0001\u0011)\u0019!C\u0001u!A\u0001\n\u0001B\u0001B\u0003%1\b\u0003\u0005J\u0001\t\u0015\r\u0011\"\u0001K\u0011!y\u0005A!A!\u0002\u0013Y\u0005\u0002\u0003)\u0001\u0005\u000b\u0007I\u0011A)\t\u0011a\u0003!\u0011!Q\u0001\nIC\u0001\"\u0017\u0001\u0003\u0006\u0004%\tA\u0017\u0005\tE\u0002\u0011\t\u0011)A\u00057\"A1\r\u0001B\u0001B\u0003-A\rC\u0003k\u0001\u0011\u00051\u000eC\u0004u\u0001\t\u0007I\u0011B;\t\ru\u0004\u0001\u0015!\u0003w\u0011\u001dq\bA1A\u0005\n}D\u0001\"!\u0005\u0001A\u0003%\u0011\u0011\u0001\u0005\b\u0003'\u0001A\u0011IA\u000b\u0011\u001d\t)\u0005\u0001C!\u0003\u000fBq!a\u001e\u0001\t\u0003\nI\bC\u0004\u0002\u0004\u0002!\t\"!\"\u0003!Y\u000b\u0014+^3ssB\u0013xnY3tg>\u0014(B\u0001\f\u0018\u0003\r!\u0017m\u001c\u0006\u00031e\tQ!];fefT!AG\u000e\u0002\u0011\u0011Lh.Y7pI\nT!\u0001H\u000f\u0002\u0017A,'o]5ti\u0016t7-\u001a\u0006\u0003=}\tA!Y6lC*\u0011\u0001%I\u0001\u0007UVJ7NM8\u000b\u0005\t\u001a\u0013AB4ji\",(MC\u0001%\u0003\r\u0019w.\\\u0002\u0001'\r\u0001q%\f\t\u0003Q-j\u0011!\u000b\u0006\u0002U\u0005)1oY1mC&\u0011A&\u000b\u0002\u0007\u0003:L(+\u001a4\u0011\u00059zS\"A\u000b\n\u0005A*\"AD)vKJL\bK]8dKN\u001cxN]\u0001\u0007gf\u001cH/Z7\u0011\u0005M:T\"\u0001\u001b\u000b\u0005U2\u0014!B1di>\u0014(\"\u0001\u0010\n\u0005a\"$aC!di>\u00148+_:uK6\f1\"Y:z]\u000e\u001cE.[3oiV\t1\bE\u0002)yyJ!!P\u0015\u0003\r=\u0003H/[8o!\tyd)D\u0001A\u0015\t\t%)\u0001\u0006es:\fWn\u001c3cmJR!a\u0011#\u0002\u0011M,'O^5dKNT!!R\u0012\u0002\u0013\u0005l\u0017M_8oC^\u001c\u0018BA$A\u0005M\tU.\u0019>p]\u0012Kh.Y7p\t\n\u000b5/\u001f8d\u00031\t7/\u001f8d\u00072LWM\u001c;!\u0003)\u0019\u0018P\\2DY&,g\u000e^\u000b\u0002\u0017B\u0019\u0001\u0006\u0010'\u0011\u0005}j\u0015B\u0001(A\u00059\tU.\u0019>p]\u0012Kh.Y7p\t\n\u000b1b]=oG\u000ec\u0017.\u001a8uA\u0005a\u0001\u000f\\;hS:\u001cuN\u001c4jOV\t!\u000b\u0005\u0002T-6\tAK\u0003\u0002V/\u000511m\u001c8gS\u001eL!a\u0016+\u0003#E+XM]=QYV<\u0017N\\\"p]\u001aLw-A\u0007qYV<\u0017N\\\"p]\u001aLw\rI\u0001\u0010[\u0016$(/[2t%\u0016\u0004xN\u001d;feV\t1\fE\u0002)yq\u0003\"!\u00181\u000e\u0003yS!aX\r\u0002\u000f5,GO]5dg&\u0011\u0011M\u0018\u0002\u0010\u001b\u0016$(/[2t%\u0016\u0004xN\u001d;fe\u0006\u0001R.\u001a;sS\u000e\u001c(+\u001a9peR,'\u000fI\u0001\u0003K\u000e\u0004\"!\u001a5\u000e\u0003\u0019T!aZ\u0015\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002jM\n\u0001R\t_3dkRLwN\\\"p]R,\u0007\u0010^\u0001\u0007y%t\u0017\u000e\u001e \u0015\r1|\u0007/\u001d:t)\tig\u000e\u0005\u0002/\u0001!)1m\u0003a\u0002I\")\u0011g\u0003a\u0001e!)\u0011h\u0003a\u0001w!)\u0011j\u0003a\u0001\u0017\")\u0001k\u0003a\u0001%\")\u0011l\u0003a\u00017\u0006\u00012m\u001c7v[:\u001cH)\u001a4D_:4\u0017nZ\u000b\u0002mB\u0011qo_\u0007\u0002q*\u0011Q+\u001f\u0006\u0003uf\tqA[8ve:\fG.\u0003\u0002}q\n9\"j\\;s]\u0006d7i\u001c7v[:\u001cH)\u001a4D_:4\u0017nZ\u0001\u0012G>dW/\u001c8t\t\u001647i\u001c8gS\u001e\u0004\u0013\u0001D:ue\u0016\fWn\u00117jK:$XCAA\u0001!\u0011\t\u0019!!\u0004\u000e\u0005\u0005\u0015!\u0002BA\u0004\u0003\u0013\t!A^\u0019\u000b\u0007\u0005-\u0011$\u0001\u0004dY&,g\u000e^\u0005\u0005\u0003\u001f\t)A\u0001\tTiJ,\u0017-\u001c*fC\u0012\u001cE.[3oi\u0006i1\u000f\u001e:fC6\u001cE.[3oi\u0002\n\u0011#\u00197m!\u0016\u00148/[:uK:\u001cW-\u00133t)\u0011\t9\"a\u000f\u0011\u0011\u0005e\u00111EA\u0014\u0003gi!!a\u0007\u000b\t\u0005u\u0011qD\u0001\tg\u000e\fG.\u00193tY*\u0019\u0011\u0011\u0005\u001c\u0002\rM$(/Z1n\u0013\u0011\t)#a\u0007\u0003\rM{WO]2f!\u0011\tI#a\f\u000e\u0005\u0005-\"bAA\u00173\u0005)Qn\u001c3fY&!\u0011\u0011GA\u0016\u00055\u0001VM]:jgR,gnY3JIB!\u0011QGA\u001c\u001b\u00051\u0014bAA\u001dm\t9aj\u001c;Vg\u0016$\u0007bBA\u001f!\u0001\u0007\u0011qH\u0001\u0004[\u0006D\bc\u0001\u0015\u0002B%\u0019\u00111I\u0015\u0003\t1{gnZ\u0001\u0018KZ,g\u000e^:CsR\u000bw-Q:K_V\u0014h.\u00197S_^$\"\"!\u0013\u0002T\u00055\u0014\u0011OA;!!\tI\"a\t\u0002L\u0005M\u0002\u0003BA'\u0003\u001fj\u0011!_\u0005\u0004\u0003#J(A\u0003&pkJt\u0017\r\u001c*po\"9\u0011QK\tA\u0002\u0005]\u0013a\u0001;bOB!\u0011\u0011LA4\u001d\u0011\tY&a\u0019\u0011\u0007\u0005u\u0013&\u0004\u0002\u0002`)\u0019\u0011\u0011M\u0013\u0002\rq\u0012xn\u001c;?\u0013\r\t)'K\u0001\u0007!J,G-\u001a4\n\t\u0005%\u00141\u000e\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005\u0015\u0014\u0006C\u0004\u0002pE\u0001\r!a\u0010\u0002\r=4gm]3u\u0011\u001d\t\u0019(\u0005a\u0001\u0003\u007f\t\u0011\"\\1y\u001f\u001a47/\u001a;\t\u000f\u0005u\u0012\u00031\u0001\u0002@\u0005y!n\\;s]\u0006d7+Z9vK:\u001cW\r\u0006\u0004\u0002|\u0005u\u0014q\u0010\t\t\u00033\t\u0019#a\u0010\u00024!9\u0011q\u000e\nA\u0002\u0005}\u0002bBAA%\u0001\u0007\u0011qH\u0001\u0006Y&l\u0017\u000e^\u0001\u0014G>tg/\u001a:u)>Tu.\u001e:oC2\u0014vn\u001e\u000b\u0005\u0003\u0017\n9\tC\u0004\u0002\nN\u0001\r!a#\u0002\u00075\f\u0007\u000f\u0005\u0005\u0002Z\u00055\u0015qKAI\u0013\u0011\ty)a\u001b\u0003\u00075\u000b\u0007\u000f\u0005\u0003\u0002\u0014\u0006]UBAAK\u0015\r\ti\u0003Q\u0005\u0005\u00033\u000b)J\u0001\bBiR\u0014\u0018NY;uKZ\u000bG.^3")
/* loaded from: input_file:com/github/j5ik2o/akka/persistence/dynamodb/query/dao/V1QueryProcessor.class */
public class V1QueryProcessor implements QueryProcessor {
    private final Option<AmazonDynamoDBAsync> asyncClient;
    private final Option<AmazonDynamoDB> syncClient;
    private final QueryPluginConfig pluginConfig;
    private final Option<MetricsReporter> metricsReporter;
    private final JournalColumnsDefConfig columnsDefConfig;
    private final StreamReadClient streamClient;
    private final Source<Object, NotUsed> startTimeSource;
    private final Attributes logLevels;

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.QueryProcessor
    public Source<Object, NotUsed> startTimeSource() {
        return this.startTimeSource;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.QueryProcessor
    public Attributes logLevels() {
        return this.logLevels;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.QueryProcessor
    public void com$github$j5ik2o$akka$persistence$dynamodb$query$dao$QueryProcessor$_setter_$startTimeSource_$eq(Source<Object, NotUsed> source) {
        this.startTimeSource = source;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.QueryProcessor
    public void com$github$j5ik2o$akka$persistence$dynamodb$query$dao$QueryProcessor$_setter_$logLevels_$eq(Attributes attributes) {
        this.logLevels = attributes;
    }

    public Option<AmazonDynamoDBAsync> asyncClient() {
        return this.asyncClient;
    }

    public Option<AmazonDynamoDB> syncClient() {
        return this.syncClient;
    }

    public QueryPluginConfig pluginConfig() {
        return this.pluginConfig;
    }

    public Option<MetricsReporter> metricsReporter() {
        return this.metricsReporter;
    }

    private JournalColumnsDefConfig columnsDefConfig() {
        return this.columnsDefConfig;
    }

    private StreamReadClient streamClient() {
        return this.streamClient;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.QueryProcessor
    public Source<PersistenceId, NotUsed> allPersistenceIds(long j) {
        return streamClient().recursiveScanSource(new ScanRequest().withTableName(pluginConfig().tableName()).withSelect(Select.SPECIFIC_ATTRIBUTES).withAttributesToGet(new String[]{columnsDefConfig().deletedColumnName(), columnsDefConfig().persistenceIdColumnName()}).withLimit(Predef$.MODULE$.int2Integer(pluginConfig().scanBatchSize())).withConsistentRead(Predef$.MODULE$.boolean2Boolean(pluginConfig().consistentRead())), new Some(BoxesRunTime.boxToLong(j))).mapConcat(scanResult -> {
            return (Vector) Option$.MODULE$.apply(scanResult.getItems()).map(list -> {
                return ((TraversableOnce) CollectionConverters$.MODULE$.asScalaBufferConverter(list).asScala()).toVector();
            }).map(vector -> {
                return (Vector) vector.map(map -> {
                    return ((TraversableOnce) CollectionConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).toMap(Predef$.MODULE$.$conforms());
                }, Vector$.MODULE$.canBuildFrom());
            }).getOrElse(() -> {
                return package$.MODULE$.Vector().empty();
            });
        }).filterNot(map -> {
            return BoxesRunTime.boxToBoolean($anonfun$allPersistenceIds$6(this, map));
        }).map(map2 -> {
            return ((AttributeValue) map2.apply(this.columnsDefConfig().persistenceIdColumnName())).getS();
        }).fold(Predef$.MODULE$.Set().empty(), (set, str) -> {
            return set.$plus(str);
        }).mapConcat(set2 -> {
            return set2.toVector();
        }).map(str2 -> {
            return PersistenceId$.MODULE$.apply(str2);
        }).take(j).withAttributes(logLevels());
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.QueryProcessor
    public Source<JournalRow, NotUsed> eventsByTagAsJournalRow(String str, long j, long j2, long j3) {
        return streamClient().recursiveScanSource(new ScanRequest().withTableName(pluginConfig().tableName()).withIndexName(pluginConfig().tagsIndexName()).withFilterExpression("contains(#tags, :tag)").withExpressionAttributeNames((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("#tags"), columnsDefConfig().tagsColumnName())}))).asJava()).withExpressionAttributeValues((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(":tag"), new AttributeValue().withS(str))}))).asJava()).withLimit(Predef$.MODULE$.int2Integer(pluginConfig().scanBatchSize())), new Some(BoxesRunTime.boxToLong(j3))).mapConcat(scanResult -> {
            return (Vector) Option$.MODULE$.apply(scanResult.getItems()).map(list -> {
                return ((TraversableOnce) CollectionConverters$.MODULE$.asScalaBufferConverter(list).asScala()).toVector();
            }).map(vector -> {
                return (Vector) vector.map(map -> {
                    return ((TraversableOnce) CollectionConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).toMap(Predef$.MODULE$.$conforms());
                }, Vector$.MODULE$.canBuildFrom());
            }).getOrElse(() -> {
                return package$.MODULE$.Vector().empty();
            });
        }).map(map -> {
            return this.convertToJournalRow(map);
        }).fold(ArrayBuffer$.MODULE$.empty(), (arrayBuffer, journalRow) -> {
            return arrayBuffer.$plus$eq(journalRow);
        }).map(arrayBuffer2 -> {
            return (ArrayBuffer) arrayBuffer2.sortBy(journalRow2 -> {
                return new Tuple2(journalRow2.persistenceId().asString(), BoxesRunTime.boxToLong(journalRow2.sequenceNumber().value()));
            }, Ordering$.MODULE$.Tuple2(Ordering$String$.MODULE$, Ordering$Long$.MODULE$));
        }).mapConcat(arrayBuffer3 -> {
            return arrayBuffer3.toVector();
        }).statefulMapConcat(() -> {
            AtomicLong atomicLong = new AtomicLong();
            return journalRow2 -> {
                return new $colon.colon(journalRow2.withOrdering(atomicLong.incrementAndGet()), Nil$.MODULE$);
            };
        }).filter(journalRow2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$eventsByTagAsJournalRow$13(j, j2, journalRow2));
        }).take(j3).withAttributes(logLevels());
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.QueryProcessor
    public Source<Object, NotUsed> journalSequence(long j, long j2) {
        return streamClient().recursiveScanSource(new ScanRequest().withTableName(pluginConfig().tableName()).withSelect(Select.SPECIFIC_ATTRIBUTES).withAttributesToGet(new String[]{columnsDefConfig().orderingColumnName()}).withLimit(Predef$.MODULE$.int2Integer(pluginConfig().scanBatchSize())).withConsistentRead(Predef$.MODULE$.boolean2Boolean(pluginConfig().consistentRead())), None$.MODULE$).mapConcat(scanResult -> {
            return (Vector) Option$.MODULE$.apply(scanResult.getItems()).map(list -> {
                return ((TraversableOnce) CollectionConverters$.MODULE$.asScalaBufferConverter(list).asScala()).toVector();
            }).map(vector -> {
                return (Vector) vector.map(map -> {
                    return ((TraversableOnce) CollectionConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).toMap(Predef$.MODULE$.$conforms());
                }, Vector$.MODULE$.canBuildFrom());
            }).getOrElse(() -> {
                return package$.MODULE$.Vector().empty();
            });
        }).map(map -> {
            return BoxesRunTime.boxToLong($anonfun$journalSequence$6(this, map));
        }).drop(j).take(j2);
    }

    public JournalRow convertToJournalRow(scala.collection.immutable.Map<String, AttributeValue> map) {
        return new JournalRow(PersistenceId$.MODULE$.apply(((AttributeValue) map.apply(columnsDefConfig().persistenceIdColumnName())).getS()), new SequenceNumber(new StringOps(Predef$.MODULE$.augmentString(((AttributeValue) map.apply(columnsDefConfig().sequenceNrColumnName())).getN())).toLong()), Predef$.MODULE$.Boolean2boolean(((AttributeValue) map.apply(columnsDefConfig().deletedColumnName())).getBOOL()), (byte[]) map.get(columnsDefConfig().messageColumnName()).map(attributeValue -> {
            return attributeValue.getB().array();
        }).get(), new StringOps(Predef$.MODULE$.augmentString(((AttributeValue) map.apply(columnsDefConfig().orderingColumnName())).getN())).toLong(), map.get(columnsDefConfig().tagsColumnName()).map(attributeValue2 -> {
            return attributeValue2.getS();
        }));
    }

    public static final /* synthetic */ boolean $anonfun$allPersistenceIds$6(V1QueryProcessor v1QueryProcessor, scala.collection.immutable.Map map) {
        return Predef$.MODULE$.Boolean2boolean(((AttributeValue) map.apply(v1QueryProcessor.columnsDefConfig().deletedColumnName())).getBOOL());
    }

    public static final /* synthetic */ boolean $anonfun$eventsByTagAsJournalRow$13(long j, long j2, JournalRow journalRow) {
        return journalRow.ordering() > j && journalRow.ordering() <= j2;
    }

    public static final /* synthetic */ long $anonfun$journalSequence$6(V1QueryProcessor v1QueryProcessor, scala.collection.immutable.Map map) {
        return new StringOps(Predef$.MODULE$.augmentString(((AttributeValue) map.apply(v1QueryProcessor.columnsDefConfig().orderingColumnName())).getN())).toLong();
    }

    public V1QueryProcessor(ActorSystem actorSystem, Option<AmazonDynamoDBAsync> option, Option<AmazonDynamoDB> option2, QueryPluginConfig queryPluginConfig, Option<MetricsReporter> option3, ExecutionContext executionContext) {
        this.asyncClient = option;
        this.syncClient = option2;
        this.pluginConfig = queryPluginConfig;
        this.metricsReporter = option3;
        QueryProcessor.$init$(this);
        this.columnsDefConfig = queryPluginConfig.columnsDefConfig();
        this.streamClient = new StreamReadClient(actorSystem, option, option2, queryPluginConfig, queryPluginConfig.readBackoffConfig());
    }
}
