package org.apache.flink.test.misc;

import java.io.IOException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.types.Value;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/misc/CustomSerializationITCase.class */
public class CustomSerializationITCase extends TestLogger {
    private static final int PARLLELISM = 5;

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARLLELISM).build());

    /* loaded from: input_file:org/apache/flink/test/misc/CustomSerializationITCase$ConsumesTooLittle.class */
    public static class ConsumesTooLittle implements Value {
        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeLong(42L);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readInt();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/misc/CustomSerializationITCase$ConsumesTooLittleSpanning.class */
    public static class ConsumesTooLittleSpanning implements Value {
        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.write(new byte[32941]);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readFully(new byte[22541]);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/misc/CustomSerializationITCase$ConsumesTooMuch.class */
    public static class ConsumesTooMuch implements Value {
        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeInt(42);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readLong();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/misc/CustomSerializationITCase$ConsumesTooMuchSpanning.class */
    public static class ConsumesTooMuchSpanning implements Value {
        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.write(new byte[22541]);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readFully(new byte[32941]);
        }
    }

    public static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "30m");
        return configuration;
    }

    @Test
    public void testIncorrectSerializer1() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARLLELISM);
            executionEnvironment.getConfig().disableSysoutLogging();
            executionEnvironment.generateSequence(1L, 50L).map(new MapFunction<Long, ConsumesTooMuch>() { // from class: org.apache.flink.test.misc.CustomSerializationITCase.1
                public ConsumesTooMuch map(Long l) throws Exception {
                    return new ConsumesTooMuch();
                }
            }).rebalance().output(new DiscardingOutputFormat());
            executionEnvironment.execute();
        } catch (JobExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertTrue(cause instanceof IOException);
            Assert.assertTrue(cause.getMessage().contains("broken serialization"));
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer2() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARLLELISM);
            executionEnvironment.getConfig().disableSysoutLogging();
            executionEnvironment.generateSequence(1L, 50L).map(new MapFunction<Long, ConsumesTooMuchSpanning>() { // from class: org.apache.flink.test.misc.CustomSerializationITCase.2
                public ConsumesTooMuchSpanning map(Long l) throws Exception {
                    return new ConsumesTooMuchSpanning();
                }
            }).rebalance().output(new DiscardingOutputFormat());
            executionEnvironment.execute();
        } catch (JobExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertTrue(cause instanceof IOException);
            Assert.assertTrue(cause.getMessage().contains("broken serialization"));
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer3() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARLLELISM);
            executionEnvironment.getConfig().disableSysoutLogging();
            executionEnvironment.generateSequence(1L, 50L).map(new MapFunction<Long, ConsumesTooLittle>() { // from class: org.apache.flink.test.misc.CustomSerializationITCase.3
                public ConsumesTooLittle map(Long l) throws Exception {
                    return new ConsumesTooLittle();
                }
            }).rebalance().output(new DiscardingOutputFormat());
            executionEnvironment.execute();
        } catch (JobExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertTrue(cause instanceof IOException);
            Assert.assertTrue(cause.getMessage().contains("broken serialization"));
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer4() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARLLELISM);
            executionEnvironment.getConfig().disableSysoutLogging();
            executionEnvironment.generateSequence(1L, 50L).map(new MapFunction<Long, ConsumesTooLittleSpanning>() { // from class: org.apache.flink.test.misc.CustomSerializationITCase.4
                public ConsumesTooLittleSpanning map(Long l) throws Exception {
                    return new ConsumesTooLittleSpanning();
                }
            }).rebalance().output(new DiscardingOutputFormat());
            executionEnvironment.execute();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        } catch (ProgramInvocationException e2) {
            Throwable cause = e2.getCause().getCause();
            Assert.assertTrue(cause instanceof IOException);
            Assert.assertTrue(cause.getMessage().contains("broken serialization"));
        }
    }
}
