package org.apache.beam.io.debezium;

import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.HistoryRecord;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn.class */
public class KafkaSourceConsumerFn<T> extends DoFn<Map<String, String>, T> {
    public static final String BEAM_INSTANCE_PROPERTY = "beam.parent.instance";
    private final Class<? extends SourceConnector> connectorClass;
    private final SourceRecordMapper<T> fn;
    private long minutesToRun;
    private Integer maxRecords;
    private static DateTime startTime;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceConsumerFn.class);
    private static final Map<String, RestrictionTracker<OffsetHolder, Map<String, Object>>> restrictionTrackers = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$BeamSourceTaskContext.class */
    private static class BeamSourceTaskContext implements SourceTaskContext {
        private final Map<String, ?> initialOffset;

        BeamSourceTaskContext(Map<String, ?> map) {
            this.initialOffset = map;
        }

        public Map<String, String> configs() {
            throw new UnsupportedOperationException("unimplemented");
        }

        public OffsetStorageReader offsetStorageReader() {
            KafkaSourceConsumerFn.LOG.debug("------------- Creating an offset storage reader");
            return new DebeziumSourceOffsetStorageReader(this.initialOffset);
        }
    }

    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$DebeziumSDFDatabaseHistory.class */
    public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory {
        private List<byte[]> history = new ArrayList();

        public void start() {
            super.start();
            KafkaSourceConsumerFn.LOG.debug("------------ STARTING THE DATABASE HISTORY! - trackers: {} - config: {}", KafkaSourceConsumerFn.restrictionTrackers, this.config.asMap());
            this.history = ((OffsetHolder) ((RestrictionTracker) KafkaSourceConsumerFn.restrictionTrackers.get(KafkaSourceConsumerFn.restrictionTrackers.keySet().iterator().next())).currentRestriction()).history;
        }

        protected void storeRecord(HistoryRecord historyRecord) throws DatabaseHistoryException {
            KafkaSourceConsumerFn.LOG.debug("------------- Adding history! {}", historyRecord);
            this.history.add(DocumentWriter.defaultWriter().writeAsBytes(historyRecord.document()));
        }

        protected void recoverRecords(Consumer<HistoryRecord> consumer) {
            KafkaSourceConsumerFn.LOG.debug("------------- Trying to recover!");
            try {
                Iterator<byte[]> it = this.history.iterator();
                while (it.hasNext()) {
                    consumer.accept(new HistoryRecord(DocumentReader.defaultReader().read(it.next())));
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public boolean exists() {
            return (this.history == null || this.history.isEmpty()) ? false : true;
        }

        public boolean storageExists() {
            return (this.history == null || this.history.isEmpty()) ? false : true;
        }
    }

    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$DebeziumSourceOffsetStorageReader.class */
    private static class DebeziumSourceOffsetStorageReader implements OffsetStorageReader {
        private final Map<String, ?> offset;

        DebeziumSourceOffsetStorageReader(Map<String, ?> map) {
            this.offset = map;
        }

        public <V> Map<String, Object> offset(Map<String, V> map) {
            return offsets(Collections.singletonList(map)).getOrDefault(map, ImmutableMap.of());
        }

        public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> collection) {
            KafkaSourceConsumerFn.LOG.debug("-------------- GETTING OFFSETS!");
            HashMap hashMap = new HashMap();
            Iterator<Map<String, T>> it = collection.iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), this.offset);
            }
            KafkaSourceConsumerFn.LOG.debug("-------------- OFFSETS: {}", hashMap);
            return hashMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$OffsetHolder.class */
    public static class OffsetHolder implements Serializable {
        public final Map<String, ?> offset;
        public final List<?> history;
        public final Integer fetchedRecords;
        public final Integer maxRecords;
        public final long minutesToRun;

        OffsetHolder(Map<String, ?> map, List<?> list, Integer num, Integer num2, long j) {
            this.offset = map;
            this.history = list == null ? new ArrayList<>() : list;
            this.fetchedRecords = num;
            this.maxRecords = num2;
            this.minutesToRun = j;
        }

        OffsetHolder(Map<String, ?> map, List<?> list, Integer num) {
            this(map, list, num, null, -1L);
        }
    }

    /* loaded from: input_file:org/apache/beam/io/debezium/KafkaSourceConsumerFn$OffsetTracker.class */
    static class OffsetTracker extends RestrictionTracker<OffsetHolder, Map<String, Object>> {
        private OffsetHolder restriction;
        private static final long MILLIS = 60000;

        OffsetTracker(OffsetHolder offsetHolder) {
            this.restriction = offsetHolder;
        }

        public boolean tryClaim(Map<String, Object> map) {
            KafkaSourceConsumerFn.LOG.debug("-------------- Claiming {} used to have: {}", map, this.restriction.offset);
            long currentTimeMillis = System.currentTimeMillis() - KafkaSourceConsumerFn.startTime.getMillis();
            int intValue = this.restriction.fetchedRecords == null ? 0 : this.restriction.fetchedRecords.intValue() + 1;
            KafkaSourceConsumerFn.LOG.debug("------------Fetched records {} / {}", Integer.valueOf(intValue), this.restriction.maxRecords);
            KafkaSourceConsumerFn.LOG.debug("-------------- Time running: {} / {}", Long.valueOf(currentTimeMillis), Long.valueOf(this.restriction.minutesToRun * MILLIS));
            this.restriction = new OffsetHolder(map, this.restriction.history, Integer.valueOf(intValue), this.restriction.maxRecords, this.restriction.minutesToRun);
            KafkaSourceConsumerFn.LOG.debug("-------------- History: {}", this.restriction.history);
            if (this.restriction.maxRecords == null && this.restriction.minutesToRun == -1) {
                return true;
            }
            return this.restriction.maxRecords != null ? intValue < this.restriction.maxRecords.intValue() : currentTimeMillis < this.restriction.minutesToRun * MILLIS;
        }

        /* renamed from: currentRestriction, reason: merged with bridge method [inline-methods] */
        public OffsetHolder m4currentRestriction() {
            return this.restriction;
        }

        public SplitResult<OffsetHolder> trySplit(double d) {
            KafkaSourceConsumerFn.LOG.debug("-------------- Trying to split: fractionOfRemainder={}", Double.valueOf(d));
            return SplitResult.of(new OffsetHolder(null, null, null), this.restriction);
        }

        public void checkDone() throws IllegalStateException {
        }

        public RestrictionTracker.IsBounded isBounded() {
            return RestrictionTracker.IsBounded.BOUNDED;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    KafkaSourceConsumerFn(Class<?> cls, SourceRecordMapper<T> sourceRecordMapper, long j) {
        this.minutesToRun = -1L;
        this.connectorClass = cls;
        this.fn = sourceRecordMapper;
        this.minutesToRun = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public KafkaSourceConsumerFn(Class<?> cls, SourceRecordMapper<T> sourceRecordMapper, Integer num) {
        this.minutesToRun = -1L;
        this.connectorClass = cls;
        this.fn = sourceRecordMapper;
        this.maxRecords = num;
    }

    @DoFn.GetInitialRestriction
    public OffsetHolder getInitialRestriction(@DoFn.Element Map<String, String> map) throws IOException {
        startTime = new DateTime();
        return new OffsetHolder(null, null, null, this.maxRecords, this.minutesToRun);
    }

    @DoFn.NewTracker
    public RestrictionTracker<OffsetHolder, Map<String, Object>> newTracker(@DoFn.Restriction OffsetHolder offsetHolder) {
        return new OffsetTracker(offsetHolder);
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetHolder> getRestrictionCoder() {
        return SerializableCoder.of(OffsetHolder.class);
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation process(@DoFn.Element Map<String, String> map, RestrictionTracker<OffsetHolder, Map<String, Object>> restrictionTracker, DoFn.OutputReceiver<T> outputReceiver) throws Exception {
        List<SourceRecord> poll;
        HashMap hashMap = new HashMap(map);
        restrictionTrackers.put(getHashCode(), restrictionTracker);
        hashMap.put(BEAM_INSTANCE_PROPERTY, getHashCode());
        SourceConnector newInstance = this.connectorClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        newInstance.start(hashMap);
        SourceTask sourceTask = (SourceTask) newInstance.taskClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        try {
            try {
                LOG.debug("--------- Consumer offset from Debezium Tracker: {}", ((OffsetHolder) restrictionTracker.currentRestriction()).offset);
                sourceTask.initialize(new BeamSourceTaskContext(((OffsetHolder) restrictionTracker.currentRestriction()).offset));
                sourceTask.start((Map) newInstance.taskConfigs(1).get(0));
                poll = sourceTask.poll();
            } catch (Exception e) {
                LOG.error("-------- Error on consumer: {}. with stacktrace: {}", e.getMessage(), e.getStackTrace());
                restrictionTrackers.remove(getHashCode());
                LOG.debug("------- Stopping SourceTask");
                sourceTask.stop();
            }
            if (poll == null) {
                LOG.debug("-------- Pulled records null");
                DoFn.ProcessContinuation stop = DoFn.ProcessContinuation.stop();
                restrictionTrackers.remove(getHashCode());
                LOG.debug("------- Stopping SourceTask");
                sourceTask.stop();
                return stop;
            }
            LOG.debug("-------- {} records found", Integer.valueOf(poll.size()));
            if (!poll.isEmpty()) {
                for (SourceRecord sourceRecord : poll) {
                    LOG.debug("-------- Record found: {}", sourceRecord);
                    Map sourceOffset = sourceRecord.sourceOffset();
                    if (sourceOffset == null || !restrictionTracker.tryClaim(sourceOffset)) {
                        LOG.debug("-------- Offset null or could not be claimed");
                        DoFn.ProcessContinuation stop2 = DoFn.ProcessContinuation.stop();
                        restrictionTrackers.remove(getHashCode());
                        LOG.debug("------- Stopping SourceTask");
                        sourceTask.stop();
                        return stop2;
                    }
                    T mapSourceRecord = this.fn.mapSourceRecord(sourceRecord);
                    LOG.debug("****************** RECEIVED SOURCE AS JSON: {}", mapSourceRecord);
                    outputReceiver.output(mapSourceRecord);
                }
                sourceTask.commit();
            }
            restrictionTrackers.remove(getHashCode());
            LOG.debug("------- Stopping SourceTask");
            sourceTask.stop();
            return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1L));
        } catch (Throwable th) {
            restrictionTrackers.remove(getHashCode());
            LOG.debug("------- Stopping SourceTask");
            sourceTask.stop();
            throw th;
        }
    }

    public String getHashCode() {
        return Integer.toString(System.identityHashCode(this));
    }
}
