package org.apache.flink.api.scala.migration;

import java.util.Collection;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.VolatileByteRef;

/* compiled from: StatefulJobWBroadcastStateMigrationITCase.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001\u00055r!B\u0001\u0003\u0011\u0003y\u0011!K*uCR,g-\u001e7K_\n<&I]8bI\u000e\f7\u000f^*uCR,W*[4sCRLwN\\%U\u0007\u0006\u001cXM\u0003\u0002\u0004\t\u0005IQ.[4sCRLwN\u001c\u0006\u0003\u000b\u0019\tQa]2bY\u0006T!a\u0002\u0005\u0002\u0007\u0005\u0004\u0018N\u0003\u0002\n\u0015\u0005)a\r\\5oW*\u00111\u0002D\u0001\u0007CB\f7\r[3\u000b\u00035\t1a\u001c:h\u0007\u0001\u0001\"\u0001E\t\u000e\u0003\t1QA\u0005\u0002\t\u0002M\u0011\u0011f\u0015;bi\u00164W\u000f\u001c&pE^\u0013%o\\1eG\u0006\u001cHo\u0015;bi\u0016l\u0015n\u001a:bi&|g.\u0013+DCN,7cA\t\u00153A\u0011QcF\u0007\u0002-)\tQ!\u0003\u0002\u0019-\t1\u0011I\\=SK\u001a\u0004\"!\u0006\u000e\n\u0005m1\"\u0001D*fe&\fG.\u001b>bE2,\u0007\"B\u000f\u0012\t\u0003q\u0012A\u0002\u001fj]&$h\bF\u0001\u0010\u0011\u0015\u0001\u0013\u0003\"\u0001\"\u0003)\u0001\u0018M]1nKR,'o]\u000b\u0002EA\u00191\u0005\u000b\u0016\u000e\u0003\u0011R!!\n\u0014\u0002\tU$\u0018\u000e\u001c\u0006\u0002O\u0005!!.\u0019<b\u0013\tICE\u0001\u0006D_2dWm\u0019;j_:\u0004B!F\u0016.i%\u0011AF\u0006\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u00059\u0012T\"A\u0018\u000b\u0005\r\u0001$BA\u0019\t\u0003%!Xm\u001d;vi&d7/\u0003\u00024_\t\u0001R*[4sCRLwN\u001c,feNLwN\u001c\t\u0003kar!!\u0006\u001c\n\u0005]2\u0012A\u0002)sK\u0012,g-\u0003\u0002:u\t11\u000b\u001e:j]\u001eT!a\u000e\f)\t}a\u0004*\u0013\t\u0003{\u0015s!AP\"\u000e\u0003}R!\u0001Q!\u0002\u000fI,hN\\3sg*\u0011!\tD\u0001\u0006UVt\u0017\u000e^\u0005\u0003\t~\nQ\u0002U1sC6,G/\u001a:ju\u0016$\u0017B\u0001$H\u0005)\u0001\u0016M]1nKR,'o\u001d\u0006\u0003\t~\nAA\\1nK\u0006\n!*\u0001\u0011NS\u001e\u0014\u0018\r^3!'\u00064X\r]8j]R\u0004s\u0006\t\"bG.,g\u000e\u001a\u001e!wBj\bb\u0002'\u0012\u0005\u0004%\t!T\u0001\u0017\u000f\u0016sUIU!U\u000b~\u001b\u0016IV#Q\u001f&sEk\u0018,F%V\tQ\u0006\u0003\u0004P#\u0001\u0006I!L\u0001\u0018\u000f\u0016sUIU!U\u000b~\u001b\u0016IV#Q\u001f&sEk\u0018,F%\u0002Bq!U\tC\u0002\u0013\u0005!+A\u0010H\u000b:+%+\u0011+F?N\u000be+\u0012)P\u0013:#vLQ!D\u0017\u0016sEi\u0018+Z!\u0016+\u0012\u0001\u000e\u0005\u0007)F\u0001\u000b\u0011\u0002\u001b\u0002A\u001d+e*\u0012*B)\u0016{6+\u0011,F!>Ke\nV0C\u0003\u000e[UI\u0014#`)f\u0003V\t\t\u0005\b-F\u0011\r\u0011\"\u0001X\u00031qU+T0F\u0019\u0016kUI\u0014+T+\u0005A\u0006CA\u000bZ\u0013\tQfCA\u0002J]RDa\u0001X\t!\u0002\u0013A\u0016!\u0004(V\u001b~+E*R'F\u001dR\u001b\u0006\u0005C\u0004_#\u0005\u0005I\u0011B0\u0002\u0017I,\u0017\r\u001a*fg>dg/\u001a\u000b\u0002AB\u0011\u0011\rZ\u0007\u0002E*\u00111MJ\u0001\u0005Y\u0006tw-\u0003\u0002fE\n1qJ\u00196fGR4AA\u0005\u0002\u0001ON\u0019a\r[\r\u0011\u0005%\u0004X\"\u00016\u000b\u0005-d\u0017!B;uS2\u001c(BA7o\u00035\u0019\u0007.Z2la>Lg\u000e^5oO*\u0011q\u000eC\u0001\u0005i\u0016\u001cH/\u0003\u0002rU\nQ2+\u0019<fa>Lg\u000e^'jOJ\fG/[8o)\u0016\u001cHOQ1tK\"A1O\u001aB\u0001B\u0003%!&\u0001\u000enS\u001e\u0014\u0018\r^5p]Z+'o]5p]\u0006sGMQ1dW\u0016tG\rC\u0003\u001eM\u0012\u0005Q\u000f\u0006\u0002woB\u0011\u0001C\u001a\u0005\u0006gR\u0004\rA\u000b\u0005\u0006s\u001a$\tA_\u0001&i\u0016\u001cHo\u0011:fCR,7+\u0019<fa>Lg\u000e^,ji\"\u0014%o\\1eG\u0006\u001cHo\u0015;bi\u0016$\u0012a\u001f\t\u0003+qL!! \f\u0003\tUs\u0017\u000e\u001e\u0015\u0003q~\u0004B!!\u0001\u0002\u00045\t\u0011)C\u0002\u0002\u0006\u0005\u0013a!S4o_J,\u0007f\u0001=\u0002\nA!\u0011\u0011AA\u0006\u0013\r\ti!\u0011\u0002\u0005)\u0016\u001cH\u000f\u0003\u0004\u0002\u0012\u0019$\tA_\u0001\"i\u0016\u001cHOU3ti>\u0014XmU1wKB|\u0017N\u001c;XSRD'I]8bI\u000e\f7\u000f\u001e\u0015\u0005\u0003\u001f\tI\u0001K\u0004g\u0003/\t\u0019#!\n\u0011\t\u0005e\u0011qD\u0007\u0003\u00037Q1!!\bB\u0003\u0019\u0011XO\u001c8fe&!\u0011\u0011EA\u000e\u0005\u001d\u0011VO\\,ji\"\fQA^1mk\u0016\u001c#!a\n\u0011\u0007y\nI#C\u0002\u0002,}\u0012Q\u0002U1sC6,G/\u001a:ju\u0016$\u0007")
/* loaded from: input_file:org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.class */
public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase implements Serializable {
    private final Tuple2<MigrationVersion, String> migrationVersionAndBackend;

    public static int NUM_ELEMENTS() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS();
    }

    public static String GENERATE_SAVEPOINT_BACKEND_TYPE() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.GENERATE_SAVEPOINT_BACKEND_TYPE();
    }

    public static MigrationVersion GENERATE_SAVEPOINT_VER() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.GENERATE_SAVEPOINT_VER();
    }

    @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    public static Collection<Tuple2<MigrationVersion, String>> parameters() {
        return StatefulJobWBroadcastStateMigrationITCase$.MODULE$.parameters();
    }

    @Test
    @Ignore
    public void testCreateSavepointWithBroadcastState() {
        StreamExecutionEnvironment stateBackend;
        ObjectRef zero = ObjectRef.zero();
        ObjectRef zero2 = ObjectRef.zero();
        VolatileByteRef create = VolatileByteRef.create((byte) 0);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String GENERATE_SAVEPOINT_BACKEND_TYPE = StatefulJobWBroadcastStateMigrationITCase$.MODULE$.GENERATE_SAVEPOINT_BACKEND_TYPE();
        if ("rocksdb".equals(GENERATE_SAVEPOINT_BACKEND_TYPE)) {
            stateBackend = executionEnvironment.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
        } else {
            if (!"jobmanager".equals(GENERATE_SAVEPOINT_BACKEND_TYPE)) {
                throw new UnsupportedOperationException();
            }
            stateBackend = executionEnvironment.setStateBackend(new MemoryStateBackend());
        }
        executionEnvironment.setStateBackend(new MemoryStateBackend());
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(4);
        executionEnvironment.setMaxParallelism(4);
        executionEnvironment.addSource(new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource").keyBy(new KeySelector<Tuple2<Object, Object>, Object>(this) { // from class: org.apache.flink.api.scala.migration.StatefulJobWBroadcastStateMigrationITCase$$anon$24
            public long getKey(Tuple2<Object, Object> tuple2) {
                return tuple2._1$mcJ$sp();
            }

            public /* bridge */ /* synthetic */ Object getKey(Object obj) {
                return BoxesRunTime.boxToLong(getKey((Tuple2<Object, Object>) obj));
            }
        }).flatMap(new StatefulFlatMapper()).keyBy(new KeySelector<Tuple2<Object, Object>, Object>(this) { // from class: org.apache.flink.api.scala.migration.StatefulJobWBroadcastStateMigrationITCase$$anon$25
            public long getKey(Tuple2<Object, Object> tuple2) {
                return tuple2._1$mcJ$sp();
            }

            public /* bridge */ /* synthetic */ Object getKey(Object obj) {
                return BoxesRunTime.boxToLong(getKey((Tuple2<Object, Object>) obj));
            }
        }).connect(executionEnvironment.addSource(new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource").broadcast(new MapStateDescriptor[]{firstBroadcastStateDesc$1(zero, create), secondBroadcastStateDesc$1(zero2, create)})).process(new TestBroadcastProcessFunction()).addSink(new AccumulatorCountingSink());
        executeAndSavepoint(executionEnvironment, new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"src/test/resources/stateful-scala-with-broadcast"})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"-udf-migration-itcase-flink"})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{StatefulJobWBroadcastStateMigrationITCase$.MODULE$.GENERATE_SAVEPOINT_VER()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"-", "-savepoint"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{StatefulJobWBroadcastStateMigrationITCase$.MODULE$.GENERATE_SAVEPOINT_BACKEND_TYPE()}))).toString(), new org.apache.flink.api.java.tuple.Tuple2<>(AccumulatorCountingSink$.MODULE$.NUM_ELEMENTS_ACCUMULATOR(), Predef$.MODULE$.int2Integer(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS())));
    }

    @Test
    public void testRestoreSavepointWithBroadcast() {
        StreamExecutionEnvironment stateBackend;
        ObjectRef zero = ObjectRef.zero();
        ObjectRef zero2 = ObjectRef.zero();
        VolatileByteRef create = VolatileByteRef.create((byte) 0);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String str = (String) this.migrationVersionAndBackend._2();
        if ("rocksdb".equals(str)) {
            stateBackend = executionEnvironment.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
        } else {
            if (!"jobmanager".equals(str)) {
                throw new UnsupportedOperationException();
            }
            stateBackend = executionEnvironment.setStateBackend(new MemoryStateBackend());
        }
        executionEnvironment.setStateBackend(new MemoryStateBackend());
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(4);
        executionEnvironment.setMaxParallelism(4);
        executionEnvironment.addSource(new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource").keyBy(new KeySelector<Tuple2<Object, Object>, Object>(this) { // from class: org.apache.flink.api.scala.migration.StatefulJobWBroadcastStateMigrationITCase$$anon$26
            public long getKey(Tuple2<Object, Object> tuple2) {
                return tuple2._1$mcJ$sp();
            }

            public /* bridge */ /* synthetic */ Object getKey(Object obj) {
                return BoxesRunTime.boxToLong(getKey((Tuple2<Object, Object>) obj));
            }
        }).flatMap(new StatefulFlatMapper()).keyBy(new KeySelector<Tuple2<Object, Object>, Object>(this) { // from class: org.apache.flink.api.scala.migration.StatefulJobWBroadcastStateMigrationITCase$$anon$27
            public long getKey(Tuple2<Object, Object> tuple2) {
                return tuple2._1$mcJ$sp();
            }

            public /* bridge */ /* synthetic */ Object getKey(Object obj) {
                return BoxesRunTime.boxToLong(getKey((Tuple2<Object, Object>) obj));
            }
        }).connect(executionEnvironment.addSource(new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource").broadcast(new MapStateDescriptor[]{firstBroadcastStateDesc$2(zero, create), secondBroadcastStateDesc$2(zero2, create)})).process(new VerifyingBroadcastProcessFunction(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(0L)), BoxesRunTime.boxToLong(0L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(1L)), BoxesRunTime.boxToLong(1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(2L)), BoxesRunTime.boxToLong(2L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(3L)), BoxesRunTime.boxToLong(3L))})), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("0"), "0"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("1"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("2"), "2"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("3"), "3")})))).addSink(new AccumulatorCountingSink());
        restoreAndExecute(executionEnvironment, SavepointMigrationTestBase.getResourceFilename(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"stateful-scala-with-broadcast"})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"-udf-migration-itcase-flink", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.migrationVersionAndBackend._1()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"-", "-savepoint"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.migrationVersionAndBackend._2()}))).toString()), new org.apache.flink.api.java.tuple.Tuple2<>(AccumulatorCountingSink$.MODULE$.NUM_ELEMENTS_ACCUMULATOR(), Predef$.MODULE$.int2Integer(StatefulJobWBroadcastStateMigrationITCase$.MODULE$.NUM_ELEMENTS())));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private final MapStateDescriptor firstBroadcastStateDesc$lzycompute$1(ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (volatileByteRef.elem & 1)) == 0) {
                objectRef.elem = new MapStateDescriptor("broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
                volatileByteRef.elem = (byte) (volatileByteRef.elem | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (MapStateDescriptor) objectRef.elem;
        }
    }

    private final MapStateDescriptor firstBroadcastStateDesc$1(ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        return ((byte) (volatileByteRef.elem & 1)) == 0 ? firstBroadcastStateDesc$lzycompute$1(objectRef, volatileByteRef) : (MapStateDescriptor) objectRef.elem;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private final MapStateDescriptor secondBroadcastStateDesc$lzycompute$1(ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (volatileByteRef.elem & 2)) == 0) {
                objectRef.elem = new MapStateDescriptor("broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
                volatileByteRef.elem = (byte) (volatileByteRef.elem | 2);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (MapStateDescriptor) objectRef.elem;
        }
    }

    private final MapStateDescriptor secondBroadcastStateDesc$1(ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        return ((byte) (volatileByteRef.elem & 2)) == 0 ? secondBroadcastStateDesc$lzycompute$1(objectRef, volatileByteRef) : (MapStateDescriptor) objectRef.elem;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private final MapStateDescriptor firstBroadcastStateDesc$lzycompute$2(ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (volatileByteRef.elem & 1)) == 0) {
                objectRef.elem = new MapStateDescriptor("broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
                volatileByteRef.elem = (byte) (volatileByteRef.elem | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (MapStateDescriptor) objectRef.elem;
        }
    }

    private final MapStateDescriptor firstBroadcastStateDesc$2(ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        return ((byte) (volatileByteRef.elem & 1)) == 0 ? firstBroadcastStateDesc$lzycompute$2(objectRef, volatileByteRef) : (MapStateDescriptor) objectRef.elem;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private final MapStateDescriptor secondBroadcastStateDesc$lzycompute$2(ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (volatileByteRef.elem & 2)) == 0) {
                objectRef.elem = new MapStateDescriptor("broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
                volatileByteRef.elem = (byte) (volatileByteRef.elem | 2);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (MapStateDescriptor) objectRef.elem;
        }
    }

    private final MapStateDescriptor secondBroadcastStateDesc$2(ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        return ((byte) (volatileByteRef.elem & 2)) == 0 ? secondBroadcastStateDesc$lzycompute$2(objectRef, volatileByteRef) : (MapStateDescriptor) objectRef.elem;
    }

    public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> tuple2) {
        this.migrationVersionAndBackend = tuple2;
    }
}
