package co.cask.cdap.etl.mock.spark.streaming;

import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.plugin.PluginClass;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.api.plugin.PluginPropertyField;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;
import co.cask.cdap.etl.proto.v2.ETLPlugin;
import co.cask.cdap.format.StructuredRecordStringConverter;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.receiver.Receiver;

@Name("Mock")
@Plugin(type = "streamingsource")
/* loaded from: input_file:co/cask/cdap/etl/mock/spark/streaming/MockSource.class */
public class MockSource extends StreamingSource<StructuredRecord> {
    public static final PluginClass PLUGIN_CLASS = getPluginClass();
    private static final Gson GSON = new Gson();
    private static final Type STRING_LIST_TYPE = new TypeToken<List<String>>() { // from class: co.cask.cdap.etl.mock.spark.streaming.MockSource.1
    }.getType();
    private final Conf conf;

    /* loaded from: input_file:co/cask/cdap/etl/mock/spark/streaming/MockSource$Conf.class */
    public static class Conf extends PluginConfig {
        private String schema;
        private String records;

        @Nullable
        private Long intervalMillis = 0L;
    }

    public MockSource(Conf conf) {
        this.conf = conf;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        try {
            pipelineConfigurer.getStageConfigurer().setOutputSchema(Schema.parseJson(this.conf.schema));
        } catch (IOException e) {
            throw new IllegalArgumentException("Could not parse schema " + this.conf.schema);
        }
    }

    public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws Exception {
        Schema parseJson = Schema.parseJson(this.conf.schema);
        List list = (List) new Gson().fromJson(this.conf.records, STRING_LIST_TYPE);
        final ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(StructuredRecordStringConverter.fromJsonString((String) it.next(), parseJson));
        }
        return streamingContext.getSparkStreamingContext().receiverStream(new Receiver<StructuredRecord>(StorageLevel.MEMORY_ONLY()) { // from class: co.cask.cdap.etl.mock.spark.streaming.MockSource.2
            public StorageLevel storageLevel() {
                return StorageLevel.MEMORY_ONLY();
            }

            /* JADX WARN: Type inference failed for: r0v0, types: [co.cask.cdap.etl.mock.spark.streaming.MockSource$2$1] */
            public void onStart() {
                new Thread() { // from class: co.cask.cdap.etl.mock.spark.streaming.MockSource.2.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        for (StructuredRecord structuredRecord : arrayList) {
                            if (isStarted()) {
                                store(structuredRecord);
                                try {
                                    TimeUnit.MILLISECONDS.sleep(MockSource.this.conf.intervalMillis.longValue());
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        }
                    }

                    @Override // java.lang.Thread
                    public void interrupt() {
                        super.interrupt();
                    }
                }.start();
            }

            public void onStop() {
            }
        });
    }

    public static ETLPlugin getPlugin(Schema schema, List<StructuredRecord> list) throws IOException {
        return getPlugin(schema, list, 0L);
    }

    public static ETLPlugin getPlugin(Schema schema, List<StructuredRecord> list, Long l) throws IOException {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<StructuredRecord> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(StructuredRecordStringConverter.toJsonString(it.next()));
        }
        return new ETLPlugin("Mock", "streamingsource", ImmutableMap.of("schema", schema.toString(), "records", GSON.toJson(arrayList), "intervalMillis", l.toString()), (ArtifactSelectorConfig) null);
    }

    private static PluginClass getPluginClass() {
        HashMap hashMap = new HashMap();
        hashMap.put("schema", new PluginPropertyField("schema", "", "string", true, false));
        hashMap.put("records", new PluginPropertyField("records", "", "string", true, false));
        hashMap.put("intervalMillis", new PluginPropertyField("intervalMillis", "", "long", false, false));
        return new PluginClass("streamingsource", "Mock", "", MockSource.class.getName(), "conf", hashMap);
    }
}
