package org.apache.flink.streaming.api.functions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.types.Value;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/FromElementsFunctionTest.class */
public class FromElementsFunctionTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/FromElementsFunctionTest$DeserializeTooMuchType.class */
    public static class DeserializeTooMuchType implements Value {
        private static final long serialVersionUID = -6037206294939421807L;

        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeInt(42);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readLong();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/FromElementsFunctionTest$MyPojo.class */
    public static class MyPojo {
        public long val1;
        public int val2;

        public MyPojo() {
        }

        public MyPojo(long j, int i) {
            this.val1 = j;
            this.val2 = i;
        }

        public int hashCode() {
            return this.val2;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof MyPojo)) {
                return false;
            }
            MyPojo myPojo = (MyPojo) obj;
            return this.val1 == myPojo.val1 && this.val2 == myPojo.val2;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/FromElementsFunctionTest$SerializationErrorType.class */
    public static class SerializationErrorType implements Value {
        private static final long serialVersionUID = -6037206294939421807L;

        public void write(DataOutputView dataOutputView) throws IOException {
            throw new IOException("test exception");
        }

        public void read(DataInputView dataInputView) throws IOException {
            throw new IOException("test exception");
        }
    }

    @Test
    public void testStrings() {
        try {
            String[] strArr = {"Oh", "boy", "what", "a", "show", "!"};
            FromElementsFunction fromElementsFunction = new FromElementsFunction(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), strArr);
            ArrayList arrayList = new ArrayList();
            fromElementsFunction.run(new ListSourceContext(arrayList));
            Assert.assertEquals(Arrays.asList(strArr), arrayList);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testNonJavaSerializableType() {
        try {
            MyPojo[] myPojoArr = {new MyPojo(1L, 2), new MyPojo(3L, 4), new MyPojo(5L, 6)};
            FromElementsFunction fromElementsFunction = new FromElementsFunction(TypeExtractor.getForClass(MyPojo.class).createSerializer(new ExecutionConfig()), myPojoArr);
            ArrayList arrayList = new ArrayList();
            fromElementsFunction.run(new ListSourceContext(arrayList));
            Assert.assertEquals(Arrays.asList(myPojoArr), arrayList);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSerializationError() {
        try {
            try {
                new FromElementsFunction(new ValueTypeInfo(SerializationErrorType.class).createSerializer(new ExecutionConfig()), new SerializationErrorType[]{new SerializationErrorType()});
                Assert.fail("should fail with an exception");
            } catch (IOException e) {
                Assert.assertTrue(ExceptionUtils.stringifyException(e).contains("test exception"));
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testDeSerializationError() {
        try {
            try {
                new FromElementsFunction(new ValueTypeInfo(DeserializeTooMuchType.class).createSerializer(new ExecutionConfig()), new DeserializeTooMuchType[]{new DeserializeTooMuchType()}).run(new ListSourceContext(new ArrayList()));
                Assert.fail("should fail with an exception");
            } catch (IOException e) {
                Assert.assertTrue(ExceptionUtils.stringifyException(e).contains("user-defined serialization"));
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testCheckpointAndRestore() {
        int intValue;
        try {
            ArrayList arrayList = new ArrayList(10000);
            ArrayList arrayList2 = new ArrayList(10000);
            for (int i = 0; i < 10000; i++) {
                arrayList.add(Integer.valueOf(i));
            }
            final FromElementsFunction fromElementsFunction = new FromElementsFunction(IntSerializer.INSTANCE, arrayList);
            FromElementsFunction createCopySerializable = CommonTestUtils.createCopySerializable(fromElementsFunction);
            final ListSourceContext listSourceContext = new ListSourceContext(arrayList2, 2L);
            final Throwable[] thArr = new Throwable[1];
            Thread thread = new Thread() { // from class: org.apache.flink.streaming.api.functions.FromElementsFunctionTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        fromElementsFunction.run(listSourceContext);
                    } catch (Throwable th) {
                        thArr[0] = th;
                    }
                }
            };
            thread.start();
            Thread.sleep(1000L);
            ArrayList arrayList3 = new ArrayList(10000);
            synchronized (listSourceContext.getCheckpointLock()) {
                intValue = fromElementsFunction.snapshotState(566L, System.currentTimeMillis()).intValue();
                arrayList3.addAll(arrayList2);
            }
            fromElementsFunction.cancel();
            thread.join();
            if (thArr[0] != null) {
                System.err.println("Error in asynchronous source runner");
                thArr[0].printStackTrace();
                Assert.fail("Error in asynchronous source runner");
            }
            ListSourceContext listSourceContext2 = new ListSourceContext(arrayList3);
            createCopySerializable.restoreState(Integer.valueOf(intValue));
            createCopySerializable.run(listSourceContext2);
            Assert.assertEquals(arrayList, arrayList3);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
