package ai.chronon.flink;

import ai.chronon.api.Extensions$;
import ai.chronon.api.Source;
import ai.chronon.online.GroupByServingInfoParsed;
import ai.chronon.online.KVStore;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.OptionTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.spark.sql.Encoder;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichInt$;

/* compiled from: FlinkJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ua\u0001\u0002\b\u0010\u0001YA\u0001B\b\u0001\u0003\u0002\u0003\u0006Ia\b\u0005\t]\u0001\u0011\t\u0011)A\u0005_!AQ\u000b\u0001B\u0001B\u0003%a\u000b\u0003\u0005[\u0001\t\u0005\t\u0015!\u0003\\\u0011!\u0019\u0007A!A!\u0002\u0013!\u0007\"B4\u0001\t\u0003A\u0007bB8\u0001\u0005\u0004%\t\u0002\u001d\u0005\u0007i\u0002\u0001\u000b\u0011B9\t\u000fU\u0004!\u0019!C\u0001m\"1q\u0010\u0001Q\u0001\n]D\u0001\"!\u0001\u0001\u0005\u0004%\tA\u001e\u0005\b\u0003\u0007\u0001\u0001\u0015!\u0003x\u0011\u001d\t)\u0001\u0001C\u0001\u0003\u000f\u0011\u0001B\u00127j].TuN\u0019\u0006\u0003!E\tQA\u001a7j].T!AE\n\u0002\u000f\rD'o\u001c8p]*\tA#\u0001\u0002bS\u000e\u0001QCA\f&'\t\u0001\u0001\u0004\u0005\u0002\u001a95\t!DC\u0001\u001c\u0003\u0015\u00198-\u00197b\u0013\ti\"D\u0001\u0004B]f\u0014VMZ\u0001\tKZ,g\u000e^*sGB\u0019\u0001%I\u0012\u000e\u0003=I!AI\b\u0003\u0017\u0019c\u0017N\\6T_V\u00148-\u001a\t\u0003I\u0015b\u0001\u0001B\u0003'\u0001\t\u0007qEA\u0001U#\tA3\u0006\u0005\u0002\u001aS%\u0011!F\u0007\u0002\b\u001d>$\b.\u001b8h!\tIB&\u0003\u0002.5\t\u0019\u0011I\\=\u0002\rMLgn\u001b$o!\u0011\u0001d\b\u0011*\u000e\u0003ER!AM\u001a\u0002\u000b\u0005\u001c\u0018P\\2\u000b\u0005Q*\u0014!\u00034v]\u000e$\u0018n\u001c8t\u0015\t1t'A\u0002ba&T!\u0001O\u001d\u0002\u0013M$(/Z1nS:<'B\u0001\t;\u0015\tYD(\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002{\u0005\u0019qN]4\n\u0005}\n$!\u0005*jG\"\f5/\u001f8d\rVt7\r^5p]B\u0011\u0011i\u0014\b\u0003\u00052s!a\u0011&\u000f\u0005\u0011KeBA#I\u001b\u00051%BA$\u0016\u0003\u0019a$o\\8u}%\tA#\u0003\u0002\u0013'%\u00111*E\u0001\u0007_:d\u0017N\\3\n\u00055s\u0015aB&W'R|'/\u001a\u0006\u0003\u0017FI!\u0001U)\u0003\u0015A+HOU3rk\u0016\u001cHO\u0003\u0002N\u001dB\u0011\u0001eU\u0005\u0003)>\u0011Qb\u0016:ji\u0016\u0014Vm\u001d9p]N,\u0017\u0001G4s_V\u0004()_*feZLgnZ%oM>\u0004\u0016M]:fIB\u0011q\u000bW\u0007\u0002\u001d&\u0011\u0011L\u0014\u0002\u0019\u000fJ|W\u000f\u001d\"z'\u0016\u0014h/\u001b8h\u0013:4w\u000eU1sg\u0016$\u0017aB3oG>$WM\u001d\t\u00049\u0006\u001cS\"A/\u000b\u0005y{\u0016aA:rY*\u0011\u0001MO\u0001\u0006gB\f'o[\u0005\u0003Ev\u0013q!\u00128d_\u0012,'/A\u0006qCJ\fG\u000e\\3mSNl\u0007CA\rf\u0013\t1'DA\u0002J]R\fa\u0001P5oSRtDCB5kW2lg\u000eE\u0002!\u0001\rBQA\b\u0004A\u0002}AQA\f\u0004A\u0002=BQ!\u0016\u0004A\u0002YCQA\u0017\u0004A\u0002mCQa\u0019\u0004A\u0002\u0011\f\u0001\"\u001a=qe\u00163\u0018\r\\\u000b\u0002cB\u0019\u0001E]\u0012\n\u0005M|!!F*qCJ\\W\t\u001f9sKN\u001c\u0018n\u001c8Fm\u0006dgI\\\u0001\nKb\u0004(/\u0012<bY\u0002\n\u0001CZ3biV\u0014Xm\u0012:pkBt\u0015-\\3\u0016\u0003]\u0004\"\u0001\u001f?\u000f\u0005eT\bCA#\u001b\u0013\tY($\u0001\u0004Qe\u0016$WMZ\u0005\u0003{z\u0014aa\u0015;sS:<'BA>\u001b\u0003E1W-\u0019;ve\u0016<%o\\;q\u001d\u0006lW\rI\u0001\u000bW\u000647.\u0019+pa&\u001c\u0017aC6bM.\fGk\u001c9jG\u0002\nQB];o\u000fJ|W\u000f\u001d\"z\u0015>\u0014G\u0003BA\u0005\u0003'\u0001R!a\u0003\u0002\u0010Ik!!!\u0004\u000b\u0005m)\u0014\u0002BA\t\u0003\u001b\u0011!\u0002R1uCN#(/Z1n\u0011\u001d\t)\"\u0004a\u0001\u0003/\t1!\u001a8w!\u0011\tY!!\u0007\n\t\u0005m\u0011Q\u0002\u0002\u001b'R\u0014X-Y7Fq\u0016\u001cW\u000f^5p]\u0016sg/\u001b:p]6,g\u000e\u001e")
/* loaded from: input_file:ai/chronon/flink/FlinkJob.class */
public class FlinkJob<T> {
    private final FlinkSource<T> eventSrc;
    private final RichAsyncFunction<KVStore.PutRequest, WriteResponse> sinkFn;
    private final GroupByServingInfoParsed groupByServingInfoParsed;
    private final int parallelism;
    private final SparkExpressionEvalFn<T> exprEval;
    private final String featureGroupName;
    private final String kafkaTopic;

    public SparkExpressionEvalFn<T> exprEval() {
        return this.exprEval;
    }

    public String featureGroupName() {
        return this.featureGroupName;
    }

    public String kafkaTopic() {
        return this.kafkaTopic;
    }

    public DataStream<WriteResponse> runGroupByJob(StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStream<T> dataStream = this.eventSrc.getDataStream(kafkaTopic(), featureGroupName(), streamExecutionEnvironment, this.parallelism);
        final FlinkJob flinkJob = null;
        final FlinkJob flinkJob2 = null;
        return AsyncKVStoreWriter$.MODULE$.withUnorderedWaits(dataStream.flatMap(exprEval(), new FlinkJob$$anon$3(null, new CaseClassTypeInfo<Tuple2<String, Object>>(flinkJob) { // from class: ai.chronon.flink.FlinkJob$$anon$1
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$1 flinkJob$$anon$1) {
                return flinkJob$$anon$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$1$$anon$2
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple2<String, Object> m3createInstance(Object[] objArr) {
                        return new Tuple2<>((String) objArr[0], objArr[1]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple2.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)), new $colon.colon("_1", new $colon.colon("_2", Nil$.MODULE$)));
            }
        })).uid(new StringBuilder(24).append("spark-expr-eval-flatmap-").append(featureGroupName()).toString()).name(new StringBuilder(26).append("Spark expression eval for ").append(featureGroupName()).toString()).setParallelism(dataStream.parallelism()).flatMap(new AvroCodecFn(this.groupByServingInfoParsed), new CaseClassTypeInfo<KVStore.PutRequest>(flinkJob2) { // from class: ai.chronon.flink.FlinkJob$$anon$5
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$5 flinkJob$$anon$5) {
                return flinkJob$$anon$5.types;
            }

            public TypeSerializer<KVStore.PutRequest> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<KVStore.PutRequest>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$5$$anon$6
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public KVStore.PutRequest m5createInstance(Object[] objArr) {
                        return new KVStore.PutRequest((byte[]) objArr[0], (byte[]) objArr[1], (String) objArr[2], (Option) objArr[3]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(KVStore.PutRequest.class, (TypeInformation[]) Nil$.MODULE$.toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(PrimitiveArrayTypeInfo.getInfoFor(byte[].class), new $colon.colon(PrimitiveArrayTypeInfo.getInfoFor(byte[].class), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(new OptionTypeInfo(BasicTypeInfo.getInfoFor(Long.TYPE)), Nil$.MODULE$)))), new $colon.colon("keyBytes", new $colon.colon("valueBytes", new $colon.colon("dataset", new $colon.colon("tsMillis", Nil$.MODULE$)))));
            }
        }).uid(new StringBuilder(16).append("avro-conversion-").append(featureGroupName()).toString()).name(new StringBuilder(20).append("Avro conversion for ").append(featureGroupName()).toString()).setParallelism(dataStream.parallelism()), this.sinkFn, featureGroupName(), AsyncKVStoreWriter$.MODULE$.withUnorderedWaits$default$4(), AsyncKVStoreWriter$.MODULE$.withUnorderedWaits$default$5());
    }

    public FlinkJob(FlinkSource<T> flinkSource, RichAsyncFunction<KVStore.PutRequest, WriteResponse> richAsyncFunction, GroupByServingInfoParsed groupByServingInfoParsed, Encoder<T> encoder, int i) {
        this.eventSrc = flinkSource;
        this.sinkFn = richAsyncFunction;
        this.groupByServingInfoParsed = groupByServingInfoParsed;
        this.parallelism = i;
        this.exprEval = new SparkExpressionEvalFn<>(encoder, groupByServingInfoParsed.groupBy);
        this.featureGroupName = groupByServingInfoParsed.groupBy.getMetaData().getName();
        if (Extensions$.MODULE$.GroupByOps(groupByServingInfoParsed.groupBy).streamingSource().isEmpty()) {
            throw new IllegalArgumentException(new StringBuilder(44).append("Invalid feature group: ").append(featureGroupName()).append(". No streaming source").toString());
        }
        this.kafkaTopic = Extensions$.MODULE$.SourceOps((Source) Extensions$.MODULE$.GroupByOps(groupByServingInfoParsed.groupBy).streamingSource().get()).topic();
    }
}
