package co.cask.cdap.etl.mock.spark.streaming;

import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.plugin.PluginClass;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.api.plugin.PluginPropertyField;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import co.cask.cdap.etl.api.batch.SparkPluginContext;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;
import co.cask.cdap.etl.proto.v2.ETLPlugin;
import co.cask.cdap.test.DataSetManager;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/etl/mock/spark/streaming/MockSink.class */
public class MockSink extends SparkSink<StructuredRecord> {
    public static final PluginClass PLUGIN_CLASS = getPluginClass();
    private final Config config;

    /* loaded from: input_file:co/cask/cdap/etl/mock/spark/streaming/MockSink$Config.class */
    public static class Config extends PluginConfig {

        @Macro
        private String tableName;
    }

    public MockSink(Config config) {
        this.config = config;
    }

    public void prepareRun(SparkPluginContext sparkPluginContext) throws Exception {
        if (sparkPluginContext.datasetExists(this.config.tableName)) {
            return;
        }
        sparkPluginContext.createDataset(this.config.tableName, KeyValueTable.class.getName(), DatasetProperties.EMPTY);
    }

    public void run(SparkExecutionPluginContext sparkExecutionPluginContext, JavaRDD<StructuredRecord> javaRDD) throws Exception {
        sparkExecutionPluginContext.saveAsDataset(javaRDD.mapToPair(new PairFunction<StructuredRecord, byte[], byte[]>() { // from class: co.cask.cdap.etl.mock.spark.streaming.MockSink.1
            public Tuple2<byte[], byte[]> call(StructuredRecord structuredRecord) throws Exception {
                return new Tuple2<>(Bytes.toBytes((String) structuredRecord.get("id")), Bytes.toBytes((String) structuredRecord.get("name")));
            }
        }), this.config.tableName);
    }

    public static ETLPlugin getPlugin(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("tableName", str);
        return new ETLPlugin("Mock", "sparksink", hashMap, (ArtifactSelectorConfig) null);
    }

    private static PluginClass getPluginClass() {
        HashMap hashMap = new HashMap();
        hashMap.put("tableName", new PluginPropertyField("tableName", "", "string", true, true));
        return new PluginClass("sparksink", "Mock", "", MockSink.class.getName(), "config", hashMap);
    }

    public static Map<String, String> getValues(Set<String> set, DataSetManager<KeyValueTable> dataSetManager) throws Exception {
        dataSetManager.flush();
        KeyValueTable keyValueTable = (KeyValueTable) dataSetManager.get();
        HashMap hashMap = new HashMap();
        for (String str : set) {
            hashMap.put(str, Bytes.toString(keyValueTable.read(str)));
        }
        return hashMap;
    }
}
