package org.apache.kafka.connect.integration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.runtime.SampleSourceConnector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.tools.ThroughputThrottler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/integration/MonitorableSourceConnector.class */
public class MonitorableSourceConnector extends SampleSourceConnector {
    private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class);
    public static final String TOPIC_CONFIG = "topic";
    public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll";
    public static final String CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG = "custom.exactly.once.support";
    public static final String EXACTLY_ONCE_SUPPORTED = "supported";
    public static final String EXACTLY_ONCE_UNSUPPORTED = "unsupported";
    public static final String EXACTLY_ONCE_NULL = "null";
    public static final String EXACTLY_ONCE_FAIL = "fail";
    public static final String CUSTOM_TRANSACTION_BOUNDARIES_CONFIG = "custom.transaction.boundaries";
    public static final String TRANSACTION_BOUNDARIES_SUPPORTED = "supported";
    public static final String TRANSACTION_BOUNDARIES_UNSUPPORTED = "unsupported";
    public static final String TRANSACTION_BOUNDARIES_NULL = "null";
    public static final String TRANSACTION_BOUNDARIES_FAIL = "fail";
    private String connectorName;
    private ConnectorHandle connectorHandle;
    private Map<String, String> commonConfigs;

    /* loaded from: input_file:org/apache/kafka/connect/integration/MonitorableSourceConnector$MonitorableSourceTask.class */
    public static class MonitorableSourceTask extends SourceTask {
        private String taskId;
        private String topicName;
        private TaskHandle taskHandle;
        private volatile boolean stopped;
        private long startingSeqno;
        private long seqno;
        private int batchSize;
        private ThroughputThrottler throttler;
        private long priorTransactionBoundary;
        private long nextTransactionBoundary;

        public String version() {
            return "unknown";
        }

        public void start(Map<String, String> map) {
            this.taskId = map.get("task.id");
            String str = map.get("connector.name");
            this.topicName = map.getOrDefault(MonitorableSourceConnector.TOPIC_CONFIG, "sequential-topic");
            this.batchSize = Integer.parseInt(map.getOrDefault(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "1"));
            this.taskHandle = RuntimeHandles.get().connectorHandle(str).taskHandle(this.taskId);
            this.startingSeqno = ((Long) Optional.ofNullable((Long) ((Map) Optional.ofNullable(this.context.offsetStorageReader().offset(MonitorableSourceConnector.sourcePartition(this.taskId))).orElse(Collections.emptyMap())).get("saved")).orElse(0L)).longValue();
            this.seqno = this.startingSeqno;
            MonitorableSourceConnector.log.info("Started {} task {} with properties {}", new Object[]{getClass().getSimpleName(), this.taskId, map});
            this.throttler = new ThroughputThrottler(Long.parseLong(map.getOrDefault("throughput", "-1")), System.currentTimeMillis());
            this.taskHandle.recordTaskStart();
            this.priorTransactionBoundary = 0L;
            this.nextTransactionBoundary = 1L;
            if (Boolean.parseBoolean(map.getOrDefault("task-" + this.taskId + ".start.inject.error", "false"))) {
                throw new RuntimeException("Injecting errors during task start");
            }
            calculateNextBoundary();
        }

        public List<SourceRecord> poll() {
            if (this.stopped) {
                return null;
            }
            if (this.throttler.shouldThrottle(this.seqno - this.startingSeqno, System.currentTimeMillis())) {
                this.throttler.throttle();
            }
            this.taskHandle.record(this.batchSize);
            MonitorableSourceConnector.log.trace("Returning batch of {} records", Integer.valueOf(this.batchSize));
            return (List) LongStream.range(0L, this.batchSize).mapToObj(j -> {
                this.seqno++;
                SourceRecord sourceRecord = new SourceRecord(MonitorableSourceConnector.sourcePartition(this.taskId), MonitorableSourceConnector.sourceOffset(this.seqno), this.topicName, (Integer) null, Schema.STRING_SCHEMA, "key-" + this.taskId + "-" + this.seqno, Schema.STRING_SCHEMA, "value-" + this.taskId + "-" + this.seqno, (Long) null, new ConnectHeaders().addLong("header-" + this.seqno, this.seqno));
                maybeDefineTransactionBoundary(sourceRecord);
                return sourceRecord;
            }).collect(Collectors.toList());
        }

        public void commit() {
            MonitorableSourceConnector.log.info("Task {} committing offsets", this.taskId);
        }

        public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
            MonitorableSourceConnector.log.trace("Committing record: {}", sourceRecord);
            this.taskHandle.commit();
        }

        public void stop() {
            MonitorableSourceConnector.log.info("Stopped {} task {}", getClass().getSimpleName(), this.taskId);
            this.stopped = true;
            this.taskHandle.recordTaskStop();
        }

        private void calculateNextBoundary() {
            while (this.nextTransactionBoundary <= this.seqno) {
                this.nextTransactionBoundary += this.priorTransactionBoundary;
                this.priorTransactionBoundary = this.nextTransactionBoundary - this.priorTransactionBoundary;
            }
        }

        private void maybeDefineTransactionBoundary(SourceRecord sourceRecord) {
            if (this.context.transactionContext() == null || this.seqno != this.nextTransactionBoundary) {
                return;
            }
            boolean z = this.nextTransactionBoundary % 2 == 0;
            calculateNextBoundary();
            if (z) {
                this.context.transactionContext().abortTransaction(sourceRecord);
            } else {
                this.context.transactionContext().commitTransaction(sourceRecord);
            }
        }
    }

    @Override // org.apache.kafka.connect.runtime.SampleSourceConnector
    public void start(Map<String, String> map) {
        this.connectorHandle = RuntimeHandles.get().connectorHandle(map.get("name"));
        this.connectorName = this.connectorHandle.name();
        this.commonConfigs = map;
        log.info("Started {} connector {}", getClass().getSimpleName(), this.connectorName);
        this.connectorHandle.recordConnectorStart();
        if (Boolean.parseBoolean(map.getOrDefault("connector.start.inject.error", "false"))) {
            throw new RuntimeException("Injecting errors during connector start");
        }
    }

    @Override // org.apache.kafka.connect.runtime.SampleSourceConnector
    public Class<? extends Task> taskClass() {
        return MonitorableSourceTask.class;
    }

    @Override // org.apache.kafka.connect.runtime.SampleSourceConnector
    public List<Map<String, String>> taskConfigs(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            HashMap hashMap = new HashMap(this.commonConfigs);
            hashMap.put("connector.name", this.connectorName);
            hashMap.put("task.id", taskId(this.connectorName, i2));
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    @Override // org.apache.kafka.connect.runtime.SampleSourceConnector
    public void stop() {
        log.info("Stopped {} connector {}", getClass().getSimpleName(), this.connectorName);
        this.connectorHandle.recordConnectorStop();
    }

    @Override // org.apache.kafka.connect.runtime.SampleSourceConnector
    public ConfigDef config() {
        log.info("Configured {} connector {}", getClass().getSimpleName(), this.connectorName);
        return new ConfigDef();
    }

    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> map) {
        String lowerCase = map.getOrDefault(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "null").toLowerCase(Locale.ROOT);
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -19802962:
                if (lowerCase.equals("supported")) {
                    z = false;
                    break;
                }
                break;
            case 3135262:
                if (lowerCase.equals("fail")) {
                    z = 2;
                    break;
                }
                break;
            case 3392903:
                if (lowerCase.equals("null")) {
                    z = 4;
                    break;
                }
                break;
            case 48636469:
                if (lowerCase.equals("unsupported")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return ExactlyOnceSupport.SUPPORTED;
            case true:
                return ExactlyOnceSupport.UNSUPPORTED;
            case true:
                throw new ConnectException("oops");
            case true:
            case true:
            default:
                return null;
        }
    }

    public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> map) {
        String lowerCase = map.getOrDefault(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, "unsupported").toLowerCase(Locale.ROOT);
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -19802962:
                if (lowerCase.equals("supported")) {
                    z = false;
                    break;
                }
                break;
            case 3135262:
                if (lowerCase.equals("fail")) {
                    z = true;
                    break;
                }
                break;
            case 3392903:
                if (lowerCase.equals("null")) {
                    z = 2;
                    break;
                }
                break;
            case 48636469:
                if (lowerCase.equals("unsupported")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return ConnectorTransactionBoundaries.SUPPORTED;
            case true:
                throw new ConnectException("oh no :(");
            case true:
                return null;
            case true:
            case true:
            default:
                return ConnectorTransactionBoundaries.UNSUPPORTED;
        }
    }

    public static String taskId(String str, int i) {
        return str + "-" + i;
    }

    public static Map<String, Object> sourcePartition(String str) {
        return Collections.singletonMap("task.id", str);
    }

    public static Map<String, Object> sourceOffset(long j) {
        return Collections.singletonMap("saved", Long.valueOf(j));
    }
}
