package com.github.j5ik2o.ak.kcl.stage;

import akka.event.LoggingAdapter;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import akka.stream.stage.TimerGraphStageLogic;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.github.j5ik2o.ak.kcl.stage.KCLSourceStage;
import scala.MatchError;
import scala.Option;
import scala.Tuple2;
import scala.collection.immutable.Queue;
import scala.collection.immutable.Queue$;
import scala.collection.immutable.Vector;
import scala.concurrent.Promise;
import scala.runtime.BoxedUnit;
import scala.util.control.NonFatal$;

/* compiled from: KCLSourceStage.scala */
/* loaded from: input_file:com/github/j5ik2o/ak/kcl/stage/KCLSourceStage$$anon$1.class */
public final class KCLSourceStage$$anon$1 extends TimerGraphStageLogic implements StageLogging {
    private Worker worker;
    private String shardId;
    private Queue<KCLSourceStage.RecordSet> buffer;
    private final AsyncCallback<InitializationInput> onInitializeCallback;
    private final AsyncCallback<KCLSourceStage.RecordSet> onRecordSetCallback;
    private final AsyncCallback<ShutdownInput> onShutdownCallback;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final /* synthetic */ KCLSourceStage $outer;
    private final Promise workerPromise$1;

    public Class<?> logSource() {
        return StageLogging.logSource$(this);
    }

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    private Worker worker() {
        return this.worker;
    }

    private void worker_$eq(Worker worker) {
        this.worker = worker;
    }

    private String shardId() {
        return this.shardId;
    }

    private void shardId_$eq(String str) {
        this.shardId = str;
    }

    private Queue<KCLSourceStage.RecordSet> buffer() {
        return this.buffer;
    }

    private void buffer_$eq(Queue<KCLSourceStage.RecordSet> queue) {
        this.buffer = queue;
    }

    private AsyncCallback<InitializationInput> onInitializeCallback() {
        return this.onInitializeCallback;
    }

    private AsyncCallback<KCLSourceStage.RecordSet> onRecordSetCallback() {
        return this.onRecordSetCallback;
    }

    private AsyncCallback<ShutdownInput> onShutdownCallback() {
        return this.onShutdownCallback;
    }

    public void com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$anon$$tryToProduce() {
        if (buffer().nonEmpty() && isAvailable(this.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$out())) {
            Tuple2 dequeue = buffer().dequeue();
            if (dequeue == null) {
                throw new MatchError(dequeue);
            }
            Tuple2 tuple2 = new Tuple2((KCLSourceStage.RecordSet) dequeue._1(), (Queue) dequeue._2());
            KCLSourceStage.RecordSet recordSet = (KCLSourceStage.RecordSet) tuple2._1();
            buffer_$eq((Queue) tuple2._2());
            Vector vector = (Vector) recordSet.records().map(record -> {
                return new CommittableRecord(recordSet.shardId(), recordSet.extendedSequenceNumber(), recordSet.millisBehindLatest(), record, recordSet.recordProcessor(), recordSet.recordProcessorCheckPointer());
            });
            emitMultiple(this.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$out(), vector);
            log().debug(new StringBuilder(38).append("tryToProduce: emitMultiple: records = ").append(vector).toString());
        }
    }

    public void onTimer(Object obj) {
        BoxedUnit boxedUnit;
        if (!"check-worker-shutdown".equals(obj)) {
            throw new IllegalStateException(new StringBuilder(29).append("Invalid timerKey: timerKey = ").append(obj.toString()).toString());
        }
        if (worker().hasGracefulShutdownStarted() && isAvailable(this.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$out())) {
            log().warning(new StringBuilder(48).append("onTimer(").append(obj).append("): failStage: worker unexpected shutdown").toString());
            failStage(WorkerUnexpectedShutdown$.MODULE$);
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public void preStart() {
        try {
            worker_$eq((Worker) this.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$workerF.apply(onInitializeCallback(), onRecordSetCallback(), onShutdownCallback()));
            log().info(new StringBuilder(34).append("Created Worker instance: worker = ").append(worker().getApplicationName()).toString());
            scheduleAtFixedRate("check-worker-shutdown", this.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$checkWorkerPeriodicity, this.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$checkWorkerPeriodicity);
            this.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$ec.execute(worker());
            this.workerPromise$1.success(worker());
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    Throwable th2 = (Throwable) unapply.get();
                    this.workerPromise$1.failure(th2);
                    throw th2;
                }
            }
            throw th;
        }
    }

    public void postStop() {
        buffer_$eq(Queue$.MODULE$.empty());
        worker().startGracefulShutdown().get();
        log().info(new StringBuilder(36).append("Shut down Worker instance: worker = ").append(worker().getApplicationName()).toString());
    }

    public static final /* synthetic */ void $anonfun$onInitializeCallback$1(KCLSourceStage$$anon$1 kCLSourceStage$$anon$1, InitializationInput initializationInput) {
        kCLSourceStage$$anon$1.log().debug(new StringBuilder(111).append("onInitializeCallback: initializationInput = shardId:").append(initializationInput.getShardId()).append(", extendedSequenceNumber:").append(initializationInput.getExtendedSequenceNumber()).append(", pendingCheckpointSequenceNumber:").append(initializationInput.getPendingCheckpointSequenceNumber()).toString());
        kCLSourceStage$$anon$1.shardId_$eq(initializationInput.getShardId());
    }

    public static final /* synthetic */ void $anonfun$onRecordSetCallback$1(KCLSourceStage$$anon$1 kCLSourceStage$$anon$1, KCLSourceStage.RecordSet recordSet) {
        kCLSourceStage$$anon$1.log().debug(new StringBuilder(33).append("onRecordSetCallback: recordSet = ").append(recordSet).toString());
        kCLSourceStage$$anon$1.buffer_$eq(kCLSourceStage$$anon$1.buffer().enqueue(recordSet));
        kCLSourceStage$$anon$1.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$anon$$tryToProduce();
    }

    public static final /* synthetic */ void $anonfun$onShutdownCallback$1(KCLSourceStage$$anon$1 kCLSourceStage$$anon$1, ShutdownInput shutdownInput) {
        Throwable th;
        kCLSourceStage$$anon$1.log().debug(new StringBuilder(61).append("onShutdownCallback: shutdownInput = shardId:").append(kCLSourceStage$$anon$1.shardId()).append(", shutdownReason:").append(shutdownInput.getShutdownReason()).toString());
        ShutdownReason shutdownReason = shutdownInput.getShutdownReason();
        ShutdownReason shutdownReason2 = ShutdownReason.TERMINATE;
        if (shutdownReason == null) {
            if (shutdownReason2 != null) {
                return;
            }
        } else if (!shutdownReason.equals(shutdownReason2)) {
            return;
        }
        try {
            shutdownInput.getCheckpointer().checkpoint();
            kCLSourceStage$$anon$1.log().debug(new StringBuilder(53).append("onShutdownCallback: checkpoint is success! shardId = ").append(kCLSourceStage$$anon$1.shardId()).toString());
        } catch (Throwable th2) {
            if (th2 != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th2);
                if (!unapply.isEmpty() && (th = (Throwable) unapply.get()) != null) {
                    kCLSourceStage$$anon$1.log().error("onShutdownCallback: checkpoint is failure!!! shardId = $shardId", th);
                    kCLSourceStage$$anon$1.fail(kCLSourceStage$$anon$1.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$out(), th);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
            throw th2;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KCLSourceStage$$anon$1(KCLSourceStage kCLSourceStage, Promise promise) {
        super(kCLSourceStage.m3shape());
        if (kCLSourceStage == null) {
            throw null;
        }
        this.$outer = kCLSourceStage;
        this.workerPromise$1 = promise;
        StageLogging.$init$(this);
        this.buffer = Queue$.MODULE$.empty();
        this.onInitializeCallback = getAsyncCallback(initializationInput -> {
            $anonfun$onInitializeCallback$1(this, initializationInput);
            return BoxedUnit.UNIT;
        });
        this.onRecordSetCallback = getAsyncCallback(recordSet -> {
            $anonfun$onRecordSetCallback$1(this, recordSet);
            return BoxedUnit.UNIT;
        });
        this.onShutdownCallback = getAsyncCallback(shutdownInput -> {
            $anonfun$onShutdownCallback$1(this, shutdownInput);
            return BoxedUnit.UNIT;
        });
        setHandler(kCLSourceStage.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$out(), new OutHandler(this) { // from class: com.github.j5ik2o.ak.kcl.stage.KCLSourceStage$$anon$1$$anon$2
            private final /* synthetic */ KCLSourceStage$$anon$1 $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                this.$outer.com$github$j5ik2o$ak$kcl$stage$KCLSourceStage$$anon$$tryToProduce();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
