/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.misc;

import java.io.IOException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Value;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class CustomSerializationITCase
extends TestLogger {
    private static final int PARLLELISM = 5;
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(CustomSerializationITCase.getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(5).build());

    public static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"30m"));
        return config;
    }

    @Test
    public void testIncorrectSerializer1() {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(5);
            env.fromSequence(1L, 50L).map((MapFunction)new MapFunction<Long, ConsumesTooMuch>(){

                public ConsumesTooMuch map(Long value) throws Exception {
                    return new ConsumesTooMuch();
                }
            }).rebalance().sinkTo((Sink)new DiscardingSink());
            env.execute();
        }
        catch (JobExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, candidate -> candidate.getMessage().contains("broken serialization.")).isPresent());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer2() {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(5);
            env.fromSequence(1L, 50L).map((MapFunction)new MapFunction<Long, ConsumesTooMuchSpanning>(){

                public ConsumesTooMuchSpanning map(Long value) throws Exception {
                    return new ConsumesTooMuchSpanning();
                }
            }).rebalance().sinkTo((Sink)new DiscardingSink());
            env.execute();
        }
        catch (JobExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, candidate -> candidate.getMessage().contains("broken serialization.")).isPresent());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer3() {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(5);
            env.fromSequence(1L, 50L).map((MapFunction)new MapFunction<Long, ConsumesTooLittle>(){

                public ConsumesTooLittle map(Long value) throws Exception {
                    return new ConsumesTooLittle();
                }
            }).rebalance().sinkTo((Sink)new DiscardingSink());
            env.execute();
        }
        catch (JobExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, candidate -> candidate.getMessage().contains("broken serialization.")).isPresent());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncorrectSerializer4() {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(5);
            env.fromSequence(1L, 50L).map((MapFunction)new MapFunction<Long, ConsumesTooLittleSpanning>(){

                public ConsumesTooLittleSpanning map(Long value) throws Exception {
                    return new ConsumesTooLittleSpanning();
                }
            }).rebalance().sinkTo((Sink)new DiscardingSink());
            env.execute();
        }
        catch (ProgramInvocationException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, candidate -> candidate.getMessage().contains("broken serialization.")).isPresent());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    public static class ConsumesTooLittleSpanning
    implements Value {
        public void write(DataOutputView out) throws IOException {
            byte[] bytes = new byte[32941];
            out.write(bytes);
        }

        public void read(DataInputView in) throws IOException {
            byte[] bytes = new byte[22541];
            in.readFully(bytes);
        }
    }

    public static class ConsumesTooLittle
    implements Value {
        public void write(DataOutputView out) throws IOException {
            out.writeLong(42L);
        }

        public void read(DataInputView in) throws IOException {
            in.readInt();
        }
    }

    public static class ConsumesTooMuchSpanning
    implements Value {
        public void write(DataOutputView out) throws IOException {
            byte[] bytes = new byte[22541];
            out.write(bytes);
        }

        public void read(DataInputView in) throws IOException {
            byte[] bytes = new byte[32941];
            in.readFully(bytes);
        }
    }

    public static class ConsumesTooMuch
    implements Value {
        public void write(DataOutputView out) throws IOException {
            out.writeInt(42);
        }

        public void read(DataInputView in) throws IOException {
            in.readLong();
        }
    }
}

