package com.twitter.finatra.kafkastreams.flushing;

import com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging;
import com.twitter.finatra.kafkastreams.transformer.lifecycle.OnClose;
import com.twitter.finatra.kafkastreams.transformer.lifecycle.OnFlush;
import com.twitter.finatra.kafkastreams.transformer.lifecycle.OnInit;
import com.twitter.util.Duration;
import com.twitter.util.Duration$;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import scala.Predef$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: Flushing.scala */
@ScalaSignature(bytes = "\u0006\u0001u3q!\u0001\u0002\u0011\u0002\u0007\u0005QB\u0001\u0005GYV\u001c\b.\u001b8h\u0015\t\u0019A!\u0001\u0005gYV\u001c\b.\u001b8h\u0015\t)a!\u0001\u0007lC\u001a\\\u0017m\u001d;sK\u0006l7O\u0003\u0002\b\u0011\u00059a-\u001b8biJ\f'BA\u0005\u000b\u0003\u001d!x/\u001b;uKJT\u0011aC\u0001\u0004G>l7\u0001A\n\u0007\u00019!Bd\b\u0012\u0011\u0005=\u0011R\"\u0001\t\u000b\u0003E\tQa]2bY\u0006L!a\u0005\t\u0003\r\u0005s\u0017PU3g!\t)\"$D\u0001\u0017\u0015\t9\u0002$A\u0005mS\u001a,7-_2mK*\u0011\u0011\u0004B\u0001\fiJ\fgn\u001d4pe6,'/\u0003\u0002\u001c-\t1qJ\\%oSR\u0004\"!F\u000f\n\u0005y1\"aB(o\u00072|7/\u001a\t\u0003+\u0001J!!\t\f\u0003\u000f=sg\t\\;tQB\u00111\u0005K\u0007\u0002I)\u0011QEJ\u0001\u0006kRLGn\u001d\u0006\u0003O\u0011\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003S\u0011\u0012q\u0003\u0015:pG\u0016\u001c8o\u001c:D_:$X\r\u001f;M_\u001e<\u0017N\\4\t\u000b-\u0002A\u0011\u0001\u0017\u0002\r\u0011Jg.\u001b;%)\u0005i\u0003CA\b/\u0013\ty\u0003C\u0001\u0003V]&$\b\"C\u0019\u0001\u0001\u0004\u0005\r\u0011\"\u00033\u0003m\u0019w.\\7jiB+hn\u0019;vCR|'oQ1oG\u0016dG.\u00192mKV\t1\u0007\u0005\u00025\u007f5\tQG\u0003\u00027o\u0005I\u0001O]8dKN\u001cxN\u001d\u0006\u0003qe\nqa\u001d;sK\u0006l7O\u0003\u0002;w\u0005)1.\u00194lC*\u0011A(P\u0001\u0007CB\f7\r[3\u000b\u0003y\n1a\u001c:h\u0013\t\u0001UGA\u0006DC:\u001cW\r\u001c7bE2,\u0007F\u0001\u0019C!\ty1)\u0003\u0002E!\tAao\u001c7bi&dW\rC\u0005G\u0001\u0001\u0007\t\u0019!C\u0005\u000f\u0006y2m\\7nSR\u0004VO\\2uk\u0006$xN]\"b]\u000e,G\u000e\\1cY\u0016|F%Z9\u0015\u00055B\u0005bB%F\u0003\u0003\u0005\raM\u0001\u0004q\u0012\n\u0004\"B&\u0001\r#a\u0015AD2p[6LG/\u00138uKJ4\u0018\r\\\u000b\u0002\u001bB\u0011a*U\u0007\u0002\u001f*\u0011\u0001\u000bC\u0001\u0005kRLG.\u0003\u0002S\u001f\nAA)\u001e:bi&|g\u000eC\u0003U\u0001\u0011\u0005C&\u0001\u0004p]&s\u0017\u000e\u001e\u0005\u0006-\u0002!\t\u0005L\u0001\b_:\u001cEn\\:f\u0011-A\u0006\u0001%A\u0002\u0002\u0003%I\u0001L-\u0002\u0019M,\b/\u001a:%_:Le.\u001b;\n\u0005QS\u0002bC.\u0001!\u0003\r\t\u0011!C\u0005Yq\u000bQb];qKJ$sN\\\"m_N,\u0017B\u0001,\u001e\u0001")
/* loaded from: input_file:com/twitter/finatra/kafkastreams/flushing/Flushing.class */
public interface Flushing extends OnInit, OnClose, OnFlush, ProcessorContextLogging {
    /* synthetic */ void com$twitter$finatra$kafkastreams$flushing$Flushing$$super$onInit();

    /* synthetic */ void com$twitter$finatra$kafkastreams$flushing$Flushing$$super$onClose();

    Cancellable com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable();

    void com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable_$eq(Cancellable cancellable);

    Duration commitInterval();

    @Override // com.twitter.finatra.kafkastreams.transformer.lifecycle.OnInit
    default void onInit() {
        com$twitter$finatra$kafkastreams$flushing$Flushing$$super$onInit();
        Predef$.MODULE$.assert(BoxesRunTime.equalsNumObject((Long) processorContext().appConfigs().get("commit.interval.ms"), BoxesRunTime.boxToLong(Duration$.MODULE$.Top().inMillis())), () -> {
            return "You're using an operator that requires 'Flushing' functionality (e.g. FlushingProcessor/Transformer or AsyncProcessor/Transformer). As such, your server must mixin FlushingAwareServer so that automatic Kafka Streams commit will be disabled.";
        });
        Duration commitInterval = commitInterval();
        Duration Top = Duration$.MODULE$.Top();
        if (commitInterval == null) {
            if (Top == null) {
                return;
            }
        } else if (commitInterval.equals(Top)) {
            return;
        }
        info(() -> {
            return new StringBuilder(38).append("Scheduling timer to call commit every ").append(this.commitInterval()).toString();
        });
        com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable_$eq(processorContext().schedule(commitInterval().inMillis(), PunctuationType.WALL_CLOCK_TIME, new Punctuator(this) { // from class: com.twitter.finatra.kafkastreams.flushing.Flushing$$anon$1
            private final /* synthetic */ Flushing $outer;

            public void punctuate(long j) {
                this.$outer.onFlush();
                this.$outer.processorContext().commit();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }));
    }

    default void onClose() {
        com$twitter$finatra$kafkastreams$flushing$Flushing$$super$onClose();
        if (com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable() != null) {
            com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable().cancel();
            com$twitter$finatra$kafkastreams$flushing$Flushing$$commitPunctuatorCancellable_$eq(null);
        }
    }

    static void $init$(Flushing flushing) {
    }
}
