/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.misc;

import java.io.IOException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.types.Value;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class CustomSerializationITCase {
    private static final int PARLLELISM = 5;
    private static ForkableFlinkMiniCluster cluster;

    @BeforeClass
    public static void startCluster() {
        try {
            Configuration config = new Configuration();
            config.setInteger("taskmanager.numberOfTaskSlots", 5);
            config.setInteger("taskmanager.memory.size", 30);
            cluster = new ForkableFlinkMiniCluster(config, false);
            cluster.start();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Failed to start test cluster: " + e.getMessage()));
        }
    }

    @AfterClass
    public static void shutdownCluster() {
        try {
            cluster.shutdown();
            cluster = null;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Failed to stop test cluster: " + e.getMessage()));
        }
    }

    @Test
    public void testIncorrectSerializer1() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(5);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 50L).map((MapFunction)new MapFunction<Long, ConsumesTooMuch>(){

                public ConsumesTooMuch map(Long value) throws Exception {
                    return new ConsumesTooMuch();
                }
            }).rebalance().output((OutputFormat)new DiscardingOutputFormat());
            env.execute();
        }
        catch (ProgramInvocationException e) {
            Throwable rootCause = e.getCause().getCause();
            Assert.assertTrue((boolean)(rootCause instanceof IOException));
            Assert.assertTrue((boolean)rootCause.getMessage().contains("broken serialization"));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer2() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(5);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 50L).map((MapFunction)new MapFunction<Long, ConsumesTooMuchSpanning>(){

                public ConsumesTooMuchSpanning map(Long value) throws Exception {
                    return new ConsumesTooMuchSpanning();
                }
            }).rebalance().output((OutputFormat)new DiscardingOutputFormat());
            env.execute();
        }
        catch (ProgramInvocationException e) {
            Throwable rootCause = e.getCause().getCause();
            Assert.assertTrue((boolean)(rootCause instanceof IOException));
            Assert.assertTrue((boolean)rootCause.getMessage().contains("broken serialization"));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer3() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(5);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 50L).map((MapFunction)new MapFunction<Long, ConsumesTooLittle>(){

                public ConsumesTooLittle map(Long value) throws Exception {
                    return new ConsumesTooLittle();
                }
            }).rebalance().output((OutputFormat)new DiscardingOutputFormat());
            env.execute();
        }
        catch (ProgramInvocationException e) {
            Throwable rootCause = e.getCause().getCause();
            Assert.assertTrue((boolean)(rootCause instanceof IOException));
            Assert.assertTrue((boolean)rootCause.getMessage().contains("broken serialization"));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer4() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(5);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 50L).map((MapFunction)new MapFunction<Long, ConsumesTooLittleSpanning>(){

                public ConsumesTooLittleSpanning map(Long value) throws Exception {
                    return new ConsumesTooLittleSpanning();
                }
            }).rebalance().output((OutputFormat)new DiscardingOutputFormat());
            env.execute();
        }
        catch (ProgramInvocationException e) {
            Throwable rootCause = e.getCause().getCause();
            Assert.assertTrue((boolean)(rootCause instanceof IOException));
            Assert.assertTrue((boolean)rootCause.getMessage().contains("broken serialization"));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    public static class ConsumesTooLittleSpanning
    implements Value {
        public void write(DataOutputView out) throws IOException {
            byte[] bytes = new byte[32941];
            out.write(bytes);
        }

        public void read(DataInputView in) throws IOException {
            byte[] bytes = new byte[22541];
            in.readFully(bytes);
        }
    }

    public static class ConsumesTooLittle
    implements Value {
        public void write(DataOutputView out) throws IOException {
            out.writeLong(42L);
        }

        public void read(DataInputView in) throws IOException {
            in.readInt();
        }
    }

    public static class ConsumesTooMuchSpanning
    implements Value {
        public void write(DataOutputView out) throws IOException {
            byte[] bytes = new byte[22541];
            out.write(bytes);
        }

        public void read(DataInputView in) throws IOException {
            byte[] bytes = new byte[32941];
            in.readFully(bytes);
        }
    }

    public static class ConsumesTooMuch
    implements Value {
        public void write(DataOutputView out) throws IOException {
            out.writeInt(42);
        }

        public void read(DataInputView in) throws IOException {
            in.readLong();
        }
    }
}

