package org.apache.streampark.spark.connector.kafka.offset;

import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.streampark.common.util.RedisEndpoint;
import org.apache.streampark.common.util.RedisUtils$;
import org.apache.streampark.shaded.org.slf4j.Logger;
import redis.clients.jedis.Jedis;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try$;

/* compiled from: RedisOffset.scala */
@ScalaSignature(bytes = "\u0006\u0001!4Q\u0001C\u0005\u0001\u0017]A\u0001B\t\u0001\u0003\u0006\u0004%\t\u0001\n\u0005\tU\u0001\u0011\t\u0011)A\u0005K!)1\u0006\u0001C\u0001Y!1q\u0006\u0001Q\u0005\fABQ!\u0013\u0001\u0005B)CQ\u0001\u0018\u0001\u0005BuCQ\u0001\u001a\u0001\u0005B\u0015\u00141BU3eSN|eMZ:fi*\u0011!bC\u0001\u0007_\u001a47/\u001a;\u000b\u00051i\u0011!B6bM.\f'B\u0001\b\u0010\u0003%\u0019wN\u001c8fGR|'O\u0003\u0002\u0011#\u0005)1\u000f]1sW*\u0011!cE\u0001\u000bgR\u0014X-Y7qCJ\\'B\u0001\u000b\u0016\u0003\u0019\t\u0007/Y2iK*\ta#A\u0002pe\u001e\u001c2\u0001\u0001\r\u001f!\tIB$D\u0001\u001b\u0015\u0005Y\u0012!B:dC2\f\u0017BA\u000f\u001b\u0005\u0019\te.\u001f*fMB\u0011q\u0004I\u0007\u0002\u0013%\u0011\u0011%\u0003\u0002\u0007\u001f\u001a47/\u001a;\u0002\u0013M\u0004\u0018M]6D_:47\u0001A\u000b\u0002KA\u0011a\u0005K\u0007\u0002O)\u0011\u0001cE\u0005\u0003S\u001d\u0012\u0011b\u00159be.\u001cuN\u001c4\u0002\u0015M\u0004\u0018M]6D_:4\u0007%\u0001\u0004=S:LGO\u0010\u000b\u0003[9\u0002\"a\b\u0001\t\u000b\t\u001a\u0001\u0019A\u0013\u0002\u0011\u0015tG\r]8j]R$\"!M\u001d\u0011\u0005I:T\"A\u001a\u000b\u0005Q*\u0014\u0001B;uS2T!AN\t\u0002\r\r|W.\\8o\u0013\tA4GA\u0007SK\u0012L7/\u00128ea>Lg\u000e\u001e\u0005\u0006u\u0011\u0001\u001daO\u0001\u0007a\u0006\u0014\u0018-\\:\u0011\tq\u001aeI\u0012\b\u0003{\u0005\u0003\"A\u0010\u000e\u000e\u0003}R!\u0001Q\u0012\u0002\rq\u0012xn\u001c;?\u0013\t\u0011%$\u0001\u0004Qe\u0016$WMZ\u0005\u0003\t\u0016\u00131!T1q\u0015\t\u0011%\u0004\u0005\u0002=\u000f&\u0011\u0001*\u0012\u0002\u0007'R\u0014\u0018N\\4\u0002\u0007\u001d,G\u000fF\u0002L+^\u0003B\u0001P\"M%B\u0011Q\nU\u0007\u0002\u001d*\u0011ag\u0014\u0006\u0003\u0019MI!!\u0015(\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]B\u0011\u0011dU\u0005\u0003)j\u0011A\u0001T8oO\")a+\u0002a\u0001\r\u00069qM]8va&#\u0007\"\u0002-\u0006\u0001\u0004I\u0016A\u0002;pa&\u001c7\u000fE\u0002=5\u001aK!aW#\u0003\u0007M+G/\u0001\u0004va\u0012\fG/\u001a\u000b\u0004=\u0006\u0014\u0007CA\r`\u0013\t\u0001'D\u0001\u0003V]&$\b\"\u0002,\u0007\u0001\u00041\u0005\"B2\u0007\u0001\u0004Y\u0015aB8gMN,Go]\u0001\u0007I\u0016dW\r^3\u0015\u0007y3w\rC\u0003W\u000f\u0001\u0007a\tC\u0003Y\u000f\u0001\u0007\u0011\f")
/* loaded from: input_file:org/apache/streampark/spark/connector/kafka/offset/RedisOffset.class */
public class RedisOffset implements Offset {
    private final SparkConf sparkConf;
    private String storeType;
    private Map<String, String> storeParams;
    private String reset;
    private Tuple2<String, Object> org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1;
    private String host;
    private int port;
    private transient Logger org$apache$streampark$common$util$Logger$$_logger;
    private final String org$apache$streampark$common$util$Logger$$prefix;
    private volatile byte bitmap$0;

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public Properties toProperty(Map<String, String> map) {
        Properties property;
        property = toProperty(map);
        return property;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public String key(String str, String str2) {
        String key;
        key = key(str, str2);
        return key;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public Map<TopicPartition, Object> getEarliestOffsets(Seq<String> seq) {
        Map<TopicPartition, Object> earliestOffsets;
        earliestOffsets = getEarliestOffsets(seq);
        return earliestOffsets;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public Map<TopicPartition, Object> getLatestOffsets(Seq<String> seq) {
        Map<TopicPartition, Object> latestOffsets;
        latestOffsets = getLatestOffsets(seq);
        return latestOffsets;
    }

    public String logName() {
        return org.apache.streampark.common.util.Logger.logName$(this);
    }

    public Logger logger() {
        return org.apache.streampark.common.util.Logger.logger$(this);
    }

    public void logInfo(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logInfo$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logDebug$(this, function0);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logTrace$(this, function0);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logTrace$(this, function0, th);
    }

    public void logWarn(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logWarn$(this, function0);
    }

    public void logWarn(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logWarn$(this, function0, th);
    }

    public void logError(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logError$(this, function0);
    }

    public void logError(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return org.apache.streampark.common.util.Logger.isTraceEnabled$(this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.offset.RedisOffset] */
    private String storeType$lzycompute() {
        String storeType;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                storeType = storeType();
                this.storeType = storeType;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.storeType;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public String storeType() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? storeType$lzycompute() : this.storeType;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.offset.RedisOffset] */
    private Map<String, String> storeParams$lzycompute() {
        Map<String, String> storeParams;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                storeParams = storeParams();
                this.storeParams = storeParams;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.storeParams;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public Map<String, String> storeParams() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? storeParams$lzycompute() : this.storeParams;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.offset.RedisOffset] */
    private String reset$lzycompute() {
        String reset;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                reset = reset();
                this.reset = reset;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
        }
        return this.reset;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public String reset() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? reset$lzycompute() : this.reset;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.offset.RedisOffset] */
    private Tuple2<String, Object> org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1$lzycompute() {
        Tuple2<String, Object> org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 8)) == 0) {
                org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1 = org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1();
                this.org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1 = org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 8);
            }
        }
        return this.org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public /* synthetic */ Tuple2 org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1() {
        return ((byte) (this.bitmap$0 & 8)) == 0 ? org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1$lzycompute() : this.org$apache$streampark$spark$connector$kafka$offset$Offset$$x$1;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.offset.RedisOffset] */
    private String host$lzycompute() {
        String host;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 16)) == 0) {
                host = host();
                this.host = host;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 16);
            }
        }
        return this.host;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public String host() {
        return ((byte) (this.bitmap$0 & 16)) == 0 ? host$lzycompute() : this.host;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.offset.RedisOffset] */
    private int port$lzycompute() {
        int port;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 32)) == 0) {
                port = port();
                this.port = port;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 32);
            }
        }
        return this.port;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public int port() {
        return ((byte) (this.bitmap$0 & 32)) == 0 ? port$lzycompute() : this.port;
    }

    public Logger org$apache$streampark$common$util$Logger$$_logger() {
        return this.org$apache$streampark$common$util$Logger$$_logger;
    }

    public void org$apache$streampark$common$util$Logger$$_logger_$eq(Logger logger) {
        this.org$apache$streampark$common$util$Logger$$_logger = logger;
    }

    public String org$apache$streampark$common$util$Logger$$prefix() {
        return this.org$apache$streampark$common$util$Logger$$prefix;
    }

    public final void org$apache$streampark$common$util$Logger$_setter_$org$apache$streampark$common$util$Logger$$prefix_$eq(String str) {
        this.org$apache$streampark$common$util$Logger$$prefix = str;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public SparkConf sparkConf() {
        return this.sparkConf;
    }

    private RedisEndpoint endpoint(Map<String, String> map) {
        return new RedisEndpoint((String) map.getOrElse("redis.hosts", () -> {
            return "localhost";
        }), new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("redis.port", () -> {
            return BoxesRunTime.boxToInteger(6379).toString();
        }))).toInt(), (String) Try$.MODULE$.apply(() -> {
            return (String) map.apply("redis.auth");
        }).getOrElse(() -> {
            return null;
        }), new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("redis.db", () -> {
            return BoxesRunTime.boxToInteger(0).toString();
        }))).toInt(), new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("redis.timeout", () -> {
            return BoxesRunTime.boxToInteger(2000).toString();
        }))).toInt());
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public Map<TopicPartition, Object> get(String str, Set<String> set) {
        Map<TopicPartition, Object> earliestOffsets = getEarliestOffsets(set.toSeq());
        Set set2 = (Set) RedisUtils$.MODULE$.doRedis(jedis -> {
            return (Set) set.flatMap(str2 -> {
                return (scala.collection.mutable.Map) JavaConversions$.MODULE$.deprecated$u0020mapAsScalaMap(jedis.hgetAll(this.key(str, str2))).map(tuple2 -> {
                    long j;
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    String str2 = (String) tuple2._1();
                    String str3 = (String) tuple2._2();
                    TopicPartition topicPartition = new TopicPartition(str2, new StringOps(Predef$.MODULE$.augmentString(str2)).toInt());
                    Some some = earliestOffsets.get(topicPartition);
                    if (some instanceof Some) {
                        long unboxToLong = BoxesRunTime.unboxToLong(some.value());
                        if (unboxToLong > new StringOps(Predef$.MODULE$.augmentString(str3)).toLong()) {
                            this.logWarn(() -> {
                                return new StringBuilder(75).append("storeType:Redis,consumer group:").append(str).append(",topic:").append(topicPartition.topic()).append(",partition:").append(topicPartition.partition()).append(" offsets Outdated,updated:").append(unboxToLong).toString();
                            });
                            j = unboxToLong;
                            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(j));
                        }
                    }
                    j = new StringOps(Predef$.MODULE$.augmentString(str3)).toLong();
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(j));
                }, Map$.MODULE$.canBuildFrom());
            }, Set$.MODULE$.canBuildFrom());
        }, RedisUtils$.MODULE$.doRedis$default$2(), endpoint(storeParams()));
        Map<TopicPartition, Object> $plus$plus = "largest".equals(reset().toLowerCase()) ? getLatestOffsets(set.toSeq()).$plus$plus(set2) : getEarliestOffsets(set.toSeq()).$plus$plus(set2);
        logInfo(() -> {
            return new StringBuilder(15).append("getOffsets [").append(str).append(",").append($plus$plus.mkString(",")).append("] ").toString();
        });
        return $plus$plus;
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public void update(String str, Map<TopicPartition, Object> map) {
        RedisUtils$.MODULE$.doRedis(jedis -> {
            $anonfun$update$1(this, map, str, jedis);
            return BoxedUnit.UNIT;
        }, RedisUtils$.MODULE$.doRedis$default$2(), endpoint(storeParams()));
        logInfo(() -> {
            return new StringBuilder(35).append("storeType:Redis,updateOffsets [ ").append(str).append(",").append(map.mkString(",")).append(" ]").toString();
        });
    }

    @Override // org.apache.streampark.spark.connector.kafka.offset.Offset
    public void delete(String str, Set<String> set) {
        RedisUtils$.MODULE$.doRedis(jedis -> {
            $anonfun$delete$1(this, set, str, jedis);
            return BoxedUnit.UNIT;
        }, RedisUtils$.MODULE$.doRedis$default$2(), endpoint(storeParams()));
        logInfo(() -> {
            return new StringBuilder(35).append("storeType:Redis,deleteOffsets [ ").append(str).append(",").append(set.mkString(",")).append(" ]").toString();
        });
    }

    public static final /* synthetic */ void $anonfun$update$1(RedisOffset redisOffset, Map map, String str, Jedis jedis) {
        map.foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2._1();
            return jedis.hset(redisOffset.key(str, topicPartition.topic()), BoxesRunTime.boxToInteger(topicPartition.partition()).toString(), BoxesRunTime.boxToLong(tuple2._2$mcJ$sp()).toString());
        });
    }

    public static final /* synthetic */ void $anonfun$delete$1(RedisOffset redisOffset, Set set, String str, Jedis jedis) {
        set.foreach(str2 -> {
            return jedis.del(redisOffset.key(str, str2));
        });
    }

    public RedisOffset(SparkConf sparkConf) {
        this.sparkConf = sparkConf;
        org.apache.streampark.common.util.Logger.$init$(this);
        Offset.$init$(this);
    }
}
