package org.apache.beam.sdk.io.cdap;

import io.cdap.cdap.api.plugin.PluginConfig;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.cdap.CdapIO;
import org.apache.beam.sdk.io.cdap.PluginConstants;
import org.apache.beam.sdk.io.cdap.batch.EmployeeBatchSink;
import org.apache.beam.sdk.io.cdap.batch.EmployeeBatchSource;
import org.apache.beam.sdk.io.cdap.batch.EmployeeInputFormat;
import org.apache.beam.sdk.io.cdap.batch.EmployeeInputFormatProvider;
import org.apache.beam.sdk.io.cdap.batch.EmployeeOutputFormat;
import org.apache.beam.sdk.io.cdap.batch.EmployeeOutputFormatProvider;
import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
import org.apache.beam.sdk.io.cdap.streaming.EmployeeReceiver;
import org.apache.beam.sdk.io.cdap.streaming.EmployeeStreamingSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/cdap/CdapIOTest.class */
public class CdapIOTest {
    private static final long PULL_FREQUENCY_SEC = 1;
    private static final long START_OFFSET = 0;

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private static final Map<String, Object> TEST_EMPLOYEE_PARAMS_MAP = ImmutableMap.builder().put(EmployeeConfig.OBJECT_TYPE, "employee").put(ConfigWrapperTest.REFERENCE_NAME_PARAM_NAME, ConfigWrapperTest.REFERENCE_NAME_PARAM_NAME).build();

    @Before
    public void setUp() {
        EmployeeOutputFormat.initWrittenOutput((OutputCommitter) Mockito.mock(OutputCommitter.class));
    }

    @Test
    public void testReadBuildsCorrectly() {
        EmployeeConfig build = new ConfigWrapper(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
        CdapIO.Read withValueClass = CdapIO.read().withCdapPlugin(Plugin.createBatch(EmployeeBatchSource.class, EmployeeInputFormat.class, EmployeeInputFormatProvider.class)).withPluginConfig(build).withKeyClass(String.class).withValueClass(String.class);
        Plugin cdapPlugin = withValueClass.getCdapPlugin();
        Assert.assertNotNull(cdapPlugin);
        Assert.assertEquals(EmployeeBatchSource.class, cdapPlugin.getPluginClass());
        Assert.assertEquals(EmployeeInputFormat.class, cdapPlugin.getFormatClass());
        Assert.assertEquals(EmployeeInputFormatProvider.class, cdapPlugin.getFormatProviderClass());
        Assert.assertNotNull(cdapPlugin.getContext());
        Assert.assertEquals(BatchSourceContextImpl.class, cdapPlugin.getContext().getClass());
        Assert.assertEquals(PluginConstants.PluginType.SOURCE, cdapPlugin.getPluginType());
        Assert.assertNotNull(cdapPlugin.getHadoopConfiguration());
        Assert.assertEquals(build, withValueClass.getPluginConfig());
        Assert.assertEquals(String.class, withValueClass.getKeyClass());
        Assert.assertEquals(String.class, withValueClass.getValueClass());
    }

    @Test
    public void testReadObjectCreationFailsIfCdapPluginClassIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.read().withCdapPluginClass((Class) null);
        });
    }

    @Test
    public void testReadObjectCreationFailsIfPluginConfigIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.read().withPluginConfig((PluginConfig) null);
        });
    }

    @Test
    public void testReadObjectCreationFailsIfKeyClassIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.read().withKeyClass((Class) null);
        });
    }

    @Test
    public void testReadObjectCreationFailsIfValueClassIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.read().withValueClass((Class) null);
        });
    }

    @Test
    public void testReadObjectCreationFailsIfPullFrequencySecIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.read().withPullFrequencySec((Long) null);
        });
    }

    @Test
    public void testReadObjectCreationFailsIfStartOffsetIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.read().withStartOffset((Long) null);
        });
    }

    @Test
    public void testReadExpandingFailsMissingCdapPluginClass() {
        PBegin in = PBegin.in(TestPipeline.create());
        CdapIO.Read read = CdapIO.read();
        Assert.assertThrows(IllegalStateException.class, () -> {
            read.expand(in);
        });
    }

    @Test
    public void testReadObjectCreationFailsIfCdapPluginClassIsNotSupported() {
        Assert.assertThrows(UnsupportedOperationException.class, () -> {
            CdapIO.read().withCdapPluginClass(EmployeeBatchSource.class);
        });
    }

    @Test
    public void testReadFromCdapBatchPlugin() {
        CdapIO.Read withValueClass = CdapIO.read().withCdapPlugin(Plugin.createBatch(EmployeeBatchSource.class, EmployeeInputFormat.class, EmployeeInputFormatProvider.class)).withPluginConfig(new ConfigWrapper(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build()).withKeyClass(String.class).withValueClass(String.class);
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i < 1000; i++) {
            arrayList.add(KV.of(String.valueOf(i), EmployeeInputFormat.EMPLOYEE_NAME_PREFIX + i));
        }
        PAssert.that(this.p.apply("ReadBatchTest", withValueClass)).containsInAnyOrder(arrayList);
        this.p.run();
    }

    @Test
    public void testReadFromCdapStreamingPlugin() {
        DirectOptions as = PipelineOptionsFactory.as(DirectOptions.class);
        as.setBlockOnRun(false);
        as.setRunner(DirectRunner.class);
        Pipeline create = Pipeline.create(as);
        CdapIO.Read withStartOffset = CdapIO.read().withCdapPlugin(Plugin.createStreaming(EmployeeStreamingSource.class, Long::valueOf, EmployeeReceiver.class, pluginConfig -> {
            return new Object[]{pluginConfig};
        })).withPluginConfig(new ConfigWrapper(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build()).withKeyClass(String.class).withValueClass(String.class).withPullFrequencySec(Long.valueOf(PULL_FREQUENCY_SEC)).withStartOffset(Long.valueOf(START_OFFSET));
        PAssert.that(create.apply("ReadStreamingTest", withStartOffset).setCoder(KvCoder.of(NullableCoder.of(StringUtf8Coder.of()), StringUtf8Coder.of())).apply(Values.create())).containsInAnyOrder(EmployeeReceiver.getStoredRecords());
        create.run().waitUntilFinish(Duration.standardSeconds(15L));
    }

    @Test
    public void testWriteBuildsCorrectly() {
        EmployeeConfig build = new ConfigWrapper(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
        CdapIO.Write withLocksDirPath = CdapIO.write().withCdapPlugin(Plugin.createBatch(EmployeeBatchSink.class, EmployeeOutputFormat.class, EmployeeOutputFormatProvider.class)).withPluginConfig(build).withKeyClass(String.class).withValueClass(String.class).withLocksDirPath(this.tmpFolder.getRoot().getAbsolutePath());
        Plugin cdapPlugin = withLocksDirPath.getCdapPlugin();
        Assert.assertNotNull(cdapPlugin);
        Assert.assertNotNull(withLocksDirPath.getLocksDirPath());
        Assert.assertEquals(EmployeeBatchSink.class, cdapPlugin.getPluginClass());
        Assert.assertEquals(EmployeeOutputFormat.class, cdapPlugin.getFormatClass());
        Assert.assertEquals(EmployeeOutputFormatProvider.class, cdapPlugin.getFormatProviderClass());
        Assert.assertNotNull(cdapPlugin.getContext());
        Assert.assertEquals(BatchSinkContextImpl.class, cdapPlugin.getContext().getClass());
        Assert.assertEquals(PluginConstants.PluginType.SINK, cdapPlugin.getPluginType());
        Assert.assertNotNull(cdapPlugin.getHadoopConfiguration());
        Assert.assertEquals(build, withLocksDirPath.getPluginConfig());
        Assert.assertEquals(String.class, withLocksDirPath.getKeyClass());
        Assert.assertEquals(String.class, withLocksDirPath.getValueClass());
    }

    @Test
    public void testWriteObjectCreationFailsIfCdapPluginClassIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.write().withCdapPluginClass((Class) null);
        });
    }

    @Test
    public void testWriteObjectCreationFailsIfPluginConfigIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.write().withPluginConfig((PluginConfig) null);
        });
    }

    @Test
    public void testWriteObjectCreationFailsIfKeyClassIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.write().withKeyClass((Class) null);
        });
    }

    @Test
    public void testWriteObjectCreationFailsIfValueClassIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.write().withValueClass((Class) null);
        });
    }

    @Test
    public void testWriteObjectCreationFailsIfLockDirIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            CdapIO.write().withLocksDirPath((String) null);
        });
    }

    @Test
    public void testWriteExpandingFailsMissingCdapPluginClass() {
        PCollection expand = Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).expand(PBegin.in(TestPipeline.create()));
        CdapIO.Write write = CdapIO.write();
        Assert.assertThrows(IllegalStateException.class, () -> {
            write.expand(expand);
        });
    }

    @Test
    public void testWriteObjectCreationFailsIfCdapPluginClassIsNotSupported() {
        Assert.assertThrows(UnsupportedOperationException.class, () -> {
            CdapIO.write().withCdapPluginClass(EmployeeBatchSink.class);
        });
    }

    @Test
    public void testWriteWithCdapBatchSinkPlugin() throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            arrayList.add(KV.of(String.valueOf(i), EmployeeInputFormat.EMPLOYEE_NAME_PREFIX + i));
        }
        this.p.apply(Create.of(arrayList)).apply("Write", CdapIO.write().withCdapPlugin(Plugin.createBatch(EmployeeBatchSink.class, EmployeeOutputFormat.class, EmployeeOutputFormatProvider.class)).withPluginConfig(new ConfigWrapper(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build()).withKeyClass(String.class).withValueClass(String.class).withLocksDirPath(this.tmpFolder.getRoot().getAbsolutePath()));
        this.p.run();
        List<KV<String, String>> writtenOutput = EmployeeOutputFormat.getWrittenOutput();
        Assert.assertEquals(arrayList.size(), writtenOutput.size());
        Assert.assertTrue(arrayList.containsAll(writtenOutput));
        Assert.assertTrue(writtenOutput.containsAll(arrayList));
        ((OutputCommitter) Mockito.verify(EmployeeOutputFormat.getOutputCommitter())).commitJob((JobContext) Mockito.any());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1453291964:
                if (implMethodName.equals("lambda$testReadFromCdapStreamingPlugin$a048f903$1")) {
                    z = false;
                    break;
                }
                break;
            case 231605032:
                if (implMethodName.equals("valueOf")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/cdap/CdapIOTest") && serializedLambda.getImplMethodSignature().equals("(Lio/cdap/cdap/api/plugin/PluginConfig;)[Ljava/lang/Object;")) {
                    return pluginConfig -> {
                        return new Object[]{pluginConfig};
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Long;")) {
                    return Long::valueOf;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
