package com.github.j5ik2o.akka.persistence.dynamodb.journal.dao;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.actor.Scheduler;
import akka.persistence.PersistentRepr;
import akka.stream.Attributes;
import akka.stream.Attributes$;
import akka.stream.Attributes$LogLevels$;
import akka.stream.KillSwitches$;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.QueueOfferResult;
import akka.stream.QueueOfferResult$Dropped$;
import akka.stream.QueueOfferResult$Enqueued$;
import akka.stream.QueueOfferResult$QueueClosed$;
import akka.stream.SystemMaterializer$;
import akka.stream.UniqueKillSwitch;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.JournalPluginContext;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.JournalRow;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.serialization.FlowPersistentReprSerializer;
import com.github.j5ik2o.akka.persistence.dynamodb.metrics.MetricsReporter;
import com.github.j5ik2o.akka.persistence.dynamodb.model.PersistenceId;
import com.github.j5ik2o.akka.persistence.dynamodb.model.SequenceNumber;
import com.github.j5ik2o.akka.persistence.dynamodb.utils.LoggingSupport;
import org.slf4j.Logger;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.FiniteDuration;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.Statics;
import scala.util.Try;

/* compiled from: WriteJournalDaoImpl.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u001dd\u0001\u0002\u0017.\u0005yB\u0001B\u0015\u0001\u0003\u0002\u0003\u0006Ia\u0015\u0005\t/\u0002\u0011)\u0019!C\t1\"AA\f\u0001B\u0001B\u0003%\u0011\f\u0003\u0005^\u0001\t\u0015\r\u0011\"\u0001_\u0011!A\u0007A!A!\u0002\u0013y\u0006\u0002C5\u0001\u0005\u000b\u0007I1\u00016\t\u0011E\u0004!\u0011!Q\u0001\n-D\u0001B\u001d\u0001\u0003\u0002\u0003\u0006Ya\u001d\u0005\u0006u\u0002!\ta\u001f\u0005\b\u0003\u000f\u0001A\u0011KA\u0005\u0011%\ti\u0002\u0001b\u0001\n\u0007\ty\u0002\u0003\u0005\u0002.\u0001\u0001\u000b\u0011BA\u0011\u0011%\ty\u0003\u0001b\u0001\n\u0013\t\t\u0004\u0003\u0005\u0002:\u0001\u0001\u000b\u0011BA\u001a\u0011%\tY\u0004\u0001b\u0001\n\u0013\t\t\u0004\u0003\u0005\u0002>\u0001\u0001\u000b\u0011BA\u001a\u0011%\ty\u0004\u0001b\u0001\n\u0013\t\t\u0004\u0003\u0005\u0002B\u0001\u0001\u000b\u0011BA\u001a\u0011%\t\u0019\u0005\u0001b\u0001\n\u0013\t)\u0005\u0003\u0005\u0002N\u0001\u0001\u000b\u0011BA$\u0011%\ty\u0005\u0001b\u0001\n\u0013\t\t\u0006\u0003\u0005\u0002Z\u0001\u0001\u000b\u0011BA*\u0011\u001d\tY\u0006\u0001C\u0005\u0003;Bq!!'\u0001\t\u0013\tY\nC\u0005\u00028\u0002\u0011\r\u0011\"\u0003\u0002:\"A\u0011Q\u0018\u0001!\u0002\u0013\tY\fC\u0004\u0002@\u0002!I!!1\t\u000f\u0005M\u0007\u0001\"\u0003\u0002V\"9\u0011\u0011\u001c\u0001\u0005\n\u0005m\u0007bBAu\u0001\u0011%\u00111\u001e\u0005\n\u0003g\u0004!\u0019!C\u0005\u0003kD\u0001\"!?\u0001A\u0003%\u0011q\u001f\u0005\b\u0003w\u0004A\u0011IA\u007f\u0011\u001d\u0011)\u0001\u0001C\u0005\u0005\u000fAqAa\u0003\u0001\t\u0003\u0012i\u0001C\u0004\u0003 \u0001!\tE!\t\t\u000f\tE\u0002\u0001\"\u0011\u00034!9!\u0011\b\u0001\u0005B\tm\u0002b\u0002B$\u0001\u0011%!\u0011\n\u0005\b\u0005#\u0002A\u0011\u0002B%\u0011\u001d\u0011\u0019\u0006\u0001C\u0005\u0005+BqAa\u0018\u0001\t\u0013\u0011\t\u0007C\u0004\u0003f\u0001!IA!\u0019\u0003']\u0013\u0018\u000e^3K_V\u0014h.\u00197EC>LU\u000e\u001d7\u000b\u00059z\u0013a\u00013b_*\u0011\u0001'M\u0001\bU>,(O\\1m\u0015\t\u00114'\u0001\u0005es:\fWn\u001c3c\u0015\t!T'A\u0006qKJ\u001c\u0018n\u001d;f]\u000e,'B\u0001\u001c8\u0003\u0011\t7n[1\u000b\u0005aJ\u0014A\u000266S.\u0014tN\u0003\u0002;w\u00051q-\u001b;ik\nT\u0011\u0001P\u0001\u0004G>l7\u0001A\n\u0006\u0001}*\u0015\n\u0014\t\u0003\u0001\u000ek\u0011!\u0011\u0006\u0002\u0005\u0006)1oY1mC&\u0011A)\u0011\u0002\u0007\u0003:L(+\u001a4\u0011\u0005\u0019;U\"A\u0017\n\u0005!k#!\u0006&pkJt\u0017\r\u001c#b_^KG\u000f[+qI\u0006$Xm\u001d\t\u0003\r*K!aS\u0017\u0003\u0015\u0011\u000bwnU;qa>\u0014H\u000f\u0005\u0002N!6\taJ\u0003\u0002Pc\u0005)Q\u000f^5mg&\u0011\u0011K\u0014\u0002\u000f\u0019><w-\u001b8h'V\u0004\bo\u001c:u\u00035\u0001H.^4j]\u000e{g\u000e^3yiB\u0011A+V\u0007\u0002_%\u0011ak\f\u0002\u0015\u0015>,(O\\1m!2,x-\u001b8D_:$X\r\u001f;\u0002!)|WO\u001d8bYJ{w\u000f\u0012:jm\u0016\u0014X#A-\u0011\u0005\u0019S\u0016BA..\u0005UQu.\u001e:oC2\u0014vn^,sSR,GI]5wKJ\f\u0011C[8ve:\fGNU8x\tJLg/\u001a:!\u0003)\u0019XM]5bY&TXM]\u000b\u0002?B\u0019\u0001mY3\u000e\u0003\u0005T!AY\u0018\u0002\u001bM,'/[1mSj\fG/[8o\u0013\t!\u0017M\u0001\u000fGY><\b+\u001a:tSN$XM\u001c;SKB\u00148+\u001a:jC2L'0\u001a:\u0011\u0005Q3\u0017BA40\u0005)Qu.\u001e:oC2\u0014vn^\u0001\fg\u0016\u0014\u0018.\u00197ju\u0016\u0014\b%\u0001\u0002fGV\t1\u000e\u0005\u0002m_6\tQN\u0003\u0002o\u0003\u0006Q1m\u001c8dkJ\u0014XM\u001c;\n\u0005Al'\u0001E#yK\u000e,H/[8o\u0007>tG/\u001a=u\u0003\r)7\rI\u0001\u0007gf\u001cH/Z7\u0011\u0005QDX\"A;\u000b\u0005Y<\u0018!B1di>\u0014(\"\u0001\u001c\n\u0005e,(aC!di>\u00148+_:uK6\fa\u0001P5oSRtDc\u0002?\u0002\u0002\u0005\r\u0011Q\u0001\u000b\u0004{z|\bC\u0001$\u0001\u0011\u0015I\u0017\u0002q\u0001l\u0011\u0015\u0011\u0018\u0002q\u0001t\u0011\u0015\u0011\u0016\u00021\u0001T\u0011\u00159\u0016\u00021\u0001Z\u0011\u0015i\u0016\u00021\u0001`\u0003=iW\r\u001e:jGN\u0014V\r]8si\u0016\u0014XCAA\u0006!\u0015\u0001\u0015QBA\t\u0013\r\ty!\u0011\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\u0005M\u0011\u0011D\u0007\u0003\u0003+Q1!a\u00062\u0003\u001diW\r\u001e:jGNLA!a\u0007\u0002\u0016\tyQ*\u001a;sS\u000e\u001c(+\u001a9peR,'/A\u0002nCR,\"!!\t\u0011\t\u0005\r\u0012\u0011F\u0007\u0003\u0003KQ1!a\nx\u0003\u0019\u0019HO]3b[&!\u00111FA\u0013\u00051i\u0015\r^3sS\u0006d\u0017N_3s\u0003\u0011i\u0017\r\u001e\u0011\u0002\u001fE,X-^3Ck\u001a4WM]*ju\u0016,\"!a\r\u0011\u0007\u0001\u000b)$C\u0002\u00028\u0005\u00131!\u00138u\u0003A\tX/Z;f\u0005V4g-\u001a:TSj,\u0007%\u0001\trk\u0016,X\rU1sC2dW\r\\5t[\u0006\t\u0012/^3vKB\u000b'/\u00197mK2L7/\u001c\u0011\u0002!]\u0014\u0018\u000e^3QCJ\fG\u000e\\3mSNl\u0017!E<sSR,\u0007+\u0019:bY2,G.[:nA\u0005IAn\\4MKZ,Gn]\u000b\u0003\u0003\u000f\u0002B!a\t\u0002J%!\u00111JA\u0013\u0005)\tE\u000f\u001e:jEV$Xm]\u0001\u000bY><G*\u001a<fYN\u0004\u0013!F9vKV,wJ^3sM2|wo\u0015;sCR,w-_\u000b\u0003\u0003'\u0002B!a\t\u0002V%!\u0011qKA\u0013\u0005Aye/\u001a:gY><8\u000b\u001e:bi\u0016<\u00170\u0001\frk\u0016,Xm\u0014<fe\u001adwn^*ue\u0006$XmZ=!\u0003EIg\u000e^3s]\u0006d\u0007+\u001e;TiJ,\u0017-\u001c\u000b\u0007\u0003?\ni'! \u0011\u000b1\f\t'!\u001a\n\u0007\u0005\rTN\u0001\u0004GkR,(/\u001a\t\u0005\u0003O\nI'D\u0001x\u0013\r\tYg\u001e\u0002\u0005\t>tW\rC\u0004\u0002p]\u0001\r!!\u001d\u0002\u000fA\u0014x.\\5tKB)A.a\u001d\u0002x%\u0019\u0011QO7\u0003\u000fA\u0013x.\\5tKB\u0019\u0001)!\u001f\n\u0007\u0005m\u0014I\u0001\u0003M_:<\u0007bBA@/\u0001\u0007\u0011\u0011Q\u0001\u0005e><8\u000fE\u0003\u0002\u0004\u0006MUM\u0004\u0003\u0002\u0006\u0006=e\u0002BAD\u0003\u001bk!!!#\u000b\u0007\u0005-U(\u0001\u0004=e>|GOP\u0005\u0002\u0005&\u0019\u0011\u0011S!\u0002\u000fA\f7m[1hK&!\u0011QSAL\u0005\r\u0019V-\u001d\u0006\u0004\u0003#\u000b\u0015\u0001\u00039viF+X-^3\u0016\u0005\u0005u\u0005c\u0002!\u0002 \u0006\r\u0016\u0011W\u0005\u0004\u0003C\u000b%A\u0002+va2,'\u0007\u0005\u0004\u0002&\u0006-\u0016qV\u0007\u0003\u0003OSA!!+\u0002&\u0005A1oY1mC\u0012\u001cH.\u0003\u0003\u0002.\u0006\u001d&aF*pkJ\u001cW-U;fk\u0016<\u0016\u000e\u001e5D_6\u0004H.\u001a;f!\u001d\u0001\u0015qTA9\u0003\u0003\u0003B!a\t\u00024&!\u0011QWA\u0013\u0005A)f.[9vK.KG\u000e\\*xSR\u001c\u0007.A\u0005qkR\fV/Z;fgV\u0011\u00111\u0018\t\u0007\u0003\u0007\u000b\u0019*!(\u0002\u0015A,H/U;fk\u0016\u001c\b%A\u0006rk\u0016,X-\u00133Ge>lG\u0003BA\u001a\u0003\u0007Dq!!2\u001c\u0001\u0004\t9-A\u0007qKJ\u001c\u0018n\u001d;f]\u000e,\u0017\n\u001a\t\u0005\u0003\u0013\fy-\u0004\u0002\u0002L*\u0019\u0011QZ\u0019\u0002\u000b5|G-\u001a7\n\t\u0005E\u00171\u001a\u0002\u000e!\u0016\u00148/[:uK:\u001cW-\u00133\u0002\u001dM,G.Z2u!V$\u0018+^3vKR!\u00111UAl\u0011\u001d\t)\r\ba\u0001\u0003\u000f\fA#\u001b8uKJt\u0017\r\u001c#fY\u0016$Xm\u0015;sK\u0006lGCBA0\u0003;\fy\u000eC\u0004\u0002pu\u0001\r!!\u001d\t\u000f\u0005}T\u00041\u0001\u0002bB1\u00111QAJ\u0003G\u00042ARAs\u0013\r\t9/\f\u0002\u0017!\u0016\u00148/[:uK:\u001cW-\u00133XSRD7+Z9Oe\u0006YA-\u001a7fi\u0016\fV/Z;f+\t\ti\u000fE\u0004A\u0003?\u000by/!-\u0011\r\u0005\u0015\u00161VAy!\u001d\u0001\u0015qTA9\u0003C\fA\u0002Z3mKR,\u0017+^3vKN,\"!a>\u0011\r\u0005\r\u00151SAw\u00035!W\r\\3uKF+X-^3tA\u00059A-[:q_N,GCAA��!\r\u0001%\u0011A\u0005\u0004\u0005\u0007\t%\u0001B+oSR\f\u0011c]3mK\u000e$H)\u001a7fi\u0016\fV/Z;f)\u0011\tyO!\u0003\t\u000f\u0005\u0015'\u00051\u0001\u0002H\u0006iQ\u000f\u001d3bi\u0016lUm]:bO\u0016$BAa\u0004\u0003\u001cAA\u0011Q\u0015B\t\u0003\u007f\u0014)\"\u0003\u0003\u0003\u0014\u0005\u001d&AB*pkJ\u001cW\r\u0005\u0003\u0002h\t]\u0011b\u0001B\ro\n9aj\u001c;Vg\u0016$\u0007B\u0002B\u000fG\u0001\u0007Q-\u0001\u0006k_V\u0014h.\u00197S_^\fa\u0002Z3mKR,W*Z:tC\u001e,7\u000f\u0006\u0004\u0003$\t\u0015\"q\u0005\t\t\u0003K\u0013\t\"a\u001e\u0003\u0016!9\u0011Q\u0019\u0013A\u0002\u0005\u001d\u0007b\u0002B\u0015I\u0001\u0007!1F\u0001\ri>\u001cV-];f]\u000e,gJ\u001d\t\u0005\u0003\u0013\u0014i#\u0003\u0003\u00030\u0005-'AD*fcV,gnY3Ok6\u0014WM]\u0001\faV$X*Z:tC\u001e,7\u000f\u0006\u0003\u0003$\tU\u0002b\u0002B\u001cK\u0001\u0007\u0011\u0011Q\u0001\t[\u0016\u001c8/Y4fg\u0006\t\u0002.[4iKN$8+Z9vK:\u001cWM\u0014:\u0015\r\tu\"\u0011\tB\"!!\t)K!\u0005\u0003@\tU\u0001#\u0002!\u0002\u000e\u0005]\u0004bBAcM\u0001\u0007\u0011q\u0019\u0005\b\u0005\u000b2\u0003\u0019\u0001B\u0016\u000391'o\\7TKF,XM\\2f\u001dJ\f\u0001E]3rk\u0016\u001cH\u000fU;u\u0015>,(O\\1m%><8\u000fU1tgRC'o\\;hQV\u0011!1\n\t\u000b\u0003K\u0013i%!!\u0002x\tU\u0011\u0002\u0002B(\u0003O\u0013AA\u00127po\u0006)\"/Z9vKN$\b+\u001e;K_V\u0014h.\u00197S_^\u001c\u0018\u0001\u00033fY\u0016$XMQ=\u0015\r\t\r\"q\u000bB-\u0011\u001d\t)-\u000ba\u0001\u0003\u000fDqAa\u0017*\u0001\u0004\u0011i&A\u0006tKF,XM\\2f\u001dJ\u001c\bCBAB\u0003'\u0013Y#A\u0012sKF,Xm\u001d;EK2,G/\u001a&pkJt\u0017\r\u001c*poN\u0004\u0016m]:UQJ|Wo\u001a5\u0016\u0005\t\r\u0004CCAS\u0005\u001b\n\t/a\u001e\u0003\u0016\u0005A\"/Z9vKN$H)\u001a7fi\u0016Tu.\u001e:oC2\u0014vn^:")
/* loaded from: input_file:com/github/j5ik2o/akka/persistence/dynamodb/journal/dao/WriteJournalDaoImpl.class */
public final class WriteJournalDaoImpl implements JournalDaoWithUpdates, DaoSupport, LoggingSupport {
    private final JournalPluginContext pluginContext;
    private final JournalRowWriteDriver journalRowDriver;
    private final FlowPersistentReprSerializer<JournalRow> serializer;
    private final ExecutionContext ec;
    private final Materializer mat;
    private final int queueBufferSize;
    private final int queueParallelism;
    private final int writeParallelism;
    private final Attributes logLevels;
    private final OverflowStrategy queueOverflowStrategy;
    private final Seq<Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<JournalRow>>>, UniqueKillSwitch>> putQueues;
    private final Seq<Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<PersistenceIdWithSeqNr>>>, UniqueKillSwitch>> deleteQueues;
    private final Logger logger;

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Source<JournalRow, NotUsed> getMessagesAsJournalRow(PersistenceId persistenceId, SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j, Option<Object> option) {
        Source<JournalRow, NotUsed> messagesAsJournalRow;
        messagesAsJournalRow = getMessagesAsJournalRow(persistenceId, sequenceNumber, sequenceNumber2, j, option);
        return messagesAsJournalRow;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Option<Object> getMessagesAsJournalRow$default$5() {
        Option<Object> messagesAsJournalRow$default$5;
        messagesAsJournalRow$default$5 = getMessagesAsJournalRow$default$5();
        return messagesAsJournalRow$default$5;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.JournalDaoWithReadMessages, com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Source<Try<PersistentRepr>, NotUsed> getMessagesAsPersistentRepr(PersistenceId persistenceId, SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j, Option<Object> option) {
        Source<Try<PersistentRepr>, NotUsed> messagesAsPersistentRepr;
        messagesAsPersistentRepr = getMessagesAsPersistentRepr(persistenceId, sequenceNumber, sequenceNumber2, j, option);
        return messagesAsPersistentRepr;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.JournalDaoWithReadMessages, com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Option<Object> getMessagesAsPersistentRepr$default$5() {
        Option<Object> messagesAsPersistentRepr$default$5;
        messagesAsPersistentRepr$default$5 = getMessagesAsPersistentRepr$default$5();
        return messagesAsPersistentRepr$default$5;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.JournalDaoWithReadMessages, com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Source<Try<PersistentRepr>, NotUsed> getMessagesAsPersistentReprWithBatch(String str, long j, long j2, int i, Option<Tuple2<FiniteDuration, Scheduler>> option) {
        Source<Try<PersistentRepr>, NotUsed> messagesAsPersistentReprWithBatch;
        messagesAsPersistentReprWithBatch = getMessagesAsPersistentReprWithBatch(str, j, j2, i, option);
        return messagesAsPersistentReprWithBatch;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$github$j5ik2o$akka$persistence$dynamodb$utils$LoggingSupport$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public JournalRowWriteDriver journalRowDriver() {
        return this.journalRowDriver;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public FlowPersistentReprSerializer<JournalRow> serializer() {
        return this.serializer;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public ExecutionContext ec() {
        return this.ec;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Option<MetricsReporter> metricsReporter() {
        return this.pluginContext.metricsReporter();
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Materializer mat() {
        return this.mat;
    }

    private int queueBufferSize() {
        return this.queueBufferSize;
    }

    private int queueParallelism() {
        return this.queueParallelism;
    }

    private int writeParallelism() {
        return this.writeParallelism;
    }

    private Attributes logLevels() {
        return this.logLevels;
    }

    private OverflowStrategy queueOverflowStrategy() {
        return this.queueOverflowStrategy;
    }

    private Future<Done> internalPutStream(Promise<Object> promise, Seq<JournalRow> seq) {
        return (Future) (seq.size() == 1 ? (Source) Source$.MODULE$.single(seq.head()).batch(this.pluginContext.m4pluginConfig().clientConfig().batchWriteItemLimit(), journalRow -> {
            return package$.MODULE$.Vector().apply(Predef$.MODULE$.wrapRefArray(new JournalRow[]{journalRow}));
        }, (vector, journalRow2) -> {
            return (Vector) vector.$colon$plus(journalRow2, Vector$.MODULE$.canBuildFrom());
        }).flatMapConcat(vector2 -> {
            return vector2.size() == 1 ? Source$.MODULE$.single(vector2.head()).via(this.journalRowDriver().singlePutJournalRowFlow()) : Source$.MODULE$.single(vector2).via(this.journalRowDriver().multiPutJournalRowsFlow());
        }) : seq.size() > this.pluginContext.m4pluginConfig().clientConfig().batchWriteItemLimit() ? (Source) Source$.MODULE$.apply(seq.toVector()).grouped(this.pluginContext.m4pluginConfig().clientConfig().batchWriteItemLimit()).via(journalRowDriver().multiPutJournalRowsFlow()).fold(BoxesRunTime.boxToLong(0L), (j, j2) -> {
            return j + j2;
        }) : Source$.MODULE$.single(seq).via(journalRowDriver().multiPutJournalRowsFlow())).map(obj -> {
            return $anonfun$internalPutStream$5(promise, BoxesRunTime.unboxToLong(obj));
        }).recover(new WriteJournalDaoImpl$$anonfun$internalPutStream$6(null, promise)).runWith(Sink$.MODULE$.ignore(), mat());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<JournalRow>>>, UniqueKillSwitch> putQueue() {
        Tuple2 tuple2 = (Tuple2) Source$.MODULE$.queue(queueBufferSize(), queueOverflowStrategy()).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.both()).mapAsync(writeParallelism(), tuple22 -> {
            if (tuple22 != null) {
                return this.internalPutStream((Promise) tuple22._1(), (Seq) tuple22._2());
            }
            throw new MatchError(tuple22);
        }).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).withAttributes(logLevels()).run(mat());
        return new Tuple2<>(((Tuple2) tuple2._1())._1(), ((Tuple2) tuple2._1())._2());
    }

    private Seq<Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<JournalRow>>>, UniqueKillSwitch>> putQueues() {
        return this.putQueues;
    }

    private int queueIdFrom(PersistenceId persistenceId) {
        return Math.abs(Statics.anyHash(persistenceId.asString())) % queueParallelism();
    }

    private SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<JournalRow>>> selectPutQueue(PersistenceId persistenceId) {
        return (SourceQueueWithComplete) ((Tuple2) putQueues().apply(queueIdFrom(persistenceId)))._1();
    }

    private Future<Done> internalDeleteStream(Promise<Object> promise, Seq<PersistenceIdWithSeqNr> seq) {
        return (Future) (seq.size() == 1 ? (Source) Source$.MODULE$.single(seq.head()).batch(this.pluginContext.m4pluginConfig().clientConfig().batchWriteItemLimit(), persistenceIdWithSeqNr -> {
            return package$.MODULE$.Vector().apply(Predef$.MODULE$.wrapRefArray(new PersistenceIdWithSeqNr[]{persistenceIdWithSeqNr}));
        }, (vector, persistenceIdWithSeqNr2) -> {
            return (Vector) vector.$colon$plus(persistenceIdWithSeqNr2, Vector$.MODULE$.canBuildFrom());
        }).flatMapConcat(vector2 -> {
            return vector2.size() == 1 ? Source$.MODULE$.single(vector2.head()).via(this.journalRowDriver().singleDeleteJournalRowFlow()) : Source$.MODULE$.single(vector2).via(this.journalRowDriver().multiDeleteJournalRowsFlow());
        }) : seq.size() > this.pluginContext.m4pluginConfig().clientConfig().batchWriteItemLimit() ? (Source) Source$.MODULE$.apply(seq.toVector()).grouped(this.pluginContext.m4pluginConfig().clientConfig().batchWriteItemLimit()).via(journalRowDriver().multiDeleteJournalRowsFlow()).fold(BoxesRunTime.boxToLong(0L), (j, j2) -> {
            return j + j2;
        }) : Source$.MODULE$.single(seq).via(journalRowDriver().multiDeleteJournalRowsFlow())).map(obj -> {
            return $anonfun$internalDeleteStream$5(promise, BoxesRunTime.unboxToLong(obj));
        }).recover(new WriteJournalDaoImpl$$anonfun$internalDeleteStream$6(null, promise)).runWith(Sink$.MODULE$.ignore(), mat());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<PersistenceIdWithSeqNr>>>, UniqueKillSwitch> deleteQueue() {
        Tuple2 tuple2 = (Tuple2) Source$.MODULE$.queue(queueBufferSize(), queueOverflowStrategy()).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.both()).mapAsync(writeParallelism(), tuple22 -> {
            if (tuple22 != null) {
                return this.internalDeleteStream((Promise) tuple22._1(), (Seq) tuple22._2());
            }
            throw new MatchError(tuple22);
        }).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).withAttributes(logLevels()).run(mat());
        return new Tuple2<>(((Tuple2) tuple2._1())._1(), ((Tuple2) tuple2._1())._2());
    }

    private Seq<Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<PersistenceIdWithSeqNr>>>, UniqueKillSwitch>> deleteQueues() {
        return this.deleteQueues;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.WriteJournalDao
    public void dispose() {
        putQueues().foreach(tuple2 -> {
            $anonfun$dispose$1(tuple2);
            return BoxedUnit.UNIT;
        });
        deleteQueues().foreach(tuple22 -> {
            $anonfun$dispose$2(tuple22);
            return BoxedUnit.UNIT;
        });
    }

    private SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<PersistenceIdWithSeqNr>>> selectDeleteQueue(PersistenceId persistenceId) {
        return (SourceQueueWithComplete) ((Tuple2) deleteQueues().apply(queueIdFrom(persistenceId)))._1();
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.JournalDaoWithUpdates
    public Source<BoxedUnit, NotUsed> updateMessage(JournalRow journalRow) {
        return journalRowDriver().updateMessage(journalRow);
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.WriteJournalDao
    public Source<Object, NotUsed> deleteMessages(PersistenceId persistenceId, SequenceNumber sequenceNumber) {
        return journalRowDriver().getJournalRows(persistenceId, sequenceNumber, false).flatMapConcat(seq -> {
            return this.pluginContext.m4pluginConfig().softDeleted() ? this.putMessages((Seq) seq.map(journalRow -> {
                return journalRow.withDeleted();
            }, Seq$.MODULE$.canBuildFrom())) : this.deleteBy(persistenceId, (Seq) seq.map(journalRow2 -> {
                return journalRow2.sequenceNumber();
            }, Seq$.MODULE$.canBuildFrom()));
        }).withAttributes(logLevels());
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.WriteJournalDao
    public Source<Object, NotUsed> putMessages(Seq<JournalRow> seq) {
        return seq.isEmpty() ? Source$.MODULE$.single(BoxesRunTime.boxToLong(0L)) : this.pluginContext.m4pluginConfig().queueEnable() ? Source$.MODULE$.single(seq).via(requestPutJournalRows()) : Source$.MODULE$.single(seq).via(requestPutJournalRowsPassThrough());
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.WriteJournalDao
    public Source<Option<Object>, NotUsed> highestSequenceNr(PersistenceId persistenceId, SequenceNumber sequenceNumber) {
        return journalRowDriver().highestSequenceNr(persistenceId, new Some(sequenceNumber), journalRowDriver().highestSequenceNr$default$3());
    }

    private Flow<Seq<JournalRow>, Object, NotUsed> requestPutJournalRowsPassThrough() {
        return Flow$.MODULE$.apply().mapAsync(writeParallelism(), seq -> {
            Promise<Object> apply = Promise$.MODULE$.apply();
            return this.internalPutStream(apply, seq).flatMap(done -> {
                return apply.future();
            }, this.ec());
        });
    }

    private Flow<Seq<JournalRow>, Object, NotUsed> requestPutJournalRows() {
        return Flow$.MODULE$.apply().mapAsync(1, seq -> {
            Promise apply = Promise$.MODULE$.apply();
            return this.selectPutQueue(((JournalRow) seq.head()).persistenceId()).offer(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(apply), seq)).flatMap(queueOfferResult -> {
                if (QueueOfferResult$Enqueued$.MODULE$.equals(queueOfferResult)) {
                    return apply.future();
                }
                if (queueOfferResult instanceof QueueOfferResult.Failure) {
                    return Future$.MODULE$.failed(new Exception("Failed to write journal row batch", ((QueueOfferResult.Failure) queueOfferResult).cause()));
                }
                if (QueueOfferResult$Dropped$.MODULE$.equals(queueOfferResult)) {
                    return Future$.MODULE$.failed(new Exception(new StringBuilder(129).append("Failed to enqueue journal row batch write, the queue buffer was full (").append(this.queueBufferSize()).append(" elements) please check the jdbc-journal.bufferSize setting").toString()));
                }
                if (QueueOfferResult$QueueClosed$.MODULE$.equals(queueOfferResult)) {
                    return Future$.MODULE$.failed(new Exception("Failed to enqueue journal row batch write, the queue was closed"));
                }
                throw new MatchError(queueOfferResult);
            }, this.ec());
        }).withAttributes(logLevels());
    }

    private Source<Object, NotUsed> deleteBy(PersistenceId persistenceId, Seq<SequenceNumber> seq) {
        return seq.isEmpty() ? Source$.MODULE$.empty() : this.pluginContext.m4pluginConfig().queueEnable() ? Source$.MODULE$.single(seq.map(sequenceNumber -> {
            return new PersistenceIdWithSeqNr(persistenceId, sequenceNumber);
        }, Seq$.MODULE$.canBuildFrom())).via(requestDeleteJournalRows()) : Source$.MODULE$.single(seq.map(sequenceNumber2 -> {
            return new PersistenceIdWithSeqNr(persistenceId, sequenceNumber2);
        }, Seq$.MODULE$.canBuildFrom())).via(requestDeleteJournalRowsPassThrough());
    }

    private Flow<Seq<PersistenceIdWithSeqNr>, Object, NotUsed> requestDeleteJournalRowsPassThrough() {
        return Flow$.MODULE$.apply().mapAsync(writeParallelism(), seq -> {
            Promise<Object> apply = Promise$.MODULE$.apply();
            return this.internalDeleteStream(apply, seq).flatMap(done -> {
                return apply.future();
            }, this.ec());
        });
    }

    private Flow<Seq<PersistenceIdWithSeqNr>, Object, NotUsed> requestDeleteJournalRows() {
        return Flow$.MODULE$.apply().mapAsync(writeParallelism(), seq -> {
            Promise apply = Promise$.MODULE$.apply();
            return this.selectDeleteQueue(((PersistenceIdWithSeqNr) seq.head()).persistenceId()).offer(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(apply), seq)).flatMap(queueOfferResult -> {
                if (QueueOfferResult$Enqueued$.MODULE$.equals(queueOfferResult)) {
                    return apply.future();
                }
                if (queueOfferResult instanceof QueueOfferResult.Failure) {
                    return Future$.MODULE$.failed(new Exception("Failed to write journal row batch", ((QueueOfferResult.Failure) queueOfferResult).cause()));
                }
                if (QueueOfferResult$Dropped$.MODULE$.equals(queueOfferResult)) {
                    return Future$.MODULE$.failed(new Exception(new StringBuilder(129).append("Failed to enqueue journal row batch write, the queue buffer was full (").append(this.queueBufferSize()).append(" elements) please check the jdbc-journal.bufferSize setting").toString()));
                }
                if (QueueOfferResult$QueueClosed$.MODULE$.equals(queueOfferResult)) {
                    return Future$.MODULE$.failed(new Exception("Failed to enqueue journal row batch write, the queue was closed"));
                }
                throw new MatchError(queueOfferResult);
            }, this.ec());
        }).withAttributes(logLevels());
    }

    public static final /* synthetic */ Promise $anonfun$internalPutStream$5(Promise promise, long j) {
        return promise.success(BoxesRunTime.boxToLong(j));
    }

    public static final /* synthetic */ Promise $anonfun$internalDeleteStream$5(Promise promise, long j) {
        return promise.success(BoxesRunTime.boxToLong(j));
    }

    public static final /* synthetic */ void $anonfun$dispose$1(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        ((UniqueKillSwitch) tuple2._2()).shutdown();
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$dispose$2(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        ((UniqueKillSwitch) tuple2._2()).shutdown();
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public WriteJournalDaoImpl(JournalPluginContext journalPluginContext, JournalRowWriteDriver journalRowWriteDriver, FlowPersistentReprSerializer<JournalRow> flowPersistentReprSerializer, ExecutionContext executionContext, ActorSystem actorSystem) {
        this.pluginContext = journalPluginContext;
        this.journalRowDriver = journalRowWriteDriver;
        this.serializer = flowPersistentReprSerializer;
        this.ec = executionContext;
        DaoSupport.$init$(this);
        LoggingSupport.$init$(this);
        this.mat = SystemMaterializer$.MODULE$.apply(actorSystem).materializer();
        this.queueBufferSize = journalPluginContext.m4pluginConfig().queueEnable() ? journalPluginContext.m4pluginConfig().queueBufferSize() : 0;
        this.queueParallelism = journalPluginContext.m4pluginConfig().queueEnable() ? journalPluginContext.m4pluginConfig().queueParallelism() : 0;
        this.writeParallelism = journalPluginContext.m4pluginConfig().writeParallelism();
        this.logLevels = Attributes$.MODULE$.logLevels(Attributes$LogLevels$.MODULE$.Debug(), Attributes$LogLevels$.MODULE$.Debug(), Attributes$LogLevels$.MODULE$.Error());
        this.queueOverflowStrategy = journalPluginContext.m4pluginConfig().queueOverflowStrategy();
        this.putQueues = (Seq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), queueParallelism()).map(obj -> {
            BoxesRunTime.unboxToInt(obj);
            return this.putQueue();
        }, IndexedSeq$.MODULE$.canBuildFrom());
        this.deleteQueues = (Seq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), queueParallelism()).map(obj2 -> {
            BoxesRunTime.unboxToInt(obj2);
            return this.deleteQueue();
        }, IndexedSeq$.MODULE$.canBuildFrom());
    }
}
