package com.twitter.finatra.kafkastreams.flushing;

import com.twitter.finagle.stats.Gauge;
import com.twitter.finagle.stats.Stat;
import com.twitter.finagle.stats.Stat$;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging;
import com.twitter.finatra.kafkastreams.transformer.lifecycle.OnClose;
import com.twitter.finatra.kafkastreams.transformer.lifecycle.OnFlush;
import com.twitter.finatra.kafkastreams.transformer.lifecycle.OnInit;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.logging.Logger;
import com.twitter.util.logging.Logger$;
import java.util.concurrent.Semaphore;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import scala.Function0;
import scala.Predef$;
import scala.Tuple3;
import scala.collection.Iterable;
import scala.collection.immutable.Nil$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: AsyncProcessor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005b!B\u0007\u000f\u0003\u0003I\u0002\u0002\u0003#\u0001\u0005\u000b\u0007I\u0011I#\t\u00119\u0003!\u0011!Q\u0001\n\u0019C\u0001b\u0014\u0001\u0003\u0006\u0004%\t\u0005\u0015\u0005\t)\u0002\u0011\t\u0011)A\u0005#\"AQ\u000b\u0001BC\u0002\u0013\u0005c\u000b\u0003\u0005^\u0001\t\u0005\t\u0015!\u0003X\u0011!q\u0006A!b\u0001\n\u00032\u0006\u0002C0\u0001\u0005\u0003\u0005\u000b\u0011B,\t\u000b\u0001\u0004A\u0011A1\t\r\u001d\u0004\u0001\u0015!\u0003i\u0011\u0015Y\u0007A\"\u0005m\u0011\u001d\tI\u0002\u0001C#\u00037\u0011a\"Q:z]\u000e\u0004&o\\2fgN|'O\u0003\u0002\u0010!\u0005Aa\r\\;tQ&twM\u0003\u0002\u0012%\u0005a1.\u00194lCN$(/Z1ng*\u00111\u0003F\u0001\bM&t\u0017\r\u001e:b\u0015\t)b#A\u0004uo&$H/\u001a:\u000b\u0003]\t1aY8n\u0007\u0001)2AG\u00169'\u0011\u00011D\u000f \u0011\tq9\u0013fN\u0007\u0002;)\u0011adH\u0001\naJ|7-Z:t_JT!\u0001I\u0011\u0002\u000fM$(/Z1ng*\u0011!eI\u0001\u0006W\u000647.\u0019\u0006\u0003I\u0015\na!\u00199bG\",'\"\u0001\u0014\u0002\u0007=\u0014x-\u0003\u0002);\t\t\u0012IY:ue\u0006\u001cG\u000f\u0015:pG\u0016\u001c8o\u001c:\u0011\u0005)ZC\u0002\u0001\u0003\u0006Y\u0001\u0011\r!\f\u0002\u0002\u0017F\u0011a\u0006\u000e\t\u0003_Ij\u0011\u0001\r\u0006\u0002c\u0005)1oY1mC&\u00111\u0007\r\u0002\b\u001d>$\b.\u001b8h!\tyS'\u0003\u00027a\t\u0019\u0011I\\=\u0011\u0005)BD!B\u001d\u0001\u0005\u0004i#!\u0001,\u0011\tmb\u0014fN\u0007\u0002\u001d%\u0011QH\u0004\u0002\u0012\r2,8\u000f[5oOB\u0013xnY3tg>\u0014\bCB\u001e@S]\n\u0015)\u0003\u0002A\u001d\ti\u0011i]=oG\u001acWo\u001d5j]\u001e\u0004\"a\f\"\n\u0005\r\u0003$\u0001B+oSR\fQb\u001d;biN\u0014VmY3jm\u0016\u0014X#\u0001$\u0011\u0005\u001dcU\"\u0001%\u000b\u0005%S\u0015!B:uCR\u001c(BA&\u0015\u0003\u001d1\u0017N\\1hY\u0016L!!\u0014%\u0003\u001bM#\u0018\r^:SK\u000e,\u0017N^3s\u00039\u0019H/\u0019;t%\u0016\u001cW-\u001b<fe\u0002\nA$\\1y\u001fV$8\u000f^1oI&twMR;ukJ,7\u000fU3s)\u0006\u001c8.F\u0001R!\ty#+\u0003\u0002Ta\t\u0019\u0011J\u001c;\u0002;5\f\u0007pT;ugR\fg\u000eZ5oO\u001a+H/\u001e:fgB+'\u000fV1tW\u0002\nabY8n[&$\u0018J\u001c;feZ\fG.F\u0001X!\tA6,D\u0001Z\u0015\tQF#\u0001\u0003vi&d\u0017B\u0001/Z\u0005!!UO]1uS>t\u0017aD2p[6LG/\u00138uKJ4\u0018\r\u001c\u0011\u0002\u0019\u0019dWo\u001d5US6,w.\u001e;\u0002\u001b\u0019dWo\u001d5US6,w.\u001e;!\u0003\u0019a\u0014N\\5u}Q)!m\u00193fMB!1\bA\u00158\u0011\u0015!\u0015\u00021\u0001G\u0011\u0015y\u0015\u00021\u0001R\u0011\u0015)\u0016\u00021\u0001X\u0011\u0015q\u0016\u00021\u0001X\u0003-a\u0017\r^3oGf\u001cF/\u0019;\u0011\u0005\u001dK\u0017B\u00016I\u0005\u0011\u0019F/\u0019;\u0002\u0019A\u0014xnY3tg\u0006\u001b\u0018P\\2\u0015\t5\u0004(\u000f\u001e\t\u00041:\f\u0015BA8Z\u0005\u00191U\u000f^;sK\")\u0011o\u0003a\u0001S\u0005\u00191.Z=\t\u000bM\\\u0001\u0019A\u001c\u0002\u000bY\fG.^3\t\u000bU\\\u0001\u0019\u0001<\u0002\u0013QLW.Z:uC6\u0004\bcA<\u0002\u00149\u0019\u00010!\u0004\u000f\u0007e\fIAD\u0002{\u0003\u000fq1a_A\u0003\u001d\ra\u00181\u0001\b\u0004{\u0006\u0005Q\"\u0001@\u000b\u0005}D\u0012A\u0002\u001fs_>$h(C\u0001\u0018\u0013\t)b#\u0003\u0002\u0014)%\u0011\u0011CE\u0005\u0004\u0003\u0017\u0001\u0012!B;uS2\u001c\u0018\u0002BA\b\u0003#\tq\u0001]1dW\u0006<WMC\u0002\u0002\fAIA!!\u0006\u0002\u0018\t\u0001R*Z:tC\u001e,G+[7fgR\fW\u000e\u001d\u0006\u0005\u0003\u001f\t\t\"A\u0004qe>\u001cWm]:\u0015\u000b\u0005\u000bi\"a\b\t\u000bEd\u0001\u0019A\u0015\t\u000bMd\u0001\u0019A\u001c")
/* loaded from: input_file:com/twitter/finatra/kafkastreams/flushing/AsyncProcessor.class */
public abstract class AsyncProcessor<K, V> extends AbstractProcessor<K, V> implements FlushingProcessor<K, V>, AsyncFlushing<K, V, BoxedUnit, BoxedUnit> {
    private final StatsReceiver statsReceiver;
    private final int maxOutstandingFuturesPerTask;
    private final Duration commitInterval;
    private final Duration flushTimeout;
    private final Stat latencyStat;
    private volatile Future<BoxedUnit> com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures;
    private volatile Throwable com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure;
    private final Semaphore com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits;
    private final Gauge com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge;
    private ProcessorContext com$twitter$finatra$kafkastreams$flushing$FlushingProcessor$$_context;
    private volatile Cancellable com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable;
    private final Logger com$twitter$finatra$kafkastreams$internal$utils$ProcessorContextLogging$$_logger;

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public /* synthetic */ void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$super$onClose() {
        onClose();
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public void addFuture(K k, V v, Future<Iterable<Tuple3<BoxedUnit, BoxedUnit, ProcessorRecordContext>>> future) {
        addFuture(k, v, future);
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public void onFutureSuccess(K k, V v, Iterable<Tuple3<BoxedUnit, BoxedUnit, ProcessorRecordContext>> iterable) {
        onFutureSuccess(k, v, iterable);
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public void onFutureFailure(K k, V v, Throwable th) {
        onFutureFailure(k, v, th);
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public void setAsyncFailure(Throwable th) {
        setAsyncFailure(th);
    }

    @Override // com.twitter.finatra.kafkastreams.transformer.lifecycle.OnFlush
    public void onFlush() {
        onFlush();
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public void throwIfAsyncFailure() {
        throwIfAsyncFailure();
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public int numOutstandingFutures() {
        int numOutstandingFutures;
        numOutstandingFutures = numOutstandingFutures();
        return numOutstandingFutures;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.Flushing, com.twitter.finatra.kafkastreams.transformer.lifecycle.OnClose
    public void onClose() {
        onClose();
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.FlushingProcessor
    public void init(ProcessorContext processorContext) {
        init(processorContext);
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.FlushingProcessor, com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public ProcessorContext processorContext() {
        ProcessorContext processorContext;
        processorContext = processorContext();
        return processorContext;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.FlushingProcessor
    public final void close() {
        close();
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.Flushing
    public /* synthetic */ void com$twitter$finatra$kafkastreams$flushing$Flushing$$super$onInit() {
        onInit();
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.Flushing
    public /* synthetic */ void com$twitter$finatra$kafkastreams$flushing$Flushing$$super$onClose() {
        onClose();
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.Flushing, com.twitter.finatra.kafkastreams.transformer.lifecycle.OnInit
    public void onInit() {
        onInit();
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public final void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public final void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public final void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public final void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public final void trace(Function0<Object> function0) {
        trace(function0);
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public final String timeStr() {
        String timeStr;
        timeStr = timeStr();
        return timeStr;
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public final String taskIdStr() {
        String taskIdStr;
        taskIdStr = taskIdStr();
        return taskIdStr;
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public ProcessorContextLogging.RichLong RichLong(long j) {
        ProcessorContextLogging.RichLong RichLong;
        RichLong = RichLong(j);
        return RichLong;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public Future<BoxedUnit> com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures() {
        return this.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures_$eq(Future<BoxedUnit> future) {
        this.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFutures = future;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public Throwable com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure() {
        return this.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure_$eq(Throwable th) {
        this.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$asyncFailure = th;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public Semaphore com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits() {
        return this.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public Gauge com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge() {
        return this.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public final void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$_setter_$com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits_$eq(Semaphore semaphore) {
        this.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$addPermits = semaphore;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public final void com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$_setter_$com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge_$eq(Gauge gauge) {
        this.com$twitter$finatra$kafkastreams$flushing$AsyncFlushing$$outstandingFuturesGauge = gauge;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.FlushingProcessor
    public ProcessorContext com$twitter$finatra$kafkastreams$flushing$FlushingProcessor$$_context() {
        return this.com$twitter$finatra$kafkastreams$flushing$FlushingProcessor$$_context;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.FlushingProcessor
    public void com$twitter$finatra$kafkastreams$flushing$FlushingProcessor$$_context_$eq(ProcessorContext processorContext) {
        this.com$twitter$finatra$kafkastreams$flushing$FlushingProcessor$$_context = processorContext;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.Flushing
    public Cancellable com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable() {
        return this.com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.Flushing
    public void com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable_$eq(Cancellable cancellable) {
        this.com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable = cancellable;
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public Logger com$twitter$finatra$kafkastreams$internal$utils$ProcessorContextLogging$$_logger() {
        return this.com$twitter$finatra$kafkastreams$internal$utils$ProcessorContextLogging$$_logger;
    }

    @Override // com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging
    public final void com$twitter$finatra$kafkastreams$internal$utils$ProcessorContextLogging$_setter_$com$twitter$finatra$kafkastreams$internal$utils$ProcessorContextLogging$$_logger_$eq(Logger logger) {
        this.com$twitter$finatra$kafkastreams$internal$utils$ProcessorContextLogging$$_logger = logger;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public StatsReceiver statsReceiver() {
        return this.statsReceiver;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public int maxOutstandingFuturesPerTask() {
        return this.maxOutstandingFuturesPerTask;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.Flushing
    public Duration commitInterval() {
        return this.commitInterval;
    }

    @Override // com.twitter.finatra.kafkastreams.flushing.AsyncFlushing
    public Duration flushTimeout() {
        return this.flushTimeout;
    }

    public abstract Future<BoxedUnit> processAsync(K k, V v, long j);

    public final void process(K k, V v) {
        addFuture(k, v, Stat$.MODULE$.timeFuture(this.latencyStat, () -> {
            return this.processAsync(k, v, this.processorContext().timestamp());
        }).map(boxedUnit -> {
            return package$.MODULE$.Iterable().apply(Nil$.MODULE$);
        }));
    }

    public AsyncProcessor(StatsReceiver statsReceiver, int i, Duration duration, Duration duration2) {
        this.statsReceiver = statsReceiver;
        this.maxOutstandingFuturesPerTask = i;
        this.commitInterval = duration;
        this.flushTimeout = duration2;
        OnInit.$init$(this);
        OnClose.$init$(this);
        OnFlush.$init$(this);
        com$twitter$finatra$kafkastreams$internal$utils$ProcessorContextLogging$_setter_$com$twitter$finatra$kafkastreams$internal$utils$ProcessorContextLogging$$_logger_$eq(Logger$.MODULE$.apply(getClass()));
        Flushing.$init$((Flushing) this);
        FlushingProcessor.$init$((FlushingProcessor) this);
        AsyncFlushing.$init$((AsyncFlushing) this);
        this.latencyStat = statsReceiver.stat(Predef$.MODULE$.wrapRefArray(new String[]{"process_async_latency_ms"}));
    }
}
