package com.twitter.finatra.kafkastreams.flushing;

import com.twitter.finagle.stats.Gauge;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.util.Await$;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.Future$;
import com.twitter.util.Return;
import com.twitter.util.Throw;
import com.twitter.util.Try;
import java.util.concurrent.Semaphore;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple3;
import scala.collection.Iterable;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: AsyncFlushing.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]ea\u0002\u000b\u0016!\u0003\r\t\u0001\t\u0005\u0006o\u0001!\t\u0001\u000f\u0005\by\u0001\u0001\r\u0011\"\u0003>\u0011\u001dA\u0005\u00011A\u0005\n%C\u0011\u0002\u0014\u0001A\u0002\u0003\u0007I\u0011B'\t\u0013m\u0003\u0001\u0019!a\u0001\n\u0013a\u0006b\u00020\u0001\u0005\u0004%Ia\u0018\u0005\bS\u0002\u0011\r\u0011\"\u0003k\u0011\u0015\u0019\bA\"\u0005u\u0011\u0015A\bA\"\u0005z\u0011\u0015i\bA\"\u0005\u007f\u0011\u001d\t)\u0001\u0001C\t\u0003\u000fAq!a\u001b\u0001\t#\ti\u0007C\u0004\u0002x\u0001!\t\"!\u001f\t\u000f\u0005\r\u0005\u0001\"\u0005\u0002\u0006\"1\u00111\u0012\u0001\u0005BaBa!!$\u0001\t#A\u0004BBAH\u0001\u0011E\u0011\u0010\u0003\u0004\u0002\u0012\u0002!\t\u0005\u000f\u0005\u000e\u0003'\u0003\u0001\u0013aA\u0001\u0002\u0013%\u0001(!&\u0003\u001b\u0005\u001b\u0018P\\2GYV\u001c\b.\u001b8h\u0015\t1r#\u0001\u0005gYV\u001c\b.\u001b8h\u0015\tA\u0012$\u0001\u0007lC\u001a\\\u0017m\u001d;sK\u0006l7O\u0003\u0002\u001b7\u00059a-\u001b8biJ\f'B\u0001\u000f\u001e\u0003\u001d!x/\u001b;uKJT\u0011AH\u0001\u0004G>l7\u0001A\u000b\nC\u0005E\u0011\u0011FA!\u0003\u000f\u001aR\u0001\u0001\u0012)YQ\u0002\"a\t\u0014\u000e\u0003\u0011R\u0011!J\u0001\u0006g\u000e\fG.Y\u0005\u0003O\u0011\u0012a!\u00118z%\u00164\u0007CA\u0015+\u001b\u0005)\u0012BA\u0016\u0016\u0005!1E.^:iS:<\u0007CA\u00173\u001b\u0005q#BA\u00181\u0003%a\u0017NZ3ds\u000edWM\u0003\u00022/\u0005YAO]1og\u001a|'/\\3s\u0013\t\u0019dF\u0001\u0004P]&s\u0017\u000e\u001e\t\u0003[UJ!A\u000e\u0018\u0003\u000f=s7\t\\8tK\u00061A%\u001b8ji\u0012\"\u0012!\u000f\t\u0003GiJ!a\u000f\u0013\u0003\tUs\u0017\u000e^\u0001\u0013_V$8\u000f^1oI&twMR;ukJ,7/F\u0001?!\ry$)O\u0007\u0002\u0001*\u0011\u0011iG\u0001\u0005kRLG.\u0003\u0002D\u0001\n1a)\u001e;ve\u0016D#AA#\u0011\u0005\r2\u0015BA$%\u0005!1x\u000e\\1uS2,\u0017AF8viN$\u0018M\u001c3j]\u001e4U\u000f^;sKN|F%Z9\u0015\u0005eR\u0005bB&\u0004\u0003\u0003\u0005\rAP\u0001\u0004q\u0012\n\u0014\u0001D1ts:\u001cg)Y5mkJ,W#\u0001(\u0011\u0005=;fB\u0001)V\u001d\t\tF+D\u0001S\u0015\t\u0019v$\u0001\u0004=e>|GOP\u0005\u0002K%\u0011a\u000bJ\u0001\ba\u0006\u001c7.Y4f\u0013\tA\u0016LA\u0005UQJ|w/\u00192mK*\u0011a\u000b\n\u0015\u0003\t\u0015\u000b\u0001#Y:z]\u000e4\u0015-\u001b7ve\u0016|F%Z9\u0015\u0005ej\u0006bB&\u0006\u0003\u0003\u0005\rAT\u0001\u000bC\u0012$\u0007+\u001a:nSR\u001cX#\u00011\u0011\u0005\u0005<W\"\u00012\u000b\u0005\r$\u0017AC2p]\u000e,(O]3oi*\u0011\u0011)\u001a\u0006\u0002M\u0006!!.\u0019<b\u0013\tA'MA\u0005TK6\f\u0007\u000f[8sK\u00069r.\u001e;ti\u0006tG-\u001b8h\rV$XO]3t\u000f\u0006,x-Z\u000b\u0002WB\u0011A.]\u0007\u0002[*\u0011an\\\u0001\u0006gR\fGo\u001d\u0006\u0003an\tqAZ5oC\u001edW-\u0003\u0002s[\n)q)Y;hK\u0006i1\u000f^1ugJ+7-Z5wKJ,\u0012!\u001e\t\u0003YZL!a^7\u0003\u001bM#\u0018\r^:SK\u000e,\u0017N^3s\u0003qi\u0017\r_(viN$\u0018M\u001c3j]\u001e4U\u000f^;sKN\u0004VM\u001d+bg.,\u0012A\u001f\t\u0003GmL!\u0001 \u0013\u0003\u0007%sG/\u0001\u0007gYV\u001c\b\u000eV5nK>,H/F\u0001��!\ry\u0014\u0011A\u0005\u0004\u0003\u0007\u0001%\u0001\u0003#ve\u0006$\u0018n\u001c8\u0002\u0013\u0005$GMR;ukJ,GcB\u001d\u0002\n\u0005\r\u0012Q\u0006\u0005\b\u0003\u0017Y\u0001\u0019AA\u0007\u0003\rYW-\u001f\t\u0005\u0003\u001f\t\t\u0002\u0004\u0001\u0005\u000f\u0005M\u0001A1\u0001\u0002\u0016\t\u00111*M\t\u0005\u0003/\ti\u0002E\u0002$\u00033I1!a\u0007%\u0005\u001dqu\u000e\u001e5j]\u001e\u00042aIA\u0010\u0013\r\t\t\u0003\n\u0002\u0004\u0003:L\bbBA\u0013\u0017\u0001\u0007\u0011qE\u0001\u0006m\u0006dW/\u001a\t\u0005\u0003\u001f\tI\u0003B\u0004\u0002,\u0001\u0011\r!!\u0006\u0003\u0005Y\u000b\u0004bBA\u0018\u0017\u0001\u0007\u0011\u0011G\u0001\u0007MV$XO]3\u0011\t}\u0012\u00151\u0007\t\u0006\u001f\u0006U\u0012\u0011H\u0005\u0004\u0003oI&\u0001C%uKJ\f'\r\\3\u0011\u0013\r\nY$a\u0010\u0002F\u0005-\u0013bAA\u001fI\t1A+\u001e9mKN\u0002B!a\u0004\u0002B\u00119\u00111\t\u0001C\u0002\u0005U!AA&3!\u0011\ty!a\u0012\u0005\u000f\u0005%\u0003A1\u0001\u0002\u0016\t\u0011aK\r\t\u0005\u0003\u001b\n9'\u0004\u0002\u0002P)!\u0011\u0011KA*\u0003%Ig\u000e^3s]\u0006d7O\u0003\u0003\u0002V\u0005]\u0013!\u00039s_\u000e,7o]8s\u0015\u0011\tI&a\u0017\u0002\u000fM$(/Z1ng*!\u0011QLA0\u0003\u0015Y\u0017MZ6b\u0015\u0011\t\t'a\u0019\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\t)'A\u0002pe\u001eLA!!\u001b\u0002P\t1\u0002K]8dKN\u001cxN\u001d*fG>\u0014HmQ8oi\u0016DH/A\bp]\u001a+H/\u001e:f'V\u001c7-Z:t)\u001dI\u0014qNA9\u0003gBq!a\u0003\r\u0001\u0004\ti\u0001C\u0004\u0002&1\u0001\r!a\n\t\u000f\u0005UD\u00021\u0001\u00024\u00051!/Z:vYR\fqb\u001c8GkR,(/\u001a$bS2,(/\u001a\u000b\bs\u0005m\u0014QPA@\u0011\u001d\tY!\u0004a\u0001\u0003\u001bAq!!\n\u000e\u0001\u0004\t9\u0003\u0003\u0004\u0002\u00026\u0001\rAT\u0001\u0002i\u0006y1/\u001a;Bgft7MR1jYV\u0014X\rF\u0002:\u0003\u000fCa!!#\u000f\u0001\u0004q\u0015!A3\u0002\u000f=tg\t\\;tQ\u0006\u0019B\u000f\u001b:po&3\u0017i]=oG\u001a\u000b\u0017\u000e\\;sK\u0006)b.^7PkR\u001cH/\u00198eS:<g)\u001e;ve\u0016\u001c\u0018aB8o\u00072|7/Z\u0001\u000egV\u0004XM\u001d\u0013p]\u000ecwn]3\n\u0007\u0005E%\u0006")
/* loaded from: input_file:com/twitter/finatra/kafkastreams/flushing/AsyncFlushing.class */
public interface AsyncFlushing<K1, V1, K2, V2> extends Flushing {
    void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$_setter_$com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits_$eq(Semaphore semaphore);

    void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$_setter_$com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge_$eq(Gauge gauge);

    /* synthetic */ void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$super$onClose();

    Future<BoxedUnit> com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures();

    void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures_$eq(Future<BoxedUnit> future);

    Throwable com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure();

    void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure_$eq(Throwable th);

    Semaphore com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits();

    Gauge com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge();

    StatsReceiver statsReceiver();

    int maxOutstandingFuturesPerTask();

    Duration flushTimeout();

    default void addFuture(K1 k1, V1 v1, Future<Iterable<Tuple3<K2, V2, ProcessorRecordContext>>> future) {
        throwIfAsyncFailure();
        com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits().acquire();
        com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures_$eq(com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures().join(future.respond(r8 -> {
            $anonfun$addFuture$1(this, k1, v1, r8);
            return BoxedUnit.UNIT;
        })).unit());
    }

    default void onFutureSuccess(K1 k1, V1 v1, Iterable<Tuple3<K2, V2, ProcessorRecordContext>> iterable) {
        debug(() -> {
            return new StringBuilder(16).append("FutureSuccess ").append(k1).append(" ").append(v1).append(" ").append(iterable).toString();
        });
    }

    default void onFutureFailure(K1 k1, V1 v1, Throwable th) {
        error(() -> {
            return new StringBuilder(20).append("Async asyncFailure: ").append(th).toString();
        });
        setAsyncFailure(th);
    }

    default void setAsyncFailure(Throwable th) {
        com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure_$eq(th);
    }

    @Override // com.twitter.finatra.kafkastreams.transformer.lifecycle.OnFlush
    default void onFlush() {
        debug(() -> {
            return "Flush: Waiting on async results";
        });
        Await$.MODULE$.result(com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures(), flushTimeout());
        com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures_$eq(Future$.MODULE$.Unit());
        Predef$.MODULE$.assert(numOutstandingFutures() == 0);
        debug(() -> {
            return "Finished waiting on async results";
        });
    }

    default void throwIfAsyncFailure() {
        if (com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure() != null) {
            throw com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure();
        }
    }

    default int numOutstandingFutures() {
        return maxOutstandingFuturesPerTask() - com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits().availablePermits();
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.Flushing, com.twitter.finatra.kafkastreams.transformer.lifecycle.OnClose
    default void onClose() {
        com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$super$onClose();
        debug(() -> {
            return "Close";
        });
        com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge().remove();
    }

    static /* synthetic */ void $anonfun$addFuture$1(AsyncFlushing asyncFlushing, Object obj, Object obj2, Try r8) {
        if (r8 instanceof Throw) {
            Throwable e = ((Throw) r8).e();
            asyncFlushing.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits().release();
            asyncFlushing.onFutureFailure(obj, obj2, e);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(r8 instanceof Return)) {
            throw new MatchError(r8);
        }
        Iterable<Tuple3<K2, V2, ProcessorRecordContext>> iterable = (Iterable) ((Return) r8).r();
        asyncFlushing.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits().release();
        asyncFlushing.onFutureSuccess(obj, obj2, iterable);
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    static void $init$(AsyncFlushing asyncFlushing) {
        asyncFlushing.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures_$eq(Future$.MODULE$.Unit());
        asyncFlushing.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$_setter_$com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits_$eq(new Semaphore(asyncFlushing.maxOutstandingFuturesPerTask(), false));
        asyncFlushing.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$_setter_$com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge_$eq(asyncFlushing.statsReceiver().addGauge(Predef$.MODULE$.wrapRefArray(new String[]{"outstandingFutures"}), () -> {
            return asyncFlushing.numOutstandingFutures();
        }));
    }
}
