package org.apache.pekko.stream.connectors.kinesis.scaladsl;

import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.ActorAttributes$;
import org.apache.pekko.stream.Attributes$;
import org.apache.pekko.stream.connectors.kinesis.CommittableRecord;
import org.apache.pekko.stream.connectors.kinesis.CommittableRecord$;
import org.apache.pekko.stream.connectors.kinesis.KinesisSchedulerCheckpointSettings;
import org.apache.pekko.stream.connectors.kinesis.KinesisSchedulerSourceSettings;
import org.apache.pekko.stream.connectors.kinesis.impl.KinesisSchedulerSourceStage;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Flow$;
import org.apache.pekko.stream.scaladsl.RunnableGraph;
import org.apache.pekko.stream.scaladsl.Sink;
import org.apache.pekko.stream.scaladsl.Sink$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.stream.scaladsl.SubFlow;
import scala.Function1;
import scala.Predef$;
import scala.collection.immutable.Seq;
import scala.concurrent.Future;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* compiled from: KinesisSchedulerSource.scala */
/* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisSchedulerSource$.class */
public final class KinesisSchedulerSource$ {
    public static final KinesisSchedulerSource$ MODULE$ = new KinesisSchedulerSource$();
    private static final Flow<Seq<CommittableRecord>, KinesisClientRecord, NotUsed> checkpointRecordBatch = Flow$.MODULE$.apply().map(seq -> {
        ((CommittableRecord) seq.max(CommittableRecord$.MODULE$.orderBySequenceNumber())).tryToCheckpoint();
        return seq;
    }).mapConcat(seq2 -> {
        return (Seq) Predef$.MODULE$.identity(seq2);
    }).map(committableRecord -> {
        return committableRecord.record();
    }).addAttributes(Attributes$.MODULE$.apply(ActorAttributes$.MODULE$.IODispatcher()));
    private static final int MAX_KINESIS_SHARDS = 500;

    public Source<CommittableRecord, Future<Scheduler>> apply(Function1<ShardRecordProcessorFactory, Scheduler> function1, KinesisSchedulerSourceSettings kinesisSchedulerSourceSettings) {
        return Source$.MODULE$.fromGraph(new KinesisSchedulerSourceStage(kinesisSchedulerSourceSettings, function1));
    }

    public SubFlow<CommittableRecord, Future<Scheduler>, ?, RunnableGraph<Future<Scheduler>>> sharded(Function1<ShardRecordProcessorFactory, Scheduler> function1, KinesisSchedulerSourceSettings kinesisSchedulerSourceSettings) {
        return Source$.MODULE$.fromGraph(new KinesisSchedulerSourceStage(kinesisSchedulerSourceSettings, function1)).groupBy(MAX_KINESIS_SHARDS(), committableRecord -> {
            return committableRecord.processorData().shardId();
        });
    }

    public Flow<CommittableRecord, KinesisClientRecord, NotUsed> checkpointRecordsFlow(KinesisSchedulerCheckpointSettings kinesisSchedulerCheckpointSettings) {
        return (Flow) Flow$.MODULE$.apply().groupBy(MAX_KINESIS_SHARDS(), committableRecord -> {
            return committableRecord.processorData().shardId();
        }).groupedWithin(kinesisSchedulerCheckpointSettings.maxBatchSize(), kinesisSchedulerCheckpointSettings.maxBatchWait()).via(checkpointRecordBatch()).mergeSubstreams();
    }

    private Flow<Seq<CommittableRecord>, KinesisClientRecord, NotUsed> checkpointRecordBatch() {
        return checkpointRecordBatch;
    }

    public Sink<CommittableRecord, NotUsed> checkpointRecordsSink(KinesisSchedulerCheckpointSettings kinesisSchedulerCheckpointSettings) {
        return checkpointRecordsFlow(kinesisSchedulerCheckpointSettings).to(Sink$.MODULE$.ignore());
    }

    private int MAX_KINESIS_SHARDS() {
        return MAX_KINESIS_SHARDS;
    }

    private KinesisSchedulerSource$() {
    }
}
