package akka.stream.alpakka.kinesis.scaladsl;

import akka.NotUsed;
import akka.dispatch.ExecutionContexts$;
import akka.stream.ActorAttributes$;
import akka.stream.Attributes$;
import akka.stream.alpakka.kinesis.CommittableRecord;
import akka.stream.alpakka.kinesis.CommittableRecord$;
import akka.stream.alpakka.kinesis.KinesisSchedulerCheckpointSettings;
import akka.stream.alpakka.kinesis.KinesisSchedulerSourceSettings;
import akka.stream.alpakka.kinesis.impl.KinesisSchedulerSourceStage;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.RunnableGraph;
import akka.stream.scaladsl.Sink;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SubFlow;
import scala.Function1;
import scala.Predef$;
import scala.collection.immutable.Seq;
import scala.concurrent.Future;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* compiled from: KinesisSchedulerSource.scala */
/* loaded from: input_file:akka/stream/alpakka/kinesis/scaladsl/KinesisSchedulerSource$.class */
public final class KinesisSchedulerSource$ {
    public static KinesisSchedulerSource$ MODULE$;
    private final Flow<Seq<CommittableRecord>, KinesisClientRecord, NotUsed> checkpointRecordBatch;
    private final int MAX_KINESIS_SHARDS;

    static {
        new KinesisSchedulerSource$();
    }

    public Source<CommittableRecord, Future<Scheduler>> apply(Function1<ShardRecordProcessorFactory, Scheduler> function1, KinesisSchedulerSourceSettings kinesisSchedulerSourceSettings) {
        return Source$.MODULE$.fromMaterializer((materializer, attributes) -> {
            return Source$.MODULE$.fromGraph(new KinesisSchedulerSourceStage(kinesisSchedulerSourceSettings, function1, materializer.executionContext()));
        }).mapMaterializedValue(future -> {
            return future.flatMap(future -> {
                return (Future) Predef$.MODULE$.identity(future);
            }, ExecutionContexts$.MODULE$.parasitic());
        });
    }

    public SubFlow<CommittableRecord, Future<Scheduler>, ?, RunnableGraph<Future<Scheduler>>> sharded(Function1<ShardRecordProcessorFactory, Scheduler> function1, KinesisSchedulerSourceSettings kinesisSchedulerSourceSettings) {
        return apply(function1, kinesisSchedulerSourceSettings).groupBy(MAX_KINESIS_SHARDS(), committableRecord -> {
            return committableRecord.processorData().shardId();
        });
    }

    public Flow<CommittableRecord, KinesisClientRecord, NotUsed> checkpointRecordsFlow(KinesisSchedulerCheckpointSettings kinesisSchedulerCheckpointSettings) {
        return (Flow) Flow$.MODULE$.apply().groupBy(MAX_KINESIS_SHARDS(), committableRecord -> {
            return committableRecord.processorData().shardId();
        }).groupedWithin(kinesisSchedulerCheckpointSettings.maxBatchSize(), kinesisSchedulerCheckpointSettings.maxBatchWait()).via(checkpointRecordBatch()).mergeSubstreams();
    }

    private Flow<Seq<CommittableRecord>, KinesisClientRecord, NotUsed> checkpointRecordBatch() {
        return this.checkpointRecordBatch;
    }

    public Sink<CommittableRecord, NotUsed> checkpointRecordsSink(KinesisSchedulerCheckpointSettings kinesisSchedulerCheckpointSettings) {
        return checkpointRecordsFlow(kinesisSchedulerCheckpointSettings).to(Sink$.MODULE$.ignore());
    }

    private int MAX_KINESIS_SHARDS() {
        return this.MAX_KINESIS_SHARDS;
    }

    private KinesisSchedulerSource$() {
        MODULE$ = this;
        this.checkpointRecordBatch = Flow$.MODULE$.apply().map(seq -> {
            ((CommittableRecord) seq.max(CommittableRecord$.MODULE$.orderBySequenceNumber())).tryToCheckpoint();
            return seq;
        }).mapConcat(seq2 -> {
            return (Seq) Predef$.MODULE$.identity(seq2);
        }).map(committableRecord -> {
            return committableRecord.record();
        }).addAttributes(Attributes$.MODULE$.apply(ActorAttributes$.MODULE$.IODispatcher()));
        this.MAX_KINESIS_SHARDS = 500;
    }
}
