package org.apache.flink.api.scala.migration;

import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.test.util.MigrationTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Tuple2;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;

/* compiled from: StatefulJobWBroadcastStateMigrationITCase.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001\u0005Uu!B\u0001\u0003\u0011\u0003y\u0011!K*uCR,g-\u001e7K_\n<&I]8bI\u000e\f7\u000f^*uCR,W*[4sCRLwN\\%U\u0007\u0006\u001cXM\u0003\u0002\u0004\t\u0005IQ.[4sCRLwN\u001c\u0006\u0003\u000b\u0019\tQa]2bY\u0006T!a\u0002\u0005\u0002\u0007\u0005\u0004\u0018N\u0003\u0002\n\u0015\u0005)a\r\\5oW*\u00111\u0002D\u0001\u0007CB\f7\r[3\u000b\u00035\t1a\u001c:h\u0007\u0001\u0001\"\u0001E\t\u000e\u0003\t1QA\u0005\u0002\t\u0002M\u0011\u0011f\u0015;bi\u00164W\u000f\u001c&pE^\u0013%o\\1eG\u0006\u001cHo\u0015;bi\u0016l\u0015n\u001a:bi&|g.\u0013+DCN,7cA\t\u00153A\u0011QcF\u0007\u0002-)\tQ!\u0003\u0002\u0019-\t1\u0011I\\=SK\u001a\u0004\"!\u0006\u000e\n\u0005m1\"\u0001D*fe&\fG.\u001b>bE2,\u0007\"B\u000f\u0012\t\u0003q\u0012A\u0002\u001fj]&$h\bF\u0001\u0010\u0011\u0015\u0001\u0013\u0003\"\u0001\"\u0003Y\u0019'/Z1uKN\u0003XmY:G_J$Vm\u001d;Sk:\u001cX#\u0001\u0012\u0011\u0007\rB#&D\u0001%\u0015\t)c%\u0001\u0003vi&d'\"A\u0014\u0002\t)\fg/Y\u0005\u0003S\u0011\u0012!bQ8mY\u0016\u001cG/[8o!\tY3I\u0004\u0002-\u0001:\u0011Q&\u0010\b\u0003]ir!a\f\u001d\u000f\u0005A:dBA\u00197\u001d\t\u0011T'D\u00014\u0015\t!d\"\u0001\u0004=e>|GOP\u0005\u0002\u001b%\u00111\u0002D\u0005\u0003\u0013)I!!\u000f\u0005\u0002\tQ,7\u000f^\u0005\u0003wq\nQb\u00195fG.\u0004x.\u001b8uS:<'BA\u001d\t\u0013\tqt(A\u0003vi&d7O\u0003\u0002<y%\u0011\u0011IQ\u0001\u001a':\f\u0007o\u001d5pi6KwM]1uS>tG+Z:u\u0005\u0006\u001cXM\u0003\u0002?\u007f%\u0011A)\u0012\u0002\r':\f\u0007o\u001d5piN\u0003Xm\u0019\u0006\u0003\u0003\nCCaH$T)B\u0011\u0001\n\u0015\b\u0003\u0013:k\u0011A\u0013\u0006\u0003\u00172\u000bqA];o]\u0016\u00148O\u0003\u0002N\u0019\u0005)!.\u001e8ji&\u0011qJS\u0001\u000e!\u0006\u0014\u0018-\\3uKJL'0\u001a3\n\u0005E\u0013&A\u0003)be\u0006lW\r^3sg*\u0011qJS\u0001\u0005]\u0006lW-I\u0001V\u0003I!Vm\u001d;!g:\f\u0007o\u001d5pij\u00023\u0010M?\t\u000b]\u000bB\u0011\u0001-\u0002A\r\u0014X-\u0019;f'B,7m\u001d$peR+7\u000f\u001e#bi\u0006<UM\\3sCRLwN\u001c\u000b\u0003EeCQA\u0017,A\u0002m\u000bqA^3sg&|g\u000e\u0005\u0002];6\t\u0001\"\u0003\u0002_\u0011\taa\t\\5oWZ+'o]5p]\")\u0001-\u0005C\u0001C\u0006\u0011\u0012N\u001c;fe:\fG\u000eU1sC6,G/\u001a:t)\t\u0011#\rC\u0003d?\u0002\u00071,A\fuCJ<W\r^$f]\u0016\u0014\u0018\r^5oOZ+'o]5p]\"\u0012!-\u001a\t\u0003M.l\u0011a\u001a\u0006\u0003Q&\f!\"\u00198o_R\fG/[8o\u0015\u0005Q\u0017!\u00026bm\u0006D\u0018B\u00017h\u0005!qU\u000f\u001c7bE2,\u0007\"\u00028\u0012\t\u0003y\u0017aD4fiNs\u0017\r]:i_R\u0004\u0016\r\u001e5\u0015\u0005AD\bCA9v\u001d\t\u00118\u000f\u0005\u00023-%\u0011AOF\u0001\u0007!J,G-\u001a4\n\u0005Y<(AB*ue&twM\u0003\u0002u-!)\u00110\u001ca\u0001U\u0005a1O\\1qg\"|Go\u00159fG\"910\u0005b\u0001\n\u0003a\u0018\u0001\u0004(V\u001b~+E*R'F\u001dR\u001bV#A?\u0011\u0005Uq\u0018BA@\u0017\u0005\rIe\u000e\u001e\u0005\b\u0003\u0007\t\u0002\u0015!\u0003~\u00035qU+T0F\u0019\u0016kUI\u0014+TA!I\u0011qA\t\u0002\u0002\u0013%\u0011\u0011B\u0001\fe\u0016\fGMU3t_24X\r\u0006\u0002\u0002\fA!\u0011QBA\n\u001b\t\tyAC\u0002\u0002\u0012\u0019\nA\u0001\\1oO&!\u0011QCA\b\u0005\u0019y%M[3di\u001a)!C\u0001\u0001\u0002\u001aM9\u0011qCA\u000e3\u0005\r\u0002\u0003BA\u000f\u0003?i\u0011AQ\u0005\u0004\u0003C\u0011%!G*oCB\u001c\bn\u001c;NS\u001e\u0014\u0018\r^5p]R+7\u000f\u001e\"bg\u0016\u0004B!!\n\u0002*5\u0011\u0011q\u0005\u0006\u0003KqJA!a\u000b\u0002(\tiQ*[4sCRLwN\u001c+fgRD\u0011\"_A\f\u0005\u0003\u0005\u000b\u0011\u0002\u0016\t\u000fu\t9\u0002\"\u0001\u00022Q!\u00111GA\u001b!\r\u0001\u0012q\u0003\u0005\u0007s\u0006=\u0002\u0019\u0001\u0016\t\u0011\u0005e\u0012q\u0003C\u0001\u0003w\t\u0011cZ3oKJ\fG/Z*oCB\u001c\bn\u001c;t)\u0011\ti$a\u0011\u0011\u0007U\ty$C\u0002\u0002BY\u0011A!\u00168ji\"1\u00110a\u000eA\u0002)B\u0003\"a\u000e\u0002H\u0005e\u00131\f\t\u0005\u0003\u0013\n\u0019F\u0004\u0003\u0002L\u0005=cb\u0001\u0018\u0002N%\u0011Q\u0005P\u0005\u0005\u0003#\n9#A\u0007NS\u001e\u0014\u0018\r^5p]R+7\u000f^\u0005\u0005\u0003+\n9FA\u0010QCJ\fW.\u001a;fe&TX\rZ*oCB\u001c\bn\u001c;t\u000f\u0016tWM]1u_JTA!!\u0015\u0002(\u0005)a/\u00197vK\u0006\nq\u000b\u0003\u0005\u0002`\u0005]A\u0011AA1\u0003i!Xm\u001d;TCZ,\u0007o\\5oi^KG\u000f\u001b\"s_\u0006$7-Y:u)\t\ti\u0004\u000b\u0003\u0002^\u0005\u0015\u0004\u0003BA4\u0003Sj\u0011\u0001T\u0005\u0004\u0003Wb%\u0001\u0002+fgRD\u0001\"a\u001c\u0002\u0018\u0011\u0005\u0011\u0011O\u0001#i\u0016\u001cHo\u0014:De\u0016\fG/Z*bm\u0016\u0004x.\u001b8u/&$\bN\u0011:pC\u0012\u001c\u0017m\u001d;\u0015\r\u0005u\u00121OA?\u0011!\t)(!\u001cA\u0002\u0005]\u0014!D3yK\u000e,H/[8o\u001b>$W\rE\u0002,\u0003sJ1!a\u001fF\u00055)\u00050Z2vi&|g.T8eK\"1\u00110!\u001cA\u0002)B\u0003\"a\u0006\u0002\u0002\u0006e\u0013Q\u0012\t\u0005\u0003\u0007\u000bI)\u0004\u0002\u0002\u0006*\u0019\u0011q\u0011'\u0002\rI,hN\\3s\u0013\u0011\tY)!\"\u0003\u000fI+hnV5uQ\u000e\u0012\u0011q\u0012\t\u0004\u0013\u0006E\u0015bAAJ\u0015\ni\u0001+\u0019:b[\u0016$XM]5{K\u0012\u0004")
/* loaded from: input_file:org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.class */
public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase implements Serializable, MigrationTest {
    private final SnapshotMigrationTestBase.SnapshotSpec snapshotSpec;

    public static int NUM_ELEMENTS() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS();
    }

    public static String getSnapshotPath(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.getSnapshotPath(snapshotSpec);
    }

    public static Collection<SnapshotMigrationTestBase.SnapshotSpec> internalParameters(@Nullable FlinkVersion flinkVersion) {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.internalParameters(flinkVersion);
    }

    public static Collection<SnapshotMigrationTestBase.SnapshotSpec> createSpecsForTestDataGeneration(FlinkVersion flinkVersion) {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.createSpecsForTestDataGeneration(flinkVersion);
    }

    @Parameterized.Parameters(name = "Test snapshot: {0}")
    public static Collection<SnapshotMigrationTestBase.SnapshotSpec> createSpecsForTestRuns() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.createSpecsForTestRuns();
    }

    @MigrationTest.ParameterizedSnapshotsGenerator("createSpecsForTestDataGeneration")
    public void generateSnapshots(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        testOrCreateSavepointWithBroadcast(SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT, snapshotSpec);
    }

    @Test
    public void testSavepointWithBroadcast() {
        testOrCreateSavepointWithBroadcast(SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT, this.snapshotSpec);
    }

    public void testOrCreateSavepointWithBroadcast(SnapshotMigrationTestBase.ExecutionMode executionMode, SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        StreamExecutionEnvironment stateBackend;
        LazyRef lazyRef = new LazyRef();
        LazyRef lazyRef2 = new LazyRef();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String stateBackendType = snapshotSpec.getStateBackendType();
        if ("rocksdb".equals(stateBackendType)) {
            executionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend());
            SnapshotMigrationTestBase.ExecutionMode executionMode2 = SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT;
            stateBackend = (executionMode != null ? !executionMode.equals(executionMode2) : executionMode2 != null) ? BoxedUnit.UNIT : executionEnvironment.enableChangelogStateBackend(false);
        } else if ("jobmanager".equals(stateBackendType)) {
            stateBackend = executionEnvironment.setStateBackend(new MemoryStateBackend());
        } else {
            if (!"hashmap".equals(stateBackendType)) {
                throw new UnsupportedOperationException();
            }
            stateBackend = executionEnvironment.setStateBackend(new HashMapStateBackend());
        }
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(4);
        executionEnvironment.setMaxParallelism(4);
        final StatefulJobWBroadcastStateMigrationITCase statefulJobWBroadcastStateMigrationITCase = null;
        SingleOutputStreamOperator flatMap = executionEnvironment.addSource(new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource").keyBy(new KeySelector<Tuple2<Object, Object>, Object>(statefulJobWBroadcastStateMigrationITCase) { // from class: org.apache.flink.api.scala.migration.StatefulJobWBroadcastStateMigrationITCase$$anon$24
            public long getKey(Tuple2<Object, Object> tuple2) {
                return tuple2._1$mcJ$sp();
            }

            public /* bridge */ /* synthetic */ Object getKey(Object obj) {
                return BoxesRunTime.boxToLong(getKey((Tuple2<Object, Object>) obj));
            }
        }).flatMap(new StatefulFlatMapper());
        final StatefulJobWBroadcastStateMigrationITCase statefulJobWBroadcastStateMigrationITCase2 = null;
        KeyedStream keyBy = flatMap.keyBy(new KeySelector<Tuple2<Object, Object>, Object>(statefulJobWBroadcastStateMigrationITCase2) { // from class: org.apache.flink.api.scala.migration.StatefulJobWBroadcastStateMigrationITCase$$anon$25
            public long getKey(Tuple2<Object, Object> tuple2) {
                return tuple2._1$mcJ$sp();
            }

            public /* bridge */ /* synthetic */ Object getKey(Object obj) {
                return BoxesRunTime.boxToLong(getKey((Tuple2<Object, Object>) obj));
            }
        });
        BroadcastStream broadcast = executionEnvironment.addSource(new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource").broadcast(new MapStateDescriptor[]{firstBroadcastStateDesc$1(lazyRef), secondBroadcastStateDesc$1(lazyRef2)});
        SnapshotMigrationTestBase.ExecutionMode executionMode3 = SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT;
        if (executionMode != null ? executionMode.equals(executionMode3) : executionMode3 == null) {
            keyBy.connect(broadcast).process(new TestBroadcastProcessFunction()).addSink(new AccumulatorCountingSink());
            executeAndSnapshot(executionEnvironment, new StringBuilder(19).append("src/test/resources/").append(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.getSnapshotPath(snapshotSpec)).toString(), snapshotSpec.getSnapshotType(), new org.apache.flink.api.java.tuple.Tuple2<>(AccumulatorCountingSink$.MODULE$.NUM_ELEMENTS_ACCUMULATOR(), Predef$.MODULE$.int2Integer(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS())));
            return;
        }
        SnapshotMigrationTestBase.ExecutionMode executionMode4 = SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT;
        if (executionMode != null ? !executionMode.equals(executionMode4) : executionMode4 != null) {
            throw new UnsupportedOperationException("Unsupported execution mode.");
        }
        keyBy.connect(broadcast).process(new VerifyingBroadcastProcessFunction(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(0L)), BoxesRunTime.boxToLong(0L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(1L)), BoxesRunTime.boxToLong(1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(2L)), BoxesRunTime.boxToLong(2L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(3L)), BoxesRunTime.boxToLong(3L))})), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("0"), "0"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("1"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("2"), "2"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("3"), "3")})))).addSink(new AccumulatorCountingSink());
        restoreAndExecute(executionEnvironment, SnapshotMigrationTestBase.getResourceFilename(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.getSnapshotPath(snapshotSpec)), new org.apache.flink.api.java.tuple.Tuple2<>(AccumulatorCountingSink$.MODULE$.NUM_ELEMENTS_ACCUMULATOR(), Predef$.MODULE$.int2Integer(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS())));
    }

    private static final /* synthetic */ MapStateDescriptor firstBroadcastStateDesc$lzycompute$1(LazyRef lazyRef) {
        MapStateDescriptor mapStateDescriptor;
        synchronized (lazyRef) {
            mapStateDescriptor = lazyRef.initialized() ? (MapStateDescriptor) lazyRef.value() : (MapStateDescriptor) lazyRef.initialize(new MapStateDescriptor("broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO));
        }
        return mapStateDescriptor;
    }

    private static final MapStateDescriptor firstBroadcastStateDesc$1(LazyRef lazyRef) {
        return lazyRef.initialized() ? (MapStateDescriptor) lazyRef.value() : firstBroadcastStateDesc$lzycompute$1(lazyRef);
    }

    private static final /* synthetic */ MapStateDescriptor secondBroadcastStateDesc$lzycompute$1(LazyRef lazyRef) {
        MapStateDescriptor mapStateDescriptor;
        synchronized (lazyRef) {
            mapStateDescriptor = lazyRef.initialized() ? (MapStateDescriptor) lazyRef.value() : (MapStateDescriptor) lazyRef.initialize(new MapStateDescriptor("broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
        }
        return mapStateDescriptor;
    }

    private static final MapStateDescriptor secondBroadcastStateDesc$1(LazyRef lazyRef) {
        return lazyRef.initialized() ? (MapStateDescriptor) lazyRef.value() : secondBroadcastStateDesc$lzycompute$1(lazyRef);
    }

    public StatefulJobWBroadcastStateMigrationITCase(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        this.snapshotSpec = snapshotSpec;
    }
}
