/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.spark.internal.Logging;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kinesis.KinesisCheckpointer$;
import org.apache.spark.streaming.kinesis.KinesisReceiver;
import org.apache.spark.streaming.kinesis.KinesisRecordProcessor$;
import org.apache.spark.streaming.util.RecurringTimer;
import org.apache.spark.util.Clock;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005d!B\u0001\u0003\u0001\ta!aE&j]\u0016\u001c\u0018n]\"iK\u000e\\\u0007o\\5oi\u0016\u0014(BA\u0002\u0005\u0003\u001dY\u0017N\\3tSNT!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0014\u0007\u0001i1\u0003\u0005\u0002\u000f#5\tqBC\u0001\u0011\u0003\u0015\u00198-\u00197b\u0013\t\u0011rB\u0001\u0004B]f\u0014VM\u001a\t\u0003)]i\u0011!\u0006\u0006\u0003-\u0019\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u00031U\u0011q\u0001T8hO&tw\r\u0003\u0005\u001b\u0001\t\u0005\t\u0015!\u0003\u001d\u0003!\u0011XmY3jm\u0016\u00148\u0001\u0001\u0019\u0003;\r\u00022AH\u0010\"\u001b\u0005\u0011\u0011B\u0001\u0011\u0003\u0005=Y\u0015N\\3tSN\u0014VmY3jm\u0016\u0014\bC\u0001\u0012$\u0019\u0001!\u0011\u0002J\r\u0002\u0002\u0003\u0005)\u0011A\u0013\u0003\u0007}#\u0013'\u0005\u0002'SA\u0011abJ\u0005\u0003Q=\u0011qAT8uQ&tw\r\u0005\u0002\u000fU%\u00111f\u0004\u0002\u0004\u0003:L\b\u0002C\u0017\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0018\u0002%\rDWmY6q_&tG/\u00138uKJ4\u0018\r\u001c\t\u0003_Aj\u0011\u0001B\u0005\u0003c\u0011\u0011\u0001\u0002R;sCRLwN\u001c\u0005\tg\u0001\u0011\t\u0011)A\u0005i\u0005Aqo\u001c:lKJLE\r\u0005\u00026y9\u0011aG\u000f\t\u0003o=i\u0011\u0001\u000f\u0006\u0003sm\ta\u0001\u0010:p_Rt\u0014BA\u001e\u0010\u0003\u0019\u0001&/\u001a3fM&\u0011QH\u0010\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005mz\u0001\u0002\u0003!\u0001\u0005\u0003\u0005\u000b\u0011B!\u0002\u000b\rdwnY6\u0011\u0005\t+U\"A\"\u000b\u0005\u00113\u0011\u0001B;uS2L!AR\"\u0003\u000b\rcwnY6\t\u000b!\u0003A\u0011A%\u0002\rqJg.\u001b;?)\u0015Q5\nU)S!\tq\u0002\u0001C\u0003\u001b\u000f\u0002\u0007A\n\r\u0002N\u001fB\u0019ad\b(\u0011\u0005\tzE!\u0003\u0013L\u0003\u0003\u0005\tQ!\u0001&\u0011\u0015is\t1\u0001/\u0011\u0015\u0019t\t1\u00015\u0011\u001d\u0001u\t%AA\u0002\u0005Cq\u0001\u0016\u0001C\u0002\u0013%Q+A\u0007dQ\u0016\u001c7\u000e]8j]R,'o]\u000b\u0002-B!q+\u0018\u001b`\u001b\u0005A&BA-[\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003\tnS\u0011\u0001X\u0001\u0005U\u00064\u0018-\u0003\u0002_1\n\t2i\u001c8dkJ\u0014XM\u001c;ICNDW*\u00199\u0011\u0005\u0001dW\"A1\u000b\u0005\t\u001c\u0017AC5oi\u0016\u0014h-Y2fg*\u0011A-Z\u0001\u000eG2LWM\u001c;mS\n\u0014\u0018M]=\u000b\u0005\r1'BA4i\u0003!\u0019XM\u001d<jG\u0016\u001c(BA5k\u0003%\tW.\u0019>p]\u0006<8OC\u0001l\u0003\r\u0019w.\\\u0005\u0003[\u0006\u0014A$\u0013*fG>\u0014H\r\u0015:pG\u0016\u001c8o\u001c:DQ\u0016\u001c7\u000e]8j]R,'\u000f\u0003\u0004p\u0001\u0001\u0006IAV\u0001\u000fG\",7m\u001b9pS:$XM]:!\u0011\u001d\t\bA1A\u0005\nI\fq\u0003\\1ti\u000eCWmY6q_&tG/\u001a3TKFtU/\\:\u0016\u0003M\u0004BaV/5i!1Q\u000f\u0001Q\u0001\nM\f\u0001\u0004\\1ti\u000eCWmY6q_&tG/\u001a3TKFtU/\\:!\u0011\u001d9\bA1A\u0005\na\f!c\u00195fG.\u0004x.\u001b8uKJ$\u0006N]3bIV\t\u0011\u0010\u0005\u0002{y6\t1P\u0003\u0002E\t%\u0011Qp\u001f\u0002\u000f%\u0016\u001cWO\u001d:j]\u001e$\u0016.\\3s\u0011\u0019y\b\u0001)A\u0005s\u0006\u00192\r[3dWB|\u0017N\u001c;feRC'/Z1eA!9\u00111\u0001\u0001\u0005\u0002\u0005\u0015\u0011aD:fi\u000eCWmY6q_&tG/\u001a:\u0015\r\u0005\u001d\u0011QBA\t!\rq\u0011\u0011B\u0005\u0004\u0003\u0017y!\u0001B+oSRDq!a\u0004\u0002\u0002\u0001\u0007A'A\u0004tQ\u0006\u0014H-\u00133\t\u000f\u0005M\u0011\u0011\u0001a\u0001?\u0006a1\r[3dWB|\u0017N\u001c;fe\"9\u0011q\u0003\u0001\u0005\u0002\u0005e\u0011A\u0005:f[>4Xm\u00115fG.\u0004x.\u001b8uKJ$b!a\u0002\u0002\u001c\u0005u\u0001bBA\b\u0003+\u0001\r\u0001\u000e\u0005\b\u0003'\t)\u00021\u0001`\u0011\u001d\t\t\u0003\u0001C\u0005\u0003G\t!b\u00195fG.\u0004x.\u001b8u)\u0019\t9!!\n\u0002(!9\u0011qBA\u0010\u0001\u0004!\u0004bBA\n\u0003?\u0001\ra\u0018\u0005\b\u0003W\u0001A\u0011BA\u0017\u00035\u0019\u0007.Z2la>Lg\u000e^!mYR\u0011\u0011q\u0001\u0005\b\u0003c\u0001A\u0011BA\u001a\u0003]\u0019H/\u0019:u\u0007\",7m\u001b9pS:$XM\u001d+ie\u0016\fG\rF\u0001z\u0011\u001d\t9\u0004\u0001C\u0001\u0003[\t\u0001b\u001d5vi\u0012|wO\\\u0004\u000b\u0003w\u0011\u0011\u0011!E\u0001\u0005\u0005u\u0012aE&j]\u0016\u001c\u0018n]\"iK\u000e\\\u0007o\\5oi\u0016\u0014\bc\u0001\u0010\u0002@\u0019I\u0011AAA\u0001\u0012\u0003\u0011\u0011\u0011I\n\u0004\u0003\u007fi\u0001b\u0002%\u0002@\u0011\u0005\u0011Q\t\u000b\u0003\u0003{A!\"!\u0013\u0002@E\u0005I\u0011AA&\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%iU\u0011\u0011Q\n\u0016\u0004\u0003\u0006=3FAA)!\u0011\t\u0019&!\u0018\u000e\u0005\u0005U#\u0002BA,\u00033\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005ms\"\u0001\u0006b]:|G/\u0019;j_:LA!a\u0018\u0002V\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3")
public class KinesisCheckpointer
implements Logging {
    private final KinesisReceiver<?> receiver;
    private final Duration checkpointInterval;
    private final String workerId;
    private final Clock clock;
    private final ConcurrentHashMap<String, IRecordProcessorCheckpointer> checkpointers;
    private final ConcurrentHashMap<String, String> lastCheckpointedSeqNums;
    private final RecurringTimer checkpointerThread;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static Clock $lessinit$greater$default$4() {
        return KinesisCheckpointer$.MODULE$.$lessinit$greater$default$4();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private ConcurrentHashMap<String, IRecordProcessorCheckpointer> checkpointers() {
        return this.checkpointers;
    }

    private ConcurrentHashMap<String, String> lastCheckpointedSeqNums() {
        return this.lastCheckpointedSeqNums;
    }

    private RecurringTimer checkpointerThread() {
        return this.checkpointerThread;
    }

    public void setCheckpointer(String shardId, IRecordProcessorCheckpointer checkpointer) {
        this.checkpointers().put(shardId, checkpointer);
    }

    public void removeCheckpointer(String shardId, IRecordProcessorCheckpointer checkpointer) {
        KinesisCheckpointer kinesisCheckpointer = this;
        synchronized (kinesisCheckpointer) {
            this.checkpointers().remove(shardId);
        }
        if (checkpointer != null) {
            try {
                KinesisRecordProcessor$.MODULE$.retryRandom((JFunction0.mcV.sp & Serializable & scala.Serializable)() -> checkpointer.checkpoint(), 4, 100);
            }
            catch (Throwable throwable) {
                Throwable throwable2 = throwable;
                Option option = NonFatal$.MODULE$.unapply(throwable2);
                if (!option.isEmpty()) {
                    Throwable e = (Throwable)option.get();
                    this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Exception:  WorkerId ").append($this.workerId).append(" encountered an exception while checkpointing").append(new StringBuilder(30).append("to finish reading a shard of ").append(shardId).append(".").toString()).toString(), e);
                    throw e;
                }
                throw throwable;
            }
        }
    }

    private void checkpoint(String shardId, IRecordProcessorCheckpointer checkpointer) {
        try {
            if (checkpointer != null) {
                this.receiver.getLatestSeqNumToCheckpoint(shardId).foreach((Function1 & Serializable & scala.Serializable)latestSeqNum -> {
                    Object object;
                    String lastSeqNum = this.lastCheckpointedSeqNums().get(shardId);
                    if (lastSeqNum == null || new StringOps(Predef$.MODULE$.augmentString(latestSeqNum)).$greater((Object)lastSeqNum)) {
                        KinesisRecordProcessor$.MODULE$.retryRandom((JFunction0.mcV.sp & Serializable & scala.Serializable)() -> checkpointer.checkpoint(latestSeqNum), 4, 100);
                        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(62).append("Checkpoint:  WorkerId ").append($this.workerId).append(" completed checkpoint at sequence number").append(new StringBuilder(14).append(" ").append((String)latestSeqNum).append(" for shardId ").append(shardId).toString()).toString());
                        object = this.lastCheckpointedSeqNums().put(shardId, (String)latestSeqNum);
                    } else {
                        object = BoxedUnit.UNIT;
                    }
                    return object;
                });
            } else {
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Checkpointing skipped for shardId ").append(shardId).append(". Checkpointer not set.").toString());
            }
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (!option.isEmpty()) {
                Throwable e = (Throwable)option.get();
                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Failed to checkpoint shardId ").append(shardId).append(" to DynamoDB.").toString(), e);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            throw throwable;
        }
    }

    private synchronized void checkpointAll() {
        try {
            Enumeration<String> shardIds = this.checkpointers().keys();
            while (shardIds.hasMoreElements()) {
                String shardId = shardIds.nextElement();
                this.checkpoint(shardId, this.checkpointers().get(shardId));
            }
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (!option.isEmpty()) {
                Throwable e = (Throwable)option.get();
                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to checkpoint to DynamoDB.", e);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            throw throwable;
        }
    }

    private RecurringTimer startCheckpointerThread() {
        long period = this.checkpointInterval.milliseconds();
        String threadName = new StringBuilder(30).append("Kinesis Checkpointer - Worker ").append(this.workerId).toString();
        RecurringTimer timer = new RecurringTimer(this.clock, period, (Function1)(JFunction1.mcVJ.sp & Serializable & scala.Serializable)x$1 -> this.checkpointAll(), threadName);
        timer.start();
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Started checkpointer thread: ").append(threadName).toString());
        return timer;
    }

    public void shutdown() {
        this.checkpointerThread().stop(false);
        this.checkpointers().clear();
        this.lastCheckpointedSeqNums().clear();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Successfully shutdown Kinesis Checkpointer.");
    }

    public KinesisCheckpointer(KinesisReceiver<?> receiver, Duration checkpointInterval, String workerId, Clock clock) {
        this.receiver = receiver;
        this.checkpointInterval = checkpointInterval;
        this.workerId = workerId;
        this.clock = clock;
        Logging.$init$((Logging)this);
        this.checkpointers = new ConcurrentHashMap();
        this.lastCheckpointedSeqNums = new ConcurrentHashMap();
        this.checkpointerThread = this.startCheckpointerThread();
    }
}

