package org.apache.kafka.connect.integration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.runtime.TestSourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.tools.ThroughputThrottler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/integration/MonitorableSourceConnector.class */
public class MonitorableSourceConnector extends TestSourceConnector {
    private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class);
    private String connectorName;
    private ConnectorHandle connectorHandle;
    private Map<String, String> commonConfigs;

    /* loaded from: input_file:org/apache/kafka/connect/integration/MonitorableSourceConnector$MonitorableSourceTask.class */
    public static class MonitorableSourceTask extends SourceTask {
        private String connectorName;
        private String taskId;
        private String topicName;
        private TaskHandle taskHandle;
        private volatile boolean stopped;
        private long startingSeqno;
        private long seqno;
        private long throughput;
        private int batchSize;
        private ThroughputThrottler throttler;

        public String version() {
            return "unknown";
        }

        public void start(Map<String, String> map) {
            this.taskId = map.get("task.id");
            this.connectorName = map.get("connector.name");
            this.topicName = map.getOrDefault("topic", "sequential-topic");
            this.throughput = Long.valueOf(map.getOrDefault("throughput", "-1")).longValue();
            this.batchSize = Integer.valueOf(map.getOrDefault("messages.per.poll", "1")).intValue();
            this.taskHandle = RuntimeHandles.get().connectorHandle(this.connectorName).taskHandle(this.taskId);
            this.startingSeqno = ((Long) Optional.ofNullable((Long) ((Map) Optional.ofNullable(this.context.offsetStorageReader().offset(Collections.singletonMap("task.id", this.taskId))).orElse(Collections.emptyMap())).get("saved")).orElse(0L)).longValue();
            MonitorableSourceConnector.log.info("Started {} task {}", getClass().getSimpleName(), this.taskId);
            this.throttler = new ThroughputThrottler(this.throughput, System.currentTimeMillis());
        }

        public List<SourceRecord> poll() {
            if (this.stopped) {
                return null;
            }
            if (this.throttler.shouldThrottle(this.seqno - this.startingSeqno, System.currentTimeMillis())) {
                this.throttler.throttle();
            }
            this.taskHandle.record(this.batchSize);
            return (List) LongStream.range(0L, this.batchSize).mapToObj(j -> {
                Map singletonMap = Collections.singletonMap("task.id", this.taskId);
                long j = this.seqno + 1;
                this.seqno = j;
                return new SourceRecord(singletonMap, Collections.singletonMap("saved", Long.valueOf(j)), this.topicName, (Integer) null, Schema.STRING_SCHEMA, "key-" + this.taskId + "-" + this.seqno, Schema.STRING_SCHEMA, "value-" + this.taskId + "-" + this.seqno);
            }).collect(Collectors.toList());
        }

        public void commit() {
            MonitorableSourceConnector.log.info("Task {} committing offsets", this.taskId);
        }

        public void commitRecord(SourceRecord sourceRecord) {
            MonitorableSourceConnector.log.trace("Committing record: {}", sourceRecord);
            this.taskHandle.commit();
        }

        public void stop() {
            MonitorableSourceConnector.log.info("Stopped {} task {}", getClass().getSimpleName(), this.taskId);
            this.stopped = true;
        }
    }

    @Override // org.apache.kafka.connect.runtime.TestSourceConnector
    public void start(Map<String, String> map) {
        this.connectorHandle = RuntimeHandles.get().connectorHandle(map.get("name"));
        this.connectorName = this.connectorHandle.name();
        this.commonConfigs = map;
        log.info("Started {} connector {}", getClass().getSimpleName(), this.connectorName);
    }

    @Override // org.apache.kafka.connect.runtime.TestSourceConnector
    public Class<? extends Task> taskClass() {
        return MonitorableSourceTask.class;
    }

    @Override // org.apache.kafka.connect.runtime.TestSourceConnector
    public List<Map<String, String>> taskConfigs(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            HashMap hashMap = new HashMap(this.commonConfigs);
            hashMap.put("connector.name", this.connectorName);
            hashMap.put("task.id", this.connectorName + "-" + i2);
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    @Override // org.apache.kafka.connect.runtime.TestSourceConnector
    public void stop() {
        log.info("Stopped {} connector {}", getClass().getSimpleName(), this.connectorName);
    }

    @Override // org.apache.kafka.connect.runtime.TestSourceConnector
    public ConfigDef config() {
        log.info("Configured {} connector {}", getClass().getSimpleName(), this.connectorName);
        return new ConfigDef();
    }
}
