package gobblin.test;

import avro.shaded.com.google.common.base.Throwables;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import gobblin.configuration.SourceState;
import gobblin.configuration.WorkUnitState;
import gobblin.runtime.TaskConfigurationKeys;
import gobblin.source.Source;
import gobblin.source.extractor.CheckpointableWatermark;
import gobblin.source.extractor.DataRecordException;
import gobblin.source.extractor.DefaultCheckpointableWatermark;
import gobblin.source.extractor.Extractor;
import gobblin.source.extractor.StreamingExtractor;
import gobblin.source.extractor.WatermarkInterval;
import gobblin.source.extractor.extract.LongWatermark;
import gobblin.source.extractor.partition.Partitioner;
import gobblin.source.workunit.Extract;
import gobblin.source.workunit.ExtractFactory;
import gobblin.source.workunit.WorkUnit;
import gobblin.stream.RecordEnvelope;
import gobblin.util.ConfigUtils;
import gobblin.writer.WatermarkStorage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/test/SequentialTestSource.class */
public class SequentialTestSource implements Source<String, Object> {
    private static final int DEFAULT_NUM_PARALLELISM = 1;
    private static final String DEFAULT_NAMESPACE = "TestDB";
    private static final String DEFAULT_TABLE = "TestTable";
    public static final String WORK_UNIT_INDEX = "workUnitIndex";
    private int num_parallelism;
    private String namespace;
    private String table;
    private int numRecordsPerExtract;
    private long sleepTimePerRecord;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SequentialTestSource.class);
    private static final Integer DEFAULT_NUM_RECORDS_PER_EXTRACT = 100;
    private static final Long DEFAULT_SLEEP_TIME_PER_RECORD_MILLIS = 10L;
    private final AtomicBoolean configured = new AtomicBoolean(false);
    private final Extract.TableType tableType = Extract.TableType.APPEND_ONLY;
    private final ExtractFactory _extractFactory = new ExtractFactory(Partitioner.WATERMARKTIMEFORMAT);
    private boolean streaming = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/test/SequentialTestSource$TestBatchExtractor.class */
    public static class TestBatchExtractor implements Extractor<String, Object> {
        private long recordsExtracted = 0;
        private final long numRecordsPerExtract;
        private LongWatermark currentWatermark;
        private long sleepTimePerRecord;
        private int partition;
        WorkUnitState workUnitState;

        public TestBatchExtractor(int i, LongWatermark longWatermark, long j, long j2, WorkUnitState workUnitState) {
            this.partition = i;
            this.currentWatermark = longWatermark;
            this.numRecordsPerExtract = j;
            this.sleepTimePerRecord = j2;
            this.workUnitState = workUnitState;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // gobblin.source.extractor.Extractor
        public String getSchema() throws IOException {
            return "";
        }

        @Override // gobblin.source.extractor.Extractor
        public Object readRecord(@Deprecated Object obj) throws DataRecordException, IOException {
            if (this.recordsExtracted >= this.numRecordsPerExtract) {
                return null;
            }
            try {
                Thread.sleep(this.sleepTimePerRecord);
            } catch (InterruptedException e) {
                Throwables.propagate(e);
            }
            TestRecord testRecord = new TestRecord(this.partition, this.currentWatermark.getValue(), null);
            SequentialTestSource.log.debug("Extracted record -> {}", testRecord);
            this.currentWatermark.increment();
            this.recordsExtracted++;
            return testRecord;
        }

        @Override // gobblin.source.extractor.Extractor
        public long getExpectedRecordCount() {
            return this.numRecordsPerExtract;
        }

        @Override // gobblin.source.extractor.Extractor
        public long getHighWatermark() {
            return this.workUnitState.getHighWaterMark();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.workUnitState.setActualHighWatermark(this.currentWatermark);
        }

        public void setCurrentWatermark(LongWatermark longWatermark) {
            this.currentWatermark = longWatermark;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/test/SequentialTestSource$TestStreamingExtractor.class */
    static class TestStreamingExtractor implements StreamingExtractor<String, Object> {
        private Optional<WatermarkStorage> watermarkStorage;
        private final TestBatchExtractor extractor;

        public TestStreamingExtractor(TestBatchExtractor testBatchExtractor) {
            this.extractor = testBatchExtractor;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.extractor.close();
        }

        @Override // gobblin.source.extractor.Extractor
        public String getSchema() throws IOException {
            return this.extractor.getSchema();
        }

        @Override // gobblin.source.extractor.Extractor
        public RecordEnvelope<Object> readRecordEnvelope() throws DataRecordException, IOException {
            TestRecord testRecord = (TestRecord) this.extractor.readRecord(null);
            return new RecordEnvelope<>(testRecord, new DefaultCheckpointableWatermark("" + testRecord.getPartition(), new LongWatermark(testRecord.getSequence())));
        }

        @Override // gobblin.source.extractor.Extractor
        public long getExpectedRecordCount() {
            return this.extractor.getExpectedRecordCount();
        }

        @Override // gobblin.source.extractor.Extractor
        public long getHighWatermark() {
            return this.extractor.getHighWatermark();
        }

        @Override // gobblin.source.extractor.StreamingExtractor
        public void start(WatermarkStorage watermarkStorage) throws IOException {
            Map<String, CheckpointableWatermark> map;
            this.watermarkStorage = Optional.of(watermarkStorage);
            try {
                map = this.watermarkStorage.get().getCommittedWatermarks(DefaultCheckpointableWatermark.class, ImmutableList.of("" + this.extractor.partition));
            } catch (IOException e) {
                SequentialTestSource.log.warn("Failed to get watermarks... will start from the beginning", (Throwable) e);
                map = Collections.EMPTY_MAP;
            }
            for (Map.Entry<String, CheckpointableWatermark> entry : map.entrySet()) {
                SequentialTestSource.log.info("{}: Found these committed watermarks: key: {}, value: {}", this, entry.getKey(), entry.getValue());
            }
            LongWatermark longWatermark = (map.isEmpty() || !map.containsKey(new StringBuilder().append("").append(this.extractor.partition).toString())) ? new LongWatermark(-1L) : (LongWatermark) map.get("" + this.extractor.partition).getWatermark();
            this.extractor.setCurrentWatermark(longWatermark);
            SequentialTestSource.log.info("{}: Set current watermark to : {}", this, longWatermark);
        }
    }

    private void configureIfNeeded(Config config) {
        if (this.configured.get()) {
            return;
        }
        this.num_parallelism = ConfigUtils.getInt(config, "source.numParallelism", 1).intValue();
        this.namespace = ConfigUtils.getString(config, "source.namespace", DEFAULT_NAMESPACE);
        this.table = ConfigUtils.getString(config, "source.table", DEFAULT_TABLE);
        this.numRecordsPerExtract = ConfigUtils.getInt(config, "source.numRecordsPerExtract", DEFAULT_NUM_RECORDS_PER_EXTRACT).intValue();
        this.sleepTimePerRecord = ConfigUtils.getLong(config, "source.sleepTimePerRecordMillis", DEFAULT_SLEEP_TIME_PER_RECORD_MILLIS).longValue();
        this.streaming = ConfigUtils.getString(config, TaskConfigurationKeys.TASK_EXECUTION_MODE, TaskConfigurationKeys.DEFAULT_TASK_EXECUTION_MODE).equalsIgnoreCase("STREAMING");
        if (this.streaming) {
            this.numRecordsPerExtract = Integer.MAX_VALUE;
        }
        this.configured.set(true);
    }

    @Override // gobblin.source.Source
    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        WorkUnit create;
        configureIfNeeded(ConfigFactory.parseProperties(sourceState.getProperties()));
        List<WorkUnitState> previousWorkUnitStates = sourceState.getPreviousWorkUnitStates();
        if (previousWorkUnitStates.isEmpty()) {
            return initialWorkUnits();
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(previousWorkUnitStates.size());
        for (WorkUnitState workUnitState : previousWorkUnitStates) {
            if (workUnitState.getWorkingState().equals(WorkUnitState.WorkingState.COMMITTED)) {
                LongWatermark longWatermark = (LongWatermark) workUnitState.getActualHighWatermark(LongWatermark.class);
                WatermarkInterval watermarkInterval = new WatermarkInterval(longWatermark, new LongWatermark(longWatermark.getValue() + this.numRecordsPerExtract));
                create = WorkUnit.create(newExtract(this.tableType, this.namespace, this.table), watermarkInterval);
                log.debug("Will be setting watermark interval to " + watermarkInterval.toJson());
                create.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
            } else {
                LongWatermark longWatermark2 = (LongWatermark) workUnitState.getWorkunit().getLowWatermark(LongWatermark.class);
                WatermarkInterval watermarkInterval2 = new WatermarkInterval(longWatermark2, new LongWatermark(longWatermark2.getValue() + this.numRecordsPerExtract));
                create = WorkUnit.create(newExtract(this.tableType, this.namespace, this.table), watermarkInterval2);
                log.debug("Will be setting watermark interval to " + watermarkInterval2.toJson());
                create.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
            }
            newArrayListWithCapacity.add(create);
        }
        return newArrayListWithCapacity;
    }

    private List<WorkUnit> initialWorkUnits() {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < this.num_parallelism; i++) {
            WorkUnit create = WorkUnit.create(newExtract(Extract.TableType.APPEND_ONLY, this.namespace, this.table));
            create.setWatermarkInterval(new WatermarkInterval(new LongWatermark((i * this.numRecordsPerExtract) + 1), new LongWatermark((i + 1) * this.numRecordsPerExtract)));
            create.setProp(WORK_UNIT_INDEX, Integer.valueOf(i));
            newArrayList.add(create);
        }
        return newArrayList;
    }

    private Extract newExtract(Extract.TableType tableType, String str, String str2) {
        return this._extractFactory.getUniqueExtract(tableType, str, str2);
    }

    @Override // gobblin.source.Source
    public Extractor<String, Object> getExtractor(WorkUnitState workUnitState) throws IOException {
        configureIfNeeded(ConfigFactory.parseProperties(workUnitState.getProperties()));
        TestBatchExtractor testBatchExtractor = new TestBatchExtractor(workUnitState.getPropAsInt(WORK_UNIT_INDEX), (LongWatermark) workUnitState.getWorkunit().getLowWatermark(LongWatermark.class), this.numRecordsPerExtract, this.sleepTimePerRecord, workUnitState);
        return !this.streaming ? testBatchExtractor : new TestStreamingExtractor(testBatchExtractor);
    }

    @Override // gobblin.source.Source
    public void shutdown(SourceState sourceState) {
    }
}
