/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.spark.internal.Logging;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kinesis.KinesisCheckpointer$;
import org.apache.spark.streaming.kinesis.KinesisReceiver;
import org.apache.spark.streaming.kinesis.KinesisRecordProcessor$;
import org.apache.spark.streaming.util.RecurringTimer;
import org.apache.spark.util.Clock;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005d!\u0002\f\u0018\u0001]\t\u0003\u0002\u0003\u0018\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0019\t\u0011\u0001\u0003!\u0011!Q\u0001\n\u0005C\u0001\"\u0012\u0001\u0003\u0002\u0003\u0006IA\u0012\u0005\t#\u0002\u0011\t\u0011)A\u0005%\")\u0001\f\u0001C\u00013\"91\r\u0001b\u0001\n\u0013!\u0007BB?\u0001A\u0003%Q\rC\u0004\u007f\u0001\t\u0007I\u0011B@\t\u0011\u0005\r\u0001\u0001)A\u0005\u0003\u0003A\u0011\"!\u0002\u0001\u0005\u0004%I!a\u0002\t\u0011\u0005M\u0001\u0001)A\u0005\u0003\u0013Aq!!\u0006\u0001\t\u0003\t9\u0002C\u0004\u0002(\u0001!\t!!\u000b\t\u000f\u0005=\u0002\u0001\"\u0003\u00022!9\u0011q\u0007\u0001\u0005\n\u0005e\u0002bBA\u001e\u0001\u0011%\u0011Q\b\u0005\b\u0003\u007f\u0001A\u0011AA\u001d\u000f)\t\teFA\u0001\u0012\u00039\u00121\t\u0004\n-]\t\t\u0011#\u0001\u0018\u0003\u000bBa\u0001W\n\u0005\u0002\u0005\u001d\u0003\"CA%'E\u0005I\u0011AA&\u0005MY\u0015N\\3tSN\u001c\u0005.Z2la>Lg\u000e^3s\u0015\tA\u0012$A\u0004lS:,7/[:\u000b\u0005iY\u0012!C:ue\u0016\fW.\u001b8h\u0015\taR$A\u0003ta\u0006\u00148N\u0003\u0002\u001f?\u00051\u0011\r]1dQ\u0016T\u0011\u0001I\u0001\u0004_J<7c\u0001\u0001#QA\u00111EJ\u0007\u0002I)\tQ%A\u0003tG\u0006d\u0017-\u0003\u0002(I\t1\u0011I\\=SK\u001a\u0004\"!\u000b\u0017\u000e\u0003)R!aK\u000e\u0002\u0011%tG/\u001a:oC2L!!\f\u0016\u0003\u000f1{wmZ5oO\u0006A!/Z2fSZ,'o\u0001\u00011\u0005E:\u0004c\u0001\u001a4k5\tq#\u0003\u00025/\ty1*\u001b8fg&\u001c(+Z2fSZ,'\u000f\u0005\u00027o1\u0001A!\u0003\u001d\u0002\u0003\u0003\u0005\tQ!\u0001:\u0005\ryF%M\t\u0003uu\u0002\"aI\u001e\n\u0005q\"#a\u0002(pi\"Lgn\u001a\t\u0003GyJ!a\u0010\u0013\u0003\u0007\u0005s\u00170\u0001\ndQ\u0016\u001c7\u000e]8j]RLe\u000e^3sm\u0006d\u0007C\u0001\"D\u001b\u0005I\u0012B\u0001#\u001a\u0005!!UO]1uS>t\u0017\u0001C<pe.,'/\u00133\u0011\u0005\u001dseB\u0001%M!\tIE%D\u0001K\u0015\tYu&\u0001\u0004=e>|GOP\u0005\u0003\u001b\u0012\na\u0001\u0015:fI\u00164\u0017BA(Q\u0005\u0019\u0019FO]5oO*\u0011Q\nJ\u0001\u0006G2|7m\u001b\t\u0003'Zk\u0011\u0001\u0016\u0006\u0003+n\tA!\u001e;jY&\u0011q\u000b\u0016\u0002\u0006\u00072|7m[\u0001\u0007y%t\u0017\u000e\u001e \u0015\u000bi[\u0006-\u00192\u0011\u0005I\u0002\u0001\"\u0002\u0018\u0006\u0001\u0004a\u0006GA/`!\r\u00114G\u0018\t\u0003m}#\u0011\u0002O.\u0002\u0002\u0003\u0005)\u0011A\u001d\t\u000b\u0001+\u0001\u0019A!\t\u000b\u0015+\u0001\u0019\u0001$\t\u000fE+\u0001\u0013!a\u0001%\u0006i1\r[3dWB|\u0017N\u001c;feN,\u0012!\u001a\t\u0005M24e.D\u0001h\u0015\tA\u0017.\u0001\u0006d_:\u001cWO\u001d:f]RT!!\u00166\u000b\u0003-\fAA[1wC&\u0011Qn\u001a\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bCA8|\u001b\u0005\u0001(BA9s\u0003)Ig\u000e^3sM\u0006\u001cWm\u001d\u0006\u0003gR\fQb\u00197jK:$H.\u001b2sCJL(B\u0001\rv\u0015\t1x/\u0001\u0005tKJ4\u0018nY3t\u0015\tA\u00180A\u0005b[\u0006TxN\\1xg*\t!0A\u0002d_6L!\u0001 9\u00039%\u0013VmY8sIB\u0013xnY3tg>\u00148\t[3dWB|\u0017N\u001c;fe\u0006q1\r[3dWB|\u0017N\u001c;feN\u0004\u0013a\u00067bgR\u001c\u0005.Z2la>Lg\u000e^3e'\u0016\fh*^7t+\t\t\t\u0001\u0005\u0003gY\u001a3\u0015\u0001\u00077bgR\u001c\u0005.Z2la>Lg\u000e^3e'\u0016\fh*^7tA\u0005\u00112\r[3dWB|\u0017N\u001c;feRC'/Z1e+\t\tI\u0001\u0005\u0003\u0002\f\u0005=QBAA\u0007\u0015\t)\u0016$\u0003\u0003\u0002\u0012\u00055!A\u0004*fGV\u0014(/\u001b8h)&lWM]\u0001\u0014G\",7m\u001b9pS:$XM\u001d+ie\u0016\fG\rI\u0001\u0010g\u0016$8\t[3dWB|\u0017N\u001c;feR1\u0011\u0011DA\u0010\u0003G\u00012aIA\u000e\u0013\r\ti\u0002\n\u0002\u0005+:LG\u000f\u0003\u0004\u0002\"1\u0001\rAR\u0001\bg\"\f'\u000fZ%e\u0011\u0019\t)\u0003\u0004a\u0001]\u0006a1\r[3dWB|\u0017N\u001c;fe\u0006\u0011\"/Z7pm\u0016\u001c\u0005.Z2la>Lg\u000e^3s)\u0019\tI\"a\u000b\u0002.!1\u0011\u0011E\u0007A\u0002\u0019Ca!!\n\u000e\u0001\u0004q\u0017AC2iK\u000e\\\u0007o\\5oiR1\u0011\u0011DA\u001a\u0003kAa!!\t\u000f\u0001\u00041\u0005BBA\u0013\u001d\u0001\u0007a.A\u0007dQ\u0016\u001c7\u000e]8j]R\fE\u000e\u001c\u000b\u0003\u00033\tqc\u001d;beR\u001c\u0005.Z2la>Lg\u000e^3s)\"\u0014X-\u00193\u0015\u0005\u0005%\u0011\u0001C:ikR$wn\u001e8\u0002'-Kg.Z:jg\u000eCWmY6q_&tG/\u001a:\u0011\u0005I\u001a2CA\n#)\t\t\u0019%A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H\u0005N\u000b\u0003\u0003\u001bR3AUA(W\t\t\t\u0006\u0005\u0003\u0002T\u0005uSBAA+\u0015\u0011\t9&!\u0017\u0002\u0013Ut7\r[3dW\u0016$'bAA.I\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005}\u0013Q\u000b\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007")
public class KinesisCheckpointer
implements Logging {
    private final KinesisReceiver<?> receiver;
    private final Duration checkpointInterval;
    private final String workerId;
    private final Clock clock;
    private final ConcurrentHashMap<String, IRecordProcessorCheckpointer> checkpointers;
    private final ConcurrentHashMap<String, String> lastCheckpointedSeqNums;
    private final RecurringTimer checkpointerThread;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static Clock $lessinit$greater$default$4() {
        return KinesisCheckpointer$.MODULE$.$lessinit$greater$default$4();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private ConcurrentHashMap<String, IRecordProcessorCheckpointer> checkpointers() {
        return this.checkpointers;
    }

    private ConcurrentHashMap<String, String> lastCheckpointedSeqNums() {
        return this.lastCheckpointedSeqNums;
    }

    private RecurringTimer checkpointerThread() {
        return this.checkpointerThread;
    }

    public void setCheckpointer(String shardId, IRecordProcessorCheckpointer checkpointer) {
        this.checkpointers().put(shardId, checkpointer);
    }

    public void removeCheckpointer(String shardId, IRecordProcessorCheckpointer checkpointer) {
        KinesisCheckpointer kinesisCheckpointer = this;
        synchronized (kinesisCheckpointer) {
            this.checkpointers().remove(shardId);
        }
        if (checkpointer != null) {
            try {
                KinesisRecordProcessor$.MODULE$.retryRandom((JFunction0.mcV.sp & Serializable & scala.Serializable)() -> checkpointer.checkpoint(), 4, 100);
            }
            catch (Throwable throwable) {
                Throwable throwable2 = throwable;
                Option option = NonFatal$.MODULE$.unapply(throwable2);
                if (!option.isEmpty()) {
                    Throwable e = (Throwable)option.get();
                    this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(96).append("Exception:  WorkerId ").append($this.workerId).append(" encountered an exception while checkpointing").append("to finish reading a shard of ").append(shardId).append(".").toString(), e);
                    throw e;
                }
                throw throwable;
            }
        }
    }

    private void checkpoint(String shardId, IRecordProcessorCheckpointer checkpointer) {
        try {
            if (checkpointer != null) {
                this.receiver.getLatestSeqNumToCheckpoint(shardId).foreach((Function1 & Serializable & scala.Serializable)latestSeqNum -> {
                    Object object;
                    String lastSeqNum = this.lastCheckpointedSeqNums().get(shardId);
                    if (lastSeqNum == null || new StringOps(Predef$.MODULE$.augmentString(latestSeqNum)).$greater((Object)lastSeqNum)) {
                        KinesisRecordProcessor$.MODULE$.retryRandom((JFunction0.mcV.sp & Serializable & scala.Serializable)() -> checkpointer.checkpoint(latestSeqNum), 4, 100);
                        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(76).append("Checkpoint:  WorkerId ").append($this.workerId).append(" completed checkpoint at sequence number").append(" ").append((String)latestSeqNum).append(" for shardId ").append(shardId).toString());
                        object = this.lastCheckpointedSeqNums().put(shardId, (String)latestSeqNum);
                    } else {
                        object = BoxedUnit.UNIT;
                    }
                    return object;
                });
            } else {
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Checkpointing skipped for shardId ").append(shardId).append(". Checkpointer not set.").toString());
            }
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (!option.isEmpty()) {
                Throwable e = (Throwable)option.get();
                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Failed to checkpoint shardId ").append(shardId).append(" to DynamoDB.").toString(), e);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            throw throwable;
        }
    }

    private synchronized void checkpointAll() {
        try {
            Enumeration<String> shardIds = this.checkpointers().keys();
            while (shardIds.hasMoreElements()) {
                String shardId = shardIds.nextElement();
                this.checkpoint(shardId, this.checkpointers().get(shardId));
            }
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (!option.isEmpty()) {
                Throwable e = (Throwable)option.get();
                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to checkpoint to DynamoDB.", e);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            throw throwable;
        }
    }

    private RecurringTimer startCheckpointerThread() {
        long period = this.checkpointInterval.milliseconds();
        String threadName = new StringBuilder(30).append("Kinesis Checkpointer - Worker ").append(this.workerId).toString();
        RecurringTimer timer = new RecurringTimer(this.clock, period, (Function1)(JFunction1.mcVJ.sp & Serializable & scala.Serializable)x$1 -> this.checkpointAll(), threadName);
        timer.start();
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Started checkpointer thread: ").append(threadName).toString());
        return timer;
    }

    public void shutdown() {
        this.checkpointerThread().stop(false);
        this.checkpointers().clear();
        this.lastCheckpointedSeqNums().clear();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Successfully shutdown Kinesis Checkpointer.");
    }

    public KinesisCheckpointer(KinesisReceiver<?> receiver, Duration checkpointInterval, String workerId, Clock clock) {
        this.receiver = receiver;
        this.checkpointInterval = checkpointInterval;
        this.workerId = workerId;
        this.clock = clock;
        Logging.$init$((Logging)this);
        this.checkpointers = new ConcurrentHashMap();
        this.lastCheckpointedSeqNums = new ConcurrentHashMap();
        this.checkpointerThread = this.startCheckpointerThread();
    }
}

