package co.cask.cdap.template.etl.realtime.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.templates.plugins.PluginConfig;
import co.cask.cdap.template.etl.api.Emitter;
import co.cask.cdap.template.etl.api.realtime.RealtimeSource;
import co.cask.cdap.template.etl.api.realtime.SourceState;
import co.cask.cdap.template.etl.common.Properties;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("DataGenerator")
@Description("Source that can generate test data for Real-time Stream and Table Sinks.")
@Plugin(type = "source")
/* loaded from: input_file:co/cask/cdap/template/etl/realtime/source/DataGeneratorSource.class */
public class DataGeneratorSource extends RealtimeSource<StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
    private static final String COUNT = "count";
    private static final String TYPE_DESCRIPTION = "The type of data to be generated. Currently, only two types ('stream' and 'table') are supported. By default, it generates a structured record containing one field named 'data' of type String with the value 'Hello'.";
    public static final String PROPERTY_TYPE = "type";
    public static final String STREAM_TYPE = "stream";
    public static final String TABLE_TYPE = "table";
    private final DataGeneratorConfig config;

    /* loaded from: input_file:co/cask/cdap/template/etl/realtime/source/DataGeneratorSource$DataGeneratorConfig.class */
    public static class DataGeneratorConfig extends PluginConfig {

        @Description(DataGeneratorSource.TYPE_DESCRIPTION)
        @Nullable
        String type;
    }

    public DataGeneratorSource(DataGeneratorConfig dataGeneratorConfig) {
        this.config = dataGeneratorConfig;
    }

    @Nullable
    public SourceState poll(Emitter<StructuredRecord> emitter, SourceState sourceState) {
        int i;
        try {
            TimeUnit.MILLISECONDS.sleep(100L);
        } catch (InterruptedException e) {
            LOG.error("Some Error in Source");
        }
        if (sourceState.getState(COUNT) != null) {
            i = Bytes.toInt(sourceState.getState(COUNT)) + 1;
            sourceState.setState(COUNT, Bytes.toBytes(i));
        } else {
            i = 1;
            sourceState = new SourceState();
            sourceState.setState(COUNT, Bytes.toBytes(1));
        }
        LOG.info("Emitting data! {}", Integer.valueOf(i));
        if (STREAM_TYPE.equalsIgnoreCase(this.config.type)) {
            writeRecordsForStreamConsumption(emitter);
        } else if (TABLE_TYPE.equalsIgnoreCase(this.config.type)) {
            writeRecordsForTableConsumption(emitter);
        } else {
            writeDefaultRecords(emitter);
        }
        return sourceState;
    }

    private void writeDefaultRecords(Emitter<StructuredRecord> emitter) {
        StructuredRecord.Builder builder = StructuredRecord.builder(Schema.recordOf("defaultRecord", new Schema.Field[]{Schema.Field.of(Properties.Stream.DEFAULT_BODY_FIELD, Schema.of(Schema.Type.STRING))}));
        builder.set(Properties.Stream.DEFAULT_BODY_FIELD, "Hello");
        emitter.emit(builder.build());
    }

    private void writeRecordsForStreamConsumption(Emitter<StructuredRecord> emitter) {
        Schema.Field of = Schema.Field.of(Properties.Stream.DEFAULT_BODY_FIELD, Schema.of(Schema.Type.STRING));
        Schema.Field of2 = Schema.Field.of(Properties.Stream.DEFAULT_HEADERS_FIELD, Schema.mapOf(Schema.of(Schema.Type.STRING), Schema.of(Schema.Type.STRING)));
        StructuredRecord.Builder builder = StructuredRecord.builder(Schema.recordOf("StringRecord", new Schema.Field[]{of}));
        builder.set(Properties.Stream.DEFAULT_BODY_FIELD, "Hello");
        emitter.emit(builder.build());
        StructuredRecord.Builder builder2 = StructuredRecord.builder(Schema.recordOf("StringHeadersRecord", new Schema.Field[]{of, of2}));
        builder2.set(Properties.Stream.DEFAULT_BODY_FIELD, "Hello");
        builder2.set(Properties.Stream.DEFAULT_HEADERS_FIELD, ImmutableMap.of("h1", "v1"));
        emitter.emit(builder2.build());
        Schema.Field of3 = Schema.Field.of(Properties.Stream.DEFAULT_BODY_FIELD, Schema.of(Schema.Type.BYTES));
        StructuredRecord.Builder builder3 = StructuredRecord.builder(Schema.recordOf("ByteArrayHeadersRecord", new Schema.Field[]{of3, of2}));
        builder3.set(Properties.Stream.DEFAULT_BODY_FIELD, "Hello".getBytes(Charsets.UTF_8));
        builder3.set(Properties.Stream.DEFAULT_HEADERS_FIELD, ImmutableMap.of("h1", "v1"));
        emitter.emit(builder3.build());
        StructuredRecord.Builder builder4 = StructuredRecord.builder(Schema.recordOf("ByteBufferHeadersRecord", new Schema.Field[]{of3, of2}));
        builder4.set(Properties.Stream.DEFAULT_BODY_FIELD, ByteBuffer.wrap("Hello".getBytes(Charsets.UTF_8)));
        builder4.set(Properties.Stream.DEFAULT_HEADERS_FIELD, ImmutableMap.of("h1", "v1"));
        emitter.emit(builder4.build());
    }

    private void writeRecordsForTableConsumption(Emitter<StructuredRecord> emitter) {
        StructuredRecord.Builder builder = StructuredRecord.builder(Schema.recordOf("tableRecord", new Schema.Field[]{Schema.Field.of("id", Schema.of(Schema.Type.INT)), Schema.Field.of("name", Schema.of(Schema.Type.STRING)), Schema.Field.of("score", Schema.of(Schema.Type.DOUBLE)), Schema.Field.of("graduated", Schema.of(Schema.Type.BOOLEAN)), Schema.Field.of("binary", Schema.of(Schema.Type.BYTES)), Schema.Field.of("time", Schema.of(Schema.Type.LONG))}));
        builder.set("id", 1).set("name", "Bob").set("score", Double.valueOf(3.4d)).set("graduated", false).set("binary", "Bob".getBytes(Charsets.UTF_8)).set("time", Long.valueOf(System.currentTimeMillis()));
        emitter.emit(builder.build());
    }
}
