package co.cask.cdap.etl.batch.connector;

import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetArguments;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSinkContext;
import co.cask.cdap.format.StructuredRecordStringConverter;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

/* loaded from: input_file:lib/cdap-etl-batch-3.5.5.jar:co/cask/cdap/etl/batch/connector/ConnectorSink.class */
public class ConnectorSink extends BatchSink<KeyValue<String, StructuredRecord>, NullWritable, Text> {
    private final String datasetName;
    private final String phaseName;

    public ConnectorSink(String str, String str2) {
        this.datasetName = str;
        this.phaseName = str2;
    }

    @Override // co.cask.cdap.etl.api.batch.BatchConfigurable
    public void prepareRun(BatchSinkContext batchSinkContext) throws Exception {
        HashMap hashMap = new HashMap();
        PartitionedFileSetArguments.setOutputPartitionKey(hashMap, PartitionKey.builder().addStringField("phase", this.phaseName).build());
        batchSinkContext.addOutput(this.datasetName, hashMap);
    }

    public void transform(KeyValue<String, StructuredRecord> keyValue, Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {
        emitter.emit(new KeyValue<>(NullWritable.get(), new Text(StructuredRecordStringConverter.toJsonString(modifyRecord(keyValue)))));
    }

    private StructuredRecord modifyRecord(KeyValue<String, StructuredRecord> keyValue) throws IOException {
        return StructuredRecord.builder(ConnectorSource.RECORD_WITH_SCHEMA).set("stageName", (String) keyValue.getKey()).set("schema", ((StructuredRecord) keyValue.getValue()).getSchema().toString()).set("record", StructuredRecordStringConverter.toJsonString((StructuredRecord) keyValue.getValue())).build();
    }

    @Override // co.cask.cdap.etl.api.batch.BatchSink, co.cask.cdap.etl.api.Transformation
    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<String, StructuredRecord>) obj, (Emitter<KeyValue<NullWritable, Text>>) emitter);
    }
}
