/*
 * Decompiled with CFR 0.152.
 */
package tech.figure.kafka.coroutines.retry.flow;

import java.time.OffsetDateTime;
import java.util.Comparator;
import java.util.Map;
import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.comparisons.ComparisonsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.coroutines.jvm.internal.Boxing;
import kotlin.coroutines.jvm.internal.ContinuationImpl;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.ranges.IntRange;
import kotlinx.coroutines.flow.Flow;
import kotlinx.coroutines.flow.FlowKt;
import mu.KLogger;
import mu.KotlinLogging;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import tech.figure.coroutines.retry.flow.FlowRetry;
import tech.figure.coroutines.retry.store.RetryRecord;
import tech.figure.coroutines.retry.store.RetryRecordStore;
import tech.figure.kafka.coroutines.retry.ByteArrayExtensionsKt;
import tech.figure.kafka.coroutines.retry.flow.KafkaFlowRetry;

/*
 * Illegal identifiers - consider using --renameillegalidents true
 */
@Metadata(mv={1, 6, 0}, k=1, xi=48, d1={"\u0000p\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010$\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0003\n\u0002\b\u0007\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0010\u0012\n\u0000\b\u0016\u0018\u0000*\u0004\b\u0000\u0010\u0001*\u0004\b\u0001\u0010\u00022\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u0002H\u0001\u0012\u0004\u0012\u0002H\u00020\u00040\u0003Bh\u0012:\u0010\u0005\u001a6\u0012\u0004\u0012\u00020\u0007\u0012,\u0012*\b\u0001\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u0004\u0012\n\u0012\b\u0012\u0004\u0012\u00020\n0\t\u0012\u0006\u0012\u0004\u0018\u00010\u000b0\b0\u0006\u0012\u0018\u0010\f\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u00040\r\u0012\b\b\u0002\u0010\u000e\u001a\u00020\u000f\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010\u0010J\u0011\u0010\u0013\u001a\u00020\u0014H\u0096@\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010\u0015J3\u0010\u0016\u001a\u00020\n2\u0018\u0010\u0017\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u00040\u00182\u0006\u0010\u0019\u001a\u00020\u001aH\u0096@\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010\u001bJ+\u0010\u001c\u001a\u00020\n2\u0018\u0010\u0017\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u00040\u0018H\u0096@\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010\u001dJ-\u0010\u001e\u001a\u00020\n2\u0012\u0010\u0017\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u00042\u0006\u0010\u001f\u001a\u00020\u000fH\u0096@\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010 JA\u0010!\u001a\u001a\u0012\u0016\u0012\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u00040\u00180\"2\u0006\u0010#\u001a\u00020$2\u0006\u0010%\u001a\u00020&2\u0006\u0010'\u001a\u00020\u000fH\u0096@\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010(J-\u0010)\u001a\u00020\n2\u0012\u0010\u0017\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u00042\u0006\u0010\u0019\u001a\u00020\u001aH\u0096@\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010*J@\u0010+\u001a\u000e\u0012\u0004\u0012\u0002H\u0001\u0012\u0004\u0012\u0002H\u00020\u0004\"\u0004\b\u0002\u0010\u0001\"\u0004\b\u0003\u0010\u0002*\u000e\u0012\u0004\u0012\u0002H\u0001\u0012\u0004\u0012\u0002H\u00020\u00042\u0006\u0010,\u001a\u00020\u00072\u0006\u0010-\u001a\u00020.H\u0002R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004\u00a2\u0006\u0002\n\u0000RE\u0010\u0005\u001a6\u0012\u0004\u0012\u00020\u0007\u0012,\u0012*\b\u0001\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u0004\u0012\n\u0012\b\u0012\u0004\u0012\u00020\n0\t\u0012\u0006\u0012\u0004\u0018\u00010\u000b0\b0\u0006X\u0082\u0004\u00f8\u0001\u0000\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0011\u001a\u00020\u0012X\u0082\u0004\u00a2\u0006\u0002\n\u0000R \u0010\f\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u00040\rX\u0082\u0004\u00a2\u0006\u0002\n\u0000\u0082\u0002\u0004\n\u0002\b\u0019\u00a8\u0006/"}, d2={"Ltech/figure/kafka/coroutines/retry/flow/KafkaFlowRetry;", "K", "V", "Ltech/figure/coroutines/retry/flow/FlowRetry;", "Lorg/apache/kafka/clients/consumer/ConsumerRecord;", "handlers", "", "", "Lkotlin/Function2;", "Lkotlin/coroutines/Continuation;", "", "", "store", "Ltech/figure/coroutines/retry/store/RetryRecordStore;", "groupSize", "", "(Ljava/util/Map;Ltech/figure/coroutines/retry/store/RetryRecordStore;I)V", "log", "Lmu/KLogger;", "hasNext", "", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "onFailure", "item", "Ltech/figure/coroutines/retry/store/RetryRecord;", "e", "", "(Ltech/figure/coroutines/retry/store/RetryRecord;Ljava/lang/Throwable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "onSuccess", "(Ltech/figure/coroutines/retry/store/RetryRecord;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "process", "attempt", "(Lorg/apache/kafka/clients/consumer/ConsumerRecord;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;", "produceNext", "Lkotlinx/coroutines/flow/Flow;", "attemptRange", "Lkotlin/ranges/IntRange;", "olderThan", "Ljava/time/OffsetDateTime;", "limit", "(Lkotlin/ranges/IntRange;Ljava/time/OffsetDateTime;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;", "send", "(Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Throwable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "setHeader", "key", "value", "", "ft-coroutines-kafka-retry"})
public class KafkaFlowRetry<K, V>
implements FlowRetry<ConsumerRecord<K, V>> {
    @NotNull
    private final Map<String, Function2<ConsumerRecord<K, V>, Continuation<? super Unit>, Object>> handlers;
    @NotNull
    private final RetryRecordStore<ConsumerRecord<K, V>> store;
    private final int groupSize;
    @NotNull
    private final KLogger log;

    public KafkaFlowRetry(@NotNull Map<String, ? extends Function2<? super ConsumerRecord<K, V>, ? super Continuation<? super Unit>, ? extends Object>> handlers, @NotNull RetryRecordStore<ConsumerRecord<K, V>> store, int groupSize) {
        Intrinsics.checkNotNullParameter(handlers, (String)"handlers");
        Intrinsics.checkNotNullParameter(store, (String)"store");
        this.handlers = handlers;
        this.store = store;
        this.groupSize = groupSize;
        this.log = KotlinLogging.INSTANCE.logger((Function0)log.1.INSTANCE);
    }

    public /* synthetic */ KafkaFlowRetry(Map map, RetryRecordStore retryRecordStore, int n, int n2, DefaultConstructorMarker defaultConstructorMarker) {
        if ((n2 & 4) != 0) {
            n = 40;
        }
        this(map, retryRecordStore, n);
    }

    @Nullable
    public Object hasNext(@NotNull Continuation<? super Boolean> $completion) {
        return KafkaFlowRetry.hasNext$suspendImpl(this, $completion);
    }

    /*
     * Unable to fully structure code
     */
    static /* synthetic */ Object hasNext$suspendImpl(KafkaFlowRetry var0, Continuation var1_1) {
        if (!(var1_1 instanceof hasNext.1)) ** GOTO lbl-1000
        var3_2 = var1_1;
        if ((var3_2.label & -2147483648) != 0) {
            var3_2.label -= -2147483648;
        } else lbl-1000:
        // 2 sources

        {
            $continuation = new ContinuationImpl(var0, (Continuation<? super hasNext.1>)var1_1){
                /* synthetic */ Object result;
                final /* synthetic */ KafkaFlowRetry<K, V> this$0;
                int label;
                {
                    this.this$0 = this$0;
                    super($completion);
                }

                @Nullable
                public final Object invokeSuspend(@NotNull Object $result) {
                    this.result = $result;
                    this.label |= Integer.MIN_VALUE;
                    return KafkaFlowRetry.hasNext$suspendImpl(this.this$0, (Continuation)this);
                }
            };
        }
        $result = $continuation.result;
        var4_4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch ($continuation.label) {
            case 0: {
                ResultKt.throwOnFailure((Object)$result);
                $continuation.label = 1;
                v0 = this.store.isEmpty((Continuation)$continuation);
                if (v0 == var4_4) {
                    return var4_4;
                }
                ** GOTO lbl20
            }
            case 1: {
                ResultKt.throwOnFailure((Object)$result);
                v0 = $result;
lbl20:
                // 2 sources

                return Boxing.boxBoolean((boolean)((Boolean)v0 == false));
            }
        }
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    @Nullable
    public Object produceNext(@NotNull IntRange attemptRange, @NotNull OffsetDateTime olderThan, int limit, @NotNull Continuation<? super Flow<? extends RetryRecord<ConsumerRecord<K, V>>>> $completion) {
        return KafkaFlowRetry.produceNext$suspendImpl(this, attemptRange, olderThan, limit, $completion);
    }

    /*
     * Unable to fully structure code
     */
    static /* synthetic */ Object produceNext$suspendImpl(KafkaFlowRetry var0, IntRange var1_1, OffsetDateTime var2_2, int var3_3, Continuation var4_4) {
        if (!(var4_4 instanceof produceNext.1)) ** GOTO lbl-1000
        var8_5 = var4_4;
        if ((var8_5.label & -2147483648) != 0) {
            var8_5.label -= -2147483648;
        } else lbl-1000:
        // 2 sources

        {
            $continuation = new ContinuationImpl(var0, (Continuation<? super produceNext.1>)var4_4){
                /* synthetic */ Object result;
                final /* synthetic */ KafkaFlowRetry<K, V> this$0;
                int label;
                {
                    this.this$0 = this$0;
                    super($completion);
                }

                @Nullable
                public final Object invokeSuspend(@NotNull Object $result) {
                    this.result = $result;
                    this.label |= Integer.MIN_VALUE;
                    return KafkaFlowRetry.produceNext$suspendImpl(this.this$0, null, null, 0, (Continuation)this);
                }
            };
        }
        $result = $continuation.result;
        var9_7 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch ($continuation.label) {
            case 0: {
                ResultKt.throwOnFailure((Object)$result);
                $continuation.label = 1;
                v0 = this.store.select((IntRange)attemptRange, (OffsetDateTime)olderThan, (int)limit, (Continuation)$continuation);
                if (v0 == var9_7) {
                    return var9_7;
                }
                ** GOTO lbl20
            }
            case 1: {
                ResultKt.throwOnFailure((Object)$result);
                v0 = $result;
lbl20:
                // 2 sources

                $this$sortedByDescending$iv = (Iterable)v0;
                $i$f$sortedByDescending = false;
                return FlowKt.asFlow((Iterable)CollectionsKt.sortedWith((Iterable)$this$sortedByDescending$iv, (Comparator)new Comparator(){

                    public final int compare(T a, T b) {
                        RetryRecord it = (RetryRecord)b;
                        boolean bl = false;
                        Comparable comparable = it.getLastAttempted();
                        it = (RetryRecord)a;
                        Comparable comparable2 = comparable;
                        bl = false;
                        return ComparisonsKt.compareValues((Comparable)comparable2, (Comparable)it.getLastAttempted());
                    }
                }));
            }
        }
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    @Nullable
    public Object send(@NotNull ConsumerRecord<K, V> item, @NotNull Throwable e, @NotNull Continuation<? super Unit> $completion) {
        return KafkaFlowRetry.send$suspendImpl(this, item, e, $completion);
    }

    static /* synthetic */ Object send$suspendImpl(KafkaFlowRetry this_, ConsumerRecord item, Throwable e, Continuation $completion) {
        this_.log.debug((Function0)new Function0<Object>(item){
            final /* synthetic */ ConsumerRecord<K, V> $item;
            {
                this.$item = $item;
                super(0);
            }

            @Nullable
            public final Object invoke() {
                return "adding record to retry queue key:" + this.$item.key() + " source:" + this.$item.topic() + "-" + this.$item.partition();
            }
        });
        Object object = this_.store.putOne((Object)item, e, new Function1<RetryRecord<ConsumerRecord<K, V>>, Unit>(e){
            final /* synthetic */ Throwable $e;
            {
                this.$e = $e;
                super(1);
            }

            public final void invoke(@NotNull RetryRecord<ConsumerRecord<K, V>> $this$putOne) {
                Intrinsics.checkNotNullParameter($this$putOne, (String)"$this$putOne");
                $this$putOne.setAttempt(0);
                OffsetDateTime offsetDateTime = OffsetDateTime.now();
                Intrinsics.checkNotNullExpressionValue((Object)offsetDateTime, (String)"now()");
                $this$putOne.setLastAttempted(offsetDateTime);
                String string = this.$e.getLocalizedMessage();
                Intrinsics.checkNotNullExpressionValue((Object)string, (String)"e.localizedMessage");
                $this$putOne.setLastException(string);
            }
        }, $completion);
        if (object == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
            return object;
        }
        return Unit.INSTANCE;
    }

    @Nullable
    public Object onSuccess(@NotNull RetryRecord<ConsumerRecord<K, V>> item, @NotNull Continuation<? super Unit> $completion) {
        return KafkaFlowRetry.onSuccess$suspendImpl(this, item, $completion);
    }

    static /* synthetic */ Object onSuccess$suspendImpl(KafkaFlowRetry this_, RetryRecord item, Continuation $completion) {
        this_.log.debug((Function0)new Function0<Object>(item){
            final /* synthetic */ RetryRecord<ConsumerRecord<K, V>> $item;
            {
                this.$item = $item;
                super(0);
            }

            @Nullable
            public final Object invoke() {
                return "successful reprocess attempt:" + this.$item.getAttempt() + " key:" + ((ConsumerRecord)this.$item.getData()).key() + " source:" + ((ConsumerRecord)this.$item.getData()).topic() + "-" + ((ConsumerRecord)this.$item.getData()).partition();
            }
        });
        Object object = this_.store.remove(item.getData(), $completion);
        if (object == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
            return object;
        }
        return Unit.INSTANCE;
    }

    @Nullable
    public Object onFailure(@NotNull RetryRecord<ConsumerRecord<K, V>> item, @NotNull Throwable e, @NotNull Continuation<? super Unit> $completion) {
        return KafkaFlowRetry.onFailure$suspendImpl(this, item, e, $completion);
    }

    static /* synthetic */ Object onFailure$suspendImpl(KafkaFlowRetry this_, RetryRecord item, Throwable e, Continuation $completion) {
        this_.log.debug((Function0)new Function0<Object>(item){
            final /* synthetic */ RetryRecord<ConsumerRecord<K, V>> $item;
            {
                this.$item = $item;
                super(0);
            }

            @Nullable
            public final Object invoke() {
                return "failed reprocess attempt:" + this.$item.getAttempt() + " Error: " + this.$item.getLastException() + " key:" + ((ConsumerRecord)this.$item.getData()).key() + " source:" + ((ConsumerRecord)this.$item.getData()).topic() + "-" + ((ConsumerRecord)this.$item.getData()).partition();
            }
        });
        Object object = this_.store.putOne(item.getData(), e, new Function1<RetryRecord<ConsumerRecord<K, V>>, Unit>(e){
            final /* synthetic */ Throwable $e;
            {
                this.$e = $e;
                super(1);
            }

            public final void invoke(@NotNull RetryRecord<ConsumerRecord<K, V>> $this$putOne) {
                Intrinsics.checkNotNullParameter($this$putOne, (String)"$this$putOne");
                int cfr_ignored_0 = $this$putOne.getAttempt() + 1;
                OffsetDateTime offsetDateTime = OffsetDateTime.now();
                Intrinsics.checkNotNullExpressionValue((Object)offsetDateTime, (String)"now()");
                $this$putOne.setLastAttempted(offsetDateTime);
                String string = this.$e.getLocalizedMessage();
                Intrinsics.checkNotNullExpressionValue((Object)string, (String)"e.localizedMessage");
                $this$putOne.setLastException(string);
            }
        }, $completion);
        if (object == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
            return object;
        }
        return Unit.INSTANCE;
    }

    @Nullable
    public Object process(@NotNull ConsumerRecord<K, V> item, int attempt, @NotNull Continuation<? super Unit> $completion) {
        return KafkaFlowRetry.process$suspendImpl(this, item, attempt, $completion);
    }

    static /* synthetic */ Object process$suspendImpl(KafkaFlowRetry this_, ConsumerRecord item, int attempt, Continuation $completion) {
        String topic = item.topic();
        Function2 function2 = this_.handlers.get(topic);
        if (function2 == null) {
            throw new RuntimeException("topic '" + topic + "' not handled by this retry handler");
        }
        Function2 handler = function2;
        this_.log.debug((Function0)new Function0<Object>(item, attempt){
            final /* synthetic */ ConsumerRecord<K, V> $item;
            final /* synthetic */ int $attempt;
            {
                this.$item = $item;
                this.$attempt = $attempt;
                super(0);
            }

            @Nullable
            public final Object invoke() {
                return "processing key:" + this.$item.key() + " attempt:" + this.$attempt + " source:" + this.$item.topic() + "-" + this.$item.partition();
            }
        });
        byte[] byArray = ByteArrayExtensionsKt.toByteArray(attempt);
        Intrinsics.checkNotNullExpressionValue((Object)byArray, (String)"attempt.toByteArray()");
        Object object = handler.invoke(this_.setHeader(item, "x-kafka-retry-attempt", byArray), (Object)$completion);
        if (object == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
            return object;
        }
        return Unit.INSTANCE;
    }

    private final <K, V> ConsumerRecord<K, V> setHeader(ConsumerRecord<K, V> $this$setHeader, String key, byte[] value) {
        ConsumerRecord<K, V> consumerRecord;
        ConsumerRecord<K, V> $this$setHeader_u24lambda_u2d2 = consumerRecord = $this$setHeader;
        boolean bl = false;
        Headers headers = $this$setHeader_u24lambda_u2d2.headers();
        Intrinsics.checkNotNullExpressionValue((Object)headers, (String)"headers()");
        KafkaFlowRetry.setHeader$lambda-2$addOrUpdate(headers, (Header)new RecordHeader(key, value));
        return consumerRecord;
    }

    private static final Headers setHeader$lambda-2$addOrUpdate(Headers $this$setHeader_u24lambda_u2d2_u24addOrUpdate, Header header) {
        Object v0;
        block3: {
            Iterable iterable = (Iterable)$this$setHeader_u24lambda_u2d2_u24addOrUpdate;
            for (Object t : iterable) {
                Header it = (Header)t;
                boolean bl = false;
                if (!Intrinsics.areEqual((Object)it.key(), (Object)header.key())) continue;
                v0 = t;
                break block3;
            }
            v0 = null;
        }
        Header h = v0;
        if (h == null) {
            $this$setHeader_u24lambda_u2d2_u24addOrUpdate.add(header);
        } else {
            $this$setHeader_u24lambda_u2d2_u24addOrUpdate.remove(header.key());
            $this$setHeader_u24lambda_u2d2_u24addOrUpdate.add(header.key(), header.value());
        }
        return $this$setHeader_u24lambda_u2d2_u24addOrUpdate;
    }
}

