package co.cask.cdap.etl.batch.connector;

import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.format.StructuredRecordStringConverter;
import javax.annotation.Nullable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

/* loaded from: input_file:lib/cdap-etl-core-4.3.0.jar:co/cask/cdap/etl/batch/connector/SingleConnectorSource.class */
public class SingleConnectorSource extends ConnectorSource<StructuredRecord> {
    static final Schema RECORD_WITH_SCHEMA = Schema.recordOf("record", Schema.Field.of("schema", Schema.of(Schema.Type.STRING)), Schema.Field.of("record", Schema.of(Schema.Type.STRING)));

    @Nullable
    private final Schema schema;

    public SingleConnectorSource(String str, @Nullable Schema schema) {
        super(str);
        this.schema = schema;
    }

    @Override // co.cask.cdap.etl.api.batch.BatchSource
    public void transform(KeyValue<LongWritable, Text> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        StructuredRecord fromJsonString;
        String text = keyValue.getValue().toString();
        StructuredRecord fromJsonString2 = StructuredRecordStringConverter.fromJsonString(text, RECORD_WITH_SCHEMA);
        if (this.schema == null) {
            fromJsonString = StructuredRecordStringConverter.fromJsonString((String) fromJsonString2.get("record"), Schema.parseJson((String) fromJsonString2.get("schema")));
        } else {
            fromJsonString = StructuredRecordStringConverter.fromJsonString(text, this.schema);
        }
        emitter.emit(fromJsonString);
    }

    @Override // co.cask.cdap.etl.api.batch.BatchSource, co.cask.cdap.etl.api.Transformation
    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, Text>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
