package ai.chronon.flink;

import ai.chronon.online.Api;
import ai.chronon.online.KVStore;
import java.util.Arrays;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: AsyncKVStoreWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001\t%r!B\u0013'\u0011\u0003ic!B\u0018'\u0011\u0003\u0001\u0004\"\u0002\u001e\u0002\t\u0003Y\u0004b\u0002\u001f\u0002\u0005\u0004%I!\u0010\u0005\u0007\u0003\u0006\u0001\u000b\u0011\u0002 \t\u000f\t\u000b!\u0019!C\u0005\u0007\"1q)\u0001Q\u0001\n\u0011CQ\u0001S\u0001\u0005\u0002%C\u0011\"!\u0005\u0002#\u0003%\t!a\u0005\t\u0013\u0005%\u0012!%A\u0005\u0002\u0005-bABA\u0018\u0003\u0011\t\t\u0004\u0003\u0004;\u0015\u0011\u0005\u0011q\b\u0005\b\u0003\u000bRA\u0011IA$\u0011\u001d\t\u0019G\u0003C!\u0003KBq!! \u000b\t\u0003\ny\bC\u0005\u0002\u0002\u0006\u0011\r\u0011\"\u0003\u0002\u0004\"A\u0011QQ\u0001!\u0002\u0013\t\u0019\u0004C\u0005\u0002\b\u0006\t\t\u0011\"\u0003\u0002\n\u001a)qF\n\u0001\u0002\u0012\"Q\u00111\u0013\n\u0003\u0002\u0003\u0006I!!&\t\u0011m\u0014\"\u0011!Q\u0001\nqDaA\u000f\n\u0005\u0002\u0005u\u0005BCAS%!\u0015\r\u0011\"\u0001\u0002(\"Y\u0011Q\u0018\nA\u0002\u0003\u0007I\u0011BA`\u0011-\t9M\u0005a\u0001\u0002\u0004%I!!3\t\u0017\u0005='\u00031A\u0001B\u0003&\u0011\u0011\u0019\u0005\f\u0003'\u0014\u0002\u0019!a\u0001\n\u0013\t)\u000eC\u0006\u0002dJ\u0001\r\u00111A\u0005\n\u0005\u0015\bbCAu%\u0001\u0007\t\u0011)Q\u0005\u0003/D1\"!<\u0013\u0001\u0004\u0005\r\u0011\"\u0003\u0002V\"Y\u0011q\u001e\nA\u0002\u0003\u0007I\u0011BAy\u0011-\t)P\u0005a\u0001\u0002\u0003\u0006K!a6\t\u0015\u0005e(\u0003#b\u0001\n\u0007\t\u0019\tC\u0004\u0002|J!\t\"a0\t\u000f\u0005u(\u0003\"\u0011\u0002��\"9!q\u0002\n\u0005B\tE\u0001b\u0002B\u0011%\u0011\u0005#1E\u0001\u0013\u0003NLhnY&W'R|'/Z,sSR,'O\u0003\u0002(Q\u0005)a\r\\5oW*\u0011\u0011FK\u0001\bG\"\u0014xN\\8o\u0015\u0005Y\u0013AA1j\u0007\u0001\u0001\"AL\u0001\u000e\u0003\u0019\u0012!#Q:z]\u000e\\ek\u0015;pe\u0016<&/\u001b;feN\u0019\u0011!M\u001c\u0011\u0005I*T\"A\u001a\u000b\u0003Q\nQa]2bY\u0006L!AN\u001a\u0003\r\u0005s\u0017PU3g!\t\u0011\u0004(\u0003\u0002:g\ta1+\u001a:jC2L'0\u00192mK\u00061A(\u001b8jiz\"\u0012!L\u0001\u0013WZ\u001cFo\u001c:f\u0007>t7-\u001e:sK:\u001c\u00170F\u0001?!\t\u0011t(\u0003\u0002Ag\t\u0019\u0011J\u001c;\u0002'-48\u000b^8sK\u000e{gnY;se\u0016t7-\u001f\u0011\u0002)\u0011,g-Y;miRKW.Z8vi6KG\u000e\\5t+\u0005!\u0005C\u0001\u001aF\u0013\t15G\u0001\u0003M_:<\u0017!\u00063fM\u0006,H\u000e\u001e+j[\u0016|W\u000f^'jY2L7\u000fI\u0001\u0013o&$\b.\u00168pe\u0012,'/\u001a3XC&$8\u000f\u0006\u0005K7BT\u0018\u0011BA\u0007!\rYe\u000bW\u0007\u0002\u0019*\u0011A'\u0014\u0006\u0003\u001d>\u000b1!\u00199j\u0015\t\u0001\u0016+A\u0005tiJ,\u0017-\\5oO*\u0011qE\u0015\u0006\u0003'R\u000ba!\u00199bG\",'\"A+\u0002\u0007=\u0014x-\u0003\u0002X\u0019\nQA)\u0019;b'R\u0014X-Y7\u0011\u00059J\u0016B\u0001.'\u000559&/\u001b;f%\u0016\u001c\bo\u001c8tK\")Al\u0002a\u0001;\u00069\u0011N\u001c9vi\u0012\u001b\u0006cA&W=B\u0011q,\u001c\b\u0003A*t!!\u00195\u000f\u0005\t<gBA2g\u001b\u0005!'BA3-\u0003\u0019a$o\\8u}%\t1&\u0003\u0002*U%\u0011\u0011\u000eK\u0001\u0007_:d\u0017N\\3\n\u0005-d\u0017aB&W'R|'/\u001a\u0006\u0003S\"J!A\\8\u0003\u0015A+HOU3rk\u0016\u001cHO\u0003\u0002lY\")\u0011o\u0002a\u0001e\u0006y1N^*u_J,wK]5uKJ4e\u000e\u0005\u0003tqzCV\"\u0001;\u000b\u0005U4\u0018!B1ts:\u001c'BA<N\u0003%1WO\\2uS>t7/\u0003\u0002zi\n\t\"+[2i\u0003NLhn\u0019$v]\u000e$\u0018n\u001c8\t\u000bm<\u0001\u0019\u0001?\u0002!\u0019,\u0017\r^;sK\u001e\u0013x.\u001e9OC6,\u0007cA?\u0002\u00049\u0011ap \t\u0003GNJ1!!\u00014\u0003\u0019\u0001&/\u001a3fM&!\u0011QAA\u0004\u0005\u0019\u0019FO]5oO*\u0019\u0011\u0011A\u001a\t\u0011\u0005-q\u0001%AA\u0002\u0011\u000bQ\u0002^5nK>,H/T5mY&\u001c\b\u0002CA\b\u000fA\u0005\t\u0019\u0001 \u0002\u0011\r\f\u0007/Y2jif\fAd^5uQVswN\u001d3fe\u0016$w+Y5ug\u0012\"WMZ1vYR$C'\u0006\u0002\u0002\u0016)\u001aA)a\u0006,\u0005\u0005e\u0001\u0003BA\u000e\u0003Ki!!!\b\u000b\t\u0005}\u0011\u0011E\u0001\nk:\u001c\u0007.Z2lK\u0012T1!a\t4\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003O\tiBA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fAd^5uQVswN\u001d3fe\u0016$w+Y5ug\u0012\"WMZ1vYR$S'\u0006\u0002\u0002.)\u001aa(a\u0006\u0003-\u0011K'/Z2u\u000bb,7-\u001e;j_:\u001cuN\u001c;fqR\u001cBAC\u0019\u00024A!\u0011QGA\u001e\u001b\t\t9DC\u0002\u0002:M\n!bY8oGV\u0014(/\u001a8u\u0013\u0011\ti$a\u000e\u0003!\u0015CXmY;uS>t7i\u001c8uKb$HCAA!!\r\t\u0019EC\u0007\u0002\u0003\u00059Q\r_3dkR,G\u0003BA%\u0003\u001f\u00022AMA&\u0013\r\tie\r\u0002\u0005+:LG\u000fC\u0004\u0002R1\u0001\r!a\u0015\u0002\u0011I,hN\\1cY\u0016\u0004B!!\u0016\u0002`5\u0011\u0011q\u000b\u0006\u0005\u00033\nY&\u0001\u0003mC:<'BAA/\u0003\u0011Q\u0017M^1\n\t\u0005\u0005\u0014q\u000b\u0002\t%Vtg.\u00192mK\u0006i!/\u001a9peR4\u0015-\u001b7ve\u0016$B!!\u0013\u0002h!9\u0011\u0011N\u0007A\u0002\u0005-\u0014!B2bkN,\u0007\u0003BA7\u0003orA!a\u001c\u0002t9\u00191-!\u001d\n\u0003QJ1!!\u001e4\u0003\u001d\u0001\u0018mY6bO\u0016LA!!\u001f\u0002|\tIA\u000b\u001b:po\u0006\u0014G.\u001a\u0006\u0004\u0003k\u001a\u0014a\u00029sKB\f'/\u001a\u000b\u0003\u0003g\t\u0001$\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;J]N$\u0018M\\2f+\t\t\u0019$A\rFq\u0016\u001cW\u000f^5p]\u000e{g\u000e^3yi&s7\u000f^1oG\u0016\u0004\u0013a\u0003:fC\u0012\u0014Vm]8mm\u0016$\"!a#\u0011\t\u0005U\u0013QR\u0005\u0005\u0003\u001f\u000b9F\u0001\u0004PE*,7\r^\n\u0003%I\f!b\u001c8mS:,\u0017*\u001c9m!\u0011\t9*!'\u000e\u00031L1!a'm\u0005\r\t\u0005/\u001b\u000b\u0007\u0003?\u000b\t+a)\u0011\u00059\u0012\u0002bBAJ+\u0001\u0007\u0011Q\u0013\u0005\u0006wV\u0001\r\u0001`\u0001\u0007Y><w-\u001a:\u0016\u0005\u0005%\u0006\u0003BAV\u0003ck!!!,\u000b\u0007\u0005=F+A\u0003tY\u001a$$.\u0003\u0003\u00024\u00065&A\u0002'pO\u001e,'\u000fK\u0002\u0017\u0003o\u00032AMA]\u0013\r\tYl\r\u0002\niJ\fgn]5f]R\fqa\u001b<Ti>\u0014X-\u0006\u0002\u0002BB!\u0011qSAb\u0013\r\t)\r\u001c\u0002\b\u0017Z\u001bFo\u001c:f\u0003-Ygo\u0015;pe\u0016|F%Z9\u0015\t\u0005%\u00131\u001a\u0005\n\u0003\u001bD\u0012\u0011!a\u0001\u0003\u0003\f1\u0001\u001f\u00132\u0003!Ygo\u0015;pe\u0016\u0004\u0003fA\r\u00028\u0006aQM\u001d:pe\u000e{WO\u001c;feV\u0011\u0011q\u001b\t\u0005\u00033\fy.\u0004\u0002\u0002\\*\u0019\u0011Q\\)\u0002\u000f5,GO]5dg&!\u0011\u0011]An\u0005\u001d\u0019u.\u001e8uKJ\f\u0001#\u001a:s_J\u001cu.\u001e8uKJ|F%Z9\u0015\t\u0005%\u0013q\u001d\u0005\n\u0003\u001b\\\u0012\u0011!a\u0001\u0003/\fQ\"\u001a:s_J\u001cu.\u001e8uKJ\u0004\u0003f\u0001\u000f\u00028\u0006q1/^2dKN\u001c8i\\;oi\u0016\u0014\u0018AE:vG\u000e,7o]\"pk:$XM]0%KF$B!!\u0013\u0002t\"I\u0011Q\u001a\u0010\u0002\u0002\u0003\u0007\u0011q[\u0001\u0010gV\u001c7-Z:t\u0007>,h\u000e^3sA!\u001aq$a.\u0002\u0011\u0015DXmY;u_J\f!bZ3u\u0017Z\u001bFo\u001c:f\u0003\u0011y\u0007/\u001a8\u0015\t\u0005%#\u0011\u0001\u0005\b\u0005\u0007\u0011\u0003\u0019\u0001B\u0003\u00035\u0019wN\u001c4jOV\u0014\u0018\r^5p]B!!q\u0001B\u0006\u001b\t\u0011IAC\u0002\u0003\u0004EKAA!\u0004\u0003\n\ti1i\u001c8gS\u001e,(/\u0019;j_:\fq\u0001^5nK>,H\u000f\u0006\u0004\u0002J\tM!q\u0003\u0005\u0007\u0005+\u0019\u0003\u0019\u00010\u0002\u000b%t\u0007/\u001e;\t\u000f\te1\u00051\u0001\u0003\u001c\u0005a!/Z:vYR4U\u000f^;sKB!1O!\bY\u0013\r\u0011y\u0002\u001e\u0002\r%\u0016\u001cX\u000f\u001c;GkR,(/Z\u0001\fCNLhnY%om>\\W\r\u0006\u0004\u0002J\t\u0015\"q\u0005\u0005\u0007\u0005+!\u0003\u0019\u00010\t\u000f\teA\u00051\u0001\u0003\u001c\u0001")
/* loaded from: input_file:ai/chronon/flink/AsyncKVStoreWriter.class */
public class AsyncKVStoreWriter extends RichAsyncFunction<KVStore.PutRequest, WriteResponse> {
    private transient Logger logger;
    private ExecutionContext executor;
    private final Api onlineImpl;
    private final String featureGroupName;
    private transient KVStore kvStore;
    private transient Counter errorCounter;
    private transient Counter successCounter;
    private volatile transient boolean bitmap$trans$0;
    private volatile boolean bitmap$0;

    /* compiled from: AsyncKVStoreWriter.scala */
    /* loaded from: input_file:ai/chronon/flink/AsyncKVStoreWriter$DirectExecutionContext.class */
    public static class DirectExecutionContext implements ExecutionContext {
        public void execute(Runnable runnable) {
            runnable.run();
        }

        public void reportFailure(Throwable th) {
            throw new IllegalStateException("Error in direct execution context.", th);
        }

        public ExecutionContext prepare() {
            return this;
        }

        public DirectExecutionContext() {
            ExecutionContext.$init$(this);
        }
    }

    public static DataStream<WriteResponse> withUnorderedWaits(DataStream<KVStore.PutRequest> dataStream, RichAsyncFunction<KVStore.PutRequest, WriteResponse> richAsyncFunction, String str, long j, int i) {
        return AsyncKVStoreWriter$.MODULE$.withUnorderedWaits(dataStream, richAsyncFunction, str, j, i);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [ai.chronon.flink.AsyncKVStoreWriter] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LoggerFactory.getLogger(getClass());
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    private KVStore kvStore() {
        return this.kvStore;
    }

    private void kvStore_$eq(KVStore kVStore) {
        this.kvStore = kVStore;
    }

    private Counter errorCounter() {
        return this.errorCounter;
    }

    private void errorCounter_$eq(Counter counter) {
        this.errorCounter = counter;
    }

    private Counter successCounter() {
        return this.successCounter;
    }

    private void successCounter_$eq(Counter counter) {
        this.successCounter = counter;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [ai.chronon.flink.AsyncKVStoreWriter] */
    private ExecutionContext executor$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.executor = AsyncKVStoreWriter$.MODULE$.ai$chronon$flink$AsyncKVStoreWriter$$ExecutionContextInstance();
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.executor;
    }

    public ExecutionContext executor() {
        return !this.bitmap$0 ? executor$lzycompute() : this.executor;
    }

    public KVStore getKVStore() {
        return this.onlineImpl.genKvStore();
    }

    public void open(Configuration configuration) {
        MetricGroup addGroup = getRuntimeContext().getMetricGroup().addGroup("chronon").addGroup("feature_group", this.featureGroupName);
        errorCounter_$eq(addGroup.counter("kvstore_writer.errors"));
        successCounter_$eq(addGroup.counter("kvstore_writer.successes"));
        kvStore_$eq(getKVStore());
    }

    public void timeout(KVStore.PutRequest putRequest, ResultFuture<WriteResponse> resultFuture) {
        logger().error(new StringBuilder(42).append("Timed out writing to KV Store for object: ").append(putRequest).toString());
        errorCounter().inc();
        resultFuture.complete(Arrays.asList(new WriteResponse(putRequest, false)));
    }

    public void asyncInvoke(KVStore.PutRequest putRequest, ResultFuture<WriteResponse> resultFuture) {
        kvStore().multiPut(new $colon.colon(putRequest, Nil$.MODULE$)).onComplete(r8 -> {
            $anonfun$asyncInvoke$1(this, putRequest, resultFuture, r8);
            return BoxedUnit.UNIT;
        }, executor());
    }

    public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) {
        asyncInvoke((KVStore.PutRequest) obj, (ResultFuture<WriteResponse>) resultFuture);
    }

    public /* bridge */ /* synthetic */ void timeout(Object obj, ResultFuture resultFuture) {
        timeout((KVStore.PutRequest) obj, (ResultFuture<WriteResponse>) resultFuture);
    }

    public static final /* synthetic */ boolean $anonfun$asyncInvoke$2(boolean z) {
        return BoxesRunTime.unboxToBoolean(Predef$.MODULE$.identity(BoxesRunTime.boxToBoolean(z)));
    }

    public static final /* synthetic */ void $anonfun$asyncInvoke$1(AsyncKVStoreWriter asyncKVStoreWriter, KVStore.PutRequest putRequest, ResultFuture resultFuture, Try r12) {
        if (r12 instanceof Success) {
            boolean forall = ((Seq) ((Success) r12).value()).forall(obj -> {
                return BoxesRunTime.boxToBoolean($anonfun$asyncInvoke$2(BoxesRunTime.unboxToBoolean(obj)));
            });
            if (forall) {
                asyncKVStoreWriter.successCounter().inc();
            } else {
                asyncKVStoreWriter.errorCounter().inc();
                asyncKVStoreWriter.logger().error(new StringBuilder(39).append("Failed to write to KVStore for object: ").append(putRequest).toString());
            }
            resultFuture.complete(Arrays.asList(new WriteResponse(putRequest, forall)));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(r12 instanceof Failure)) {
            throw new MatchError(r12);
        }
        Throwable exception = ((Failure) r12).exception();
        asyncKVStoreWriter.errorCounter().inc();
        asyncKVStoreWriter.logger().error(new StringBuilder(51).append("Caught exception writing to KVStore for object: ").append(putRequest).append(" - ").append(exception).toString());
        resultFuture.complete(Arrays.asList(new WriteResponse(putRequest, false)));
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    public AsyncKVStoreWriter(Api api, String str) {
        this.onlineImpl = api;
        this.featureGroupName = str;
    }
}
