/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.typeserializerupgrade;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class PojoSerializerUpgradeTest
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private StateBackend stateBackend;
    private static final String POJO_NAME = "Pojo";
    private static final String SOURCE_A = "import java.util.Objects;public class Pojo { private long a; private String b; public long getA() { return a;} public void setA(long value) { a = value; }public String getB() { return b; }public void setB(String value) { b = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}@Override public int hashCode() { return Objects.hash(a, b); } @Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
    private static final String SOURCE_B = "import java.util.Objects;public class Pojo { private String b; private long a; public long getA() { return a;} public void setA(long value) { a = value; }public String getB() { return b; }public void setB(String value) { b = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}@Override public int hashCode() { return Objects.hash(a, b); } @Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
    private static final String SOURCE_C = "import java.util.Objects;public class Pojo { private double a; private String b; public double getA() { return a;} public void setA(double value) { a = value; }public String getB() { return b; }public void setB(String value) { b = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}@Override public int hashCode() { return Objects.hash(a, b); } @Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
    private static final String SOURCE_D = "import java.util.Objects;public class Pojo { private long a; private String b; private double c; public long getA() { return a;} public void setA(long value) { a = value; }public String getB() { return b; }public void setB(String value) { b = value; }public double getC() { return c; } public void setC(double value) { c = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b) && c == other.c;} else { return false; }}@Override public int hashCode() { return Objects.hash(a, b, c); } @Override public String toString() {return \"(\" + a + \", \" + b + \", \" + c + \")\";}}";
    private static final String SOURCE_E = "import java.util.Objects;public class Pojo { private long a; public long getA() { return a;} public void setA(long value) { a = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a;} else { return false; }}@Override public int hashCode() { return Objects.hash(a); } @Override public String toString() {return \"(\" + a + \")\";}}";

    @Parameterized.Parameters(name="StateBackend: {0}")
    public static Collection<String> parameters() {
        return Arrays.asList("hashmap", "rocksdb");
    }

    public PojoSerializerUpgradeTest(String backendType) throws IOException, DynamicCodeLoadingException {
        Configuration config = new Configuration();
        config.set(StateBackendOptions.STATE_BACKEND, (Object)backendType);
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)temporaryFolder.newFolder().toURI().toString());
        this.stateBackend = StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)config, (ClassLoader)Thread.currentThread().getContextClassLoader(), null);
    }

    @Test
    public void testChangedFieldOrderWithKeyedState() throws Exception {
        this.testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, true);
    }

    @Test
    public void testChangedFieldOrderWithOperatorState() throws Exception {
        this.testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, false);
    }

    @Test
    public void testChangedFieldTypesWithKeyedState() throws Exception {
        block2: {
            try {
                this.testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, true);
                Assert.fail((String)"Expected a state migration exception.");
            }
            catch (Exception e) {
                if (CommonTestUtils.containsCause((Throwable)e, StateMigrationException.class)) break block2;
                throw e;
            }
        }
    }

    @Test
    public void testChangedFieldTypesWithOperatorState() throws Exception {
        block2: {
            try {
                this.testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, false);
                Assert.fail((String)"Expected a state migration exception.");
            }
            catch (Exception e) {
                if (CommonTestUtils.containsCause((Throwable)e, StateMigrationException.class)) break block2;
                throw e;
            }
        }
    }

    @Test
    public void testAdditionalFieldWithKeyedState() throws Exception {
        this.testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true);
    }

    @Test
    public void testAdditionalFieldWithOperatorState() throws Exception {
        this.testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, false);
    }

    @Test
    public void testMissingFieldWithKeyedState() throws Exception {
        this.testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, true);
    }

    @Test
    public void testMissingFieldWithOperatorState() throws Exception {
        this.testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, false);
    }

    private void testPojoSerializerUpgrade(String classSourceA, String classSourceB, boolean hasBField, boolean isKeyedState) throws Exception {
        Configuration taskConfiguration = new Configuration();
        ExecutionConfig executionConfig = new ExecutionConfig();
        IdentityKeySelector<Long> keySelector = new IdentityKeySelector<Long>();
        List<Long> inputs = Arrays.asList(1L, 2L, 45L, 67L, 1337L);
        File rootPath = temporaryFolder.newFolder();
        File sourceFile = PojoSerializerUpgradeTest.writeSourceFile(rootPath, "Pojo.java", classSourceA);
        PojoSerializerUpgradeTest.compileClass(sourceFile);
        URLClassLoader classLoader = URLClassLoader.newInstance(new URL[]{rootPath.toURI().toURL()}, Thread.currentThread().getContextClassLoader());
        OperatorSubtaskState stateHandles = this.runOperator(taskConfiguration, executionConfig, (OneInputStreamOperator<Long, Long>)new StreamMap((MapFunction)new StatefulMapper(isKeyedState, false, hasBField)), keySelector, isKeyedState, this.stateBackend, classLoader, null, inputs);
        rootPath = temporaryFolder.newFolder();
        sourceFile = PojoSerializerUpgradeTest.writeSourceFile(rootPath, "Pojo.java", classSourceB);
        PojoSerializerUpgradeTest.compileClass(sourceFile);
        URLClassLoader classLoaderB = URLClassLoader.newInstance(new URL[]{rootPath.toURI().toURL()}, Thread.currentThread().getContextClassLoader());
        this.runOperator(taskConfiguration, executionConfig, (OneInputStreamOperator<Long, Long>)new StreamMap((MapFunction)new StatefulMapper(isKeyedState, true, hasBField)), keySelector, isKeyedState, this.stateBackend, classLoaderB, stateHandles, inputs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private OperatorSubtaskState runOperator(Configuration taskConfiguration, ExecutionConfig executionConfig, OneInputStreamOperator<Long, Long> operator, KeySelector<Long, Long> keySelector, boolean isKeyedState, StateBackend stateBackend, ClassLoader classLoader, OperatorSubtaskState operatorSubtaskState, Iterable<Long> input) throws Exception {
        try (MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("test task").setManagedMemorySize(32768L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(256).setTaskConfiguration(taskConfiguration).setExecutionConfig(executionConfig).setMaxParallelism(16).setUserCodeClassLoader(classLoader).build();){
            OperatorSubtaskState operatorSubtaskState2;
            Object harness = null;
            try {
                harness = isKeyedState ? new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, environment) : new OneInputStreamOperatorTestHarness(operator, (TypeSerializer)LongSerializer.INSTANCE, environment);
                harness.setStateBackend(stateBackend);
                harness.setup();
                harness.initializeState(operatorSubtaskState);
                harness.open();
                long timestamp = 0L;
                for (Long value : input) {
                    harness.processElement((Object)value, timestamp++);
                }
                long checkpointId = 1L;
                long checkpointTimestamp = timestamp + 1L;
                operatorSubtaskState2 = harness.snapshot(checkpointId, checkpointTimestamp);
            }
            catch (Throwable throwable) {
                IOUtils.closeQuietly(harness);
                throw throwable;
            }
            IOUtils.closeQuietly((AutoCloseable)harness);
            return operatorSubtaskState2;
        }
    }

    private static File writeSourceFile(File root, String name, String source) throws IOException {
        File sourceFile = new File(root, name);
        sourceFile.getParentFile().mkdirs();
        try (FileWriter writer = new FileWriter(sourceFile);){
            writer.write(source);
        }
        return sourceFile;
    }

    private static int compileClass(File sourceFile) {
        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
        return compiler.run(null, null, null, "-proc:none", sourceFile.getPath());
    }

    private static final class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private static final long serialVersionUID = -3263628393881929147L;

        private IdentityKeySelector() {
        }

        public T getKey(T value) throws Exception {
            return value;
        }
    }

    private static final class FirstValueReducer<T>
    implements ReduceFunction<T> {
        private static final long serialVersionUID = -9222976423336835926L;

        private FirstValueReducer() {
        }

        public T reduce(T value1, T value2) throws Exception {
            return value1;
        }
    }

    private static final class StatefulMapper
    extends RichMapFunction<Long, Long>
    implements CheckpointedFunction {
        private static final long serialVersionUID = -520490739059396832L;
        private final boolean keyed;
        private final boolean verify;
        private final boolean hasBField;
        private transient ValueState<Object> keyedValueState;
        private transient ListState<Object> keyedListState;
        private transient ReducingState<Object> keyedReducingState;
        private transient ListState<Object> partitionableListState;
        private transient ListState<Object> unionListState;
        private transient Class<?> pojoClass;
        private transient Field fieldA;
        private transient Field fieldB;

        StatefulMapper(boolean keyed, boolean verify, boolean hasBField) {
            this.keyed = keyed;
            this.verify = verify;
            this.hasBField = hasBField;
        }

        public Long map(Long value) throws Exception {
            Object pojo = this.pojoClass.newInstance();
            this.fieldA.set(pojo, value);
            if (this.hasBField) {
                this.fieldB.set(pojo, "" + value);
            }
            if (this.verify) {
                if (this.keyed) {
                    Assert.assertEquals(pojo, (Object)this.keyedValueState.value());
                    Iterator listIterator = ((Iterable)this.keyedListState.get()).iterator();
                    boolean elementFound = false;
                    while (listIterator.hasNext()) {
                        elementFound |= pojo.equals(listIterator.next());
                    }
                    Assert.assertTrue((boolean)elementFound);
                    Assert.assertEquals(pojo, (Object)this.keyedReducingState.get());
                } else {
                    boolean elementFound = false;
                    Iterator listIterator = ((Iterable)this.partitionableListState.get()).iterator();
                    while (listIterator.hasNext()) {
                        elementFound |= pojo.equals(listIterator.next());
                    }
                    Assert.assertTrue((boolean)elementFound);
                    elementFound = false;
                    listIterator = ((Iterable)this.unionListState.get()).iterator();
                    while (listIterator.hasNext()) {
                        elementFound |= pojo.equals(listIterator.next());
                    }
                    Assert.assertTrue((boolean)elementFound);
                }
            } else if (this.keyed) {
                this.keyedValueState.update(pojo);
                this.keyedListState.add(pojo);
                this.keyedReducingState.add(pojo);
            } else {
                this.partitionableListState.add(pojo);
                this.unionListState.add(pojo);
            }
            return value;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.pojoClass = this.getRuntimeContext().getUserCodeClassLoader().loadClass(PojoSerializerUpgradeTest.POJO_NAME);
            this.fieldA = this.pojoClass.getDeclaredField("a");
            this.fieldA.setAccessible(true);
            if (this.hasBField) {
                this.fieldB = this.pojoClass.getDeclaredField("b");
                this.fieldB.setAccessible(true);
            }
            if (this.keyed) {
                this.keyedValueState = context.getKeyedStateStore().getState(new ValueStateDescriptor("keyedValueState", this.pojoClass));
                this.keyedListState = context.getKeyedStateStore().getListState(new ListStateDescriptor("keyedListState", this.pojoClass));
                FirstValueReducer reduceFunction = new FirstValueReducer();
                this.keyedReducingState = context.getKeyedStateStore().getReducingState(new ReducingStateDescriptor("keyedReducingState", reduceFunction, this.pojoClass));
            } else {
                this.partitionableListState = context.getOperatorStateStore().getListState(new ListStateDescriptor("partitionableListState", this.pojoClass));
                this.unionListState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("unionListState", this.pojoClass));
            }
        }
    }
}

