package co.cask.cdap.etl.batch.connector;

import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.PartitionDetail;
import co.cask.cdap.api.dataset.lib.PartitionFilter;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetArguments;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetProperties;
import co.cask.cdap.api.dataset.lib.Partitioning;
import co.cask.cdap.api.workflow.WorkflowConfigurer;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.cdap.format.StructuredRecordStringConverter;
import java.util.HashMap;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/* loaded from: input_file:lib/cdap-etl-batch-3.5.5.jar:co/cask/cdap/etl/batch/connector/ConnectorSource.class */
public class ConnectorSource extends BatchSource<LongWritable, Text, KeyValue<String, StructuredRecord>> {
    static final Schema RECORD_WITH_SCHEMA = Schema.recordOf("record", Schema.Field.of("stageName", Schema.of(Schema.Type.STRING)), Schema.Field.of("schema", Schema.of(Schema.Type.STRING)), Schema.Field.of("record", Schema.of(Schema.Type.STRING)));
    private final String datasetName;

    @Nullable
    private final Schema schema;

    public ConnectorSource(String str, @Nullable Schema schema) {
        this.datasetName = str;
        this.schema = schema;
    }

    public void configure(WorkflowConfigurer workflowConfigurer) {
        workflowConfigurer.createLocalDataset(this.datasetName, PartitionedFileSet.class, PartitionedFileSetProperties.builder().setPartitioning(Partitioning.builder().addField("phase", Partitioning.FieldType.STRING).build()).setInputFormat(TextInputFormat.class).setOutputFormat(TextOutputFormat.class).build());
    }

    @Override // co.cask.cdap.etl.api.batch.BatchConfigurable
    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        HashMap hashMap = new HashMap();
        Iterator it = batchSourceContext.getDataset(this.datasetName).getPartitions(PartitionFilter.ALWAYS_MATCH).iterator();
        while (it.hasNext()) {
            PartitionedFileSetArguments.addInputPartition(hashMap, (PartitionDetail) it.next());
        }
        batchSourceContext.setInput(Input.ofDataset(this.datasetName, hashMap));
    }

    @Override // co.cask.cdap.etl.api.batch.BatchSource
    public void transform(KeyValue<LongWritable, Text> keyValue, Emitter<KeyValue<String, StructuredRecord>> emitter) throws Exception {
        StructuredRecord fromJsonString;
        String text = ((Text) keyValue.getValue()).toString();
        StructuredRecord fromJsonString2 = StructuredRecordStringConverter.fromJsonString(text, RECORD_WITH_SCHEMA);
        String str = (String) fromJsonString2.get("stageName");
        if (this.schema == null) {
            fromJsonString = StructuredRecordStringConverter.fromJsonString((String) fromJsonString2.get("record"), Schema.parseJson((String) fromJsonString2.get("schema")));
        } else {
            fromJsonString = StructuredRecordStringConverter.fromJsonString(text, this.schema);
        }
        emitter.emit(new KeyValue<>(str, fromJsonString));
    }

    @Override // co.cask.cdap.etl.api.batch.BatchSource, co.cask.cdap.etl.api.Transformation
    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, Text>) obj, (Emitter<KeyValue<String, StructuredRecord>>) emitter);
    }
}
