package org.apache.beam.io.debezium;

import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.HistoryRecord;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn.class */
public class KafkaSourceConsumerFn<T> extends DoFn<Map<String, String>, T> {
    public static final String BEAM_INSTANCE_PROPERTY = "beam.parent.instance";
    private final Class<? extends SourceConnector> connectorClass;
    private final SourceRecordMapper<T> fn;
    private final Long milisecondsToRun;
    private final Integer maxRecords;
    private static DateTime startTime;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaSourceConsumerFn.class);
    private static final Map<String, RestrictionTracker<OffsetHolder, Map<String, Object>>> restrictionTrackers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$BeamSourceTaskContext.class */
    public static class BeamSourceTaskContext implements SourceTaskContext {
        private final Map<String, ?> initialOffset;

        BeamSourceTaskContext(Map<String, ?> map) {
            this.initialOffset = map;
        }

        @Override // org.apache.kafka.connect.source.SourceTaskContext
        public Map<String, String> configs() {
            throw new UnsupportedOperationException("unimplemented");
        }

        @Override // org.apache.kafka.connect.source.SourceTaskContext
        public OffsetStorageReader offsetStorageReader() {
            KafkaSourceConsumerFn.LOG.debug("------------- Creating an offset storage reader");
            return new DebeziumSourceOffsetStorageReader(this.initialOffset);
        }
    }

    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$DebeziumSDFDatabaseHistory.class */
    public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory {
        private List<byte[]> history = new ArrayList();

        @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
        public void start() {
            super.start();
            KafkaSourceConsumerFn.LOG.debug("------------ STARTING THE DATABASE HISTORY! - trackers: {} - config: {}", KafkaSourceConsumerFn.restrictionTrackers, this.config.asMap());
            this.history = ((OffsetHolder) ((RestrictionTracker) KafkaSourceConsumerFn.restrictionTrackers.get(KafkaSourceConsumerFn.restrictionTrackers.keySet().iterator().next())).currentRestriction()).history;
        }

        @Override // io.debezium.relational.history.AbstractDatabaseHistory
        protected void storeRecord(HistoryRecord historyRecord) throws DatabaseHistoryException {
            KafkaSourceConsumerFn.LOG.debug("------------- Adding history! {}", historyRecord);
            this.history.add(DocumentWriter.defaultWriter().writeAsBytes(historyRecord.document()));
        }

        @Override // io.debezium.relational.history.AbstractDatabaseHistory
        protected void recoverRecords(Consumer<HistoryRecord> consumer) {
            KafkaSourceConsumerFn.LOG.debug("------------- Trying to recover!");
            try {
                Iterator<byte[]> it = this.history.iterator();
                while (it.hasNext()) {
                    consumer.accept(new HistoryRecord(DocumentReader.defaultReader().read(it.next())));
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // io.debezium.relational.history.DatabaseHistory
        public boolean exists() {
            return (this.history == null || this.history.isEmpty()) ? false : true;
        }

        @Override // io.debezium.relational.history.DatabaseHistory
        public boolean storageExists() {
            return (this.history == null || this.history.isEmpty()) ? false : true;
        }
    }

    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$DebeziumSourceOffsetStorageReader.class */
    private static class DebeziumSourceOffsetStorageReader implements OffsetStorageReader {
        private final Map<String, ?> offset;

        DebeziumSourceOffsetStorageReader(Map<String, ?> map) {
            this.offset = map;
        }

        @Override // org.apache.kafka.connect.storage.OffsetStorageReader
        public <V> Map<String, Object> offset(Map<String, V> map) {
            return offsets(Collections.singletonList(map)).getOrDefault(map, ImmutableMap.of());
        }

        @Override // org.apache.kafka.connect.storage.OffsetStorageReader
        public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> collection) {
            KafkaSourceConsumerFn.LOG.debug("-------------- GETTING OFFSETS!");
            HashMap hashMap = new HashMap();
            Iterator<Map<String, T>> it = collection.iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), this.offset);
            }
            KafkaSourceConsumerFn.LOG.debug("-------------- OFFSETS: {}", hashMap);
            return hashMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$OffsetHolder.class */
    public static class OffsetHolder implements Serializable {
        public Map<String, ?> offset;
        public List<?> history;
        public Integer fetchedRecords;
        public Integer maxRecords;
        public final Long milisToRun;

        OffsetHolder(Map<String, ?> map, List<?> list, Integer num, Integer num2, Long l) {
            this.offset = map;
            this.history = list == null ? new ArrayList<>() : list;
            this.fetchedRecords = num;
            this.maxRecords = num2;
            this.milisToRun = l;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public OffsetHolder(Map<String, ?> map, List<?> list, Integer num) {
            this(map, list, num, null, -1L);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$OffsetTracker.class */
    public static class OffsetTracker extends RestrictionTracker<OffsetHolder, Map<String, Object>> {
        private OffsetHolder restriction;

        /* JADX INFO: Access modifiers changed from: package-private */
        public OffsetTracker(OffsetHolder offsetHolder) {
            this.restriction = offsetHolder;
        }

        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public boolean tryClaim(Map<String, Object> map) {
            KafkaSourceConsumerFn.LOG.debug("-------------- Claiming {} used to have: {}", map, this.restriction.offset);
            long currentTimeMillis = System.currentTimeMillis() - KafkaSourceConsumerFn.startTime.getMillis();
            int intValue = this.restriction.fetchedRecords == null ? 0 : this.restriction.fetchedRecords.intValue() + 1;
            KafkaSourceConsumerFn.LOG.debug("------------Fetched records {} / {}", Integer.valueOf(intValue), this.restriction.maxRecords);
            KafkaSourceConsumerFn.LOG.debug("-------------- Time running: {} / {}", Long.valueOf(currentTimeMillis), this.restriction.milisToRun);
            this.restriction.offset = map;
            this.restriction.fetchedRecords = Integer.valueOf(intValue);
            KafkaSourceConsumerFn.LOG.debug("-------------- History: {}", this.restriction.history);
            if (this.restriction.maxRecords == null && this.restriction.milisToRun.longValue() == -1) {
                return true;
            }
            return (this.restriction.maxRecords == null || intValue < this.restriction.maxRecords.intValue()) && (this.restriction.milisToRun == null || currentTimeMillis < this.restriction.milisToRun.longValue());
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public OffsetHolder currentRestriction() {
            return this.restriction;
        }

        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public SplitResult<OffsetHolder> trySplit(double d) {
            KafkaSourceConsumerFn.LOG.debug("-------------- Trying to split: fractionOfRemainder={}", Double.valueOf(d));
            return SplitResult.of(new OffsetHolder(null, null, null), this.restriction);
        }

        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public void checkDone() throws IllegalStateException {
        }

        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public RestrictionTracker.IsBounded isBounded() {
            return RestrictionTracker.IsBounded.UNBOUNDED;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public KafkaSourceConsumerFn(Class<?> cls, SourceRecordMapper<T> sourceRecordMapper, Integer num, Long l) {
        this.connectorClass = cls;
        this.fn = sourceRecordMapper;
        this.maxRecords = num;
        this.milisecondsToRun = l;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSourceConsumerFn(Class<?> cls, SourceRecordMapper<T> sourceRecordMapper, Integer num) {
        this(cls, sourceRecordMapper, num, null);
    }

    @DoFn.GetInitialRestriction
    public OffsetHolder getInitialRestriction(@DoFn.Element Map<String, String> map) throws IOException {
        startTime = new DateTime();
        return new OffsetHolder(null, null, null, this.maxRecords, this.milisecondsToRun);
    }

    @DoFn.NewTracker
    public RestrictionTracker<OffsetHolder, Map<String, Object>> newTracker(@DoFn.Restriction OffsetHolder offsetHolder) {
        return new OffsetTracker(offsetHolder);
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetHolder> getRestrictionCoder() {
        return SerializableCoder.of(OffsetHolder.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceRecord getOneRecord(Map<String, String> map) {
        try {
            SourceConnector newInstance = this.connectorClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            newInstance.start(map);
            SourceTask sourceTask = (SourceTask) newInstance.taskClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            sourceTask.initialize(new BeamSourceTaskContext(null));
            sourceTask.start(newInstance.taskConfigs(1).get(0));
            List<SourceRecord> newArrayList = Lists.newArrayList();
            int i = 0;
            while (newArrayList.size() == 0) {
                if (i > 3) {
                    throw new RuntimeException("could not fetch database schema");
                }
                newArrayList = sourceTask.poll();
                Thread.sleep(2000L);
                i++;
            }
            sourceTask.stop();
            newInstance.stop();
            return newArrayList.get(0);
        } catch (IllegalAccessException | InstantiationException | InterruptedException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Unexpected exception fetching database schema.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void register(RestrictionTracker<OffsetHolder, Map<String, Object>> restrictionTracker) {
        restrictionTrackers.put(getHashCode(), restrictionTracker);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reset() {
        restrictionTrackers.remove(getHashCode());
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return new WatermarkEstimators.MonotonicallyIncreasing(ensureTimestampWithinBounds(instant));
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return instant;
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation process(@DoFn.Element Map<String, String> map, RestrictionTracker<OffsetHolder, Map<String, Object>> restrictionTracker, DoFn.OutputReceiver<T> outputReceiver) throws Exception {
        HashMap hashMap = new HashMap(map);
        register(restrictionTracker);
        hashMap.put(BEAM_INSTANCE_PROPERTY, getHashCode());
        SourceConnector newInstance = this.connectorClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        newInstance.start(hashMap);
        SourceTask sourceTask = (SourceTask) newInstance.taskClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        try {
            try {
                LOG.debug("--------- Consumer offset from Debezium Tracker: {}", restrictionTracker.currentRestriction().offset);
                sourceTask.initialize(new BeamSourceTaskContext(restrictionTracker.currentRestriction().offset));
                sourceTask.start(newInstance.taskConfigs(1).get(0));
                List<SourceRecord> poll = sourceTask.poll();
                if (poll == null) {
                    LOG.debug("-------- Pulled records null");
                    DoFn.ProcessContinuation stop = DoFn.ProcessContinuation.stop();
                    reset();
                    LOG.debug("------- Stopping SourceTask");
                    sourceTask.stop();
                    return stop;
                }
                LOG.debug("-------- {} records found", Integer.valueOf(poll.size()));
                while (poll != null && !poll.isEmpty()) {
                    for (SourceRecord sourceRecord : poll) {
                        LOG.debug("-------- Record found: {}", sourceRecord);
                        Map<String, ?> sourceOffset = sourceRecord.sourceOffset();
                        if (sourceOffset == null || !restrictionTracker.tryClaim(sourceOffset)) {
                            LOG.debug("-------- Offset null or could not be claimed");
                            DoFn.ProcessContinuation stop2 = DoFn.ProcessContinuation.stop();
                            reset();
                            LOG.debug("------- Stopping SourceTask");
                            sourceTask.stop();
                            return stop2;
                        }
                        T mapSourceRecord = this.fn.mapSourceRecord(sourceRecord);
                        LOG.debug("****************** RECEIVED SOURCE AS JSON: {}", mapSourceRecord);
                        outputReceiver.outputWithTimestamp(mapSourceRecord, KafkaConnectUtils.debeziumRecordInstant(sourceRecord));
                    }
                    sourceTask.commit();
                    poll = sourceTask.poll();
                }
                return (this.milisecondsToRun == null || this.milisecondsToRun.longValue() <= 0 || System.currentTimeMillis() - startTime.getMillis() < this.milisecondsToRun.longValue()) ? DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1L)) : DoFn.ProcessContinuation.stop();
            } catch (Exception e) {
                throw new RuntimeException("Error occurred when consuming changes from Database. ", e);
            }
        } finally {
            reset();
            LOG.debug("------- Stopping SourceTask");
            sourceTask.stop();
        }
    }

    public String getHashCode() {
        return Integer.toString(System.identityHashCode(this));
    }
}
