package co.cask.cdap.etl.realtime.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.realtime.DataWriter;
import co.cask.cdap.etl.api.realtime.RealtimeContext;
import co.cask.cdap.etl.api.realtime.RealtimeSink;
import co.cask.cdap.etl.common.RecordPutTransformer;
import co.cask.cdap.etl.common.TableSinkConfig;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.Iterator;
import java.util.Map;

@Name("Table")
@Description("Real-time Sink for CDAP Table dataset")
@Plugin(type = "realtimesink")
/* loaded from: input_file:co/cask/cdap/etl/realtime/sink/RealtimeTableSink.class */
public class RealtimeTableSink extends RealtimeSink<StructuredRecord> {
    private RecordPutTransformer recordPutTransformer;
    private final TableSinkConfig tableSinkConfig;

    public RealtimeTableSink(TableSinkConfig tableSinkConfig) {
        this.tableSinkConfig = tableSinkConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        Map properties = this.tableSinkConfig.getProperties().getProperties();
        Preconditions.checkArgument(!Strings.isNullOrEmpty(this.tableSinkConfig.getName()), "Dataset name must be given.");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(this.tableSinkConfig.getRowField()), "Field to be used as rowkey must be given.");
        pipelineConfigurer.createDataset(this.tableSinkConfig.getName(), Table.class.getName(), DatasetProperties.builder().addAll(properties).build());
    }

    public void initialize(RealtimeContext realtimeContext) throws Exception {
        super.initialize(realtimeContext);
        Schema schema = null;
        String str = (String) realtimeContext.getPluginProperties().getProperties().get("schema");
        if (str != null) {
            schema = Schema.parseJson(str);
        }
        this.recordPutTransformer = new RecordPutTransformer(this.tableSinkConfig.getRowField(), schema);
    }

    public int write(Iterable<StructuredRecord> iterable, DataWriter dataWriter) throws Exception {
        Table dataset = dataWriter.getDataset(this.tableSinkConfig.getName());
        int i = 0;
        Iterator<StructuredRecord> it = iterable.iterator();
        while (it.hasNext()) {
            dataset.put(this.recordPutTransformer.toPut(it.next()));
            i++;
        }
        return i;
    }
}
