package org.apache.flink.api.scala.migration;

import java.util.Collection;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Tuple2;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;

/* compiled from: StatefulJobWBroadcastStateMigrationITCase.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001\u0005Es!B\u0001\u0003\u0011\u0003y\u0011!K*uCR,g-\u001e7K_\n<&I]8bI\u000e\f7\u000f^*uCR,W*[4sCRLwN\\%U\u0007\u0006\u001cXM\u0003\u0002\u0004\t\u0005IQ.[4sCRLwN\u001c\u0006\u0003\u000b\u0019\tQa]2bY\u0006T!a\u0002\u0005\u0002\u0007\u0005\u0004\u0018N\u0003\u0002\n\u0015\u0005)a\r\\5oW*\u00111\u0002D\u0001\u0007CB\f7\r[3\u000b\u00035\t1a\u001c:h\u0007\u0001\u0001\"\u0001E\t\u000e\u0003\t1QA\u0005\u0002\t\u0002M\u0011\u0011f\u0015;bi\u00164W\u000f\u001c&pE^\u0013%o\\1eG\u0006\u001cHo\u0015;bi\u0016l\u0015n\u001a:bi&|g.\u0013+DCN,7cA\t\u00153A\u0011QcF\u0007\u0002-)\tQ!\u0003\u0002\u0019-\t1\u0011I\\=SK\u001a\u0004\"!\u0006\u000e\n\u0005m1\"\u0001D*fe&\fG.\u001b>bE2,\u0007\"B\u000f\u0012\t\u0003q\u0012A\u0002\u001fj]&$h\bF\u0001\u0010\u0011\u001d\u0001\u0013C1A\u0005\u0002\u0005\nabY;se\u0016tGOV3sg&|g.F\u0001#!\t\u0019C%D\u0001\t\u0013\t)\u0003B\u0001\u0007GY&t7NV3sg&|g\u000e\u0003\u0004(#\u0001\u0006IAI\u0001\u0010GV\u0014(/\u001a8u-\u0016\u00148/[8oA!9\u0011&\u0005b\u0001\n\u0003Q\u0013!D3yK\u000e,H/[8o\u001b>$W-F\u0001,!\taS'D\u0001.\u0015\tqs&A\rT]\u0006\u00048\u000f[8u\u001b&<'/\u0019;j_:$Vm\u001d;CCN,'B\u0001\u00192\u0003\u0015)H/\u001b7t\u0015\t\u00114'A\u0007dQ\u0016\u001c7\u000e]8j]RLgn\u001a\u0006\u0003i!\tA\u0001^3ti&\u0011a'\f\u0002\u000e\u000bb,7-\u001e;j_:lu\u000eZ3\t\ra\n\u0002\u0015!\u0003,\u00039)\u00070Z2vi&|g.T8eK\u0002BQAO\t\u0005\u0002m\n!\u0002]1sC6,G/\u001a:t+\u0005a\u0004cA\u001fC\t6\taH\u0003\u0002@\u0001\u0006!Q\u000f^5m\u0015\u0005\t\u0015\u0001\u00026bm\u0006L!a\u0011 \u0003\u0015\r{G\u000e\\3di&|g\u000e\u0005\u0002F-:\u0011a)\u0016\b\u0003\u000fRs!\u0001S*\u000f\u0005%\u0013fB\u0001&R\u001d\tY\u0005K\u0004\u0002M\u001f6\tQJ\u0003\u0002O\u001d\u00051AH]8pizJ\u0011!D\u0005\u0003\u00171I!!\u0003\u0006\n\u0005QB\u0011B\u0001\u001a4\u0013\t\u0001\u0014'\u0003\u0002/_%\u0011q+\f\u0002\r':\f\u0007o\u001d5piN\u0003Xm\u0019\u0015\u0005se+g\r\u0005\u0002[E:\u00111\fY\u0007\u00029*\u0011QLX\u0001\beVtg.\u001a:t\u0015\tyF\"A\u0003kk:LG/\u0003\u0002b9\u0006i\u0001+\u0019:b[\u0016$XM]5{K\u0012L!a\u00193\u0003\u0015A\u000b'/Y7fi\u0016\u00148O\u0003\u0002b9\u0006!a.Y7fC\u00059\u0017A\u0005+fgR\u00043O\\1qg\"|GO\u000f\u0011|auDQ![\t\u0005\u0002)\fqbZ3u':\f\u0007o\u001d5piB\u000bG\u000f\u001b\u000b\u0003WN\u0004\"\u0001\u001c9\u000f\u00055t\u0007C\u0001'\u0017\u0013\tyg#\u0001\u0004Qe\u0016$WMZ\u0005\u0003cJ\u0014aa\u0015;sS:<'BA8\u0017\u0011\u0015!\b\u000e1\u0001E\u00031\u0019h.\u00199tQ>$8\u000b]3d\u0011\u001d1\u0018C1A\u0005\u0002]\fABT+N?\u0016cU)T#O)N+\u0012\u0001\u001f\t\u0003+eL!A\u001f\f\u0003\u0007%sG\u000f\u0003\u0004}#\u0001\u0006I\u0001_\u0001\u000e\u001dVku,\u0012'F\u001b\u0016sEk\u0015\u0011\t\u000fy\f\u0012\u0011!C\u0005\u007f\u0006Y!/Z1e%\u0016\u001cx\u000e\u001c<f)\t\t\t\u0001\u0005\u0003\u0002\u0004\u0005%QBAA\u0003\u0015\r\t9\u0001Q\u0001\u0005Y\u0006tw-\u0003\u0003\u0002\f\u0005\u0015!AB(cU\u0016\u001cGOB\u0003\u0013\u0005\u0001\tyaE\u0003\u0002\u000e\u0005E\u0011\u0004\u0005\u0003\u0002\u0014\u0005UQ\"A\u0018\n\u0007\u0005]qFA\rT]\u0006\u00048\u000f[8u\u001b&<'/\u0019;j_:$Vm\u001d;CCN,\u0007\"\u0003;\u0002\u000e\t\u0005\t\u0015!\u0003E\u0011\u001di\u0012Q\u0002C\u0001\u0003;!B!a\b\u0002\"A\u0019\u0001#!\u0004\t\rQ\fY\u00021\u0001E\u0011!\t)#!\u0004\u0005\u0002\u0005\u001d\u0012A\u0007;fgR\u001c\u0016M^3q_&tGoV5uQ\n\u0013x.\u00193dCN$HCAA\u0015!\r)\u00121F\u0005\u0004\u0003[1\"\u0001B+oSRDC!a\t\u00022A!\u00111GA\u001b\u001b\u0005q\u0016bAA\u001c=\n!A+Z:uQ!\ti!a\u000f\u0002H\u0005%\u0003\u0003BA\u001f\u0003\u0007j!!a\u0010\u000b\u0007\u0005\u0005c,\u0001\u0004sk:tWM]\u0005\u0005\u0003\u000b\nyDA\u0004Sk:<\u0016\u000e\u001e5\u0002\u000bY\fG.^3$\u0005\u0005-\u0003cA.\u0002N%\u0019\u0011q\n/\u0003\u001bA\u000b'/Y7fi\u0016\u0014\u0018N_3e\u0001")
/* loaded from: input_file:org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.class */
public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase implements Serializable {
    private final SnapshotMigrationTestBase.SnapshotSpec snapshotSpec;

    public static int NUM_ELEMENTS() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS();
    }

    public static String getSnapshotPath(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.getSnapshotPath(snapshotSpec);
    }

    @Parameterized.Parameters(name = "Test snapshot: {0}")
    public static Collection<SnapshotMigrationTestBase.SnapshotSpec> parameters() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.parameters();
    }

    public static SnapshotMigrationTestBase.ExecutionMode executionMode() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.executionMode();
    }

    public static FlinkVersion currentVersion() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.currentVersion();
    }

    @Test
    public void testSavepointWithBroadcast() {
        StreamExecutionEnvironment stateBackend;
        LazyRef lazyRef = new LazyRef();
        LazyRef lazyRef2 = new LazyRef();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String stateBackendType = this.snapshotSpec.getStateBackendType();
        if ("rocksdb".equals(stateBackendType)) {
            stateBackend = executionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend());
        } else if ("jobmanager".equals(stateBackendType)) {
            stateBackend = executionEnvironment.setStateBackend(new MemoryStateBackend());
        } else {
            if (!"hashmap".equals(stateBackendType)) {
                throw new UnsupportedOperationException();
            }
            stateBackend = executionEnvironment.setStateBackend(new HashMapStateBackend());
        }
        executionEnvironment.enableChangelogStateBackend(false);
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(4);
        executionEnvironment.setMaxParallelism(4);
        final StatefulJobWBroadcastStateMigrationITCase statefulJobWBroadcastStateMigrationITCase = null;
        SingleOutputStreamOperator flatMap = executionEnvironment.addSource(new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource").keyBy(new KeySelector<Tuple2<Object, Object>, Object>(statefulJobWBroadcastStateMigrationITCase) { // from class: org.apache.flink.api.scala.migration.StatefulJobWBroadcastStateMigrationITCase$$anon$24
            public long getKey(Tuple2<Object, Object> tuple2) {
                return tuple2._1$mcJ$sp();
            }

            public /* bridge */ /* synthetic */ Object getKey(Object obj) {
                return BoxesRunTime.boxToLong(getKey((Tuple2<Object, Object>) obj));
            }
        }).flatMap(new StatefulFlatMapper());
        final StatefulJobWBroadcastStateMigrationITCase statefulJobWBroadcastStateMigrationITCase2 = null;
        KeyedStream keyBy = flatMap.keyBy(new KeySelector<Tuple2<Object, Object>, Object>(statefulJobWBroadcastStateMigrationITCase2) { // from class: org.apache.flink.api.scala.migration.StatefulJobWBroadcastStateMigrationITCase$$anon$25
            public long getKey(Tuple2<Object, Object> tuple2) {
                return tuple2._1$mcJ$sp();
            }

            public /* bridge */ /* synthetic */ Object getKey(Object obj) {
                return BoxesRunTime.boxToLong(getKey((Tuple2<Object, Object>) obj));
            }
        });
        BroadcastStream broadcast = executionEnvironment.addSource(new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource").broadcast(new MapStateDescriptor[]{firstBroadcastStateDesc$1(lazyRef), secondBroadcastStateDesc$1(lazyRef2)});
        SnapshotMigrationTestBase.ExecutionMode executionMode = StatefulJobWBroadcastStateMigrationITCase$.MODULE$.executionMode();
        SnapshotMigrationTestBase.ExecutionMode executionMode2 = SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT;
        if (executionMode != null ? executionMode.equals(executionMode2) : executionMode2 == null) {
            keyBy.connect(broadcast).process(new TestBroadcastProcessFunction()).addSink(new AccumulatorCountingSink());
            executeAndSnapshot(executionEnvironment, new StringBuilder(19).append("src/test/resources/").append(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.getSnapshotPath(this.snapshotSpec)).toString(), this.snapshotSpec.getSnapshotType(), new org.apache.flink.api.java.tuple.Tuple2<>(AccumulatorCountingSink$.MODULE$.NUM_ELEMENTS_ACCUMULATOR(), Predef$.MODULE$.int2Integer(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS())));
            return;
        }
        SnapshotMigrationTestBase.ExecutionMode executionMode3 = StatefulJobWBroadcastStateMigrationITCase$.MODULE$.executionMode();
        SnapshotMigrationTestBase.ExecutionMode executionMode4 = SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT;
        if (executionMode3 != null ? !executionMode3.equals(executionMode4) : executionMode4 != null) {
            throw new UnsupportedOperationException("Unsupported execution mode.");
        }
        keyBy.connect(broadcast).process(new VerifyingBroadcastProcessFunction(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(0L)), BoxesRunTime.boxToLong(0L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(1L)), BoxesRunTime.boxToLong(1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(2L)), BoxesRunTime.boxToLong(2L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(3L)), BoxesRunTime.boxToLong(3L))})), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("0"), "0"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("1"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("2"), "2"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("3"), "3")})))).addSink(new AccumulatorCountingSink());
        restoreAndExecute(executionEnvironment, SnapshotMigrationTestBase.getResourceFilename(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.getSnapshotPath(this.snapshotSpec)), new org.apache.flink.api.java.tuple.Tuple2<>(AccumulatorCountingSink$.MODULE$.NUM_ELEMENTS_ACCUMULATOR(), Predef$.MODULE$.int2Integer(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS())));
    }

    private static final /* synthetic */ MapStateDescriptor firstBroadcastStateDesc$lzycompute$1(LazyRef lazyRef) {
        MapStateDescriptor mapStateDescriptor;
        synchronized (lazyRef) {
            mapStateDescriptor = lazyRef.initialized() ? (MapStateDescriptor) lazyRef.value() : (MapStateDescriptor) lazyRef.initialize(new MapStateDescriptor("broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO));
        }
        return mapStateDescriptor;
    }

    private static final MapStateDescriptor firstBroadcastStateDesc$1(LazyRef lazyRef) {
        return lazyRef.initialized() ? (MapStateDescriptor) lazyRef.value() : firstBroadcastStateDesc$lzycompute$1(lazyRef);
    }

    private static final /* synthetic */ MapStateDescriptor secondBroadcastStateDesc$lzycompute$1(LazyRef lazyRef) {
        MapStateDescriptor mapStateDescriptor;
        synchronized (lazyRef) {
            mapStateDescriptor = lazyRef.initialized() ? (MapStateDescriptor) lazyRef.value() : (MapStateDescriptor) lazyRef.initialize(new MapStateDescriptor("broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
        }
        return mapStateDescriptor;
    }

    private static final MapStateDescriptor secondBroadcastStateDesc$1(LazyRef lazyRef) {
        return lazyRef.initialized() ? (MapStateDescriptor) lazyRef.value() : secondBroadcastStateDesc$lzycompute$1(lazyRef);
    }

    public StatefulJobWBroadcastStateMigrationITCase(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        this.snapshotSpec = snapshotSpec;
    }
}
