package akka.stream.alpakka.orientdb;

import akka.NotUsed$;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogic;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.record.ORecord;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.tx.OTransaction;
import com.orientechnologies.orient.object.db.OObjectDatabaseTx;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Queue;
import scala.concurrent.Future$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: OrientDBFlowStage.scala */
/* loaded from: input_file:akka/stream/alpakka/orientdb/OrientDBFlowStage$$anon$1.class */
public final class OrientDBFlowStage$$anon$1 extends TimerGraphStageLogic implements InHandler, OutHandler {
    private final Queue<OIncomingMessage<T, C>> queue;
    private final AsyncCallback<Tuple2<Seq<OIncomingMessage<T, C>>, Throwable>> failureHandler;
    private final AsyncCallback<Tuple2<Seq<OIncomingMessage<T, C>>, Option<String>>> responseHandler;
    private Seq<OIncomingMessage<T, C>> failedMessages;
    private int retryCount;
    private ODatabaseDocumentTx client;
    private OObjectDatabaseTx oObjectClient;
    private final /* synthetic */ OrientDBFlowStage $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    private Queue<OIncomingMessage<T, C>> queue() {
        return this.queue;
    }

    private AsyncCallback<Tuple2<Seq<OIncomingMessage<T, C>>, Throwable>> failureHandler() {
        return this.failureHandler;
    }

    private AsyncCallback<Tuple2<Seq<OIncomingMessage<T, C>>, Option<String>>> responseHandler() {
        return this.responseHandler;
    }

    private Seq<OIncomingMessage<T, C>> failedMessages() {
        return this.failedMessages;
    }

    private void failedMessages_$eq(Seq<OIncomingMessage<T, C>> seq) {
        this.failedMessages = seq;
    }

    private int retryCount() {
        return this.retryCount;
    }

    private void retryCount_$eq(int i) {
        this.retryCount = i;
    }

    private ODatabaseDocumentTx client() {
        return this.client;
    }

    private void client_$eq(ODatabaseDocumentTx oDatabaseDocumentTx) {
        this.client = oDatabaseDocumentTx;
    }

    private OObjectDatabaseTx oObjectClient() {
        return this.oObjectClient;
    }

    private void oObjectClient_$eq(OObjectDatabaseTx oObjectDatabaseTx) {
        this.oObjectClient = oObjectDatabaseTx;
    }

    public void preStart() {
        client_$eq(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$settings.oDatabasePool().acquire());
        oObjectClient_$eq(new OObjectDatabaseTx(client()));
        pull(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$in());
    }

    public void postStop() {
        oObjectClient().close();
        client().close();
    }

    private void tryPull() {
        if (queue().size() >= this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$settings.bufferSize() || isClosed(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$in()) || hasBeenPulled(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$in())) {
            return;
        }
        pull(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$in());
    }

    public void onTimer(Object obj) {
        sendOSQLBulkInsertRequest(failedMessages());
        failedMessages_$eq(Nil$.MODULE$);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFailure(Tuple2<Seq<OIncomingMessage<T, C>>, Throwable> tuple2) {
        if (tuple2 == 0) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Seq) tuple2._1(), (Throwable) tuple2._2());
        Seq seq = (Seq) tuple22._1();
        Throwable th = (Throwable) tuple22._2();
        if (retryCount() >= this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$settings.maxRetry()) {
            failStage(th);
            return;
        }
        retryCount_$eq(retryCount() + 1);
        failedMessages_$eq(seq);
        scheduleOnce(NotUsed$.MODULE$, this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$settings.retryInterval());
    }

    private void handleSuccess() {
        completeStage();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleResponse(Tuple2<Seq<OIncomingMessage<T, C>>, Option<String>> tuple2) {
        retryCount_$eq(0);
        if (tuple2 == 0) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Seq) tuple2._1(), (Option) tuple2._2());
        Seq seq = (Seq) tuple22._1();
        Option option = (Option) tuple22._2();
        Seq seq2 = (Seq) seq.flatMap(oIncomingMessage -> {
            return option.isEmpty() ? Option$.MODULE$.option2Iterable(None$.MODULE$) : Option$.MODULE$.option2Iterable(new Some(oIncomingMessage));
        }, Seq$.MODULE$.canBuildFrom());
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$settings.bufferSize()).flatMap(obj -> {
            return $anonfun$handleResponse$2(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        if (indexedSeq.isEmpty()) {
            handleSuccess();
        } else {
            sendOSQLBulkInsertRequest(indexedSeq);
        }
        push(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$out(), Future$.MODULE$.successful(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$pusher.apply(seq2)));
    }

    private void sendOSQLBulkInsertRequest(Seq<OIncomingMessage<T, C>> seq) {
        try {
            ODatabaseRecordThreadLocal.instance().set(client());
            if (this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$clazz.isEmpty()) {
                if (client().getMetadata().getSchema().existsClass(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$className)) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    client().getMetadata().getSchema().createClass(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$className);
                }
                client().begin(OTransaction.TXTYPE.OPTIMISTIC);
                ObjectRef create = ObjectRef.create(Nil$.MODULE$);
                ObjectRef create2 = ObjectRef.create(Nil$.MODULE$);
                seq.foreach(oIncomingMessage -> {
                    $anonfun$sendOSQLBulkInsertRequest$1(this, create, create2, oIncomingMessage);
                    return BoxedUnit.UNIT;
                });
                client().commit();
                if (((List) create.elem).nonEmpty()) {
                    responseHandler().invoke(new Tuple2((List) create.elem, new Some("Records are invalid OrientDB Records")));
                } else {
                    emit(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$out(), Future$.MODULE$.successful(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$pusher.apply((List) create2.elem)));
                    responseHandler().invoke(new Tuple2(Seq$.MODULE$.apply(Nil$.MODULE$), None$.MODULE$));
                }
            } else {
                client().setDatabaseOwner(oObjectClient());
                oObjectClient().getEntityManager().registerEntityClass((Class) this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$clazz.getOrElse(() -> {
                    throw new RuntimeException("Typed stream class is invalid");
                }));
                ObjectRef create3 = ObjectRef.create(Nil$.MODULE$);
                ObjectRef create4 = ObjectRef.create(Nil$.MODULE$);
                seq.foreach(oIncomingMessage2 -> {
                    $anonfun$sendOSQLBulkInsertRequest$4(this, create3, create4, oIncomingMessage2);
                    return BoxedUnit.UNIT;
                });
                if (((List) create3.elem).nonEmpty()) {
                    responseHandler().invoke(new Tuple2((List) create3.elem, new Some("Records are invalid OrientDB Records")));
                } else {
                    emit(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$out(), Future$.MODULE$.successful(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$pusher.apply((List) create4.elem)));
                    responseHandler().invoke(new Tuple2(Seq$.MODULE$.apply(Nil$.MODULE$), None$.MODULE$));
                }
            }
        } catch (Exception e) {
            failureHandler().invoke(new Tuple2(seq, e));
        }
    }

    public void onPull() {
        tryPull();
    }

    public void onPush() {
        queue().enqueue(Predef$.MODULE$.wrapRefArray(new OIncomingMessage[]{(OIncomingMessage) grab(this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$in())}));
        sendOSQLBulkInsertRequest((IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$settings.bufferSize()).flatMap(obj -> {
            return $anonfun$onPush$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom()));
        tryPull();
    }

    public void onUpstreamFailure(Throwable th) {
        failStage(th);
    }

    public void onUpstreamFinish() {
        handleSuccess();
    }

    public static final /* synthetic */ boolean $anonfun$handleResponse$3(OIncomingMessage oIncomingMessage) {
        return true;
    }

    public static final /* synthetic */ Iterable $anonfun$handleResponse$2(OrientDBFlowStage$$anon$1 orientDBFlowStage$$anon$1, int i) {
        return Option$.MODULE$.option2Iterable(orientDBFlowStage$$anon$1.queue().dequeueFirst(oIncomingMessage -> {
            return BoxesRunTime.boxToBoolean($anonfun$handleResponse$3(oIncomingMessage));
        }));
    }

    public static final /* synthetic */ void $anonfun$sendOSQLBulkInsertRequest$2(ODocument oDocument, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        oDocument.field((String) tuple2._1(), tuple2._2());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$sendOSQLBulkInsertRequest$1(OrientDBFlowStage$$anon$1 orientDBFlowStage$$anon$1, ObjectRef objectRef, ObjectRef objectRef2, OIncomingMessage oIncomingMessage) {
        if (oIncomingMessage != null) {
            Object oDocument = oIncomingMessage.oDocument();
            Object passThrough = oIncomingMessage.passThrough();
            if (oDocument instanceof ODocument) {
                ODocument oDocument2 = (ODocument) oDocument;
                if (passThrough instanceof Object) {
                    ODocument oDocument3 = new ODocument();
                    new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(oDocument2.fieldNames())).zip(Predef$.MODULE$.wrapRefArray(oDocument2.fieldValues()), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).foreach(tuple2 -> {
                        $anonfun$sendOSQLBulkInsertRequest$2(oDocument3, tuple2);
                        return BoxedUnit.UNIT;
                    });
                    oDocument3.setClassName(orientDBFlowStage$$anon$1.$outer.akka$stream$alpakka$orientdb$OrientDBFlowStage$$className);
                    orientDBFlowStage$$anon$1.client().save(oDocument3);
                    objectRef2.elem = (List) ((List) objectRef2.elem).$plus$plus(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new OIncomingMessage[]{new OIncomingMessage(oDocument2, passThrough)})), List$.MODULE$.canBuildFrom());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (oIncomingMessage != null) {
            Object oDocument4 = oIncomingMessage.oDocument();
            Object passThrough2 = oIncomingMessage.passThrough();
            if ((oDocument4 instanceof ORecord) && (passThrough2 instanceof Object)) {
                orientDBFlowStage$$anon$1.client().save((ORecord) oDocument4);
                objectRef2.elem = (List) ((List) objectRef2.elem).$plus$plus(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new OIncomingMessage[]{new OIncomingMessage(oDocument4, passThrough2)})), List$.MODULE$.canBuildFrom());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (oIncomingMessage != null) {
            Object oDocument5 = oIncomingMessage.oDocument();
            Object passThrough3 = oIncomingMessage.passThrough();
            if ((oDocument5 instanceof Object) && (passThrough3 instanceof Object)) {
                objectRef.elem = (List) ((List) objectRef.elem).$plus$plus(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new OIncomingMessage[]{new OIncomingMessage(oDocument5, passThrough3)})), List$.MODULE$.canBuildFrom());
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                return;
            }
        }
        throw new MatchError(oIncomingMessage);
    }

    public static final /* synthetic */ void $anonfun$sendOSQLBulkInsertRequest$4(OrientDBFlowStage$$anon$1 orientDBFlowStage$$anon$1, ObjectRef objectRef, ObjectRef objectRef2, OIncomingMessage oIncomingMessage) {
        if (oIncomingMessage != null) {
            Object oDocument = oIncomingMessage.oDocument();
            Object passThrough = oIncomingMessage.passThrough();
            if ((oDocument instanceof Object) && (passThrough instanceof Object)) {
                orientDBFlowStage$$anon$1.oObjectClient().save(oDocument);
                objectRef2.elem = (List) ((List) objectRef2.elem).$plus$plus(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new OIncomingMessage[]{new OIncomingMessage(oDocument, passThrough)})), List$.MODULE$.canBuildFrom());
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (oIncomingMessage != null) {
            Object oDocument2 = oIncomingMessage.oDocument();
            Object passThrough2 = oIncomingMessage.passThrough();
            if ((oDocument2 instanceof Object) && (passThrough2 instanceof Object)) {
                objectRef.elem = (List) ((List) objectRef.elem).$plus$plus(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new OIncomingMessage[]{new OIncomingMessage(oDocument2, passThrough2)})), List$.MODULE$.canBuildFrom());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        throw new MatchError(oIncomingMessage);
    }

    public static final /* synthetic */ boolean $anonfun$onPush$2(OIncomingMessage oIncomingMessage) {
        return true;
    }

    public static final /* synthetic */ Iterable $anonfun$onPush$1(OrientDBFlowStage$$anon$1 orientDBFlowStage$$anon$1, int i) {
        return Option$.MODULE$.option2Iterable(orientDBFlowStage$$anon$1.queue().dequeueFirst(oIncomingMessage -> {
            return BoxesRunTime.boxToBoolean($anonfun$onPush$2(oIncomingMessage));
        }));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public OrientDBFlowStage$$anon$1(OrientDBFlowStage<T, C, R> orientDBFlowStage) {
        super(orientDBFlowStage.m4shape());
        if (orientDBFlowStage == 0) {
            throw null;
        }
        this.$outer = orientDBFlowStage;
        InHandler.$init$(this);
        OutHandler.$init$(this);
        this.queue = new Queue<>();
        this.failureHandler = getAsyncCallback(tuple2 -> {
            this.handleFailure(tuple2);
            return BoxedUnit.UNIT;
        });
        this.responseHandler = getAsyncCallback(tuple22 -> {
            this.handleResponse(tuple22);
            return BoxedUnit.UNIT;
        });
        this.failedMessages = Nil$.MODULE$;
        this.retryCount = 0;
        setHandlers(orientDBFlowStage.akka$stream$alpakka$orientdb$OrientDBFlowStage$$in(), orientDBFlowStage.akka$stream$alpakka$orientdb$OrientDBFlowStage$$out(), this);
    }
}
