package co.cask.cdap.template.etl.realtime.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.templates.plugins.PluginConfig;
import co.cask.cdap.template.etl.api.PipelineConfigurer;
import co.cask.cdap.template.etl.api.realtime.DataWriter;
import co.cask.cdap.template.etl.api.realtime.RealtimeContext;
import co.cask.cdap.template.etl.api.realtime.RealtimeSink;
import co.cask.cdap.template.etl.common.Properties;
import co.cask.cdap.template.etl.common.RecordPutTransformer;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;

@Name("Table")
@Description("Real Time Sink for CDAP Table dataset")
@Plugin(type = "sink")
/* loaded from: input_file:co/cask/cdap/template/etl/realtime/sink/RealtimeTableSink.class */
public class RealtimeTableSink extends RealtimeSink<StructuredRecord> {
    private static final String NAME_DESC = "Name of the table. If the table does not already exist, one will be created.";
    private static final String PROPERTY_SCHEMA_DESC = "Optional schema of the table as a JSON Object. If the table does not already exist, one will be created with this schema, which will allow the table to be explored through Hive.\"";
    private static final String PROPERTY_SCHEMA_ROW_FIELD_DESC = "The name of the record field that should be used as the row key when writing to the table.";
    private RecordPutTransformer recordPutTransformer;
    private final TableConfig tableConfig;

    /* loaded from: input_file:co/cask/cdap/template/etl/realtime/sink/RealtimeTableSink$TableConfig.class */
    public static class TableConfig extends PluginConfig {

        @Description(RealtimeTableSink.NAME_DESC)
        private String name;

        @Name("schema")
        @Description(RealtimeTableSink.PROPERTY_SCHEMA_DESC)
        @Nullable
        String schemaStr;

        @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD)
        @Description(RealtimeTableSink.PROPERTY_SCHEMA_ROW_FIELD_DESC)
        String rowField;

        public TableConfig(String str, String str2, String str3) {
            this.name = str;
            this.schemaStr = str2;
            this.rowField = str3;
        }
    }

    public RealtimeTableSink(TableConfig tableConfig) {
        this.tableConfig = tableConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        Map properties = this.tableConfig.getProperties().getProperties();
        Preconditions.checkArgument(!Strings.isNullOrEmpty(this.tableConfig.name), "Dataset name must be given.");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(this.tableConfig.rowField), "Field to be used as rowkey must be given.");
        pipelineConfigurer.createDataset(this.tableConfig.name, Table.class.getName(), DatasetProperties.builder().addAll(properties).build());
    }

    public void initialize(RealtimeContext realtimeContext) throws Exception {
        super.initialize(realtimeContext);
        this.recordPutTransformer = new RecordPutTransformer(this.tableConfig.rowField);
    }

    public int write(Iterable<StructuredRecord> iterable, DataWriter dataWriter) throws Exception {
        Table dataset = dataWriter.getDataset(this.tableConfig.name);
        int i = 0;
        Iterator<StructuredRecord> it = iterable.iterator();
        while (it.hasNext()) {
            dataset.put(this.recordPutTransformer.toPut(it.next()));
            i++;
        }
        return i;
    }
}
