package gobblin.test;

import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import gobblin.configuration.SourceState;
import gobblin.configuration.WorkUnitState;
import gobblin.source.Source;
import gobblin.source.extractor.DataRecordException;
import gobblin.source.extractor.Extractor;
import gobblin.source.extractor.WatermarkInterval;
import gobblin.source.extractor.extract.LongWatermark;
import gobblin.source.workunit.Extract;
import gobblin.source.workunit.ExtractFactory;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.ConfigUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/test/SequentialTestSource.class */
public class SequentialTestSource implements Source<String, String> {
    private static final int DEFAULT_NUM_PARALLELISM = 1;
    private static final String DEFAULT_NAMESPACE = "TestDB";
    private static final String DEFAULT_TABLE = "TestTable";
    public static final String WORK_UNIT_INDEX = "workUnitIndex";
    private int num_parallelism;
    private String namespace;
    private String table;
    private int numRecordsPerExtract;
    private static final Logger log = LoggerFactory.getLogger(SequentialTestSource.class);
    private static final Integer DEFAULT_NUM_RECORDS_PER_EXTRACT = 100;
    private final AtomicBoolean configured = new AtomicBoolean(false);
    private final Extract.TableType tableType = Extract.TableType.APPEND_ONLY;
    private final ExtractFactory _extractFactory = new ExtractFactory("yyyyMMddHHmmss");

    private void configureIfNeeded(Config config) {
        if (this.configured.get()) {
            return;
        }
        this.num_parallelism = ConfigUtils.getInt(config, "source.numParallelism", Integer.valueOf(DEFAULT_NUM_PARALLELISM)).intValue();
        this.namespace = ConfigUtils.getString(config, "source.namespace", DEFAULT_NAMESPACE);
        this.table = ConfigUtils.getString(config, "source.table", DEFAULT_TABLE);
        this.numRecordsPerExtract = ConfigUtils.getInt(config, "source.numRecordsPerExtract", DEFAULT_NUM_RECORDS_PER_EXTRACT).intValue();
        this.configured.set(true);
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        WorkUnit create;
        configureIfNeeded(ConfigFactory.parseProperties(sourceState.getProperties()));
        List<WorkUnitState> previousWorkUnitStates = sourceState.getPreviousWorkUnitStates();
        if (previousWorkUnitStates.isEmpty()) {
            return initialWorkUnits();
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(previousWorkUnitStates.size());
        for (WorkUnitState workUnitState : previousWorkUnitStates) {
            if (workUnitState.getWorkingState().equals(WorkUnitState.WorkingState.COMMITTED)) {
                LongWatermark actualHighWatermark = workUnitState.getActualHighWatermark(LongWatermark.class);
                WatermarkInterval watermarkInterval = new WatermarkInterval(actualHighWatermark, new LongWatermark(actualHighWatermark.getValue() + this.numRecordsPerExtract));
                create = WorkUnit.create(newExtract(this.tableType, this.namespace, this.table), watermarkInterval);
                log.debug("Will be setting watermark interval to " + watermarkInterval.toJson());
                create.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
            } else {
                LongWatermark lowWatermark = workUnitState.getWorkunit().getLowWatermark(LongWatermark.class);
                WatermarkInterval watermarkInterval2 = new WatermarkInterval(lowWatermark, new LongWatermark(lowWatermark.getValue() + this.numRecordsPerExtract));
                create = WorkUnit.create(newExtract(this.tableType, this.namespace, this.table), watermarkInterval2);
                log.debug("Will be setting watermark interval to " + watermarkInterval2.toJson());
                create.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
            }
            newArrayListWithCapacity.add(create);
        }
        return newArrayListWithCapacity;
    }

    private List<WorkUnit> initialWorkUnits() {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < this.num_parallelism; i += DEFAULT_NUM_PARALLELISM) {
            WorkUnit create = WorkUnit.create(newExtract(Extract.TableType.APPEND_ONLY, this.namespace, this.table));
            create.setWatermarkInterval(new WatermarkInterval(new LongWatermark((i * this.numRecordsPerExtract) + DEFAULT_NUM_PARALLELISM), new LongWatermark((i + DEFAULT_NUM_PARALLELISM) * this.numRecordsPerExtract)));
            create.setProp(WORK_UNIT_INDEX, Integer.valueOf(i));
            newArrayList.add(create);
        }
        return newArrayList;
    }

    private Extract newExtract(Extract.TableType tableType, String str, String str2) {
        return this._extractFactory.getUniqueExtract(tableType, str, str2);
    }

    public Extractor<String, String> getExtractor(final WorkUnitState workUnitState) throws IOException {
        configureIfNeeded(ConfigFactory.parseProperties(workUnitState.getProperties()));
        final LongWatermark lowWatermark = workUnitState.getWorkunit().getLowWatermark(LongWatermark.class);
        final int propAsInt = workUnitState.getPropAsInt(WORK_UNIT_INDEX);
        return new Extractor<String, String>() { // from class: gobblin.test.SequentialTestSource.1
            long recordsExtracted = 0;
            LongWatermark currentWatermark;

            {
                this.currentWatermark = lowWatermark;
            }

            /* renamed from: getSchema, reason: merged with bridge method [inline-methods] */
            public String m4getSchema() throws IOException {
                return "";
            }

            public String readRecord(@Deprecated String str) throws DataRecordException, IOException {
                if (this.recordsExtracted >= SequentialTestSource.this.numRecordsPerExtract) {
                    return null;
                }
                String str2 = ":index:" + propAsInt + ":seq:" + this.currentWatermark.getValue() + ":";
                SequentialTestSource.log.debug("Extracted record -> " + str2);
                this.currentWatermark.increment();
                this.recordsExtracted++;
                return str2;
            }

            public long getExpectedRecordCount() {
                return SequentialTestSource.this.numRecordsPerExtract;
            }

            public long getHighWatermark() {
                return workUnitState.getHighWaterMark();
            }

            public void close() throws IOException {
                workUnitState.setActualHighWatermark(this.currentWatermark);
            }
        };
    }

    public void shutdown(SourceState sourceState) {
    }
}
