package com.twitter.finatra.kafkastreams.integration.delay;

import com.twitter.conversions.DurationOps$;
import com.twitter.conversions.DurationOps$RichDuration$;
import com.twitter.finatra.kafkastreams.test.FinatraTopologyTester;
import com.twitter.finatra.kafkastreams.test.FinatraTopologyTester$;
import com.twitter.finatra.kafkastreams.test.TopologyFeatureTest;
import com.twitter.finatra.kafkastreams.test.TopologyTesterTopic;
import com.twitter.util.Duration;
import org.apache.kafka.streams.scala.Serdes$;
import org.joda.time.DateTime;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Tag;
import org.scalatest.enablers.Emptiness$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: DelayStoreServerTopologyFeatureTest.scala */
@ScalaSignature(bytes = "\u0006\u0001U2A\u0001C\u0005\u0001-!)Q\u0004\u0001C\u0001=!9\u0011\u0005\u0001b\u0001\n#\u0012\u0003B\u0002\u0014\u0001A\u0003%1\u0005C\u0004(\u0001\t\u0007I\u0011\u0002\u0015\t\rI\u0002\u0001\u0015!\u0003*\u0011\u001d\u0019\u0004A1A\u0005\n!Ba\u0001\u000e\u0001!\u0002\u0013I#a\t#fY\u0006L8\u000b^8sKN+'O^3s)>\u0004x\u000e\\8hs\u001a+\u0017\r^;sKR+7\u000f\u001e\u0006\u0003\u0015-\tQ\u0001Z3mCfT!\u0001D\u0007\u0002\u0017%tG/Z4sCRLwN\u001c\u0006\u0003\u001d=\tAb[1gW\u0006\u001cHO]3b[NT!\u0001E\t\u0002\u000f\u0019Lg.\u0019;sC*\u0011!cE\u0001\bi^LG\u000f^3s\u0015\u0005!\u0012aA2p[\u000e\u00011C\u0001\u0001\u0018!\tA2$D\u0001\u001a\u0015\tQR\"\u0001\u0003uKN$\u0018B\u0001\u000f\u001a\u0005M!v\u000e]8m_\u001eLh)Z1ukJ,G+Z:u\u0003\u0019a\u0014N\\5u}Q\tq\u0004\u0005\u0002!\u00015\t\u0011\"\u0001\bu_B|Gn\\4z)\u0016\u001cH/\u001a:\u0016\u0003\r\u0002\"\u0001\u0007\u0013\n\u0005\u0015J\"!\u0006$j]\u0006$(/\u0019+pa>dwnZ=UKN$XM]\u0001\u0010i>\u0004x\u000e\\8hsR+7\u000f^3sA\u0005i\u0011N\\2p[&tw\rV8qS\u000e,\u0012!\u000b\t\u00051)bC&\u0003\u0002,3\t\u0019Bk\u001c9pY><\u0017\u0010V3ti\u0016\u0014Hk\u001c9jGB\u0011Q\u0006M\u0007\u0002])\tq&A\u0003tG\u0006d\u0017-\u0003\u00022]\t!Aj\u001c8h\u00039IgnY8nS:<Gk\u001c9jG\u0002\nQb\\;uO>Lgn\u001a+pa&\u001c\u0017AD8vi\u001e|\u0017N\\4U_BL7\r\t")
/* loaded from: input_file:com/twitter/finatra/kafkastreams/integration/delay/DelayStoreServerTopologyFeatureTest.class */
public class DelayStoreServerTopologyFeatureTest extends TopologyFeatureTest {
    private final FinatraTopologyTester topologyTester = FinatraTopologyTester$.MODULE$.apply(DelayStoreServer$.MODULE$.IncomingTopic(), new DelayStoreServer(), new DateTime("2018-01-01T00:00:00Z"), FinatraTopologyTester$.MODULE$.apply$default$4(), FinatraTopologyTester$.MODULE$.apply$default$5(), FinatraTopologyTester$.MODULE$.apply$default$6(), FinatraTopologyTester$.MODULE$.apply$default$7(), FinatraTopologyTester$.MODULE$.apply$default$8(), FinatraTopologyTester$.MODULE$.apply$default$9());
    private final TopologyTesterTopic<Object, Object> incomingTopic = topologyTester().topic(DelayStoreServer$.MODULE$.IncomingTopic(), Serdes$.MODULE$.Long(), Serdes$.MODULE$.Long());
    private final TopologyTesterTopic<Object, Object> outgoingTopic = topologyTester().topic(DelayStoreServer$.MODULE$.OutgoingTopic(), Serdes$.MODULE$.Long(), Serdes$.MODULE$.Long());

    @Override // com.twitter.finatra.kafkastreams.test.TopologyFeatureTest
    public FinatraTopologyTester topologyTester() {
        return this.topologyTester;
    }

    private TopologyTesterTopic<Object, Object> incomingTopic() {
        return this.incomingTopic;
    }

    private TopologyTesterTopic<Object, Object> outgoingTopic() {
        return this.outgoingTopic;
    }

    public DelayStoreServerTopologyFeatureTest() {
        test("Published event gets published after a delay", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(3L), this.incomingTopic().pipeInput$default$3());
            this.convertToAnyShouldWrapper(this.outgoingTopic().readAllOutput(), new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 34), Prettifier$.MODULE$.default()).shouldBe(this.empty(), Emptiness$.MODULE$.emptinessOfGenTraversable());
            this.topologyTester().advanceWallClockTime((Duration) DelayStoreServer$.MODULE$.Delay().$plus(DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L))));
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToLong(4L), this.incomingTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.seconds$extension(DurationOps$.MODULE$.RichDuration(1L)));
            this.outgoingTopic().assertOutput(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(3L), this.outgoingTopic().assertOutput$default$3());
        }, new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 32));
        test("Two published events within delay window gets published twice", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(3L), this.incomingTopic().pipeInput$default$3());
            this.convertToAnyShouldWrapper(this.outgoingTopic().readAllOutput(), new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 47), Prettifier$.MODULE$.default()).shouldBe(this.empty(), Emptiness$.MODULE$.emptinessOfGenTraversable());
            this.topologyTester().advanceWallClockTime(DelayStoreServer$.MODULE$.Delay().$div(2L));
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToLong(4L), this.incomingTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)));
            this.convertToAnyShouldWrapper(this.outgoingTopic().readAllOutput(), new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 55), Prettifier$.MODULE$.default()).shouldBe(this.empty(), Emptiness$.MODULE$.emptinessOfGenTraversable());
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(3L), this.incomingTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DelayStoreServer$.MODULE$.Delay().$div(2L));
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(5L), BoxesRunTime.boxToLong(5L), this.incomingTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)));
            this.convertToAnyShouldWrapper(this.outgoingTopic().readAllOutput().map(producerRecord -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(producerRecord.key()), producerRecord.value());
            }, Seq$.MODULE$.canBuildFrom()), new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 66), Prettifier$.MODULE$.default()).shouldBe(new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(3L)), BoxesRunTime.boxToLong(3L)), Nil$.MODULE$));
            this.topologyTester().advanceWallClockTime(DelayStoreServer$.MODULE$.Delay().$div(2L));
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(6L), BoxesRunTime.boxToLong(6L), this.incomingTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)));
            return this.convertToAnyShouldWrapper(this.outgoingTopic().readAllOutput().map(producerRecord2 -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(producerRecord2.key()), producerRecord2.value());
            }, Seq$.MODULE$.canBuildFrom()), new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 76), Prettifier$.MODULE$.default()).shouldBe(new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(4L)), BoxesRunTime.boxToLong(4L)), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(3L)), BoxesRunTime.boxToLong(3L)), Nil$.MODULE$)));
        }, new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 45));
        test("Two published deuplicate events outside delay window gets published twice", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(3L), this.incomingTopic().pipeInput$default$3());
            this.convertToAnyShouldWrapper(this.outgoingTopic().readAllOutput(), new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 84), Prettifier$.MODULE$.default()).shouldBe(this.empty(), Emptiness$.MODULE$.emptinessOfGenTraversable());
            this.topologyTester().advanceWallClockTime((Duration) DelayStoreServer$.MODULE$.Delay().$plus(DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L))));
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToLong(4L), this.incomingTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)));
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(3L), this.incomingTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime((Duration) DelayStoreServer$.MODULE$.Delay().$plus(DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L))));
            this.incomingTopic().pipeInput(BoxesRunTime.boxToLong(5L), BoxesRunTime.boxToLong(5L), this.incomingTopic().pipeInput$default$3());
            this.topologyTester().advanceWallClockTime(DurationOps$RichDuration$.MODULE$.second$extension(DurationOps$.MODULE$.RichDuration(1L)));
            return this.convertToAnyShouldWrapper(this.outgoingTopic().readAllOutput().map(producerRecord -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(producerRecord.key()), producerRecord.value());
            }, Seq$.MODULE$.canBuildFrom()), new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 101), Prettifier$.MODULE$.default()).shouldBe(new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(3L)), BoxesRunTime.boxToLong(3L)), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(4L)), BoxesRunTime.boxToLong(4L)), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(3L)), BoxesRunTime.boxToLong(3L)), Nil$.MODULE$))));
        }, new Position("DelayStoreServerTopologyFeatureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 82));
    }
}
