package akka.stream.alpakka.orientdb.impl;

import akka.NotUsed$;
import akka.stream.alpakka.orientdb.OIncomingMessage;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogic;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.tx.OTransaction;
import com.orientechnologies.orient.object.db.OObjectDatabaseTx;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Seq$;
import scala.collection.mutable.Queue;
import scala.runtime.BoxedUnit;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: OrientDBFlowStage.scala */
/* loaded from: input_file:akka/stream/alpakka/orientdb/impl/OrientDBFlowStage$$anon$1.class */
public final class OrientDBFlowStage$$anon$1 extends TimerGraphStageLogic implements InHandler, OutHandler {
    private final Queue<OIncomingMessage<T, C>> akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$queue;
    private final AsyncCallback<Tuple2<Seq<OIncomingMessage<T, C>>, Throwable>> failureHandler;
    private final AsyncCallback<Tuple2<Seq<OIncomingMessage<T, C>>, Option<String>>> responseHandler;
    private Seq<OIncomingMessage<T, C>> failedMessages;
    private int retryCount;
    private ODatabaseDocumentTx akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client;
    private OObjectDatabaseTx akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient;
    private final /* synthetic */ OrientDBFlowStage $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.class.onDownstreamFinish(this);
    }

    public Queue<OIncomingMessage<T, C>> akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$queue() {
        return this.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$queue;
    }

    private AsyncCallback<Tuple2<Seq<OIncomingMessage<T, C>>, Throwable>> failureHandler() {
        return this.failureHandler;
    }

    private AsyncCallback<Tuple2<Seq<OIncomingMessage<T, C>>, Option<String>>> responseHandler() {
        return this.responseHandler;
    }

    private Seq<OIncomingMessage<T, C>> failedMessages() {
        return this.failedMessages;
    }

    private void failedMessages_$eq(Seq<OIncomingMessage<T, C>> seq) {
        this.failedMessages = seq;
    }

    private int retryCount() {
        return this.retryCount;
    }

    private void retryCount_$eq(int i) {
        this.retryCount = i;
    }

    public ODatabaseDocumentTx akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client() {
        return this.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client;
    }

    private void akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client_$eq(ODatabaseDocumentTx oDatabaseDocumentTx) {
        this.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client = oDatabaseDocumentTx;
    }

    public OObjectDatabaseTx akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient() {
        return this.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient;
    }

    private void akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient_$eq(OObjectDatabaseTx oObjectDatabaseTx) {
        this.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient = oObjectDatabaseTx;
    }

    public void preStart() {
        akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client_$eq(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$settings.oDatabasePool().acquire());
        akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient_$eq(new OObjectDatabaseTx(akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client()));
        pull(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$in());
    }

    public void postStop() {
        akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient().close();
        akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client().close();
    }

    private void tryPull() {
        if (akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$queue().size() >= this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$settings.bufferSize() || isClosed(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$in()) || hasBeenPulled(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$in())) {
            return;
        }
        pull(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$in());
    }

    public void onTimer(Object obj) {
        sendOSQLBulkInsertRequest(failedMessages());
        failedMessages_$eq(Nil$.MODULE$);
    }

    public void akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$handleFailure(Tuple2<Seq<OIncomingMessage<T, C>>, Throwable> tuple2) {
        if (tuple2 == 0) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Seq) tuple2._1(), (Throwable) tuple2._2());
        Seq seq = (Seq) tuple22._1();
        Throwable th = (Throwable) tuple22._2();
        if (retryCount() >= this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$settings.maxRetry()) {
            failStage(th);
            return;
        }
        retryCount_$eq(retryCount() + 1);
        failedMessages_$eq(seq);
        scheduleOnce(NotUsed$.MODULE$, this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$settings.retryInterval());
    }

    private void handleSuccess() {
        completeStage();
    }

    public void akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$handleResponse(Tuple2<Seq<OIncomingMessage<T, C>>, Option<String>> tuple2) {
        retryCount_$eq(0);
        if (tuple2 == 0) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Seq) tuple2._1(), (Option) tuple2._2());
        Seq seq = (Seq) ((Seq) tuple22._1()).flatMap(new OrientDBFlowStage$$anon$1$$anonfun$3(this, (Option) tuple22._2()), Seq$.MODULE$.canBuildFrom());
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$settings.bufferSize()).flatMap(new OrientDBFlowStage$$anon$1$$anonfun$4(this), IndexedSeq$.MODULE$.canBuildFrom());
        if (indexedSeq.isEmpty()) {
            handleSuccess();
        } else {
            sendOSQLBulkInsertRequest(indexedSeq);
        }
        push(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$out(), seq);
    }

    private void sendOSQLBulkInsertRequest(Seq<OIncomingMessage<T, C>> seq) {
        try {
            ODatabaseRecordThreadLocal.instance().set(akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client());
            if (!this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$clazz.isEmpty()) {
                akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client().setDatabaseOwner(akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient());
                akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$oObjectClient().getEntityManager().registerEntityClass((Class) this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$clazz.getOrElse(new OrientDBFlowStage$$anon$1$$anonfun$sendOSQLBulkInsertRequest$2(this)));
                ObjectRef create = ObjectRef.create(Nil$.MODULE$);
                ObjectRef create2 = ObjectRef.create(Nil$.MODULE$);
                seq.foreach(new OrientDBFlowStage$$anon$1$$anonfun$sendOSQLBulkInsertRequest$3(this, create, create2));
                if (((List) create.elem).nonEmpty()) {
                    responseHandler().invoke(new Tuple2((List) create.elem, new Some("Records are invalid OrientDB Records")));
                    return;
                } else {
                    emit(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$out(), (List) create2.elem);
                    responseHandler().invoke(new Tuple2(Seq$.MODULE$.empty(), None$.MODULE$));
                    return;
                }
            }
            if (akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client().getMetadata().getSchema().existsClass(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$className)) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client().getMetadata().getSchema().createClass(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$className);
            }
            akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client().begin(OTransaction.TXTYPE.OPTIMISTIC);
            ObjectRef create3 = ObjectRef.create(Nil$.MODULE$);
            ObjectRef create4 = ObjectRef.create(Nil$.MODULE$);
            seq.foreach(new OrientDBFlowStage$$anon$1$$anonfun$sendOSQLBulkInsertRequest$1(this, create3, create4));
            akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$client().commit();
            if (((List) create3.elem).nonEmpty()) {
                responseHandler().invoke(new Tuple2((List) create3.elem, new Some("Records are invalid OrientDB Records")));
            } else {
                emit(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$out(), (List) create4.elem);
                responseHandler().invoke(new Tuple2(Seq$.MODULE$.empty(), None$.MODULE$));
            }
        } catch (Exception e) {
            failureHandler().invoke(new Tuple2(seq, e));
        }
    }

    public void onPull() {
        tryPull();
    }

    public void onPush() {
        akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$queue().enqueue(Predef$.MODULE$.wrapRefArray(new OIncomingMessage[]{(OIncomingMessage) grab(this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$in())}));
        sendOSQLBulkInsertRequest((IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.$outer.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$settings.bufferSize()).flatMap(new OrientDBFlowStage$$anon$1$$anonfun$5(this), IndexedSeq$.MODULE$.canBuildFrom()));
        tryPull();
    }

    public void onUpstreamFailure(Throwable th) {
        failStage(th);
    }

    public void onUpstreamFinish() {
        handleSuccess();
    }

    public /* synthetic */ OrientDBFlowStage akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public OrientDBFlowStage$$anon$1(OrientDBFlowStage<T, C> orientDBFlowStage) {
        super(orientDBFlowStage.m5shape());
        if (orientDBFlowStage == 0) {
            throw null;
        }
        this.$outer = orientDBFlowStage;
        InHandler.class.$init$(this);
        OutHandler.class.$init$(this);
        this.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$anon$$queue = new Queue<>();
        this.failureHandler = getAsyncCallback(new OrientDBFlowStage$$anon$1$$anonfun$1(this));
        this.responseHandler = getAsyncCallback(new OrientDBFlowStage$$anon$1$$anonfun$2(this));
        this.failedMessages = Nil$.MODULE$;
        this.retryCount = 0;
        setHandlers(orientDBFlowStage.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$in(), orientDBFlowStage.akka$stream$alpakka$orientdb$impl$OrientDBFlowStage$$out(), this);
    }
}
