package org.apache.beam.sdk.io.cdap;

import com.google.auto.value.AutoValue;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.SubmitterLifecycle;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.cdap.AutoValue_Plugin;
import org.apache.beam.sdk.io.cdap.PluginConstants;
import org.apache.beam.sdk.io.cdap.context.BatchContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
import org.apache.beam.sdk.io.cdap.context.StreamingSourceContextImpl;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoValue
/* loaded from: input_file:org/apache/beam/sdk/io/cdap/Plugin.class */
public abstract class Plugin<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(Plugin.class);
    private static final String PREPARE_RUN_METHOD_NAME = "prepareRun";
    private static final String GET_STREAM_METHOD_NAME = "getStream";

    @Nullable
    protected PluginConfig pluginConfig;

    @Nullable
    protected Configuration hadoopConfiguration;

    @Nullable
    protected SubmitterLifecycle cdapPluginObj;

    @AutoValue.Builder
    /* loaded from: input_file:org/apache/beam/sdk/io/cdap/Plugin$Builder.class */
    public static abstract class Builder<K, V> {
        public abstract Builder<K, V> setPluginClass(Class<?> cls);

        public abstract Builder<K, V> setFormatClass(Class<?> cls);

        public abstract Builder<K, V> setFormatProviderClass(Class<?> cls);

        public abstract Builder<K, V> setGetOffsetFn(SerializableFunction<V, Long> serializableFunction);

        public abstract Builder<K, V> setGetReceiverArgsFromConfigFn(SerializableFunction<PluginConfig, Object[]> serializableFunction);

        public abstract Builder<K, V> setReceiverClass(Class<? extends Receiver<V>> cls);

        public abstract Builder<K, V> setPluginType(PluginConstants.PluginType pluginType);

        public abstract Builder<K, V> setContext(BatchContextImpl batchContextImpl);

        public abstract Plugin<K, V> build();
    }

    public abstract BatchContextImpl getContext();

    public abstract Class<?> getPluginClass();

    @Nullable
    public abstract Class<?> getFormatClass();

    @Nullable
    public abstract Class<?> getFormatProviderClass();

    @Nullable
    public abstract Class<? extends Receiver<V>> getReceiverClass();

    @Nullable
    public abstract SerializableFunction<V, Long> getGetOffsetFn();

    @Nullable
    public abstract SerializableFunction<PluginConfig, Object[]> getGetReceiverArgsFromConfigFn();

    public Plugin<K, V> withConfig(PluginConfig pluginConfig) {
        this.pluginConfig = pluginConfig;
        return this;
    }

    @Nullable
    public PluginConfig getPluginConfig() {
        return this.pluginConfig;
    }

    public void prepareRun() {
        if (isUnbounded().booleanValue()) {
            return;
        }
        if (this.cdapPluginObj == null) {
            instantiateCdapPluginObj();
        }
        Preconditions.checkStateNotNull(this.cdapPluginObj, "Cdap Plugin object can't be null!");
        try {
            this.cdapPluginObj.prepareRun(getContext());
            if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
                for (Map.Entry<K, V> entry : getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
                    getHadoopConfiguration().set((String) entry.getKey(), (String) entry.getValue());
                }
                return;
            }
            for (Map.Entry<K, V> entry2 : getContext().getOutputFormatProvider().getOutputFormatConfiguration().entrySet()) {
                getHadoopConfiguration().set((String) entry2.getKey(), (String) entry2.getValue());
            }
            getHadoopConfiguration().set("mapreduce.job.id", String.valueOf(1));
        } catch (Exception e) {
            LOG.error("Error while prepareRun", e);
            throw new IllegalStateException("Error while prepareRun", e);
        }
    }

    private void instantiateCdapPluginObj() {
        PluginConfig pluginConfig = getPluginConfig();
        Preconditions.checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
        try {
            Constructor<?> declaredConstructor = getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
            declaredConstructor.setAccessible(true);
            this.cdapPluginObj = (SubmitterLifecycle) declaredConstructor.newInstance(pluginConfig);
        } catch (Exception e) {
            LOG.error("Can not instantiate CDAP plugin class", e);
            throw new IllegalStateException("Can not call prepareRun");
        }
    }

    public Plugin<K, V> withHadoopConfiguration(Class<K> cls, Class<V> cls2) {
        Class<?> formatClass = getFormatClass();
        Preconditions.checkStateNotNull(formatClass, "Format class can't be null!");
        PluginConstants.Format formatType = getFormatType();
        PluginConstants.Hadoop hadoopType = getHadoopType();
        getHadoopConfiguration().setClass(hadoopType.getFormatClass(), formatClass, formatType.getFormatClass());
        getHadoopConfiguration().setClass(hadoopType.getKeyClass(), cls, Object.class);
        getHadoopConfiguration().setClass(hadoopType.getValueClass(), cls2, Object.class);
        return this;
    }

    public Plugin<K, V> withHadoopConfiguration(Configuration configuration) {
        this.hadoopConfiguration = configuration;
        return this;
    }

    public Configuration getHadoopConfiguration() {
        if (this.hadoopConfiguration == null) {
            this.hadoopConfiguration = new Configuration(false);
        }
        return this.hadoopConfiguration;
    }

    public abstract PluginConstants.PluginType getPluginType();

    private PluginConstants.Format getFormatType() {
        return getPluginType() == PluginConstants.PluginType.SOURCE ? PluginConstants.Format.INPUT : PluginConstants.Format.OUTPUT;
    }

    private PluginConstants.Hadoop getHadoopType() {
        return getPluginType() == PluginConstants.PluginType.SOURCE ? PluginConstants.Hadoop.SOURCE : PluginConstants.Hadoop.SINK;
    }

    public static PluginConstants.PluginType initPluginType(Class<?> cls) throws IllegalArgumentException {
        if (StreamingSource.class.isAssignableFrom(cls) || BatchSource.class.isAssignableFrom(cls)) {
            return PluginConstants.PluginType.SOURCE;
        }
        if (BatchSink.class.isAssignableFrom(cls)) {
            return PluginConstants.PluginType.SINK;
        }
        throw new IllegalArgumentException("Provided class should be source or sink plugin");
    }

    public static BatchContextImpl initContext(Class<?> cls) {
        ArrayList<Method> arrayList = new ArrayList(Arrays.asList(cls.getDeclaredMethods()));
        Class<? super Object> superclass = cls.getSuperclass();
        if (superclass != null) {
            arrayList.addAll(Arrays.asList(superclass.getDeclaredMethods()));
        }
        for (Method method : arrayList) {
            if (method.getName().equals(PREPARE_RUN_METHOD_NAME)) {
                Class<?> cls2 = method.getParameterTypes()[0];
                if (cls2.equals(BatchSourceContext.class)) {
                    return new BatchSourceContextImpl();
                }
                if (cls2.equals(BatchSinkContext.class)) {
                    return new BatchSinkContextImpl();
                }
            } else if (method.getName().equals(GET_STREAM_METHOD_NAME)) {
                return new StreamingSourceContextImpl();
            }
        }
        throw new IllegalStateException("Cannot determine context class");
    }

    public Boolean isUnbounded() {
        Boolean bool = null;
        for (io.cdap.cdap.api.annotation.Plugin plugin : getPluginClass().getDeclaredAnnotations()) {
            if (plugin.annotationType().equals(io.cdap.cdap.api.annotation.Plugin.class)) {
                String type = plugin.type();
                bool = Boolean.valueOf(type != null && type.startsWith("streaming"));
            }
        }
        if (bool == null) {
            throw new IllegalArgumentException("CDAP plugin class must have Plugin annotation!");
        }
        return bool;
    }

    public ReceiverBuilder<V, ? extends Receiver<V>> getReceiverBuilder() {
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(isUnbounded().booleanValue(), "Receiver Builder is supported only for unbounded plugins");
        Class<?> pluginClass = getPluginClass();
        Class<? extends Receiver<V>> receiverClass = getReceiverClass();
        SerializableFunction<PluginConfig, Object[]> getReceiverArgsFromConfigFn = getGetReceiverArgsFromConfigFn();
        PluginConfig pluginConfig = getPluginConfig();
        Preconditions.checkStateNotNull(pluginConfig, "Plugin config can not be null!");
        Preconditions.checkStateNotNull(pluginClass, "Plugin class can not be null!");
        Preconditions.checkStateNotNull(receiverClass, "Receiver class can not be null!");
        Preconditions.checkStateNotNull(getReceiverArgsFromConfigFn, "Get receiver args from config function can not be null!");
        return new ReceiverBuilder(receiverClass).withConstructorArgs((Object[]) getReceiverArgsFromConfigFn.apply(pluginConfig));
    }

    public static <K, V> Plugin<K, V> createBatch(Class<?> cls, Class<?> cls2, Class<?> cls3) {
        return builder().setPluginClass(cls).setFormatClass(cls2).setFormatProviderClass(cls3).setPluginType(initPluginType(cls)).setContext(initContext(cls)).build();
    }

    public static <K, V> Plugin<K, V> createStreaming(Class<?> cls, SerializableFunction<V, Long> serializableFunction, Class<? extends Receiver<V>> cls2, SerializableFunction<PluginConfig, Object[]> serializableFunction2) {
        return builder().setPluginClass(cls).setPluginType(initPluginType(cls)).setContext(initContext(cls)).setGetOffsetFn(serializableFunction).setReceiverClass(cls2).setGetReceiverArgsFromConfigFn(serializableFunction2).build();
    }

    public static <K, V> Plugin<K, V> createStreaming(Class<?> cls, SerializableFunction<V, Long> serializableFunction, Class<? extends Receiver<V>> cls2) {
        return createStreaming(cls, serializableFunction, cls2, pluginConfig -> {
            return new Object[]{pluginConfig};
        });
    }

    public static <K, V> Builder<K, V> builder() {
        return new AutoValue_Plugin.Builder();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -665742424:
                if (implMethodName.equals("lambda$createStreaming$44f6d5cb$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/cdap/Plugin") && serializedLambda.getImplMethodSignature().equals("(Lio/cdap/cdap/api/plugin/PluginConfig;)[Ljava/lang/Object;")) {
                    return pluginConfig -> {
                        return new Object[]{pluginConfig};
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
