package org.apache.pekko.stream.connectors.kinesisfirehose.scaladsl;

import java.io.Serializable;
import org.apache.pekko.NotUsed;
import org.apache.pekko.dispatch.ExecutionContexts$;
import org.apache.pekko.stream.ThrottleMode$Shaping$;
import org.apache.pekko.stream.connectors.kinesisfirehose.KinesisFirehoseErrors$FailurePublishingRecords$;
import org.apache.pekko.stream.connectors.kinesisfirehose.KinesisFirehoseFlowSettings;
import org.apache.pekko.stream.connectors.kinesisfirehose.KinesisFirehoseFlowSettings$;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Flow$;
import org.apache.pekko.util.FutureConverters$;
import org.apache.pekko.util.FutureConverters$CompletionStageOps$;
import org.apache.pekko.util.ccompat.package$JavaConverters$;
import scala.Int$;
import scala.Predef$;
import scala.collection.immutable.Queue;
import scala.collection.immutable.Queue$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.ScalaRunTime$;
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
import software.amazon.awssdk.services.firehose.model.Record;

/* compiled from: KinesisFirehoseFlow.scala */
/* loaded from: input_file:org/apache/pekko/stream/connectors/kinesisfirehose/scaladsl/KinesisFirehoseFlow$.class */
public final class KinesisFirehoseFlow$ implements Serializable {
    public static final KinesisFirehoseFlow$ MODULE$ = new KinesisFirehoseFlow$();

    private KinesisFirehoseFlow$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(KinesisFirehoseFlow$.class);
    }

    public Flow<Record, PutRecordBatchResponseEntry, NotUsed> apply(String str, KinesisFirehoseFlowSettings kinesisFirehoseFlowSettings, FirehoseAsyncClient firehoseAsyncClient) {
        return Flow$.MODULE$.apply().throttle(kinesisFirehoseFlowSettings.maxRecordsPerSecond(), new package.DurationInt(package$.MODULE$.DurationInt(1)).second(), kinesisFirehoseFlowSettings.maxRecordsPerSecond(), ThrottleMode$Shaping$.MODULE$).throttle(kinesisFirehoseFlowSettings.maxBytesPerSecond(), new package.DurationInt(package$.MODULE$.DurationInt(1)).second(), kinesisFirehoseFlowSettings.maxBytesPerSecond(), record -> {
            return getByteSize(record);
        }, ThrottleMode$Shaping$.MODULE$).batch(Int$.MODULE$.int2long(kinesisFirehoseFlowSettings.maxBatchSize()), record2 -> {
            return Queue$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Record[]{record2}));
        }, (queue, record3) -> {
            return (Queue) queue.$colon$plus(record3);
        }).mapAsync(kinesisFirehoseFlowSettings.parallelism(), queue2 -> {
            return FutureConverters$CompletionStageOps$.MODULE$.asScala$extension(FutureConverters$.MODULE$.CompletionStageOps(firehoseAsyncClient.putRecordBatch((PutRecordBatchRequest) PutRecordBatchRequest.builder().deliveryStreamName(str).records(package$JavaConverters$.MODULE$.IterableHasAsJava(queue2).asJavaCollection()).build()))).transform(putRecordBatchResponse -> {
                return (PutRecordBatchResponse) Predef$.MODULE$.identity(putRecordBatchResponse);
            }, th -> {
                return KinesisFirehoseErrors$FailurePublishingRecords$.MODULE$.apply(th);
            }, ExecutionContexts$.MODULE$.parasitic());
        }).mapConcat(putRecordBatchResponse -> {
            return package$JavaConverters$.MODULE$.ListHasAsScala(putRecordBatchResponse.requestResponses()).asScala().toIndexedSeq();
        });
    }

    public KinesisFirehoseFlowSettings apply$default$2() {
        return KinesisFirehoseFlowSettings$.MODULE$.Defaults();
    }

    private int getByteSize(Record record) {
        return record.data().asByteBuffer().position();
    }
}
