package ai.chronon.spark.streaming;

import ai.chronon.online.DataStream;
import ai.chronon.online.Metrics;
import ai.chronon.online.Metrics$Name$;
import ai.chronon.online.Mutation;
import ai.chronon.online.StreamDecoder;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.runtime.AbstractFunction1;

/* compiled from: JoinSourceRunner.scala */
/* loaded from: input_file:ai/chronon/spark/streaming/JoinSourceRunner$$anonfun$5.class */
public final class JoinSourceRunner$$anonfun$5 extends AbstractFunction1<byte[], Mutation> implements Serializable {
    public static final long serialVersionUID = 0;
    private final DataStream dataStream$1;
    private final StreamDecoder streamDecoder$1;
    private final Metrics.Context ingressContext$1;

    public final Mutation apply(byte[] bArr) {
        this.ingressContext$1.increment(Metrics$Name$.MODULE$.RowCount());
        this.ingressContext$1.count(Metrics$Name$.MODULE$.Bytes(), bArr.length);
        try {
            return this.streamDecoder$1.decode(bArr);
        } catch (Throwable th) {
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error while decoding streaming events from stream: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.dataStream$1.topicInfo().name()})));
            th.printStackTrace();
            this.ingressContext$1.incrementException(th);
            return null;
        }
    }

    public JoinSourceRunner$$anonfun$5(JoinSourceRunner joinSourceRunner, DataStream dataStream, StreamDecoder streamDecoder, Metrics.Context context) {
        this.dataStream$1 = dataStream;
        this.streamDecoder$1 = streamDecoder;
        this.ingressContext$1 = context;
    }
}
