package com.twitter.finatra.kafkastreams.integration.punctuator;

import com.twitter.conversions.DurationOps$;
import com.twitter.conversions.DurationOps$RichDuration$;
import com.twitter.finatra.kafka.serde.ScalaSerdes$;
import com.twitter.finatra.kafkastreams.test.FinatraTopologyTester;
import com.twitter.finatra.kafkastreams.test.FinatraTopologyTester$;
import com.twitter.finatra.kafkastreams.test.TopologyFeatureTest;
import com.twitter.finatra.kafkastreams.test.TopologyTesterTopic;
import org.joda.time.DateTime;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import scala.Predef$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: HeartBeatServerTopologyFeatureTest.scala */
@ScalaSignature(bytes = "\u0006\u0005U3AAD\b\u00019!)1\u0005\u0001C\u0001I!9q\u0005\u0001b\u0001\n\u0013A\u0003BB\u001a\u0001A\u0003%\u0011\u0006C\u00045\u0001\t\u0007I\u0011I\u001b\t\re\u0002\u0001\u0015!\u00037\u0011\u001dQ\u0004A1A\u0005\nmBa!\u0012\u0001!\u0002\u0013a\u0004b\u0002$\u0001\u0005\u0004%Ia\u000f\u0005\u0007\u000f\u0002\u0001\u000b\u0011\u0002\u001f\t\u000f!\u0003!\u0019!C\u0005\u0013\"1!\u000b\u0001Q\u0001\n)Cqa\u0015\u0001C\u0002\u0013%\u0011\n\u0003\u0004U\u0001\u0001\u0006IA\u0013\u0002#\u0011\u0016\f'\u000f\u001e\"fCR\u001cVM\u001d<feR{\u0007o\u001c7pOf4U-\u0019;ve\u0016$Vm\u001d;\u000b\u0005A\t\u0012A\u00039v]\u000e$X/\u0019;pe*\u0011!cE\u0001\fS:$Xm\u001a:bi&|gN\u0003\u0002\u0015+\u0005a1.\u00194lCN$(/Z1ng*\u0011acF\u0001\bM&t\u0017\r\u001e:b\u0015\tA\u0012$A\u0004uo&$H/\u001a:\u000b\u0003i\t1aY8n\u0007\u0001\u0019\"\u0001A\u000f\u0011\u0005y\tS\"A\u0010\u000b\u0005\u0001\u001a\u0012\u0001\u0002;fgRL!AI\u0010\u0003'Q{\u0007o\u001c7pOf4U-\u0019;ve\u0016$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005)\u0003C\u0001\u0014\u0001\u001b\u0005y\u0011!F:uCJ$\u0018N\\4XC2d7\t\\8dWRKW.Z\u000b\u0002SA\u0011!&M\u0007\u0002W)\u0011A&L\u0001\u0005i&lWM\u0003\u0002/_\u0005!!n\u001c3b\u0015\u0005\u0001\u0014aA8sO&\u0011!g\u000b\u0002\t\t\u0006$X\rV5nK\u000612\u000f^1si&twmV1mY\u000ecwnY6US6,\u0007%\u0001\bu_B|Gn\\4z)\u0016\u001cH/\u001a:\u0016\u0003Y\u0002\"AH\u001c\n\u0005az\"!\u0006$j]\u0006$(/\u0019+pa>dwnZ=UKN$XM]\u0001\u0010i>\u0004x\u000e\\8hsR+7\u000f^3sA\u0005Q\u0011N\u001c9viR{\u0007/[2\u0016\u0003q\u0002BAH\u001f@\u007f%\u0011ah\b\u0002\u0014)>\u0004x\u000e\\8hsR+7\u000f^3s)>\u0004\u0018n\u0019\t\u0003\u0001\u000ek\u0011!\u0011\u0006\u0002\u0005\u0006)1oY1mC&\u0011A)\u0011\u0002\u0005\u0019>tw-A\u0006j]B,H\u000fV8qS\u000e\u0004\u0013aC8viB,H\u000fV8qS\u000e\fAb\\;uaV$Hk\u001c9jG\u0002\n\u0011\u0003\u001e:b]N4wN]7Ti\u0006$h*Y7f+\u0005Q\u0005CA&Q\u001b\u0005a%BA'O\u0003\u0011a\u0017M\\4\u000b\u0003=\u000bAA[1wC&\u0011\u0011\u000b\u0014\u0002\u0007'R\u0014\u0018N\\4\u0002%Q\u0014\u0018M\\:g_Jl7\u000b^1u\u001d\u0006lW\rI\u0001\u0012aVt7\r^;bi\u0016\u001cF/\u0019;OC6,\u0017A\u00059v]\u000e$X/\u0019;f'R\fGOT1nK\u0002\u0002")
/* loaded from: input_file:com/twitter/finatra/kafkastreams/integration/punctuator/HeartBeatServerTopologyFeatureTest.class */
public class HeartBeatServerTopologyFeatureTest extends TopologyFeatureTest {
    private final DateTime startingWallClockTime = new DateTime("1970-01-01T00:00:00Z");
    private final FinatraTopologyTester topologyTester = FinatraTopologyTester$.MODULE$.apply("wordcount-prod-bob", new HeartBeatServer(), startingWallClockTime(), FinatraTopologyTester$.MODULE$.apply$default$4(), FinatraTopologyTester$.MODULE$.apply$default$5(), FinatraTopologyTester$.MODULE$.apply$default$6(), FinatraTopologyTester$.MODULE$.apply$default$7(), FinatraTopologyTester$.MODULE$.apply$default$8(), FinatraTopologyTester$.MODULE$.apply$default$9());
    private final TopologyTesterTopic<Object, Object> inputTopic = topologyTester().topic("input-topic", ScalaSerdes$.MODULE$.Long(), ScalaSerdes$.MODULE$.Long());
    private final TopologyTesterTopic<Object, Object> outputTopic = topologyTester().topic("output-topic", ScalaSerdes$.MODULE$.Long(), ScalaSerdes$.MODULE$.Long());
    private final String transformStatName = "kafka/stream/transform";
    private final String punctuateStatName = "kafka/stream/punctuate";

    private DateTime startingWallClockTime() {
        return this.startingWallClockTime;
    }

    @Override // com.twitter.finatra.kafkastreams.test.TopologyFeatureTest
    public FinatraTopologyTester topologyTester() {
        return this.topologyTester;
    }

    private TopologyTesterTopic<Object, Object> inputTopic() {
        return this.inputTopic;
    }

    private TopologyTesterTopic<Object, Object> outputTopic() {
        return this.outputTopic;
    }

    private String transformStatName() {
        return this.transformStatName;
    }

    private String punctuateStatName() {
        return this.punctuateStatName;
    }

    public static final /* synthetic */ DateTime $anonfun$new$8(HeartBeatServerTopologyFeatureTest heartBeatServerTopologyFeatureTest, int i) {
        return heartBeatServerTopologyFeatureTest.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.seconds$extension(DurationOps$.MODULE$.RichDuration(1L)));
    }

    public HeartBeatServerTopologyFeatureTest() {
        test("Publish single value from input to output", Nil$.MODULE$, () -> {
            this.topologyTester().stats().assertCounter(this.transformStatName(), 0L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 0L);
            this.inputTopic().pipeInput(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToLong(1L), this.inputTopic().pipeInput$default$3());
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToLong(1L), this.outputTopic().assertOutput$default$3());
            this.convertToAnyShouldWrapper(BoxesRunTime.boxToBoolean(this.outputTopic().readAllOutput().isEmpty()), new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 31), Prettifier$.MODULE$.default()).should(this.be().apply(true));
            this.topologyTester().stats().assertCounter(this.transformStatName(), 1L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 0L);
        }, new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 26));
        test("Publish single value and single heartbeat from input to output", Nil$.MODULE$, () -> {
            this.topologyTester().stats().assertCounter(this.transformStatName(), 0L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 0L);
            this.inputTopic().pipeInput(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToLong(1L), this.inputTopic().pipeInput$default$3());
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToLong(1L), this.outputTopic().assertOutput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.seconds$extension(DurationOps$.MODULE$.RichDuration(1L)));
            long millis = this.startingWallClockTime().getMillis() + DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)).inMillis();
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(millis), BoxesRunTime.boxToLong(millis), this.outputTopic().assertOutput$default$3());
            this.convertToAnyShouldWrapper(BoxesRunTime.boxToBoolean(this.outputTopic().readAllOutput().isEmpty()), new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 44), Prettifier$.MODULE$.default()).should(this.be().apply(true));
            this.topologyTester().stats().assertCounter(this.transformStatName(), 1L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 1L);
        }, new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 36));
        test("Publish heartbeat from advanced wall clock time to output", Nil$.MODULE$, () -> {
            this.topologyTester().stats().assertCounter(this.transformStatName(), 0L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 0L);
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.seconds$extension(DurationOps$.MODULE$.RichDuration(1L)));
            long millis = this.startingWallClockTime().getMillis() + DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)).inMillis();
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(millis), BoxesRunTime.boxToLong(millis), this.outputTopic().assertOutput$default$3());
            this.convertToAnyShouldWrapper(BoxesRunTime.boxToBoolean(this.outputTopic().readAllOutput().isEmpty()), new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 55), Prettifier$.MODULE$.default()).should(this.be().apply(true));
            this.topologyTester().stats().assertCounter(this.transformStatName(), 0L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 1L);
        }, new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 49));
        test("Publish multiple values from input to output", Nil$.MODULE$, () -> {
            this.topologyTester().stats().assertCounter(this.transformStatName(), 0L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 0L);
            Range.Inclusive inclusive = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 10);
            inclusive.foreach$mVc$sp(i -> {
                this.inputTopic().pipeInput(BoxesRunTime.boxToLong(i), BoxesRunTime.boxToLong(i), this.inputTopic().pipeInput$default$3());
            });
            inclusive.foreach$mVc$sp(i2 -> {
                this.outputTopic().assertOutput(BoxesRunTime.boxToLong(i2), BoxesRunTime.boxToLong(i2), this.outputTopic().assertOutput$default$3());
            });
            this.convertToAnyShouldWrapper(BoxesRunTime.boxToBoolean(this.outputTopic().readAllOutput().isEmpty()), new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 77), Prettifier$.MODULE$.default()).should(this.be().apply(true));
            this.topologyTester().stats().assertCounter(this.transformStatName(), 10L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 0L);
        }, new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 60));
        test("Publish multiple heartbeat from advanced wall clock time to output", Nil$.MODULE$, () -> {
            this.topologyTester().stats().assertCounter(this.transformStatName(), 0L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 0L);
            Range.Inclusive inclusive = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 10);
            inclusive.foreach(obj -> {
                return $anonfun$new$8(this, BoxesRunTime.unboxToInt(obj));
            });
            inclusive.foreach$mVc$sp(i -> {
                long millis = this.startingWallClockTime().getMillis() + DurationOps$RichDuration$.MODULE$.seconds$extension(DurationOps$.MODULE$.RichDuration(i)).inMillis();
                this.outputTopic().assertOutput(BoxesRunTime.boxToLong(millis), BoxesRunTime.boxToLong(millis), this.outputTopic().assertOutput$default$3());
            });
            this.convertToAnyShouldWrapper(BoxesRunTime.boxToBoolean(this.outputTopic().readAllOutput().isEmpty()), new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 100), Prettifier$.MODULE$.default()).should(this.be().apply(true));
            this.topologyTester().stats().assertCounter(this.transformStatName(), 0L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 10L);
        }, new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 82));
        test("Publish multiple values and multiple heartbeats from input to output", Nil$.MODULE$, () -> {
            this.topologyTester().stats().assertCounter(this.transformStatName(), 0L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 0L);
            this.inputTopic().pipeInput(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToLong(1L), this.inputTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.seconds$extension(DurationOps$.MODULE$.RichDuration(1L)));
            this.inputTopic().pipeInput(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToLong(2L), this.inputTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.seconds$extension(DurationOps$.MODULE$.RichDuration(1L)));
            this.inputTopic().pipeInput(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(3L), this.inputTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.seconds$extension(DurationOps$.MODULE$.RichDuration(1L)));
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToLong(1L), this.outputTopic().assertOutput$default$3());
            long millis = this.startingWallClockTime().getMillis() + DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)).inMillis();
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(millis), BoxesRunTime.boxToLong(millis), this.outputTopic().assertOutput$default$3());
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToLong(2L), this.outputTopic().assertOutput$default$3());
            long inMillis = millis + DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)).inMillis();
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(inMillis), BoxesRunTime.boxToLong(inMillis), this.outputTopic().assertOutput$default$3());
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(3L), this.outputTopic().assertOutput$default$3());
            long inMillis2 = inMillis + DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)).inMillis();
            this.outputTopic().assertOutput(BoxesRunTime.boxToLong(inMillis2), BoxesRunTime.boxToLong(inMillis2), this.outputTopic().assertOutput$default$3());
            this.convertToAnyShouldWrapper(BoxesRunTime.boxToBoolean(this.outputTopic().readAllOutput().isEmpty()), new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 127), Prettifier$.MODULE$.default()).should(this.be().apply(true));
            this.topologyTester().stats().assertCounter(this.transformStatName(), 3L);
            this.topologyTester().stats().assertCounter(this.punctuateStatName(), 3L);
        }, new Position("HeartBeatServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 105));
    }
}
