package org.apache.beam.examples.complete.game;

import org.apache.beam.examples.complete.game.StatefulTeamScore;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/examples/complete/game/StatefulTeamScoreTest.class */
public class StatefulTeamScoreTest {
    private Instant baseTime = new Instant(0);

    @Rule
    public TestPipeline p = TestPipeline.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/examples/complete/game/StatefulTeamScoreTest$TestUser.class */
    public enum TestUser {
        RED_ONE("scarlet", "red"),
        RED_TWO("burgundy", "red"),
        BLUE_ONE("navy", "blue"),
        BLUE_TWO("sky", "blue");

        private final String userName;
        private final String teamName;

        TestUser(String str, String str2) {
            this.userName = str;
            this.teamName = str2;
        }

        public String getUser() {
            return this.userName;
        }

        public String getTeam() {
            return this.teamName;
        }
    }

    @Test
    public void testScoreUpdatesOneTeam() {
        PCollection apply = this.p.apply(TestStream.create(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(UserScore.GameActionInfo.class))).advanceWatermarkTo(this.baseTime).addElements(event(TestUser.RED_TWO, 99, Duration.standardSeconds(10L)), new TimestampedValue[]{event(TestUser.RED_ONE, 1, Duration.standardSeconds(20L)), event(TestUser.RED_ONE, 0, Duration.standardSeconds(30L)), event(TestUser.RED_TWO, 100, Duration.standardSeconds(40L)), event(TestUser.RED_TWO, 201, Duration.standardSeconds(50L))}).advanceWatermarkToInfinity()).apply(ParDo.of(new StatefulTeamScore.UpdateTeamScoreFn(100)));
        String team = TestUser.RED_ONE.getTeam();
        PAssert.that(apply).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(new KV[]{KV.of(team, 100), KV.of(team, 200), KV.of(team, 401)});
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testScoreUpdatesPerTeam() {
        PCollection apply = this.p.apply(TestStream.create(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(UserScore.GameActionInfo.class))).advanceWatermarkTo(this.baseTime).addElements(event(TestUser.RED_ONE, 50, Duration.standardSeconds(10L)), new TimestampedValue[]{event(TestUser.RED_TWO, 50, Duration.standardSeconds(20L)), event(TestUser.BLUE_ONE, 70, Duration.standardSeconds(30L)), event(TestUser.BLUE_TWO, 80, Duration.standardSeconds(40L)), event(TestUser.BLUE_TWO, 50, Duration.standardSeconds(50L))}).advanceWatermarkToInfinity()).apply(ParDo.of(new StatefulTeamScore.UpdateTeamScoreFn(100)));
        String team = TestUser.RED_ONE.getTeam();
        String team2 = TestUser.BLUE_ONE.getTeam();
        PAssert.that(apply).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(new KV[]{KV.of(team, 100), KV.of(team2, 150), KV.of(team2, 200)});
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testScoreUpdatesPerWindow() {
        TestStream advanceWatermarkToInfinity = TestStream.create(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(UserScore.GameActionInfo.class))).advanceWatermarkTo(this.baseTime).addElements(event(TestUser.RED_ONE, 50, Duration.standardMinutes(1L)), new TimestampedValue[]{event(TestUser.RED_TWO, 50, Duration.standardMinutes(2L)), event(TestUser.RED_ONE, 50, Duration.standardMinutes(3L)), event(TestUser.RED_ONE, 60, Duration.standardMinutes(6L)), event(TestUser.RED_TWO, 60, Duration.standardMinutes(7L))}).advanceWatermarkToInfinity();
        Duration standardMinutes = Duration.standardMinutes(5L);
        PCollection apply = this.p.apply(advanceWatermarkToInfinity).apply(Window.into(FixedWindows.of(standardMinutes))).apply(ParDo.of(new StatefulTeamScore.UpdateTeamScoreFn(100)));
        String team = TestUser.RED_ONE.getTeam();
        IntervalWindow intervalWindow = new IntervalWindow(this.baseTime, standardMinutes);
        IntervalWindow intervalWindow2 = new IntervalWindow(intervalWindow.end(), standardMinutes);
        PAssert.that(apply).inWindow(intervalWindow).containsInAnyOrder(new KV[]{KV.of(team, 100)});
        PAssert.that(apply).inWindow(intervalWindow2).containsInAnyOrder(new KV[]{KV.of(team, 120)});
        this.p.run().waitUntilFinish();
    }

    private TimestampedValue<KV<String, UserScore.GameActionInfo>> event(TestUser testUser, int i, Duration duration) {
        return TimestampedValue.of(KV.of(testUser.getTeam(), new UserScore.GameActionInfo(testUser.getUser(), testUser.getTeam(), Integer.valueOf(i), Long.valueOf(this.baseTime.plus(duration).getMillis()))), this.baseTime.plus(duration));
    }
}
