package io.confluent.connect.s3;

import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.storage.format.RecordWriter;
import io.confluent.connect.storage.format.RecordWriterProvider;
import io.confluent.connect.storage.partitioner.Partitioner;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.storage.partitioner.TimestampExtractor;
import io.confluent.connect.storage.schema.StorageSchemaCompatibility;
import io.confluent.connect.storage.util.DateTimeUtils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.errors.SchemaProjectorException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/s3/TopicPartitionWriter.class */
public class TopicPartitionWriter {
    private final Map<String, String> commitFiles;
    private final Map<String, RecordWriter> writers;
    private final Map<String, Schema> currentSchemas;
    private final TopicPartition tp;
    private final Partitioner<FieldSchema> partitioner;
    private final TimestampExtractor timestampExtractor;
    private String topicsDir;
    private State state;
    private final Queue<SinkRecord> buffer;
    private final SinkTaskContext context;
    private int recordCount;
    private final int flushSize;
    private final long rotateIntervalMs;
    private final long rotateScheduleIntervalMs;
    private long nextScheduledRotation;
    private long currentOffset;
    private Long currentTimestamp;
    private String currentEncodedPartition;
    private Long baseRecordTimestamp;
    private Long offsetToCommit;
    private final RecordWriterProvider<S3SinkConnectorConfig> writerProvider;
    private final Map<String, Long> startOffsets;
    private long timeoutMs;
    private long failureTime;
    private final StorageSchemaCompatibility compatibility;
    private final String extension;
    private final String zeroPadOffsetFormat;
    private final String dirDelim;
    private final String fileDelim;
    private final Time time;
    private DateTimeZone timeZone;
    private final S3SinkConnectorConfig connectorConfig;
    private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
    private static final Time SYSTEM_TIME = new SystemTime();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/connect/s3/TopicPartitionWriter$State.class */
    public enum State {
        WRITE_STARTED,
        WRITE_PARTITION_PAUSED,
        SHOULD_ROTATE,
        FILE_COMMITTED;

        private static final State[] VALS = values();

        public State next() {
            return VALS[(ordinal() + 1) % VALS.length];
        }
    }

    public TopicPartitionWriter(TopicPartition topicPartition, S3Storage s3Storage, RecordWriterProvider<S3SinkConnectorConfig> recordWriterProvider, Partitioner<FieldSchema> partitioner, S3SinkConnectorConfig s3SinkConnectorConfig, SinkTaskContext sinkTaskContext) {
        this(topicPartition, recordWriterProvider, partitioner, s3SinkConnectorConfig, sinkTaskContext, SYSTEM_TIME);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TopicPartitionWriter(TopicPartition topicPartition, RecordWriterProvider<S3SinkConnectorConfig> recordWriterProvider, Partitioner<FieldSchema> partitioner, S3SinkConnectorConfig s3SinkConnectorConfig, SinkTaskContext sinkTaskContext, Time time) {
        this.connectorConfig = s3SinkConnectorConfig;
        this.time = time;
        this.tp = topicPartition;
        this.context = sinkTaskContext;
        this.writerProvider = recordWriterProvider;
        this.partitioner = partitioner;
        this.timestampExtractor = partitioner instanceof TimeBasedPartitioner ? ((TimeBasedPartitioner) partitioner).getTimestampExtractor() : null;
        this.flushSize = s3SinkConnectorConfig.getInt("flush.size").intValue();
        this.topicsDir = s3SinkConnectorConfig.getString("topics.dir");
        this.rotateIntervalMs = s3SinkConnectorConfig.getLong("rotate.interval.ms").longValue();
        this.rotateScheduleIntervalMs = s3SinkConnectorConfig.getLong("rotate.schedule.interval.ms").longValue();
        if (this.rotateScheduleIntervalMs > 0) {
            this.timeZone = DateTimeZone.forID(s3SinkConnectorConfig.getString("timezone"));
        }
        this.timeoutMs = s3SinkConnectorConfig.getLong("retry.backoff.ms").longValue();
        this.compatibility = StorageSchemaCompatibility.getCompatibility(s3SinkConnectorConfig.getString("schema.compatibility"));
        this.buffer = new LinkedList();
        this.commitFiles = new HashMap();
        this.writers = new HashMap();
        this.currentSchemas = new HashMap();
        this.startOffsets = new HashMap();
        this.state = State.WRITE_STARTED;
        this.failureTime = -1L;
        this.currentOffset = -1L;
        this.dirDelim = s3SinkConnectorConfig.getString("directory.delim");
        this.fileDelim = s3SinkConnectorConfig.getString("file.delim");
        this.extension = recordWriterProvider.getExtension();
        this.zeroPadOffsetFormat = "%0" + s3SinkConnectorConfig.getInt("filename.offset.zero.pad.width") + "d";
        setNextScheduledRotation();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:12:0x0039. Please report as an issue. */
    public void write() {
        long milliseconds = this.time.milliseconds();
        if (this.failureTime <= 0 || milliseconds - this.failureTime >= this.timeoutMs) {
            while (!this.buffer.isEmpty()) {
                try {
                    switch (this.state) {
                        case WRITE_STARTED:
                            pause();
                            nextState();
                        case WRITE_PARTITION_PAUSED:
                            SinkRecord peek = this.buffer.peek();
                            if (this.timestampExtractor != null) {
                                this.currentTimestamp = this.timestampExtractor.extract(peek);
                                if (this.baseRecordTimestamp == null) {
                                    this.baseRecordTimestamp = this.currentTimestamp;
                                }
                            }
                            Schema valueSchema = peek.valueSchema();
                            String encodePartition = this.partitioner.encodePartition(peek);
                            Schema schema = this.currentSchemas.get(encodePartition);
                            if (schema == null) {
                                this.currentSchemas.put(encodePartition, valueSchema);
                                schema = valueSchema;
                            }
                            if (this.compatibility.shouldChangeSchema(peek, (Schema) null, schema) && this.recordCount > 0) {
                                log.trace("Incompatible change of schema detected for record '{}' with encoded partition '{}' and current offset: '{}'", new Object[]{peek, encodePartition, Long.valueOf(this.currentOffset)});
                                this.currentSchemas.put(encodePartition, valueSchema);
                                nextState();
                            } else if (rotateOnTime(encodePartition, this.currentTimestamp, milliseconds)) {
                                setNextScheduledRotation();
                                nextState();
                            } else {
                                this.currentEncodedPartition = encodePartition;
                                writeRecord(this.compatibility.project(peek, (Schema) null, schema));
                                this.buffer.poll();
                                if (rotateOnSize()) {
                                    log.info("Starting commit and rotation for topic partition {} with start offset {}", this.tp, this.startOffsets);
                                    nextState();
                                }
                            }
                            break;
                        case SHOULD_ROTATE:
                            commitFiles();
                            nextState();
                        case FILE_COMMITTED:
                            setState(State.WRITE_PARTITION_PAUSED);
                        default:
                            log.error("{} is not a valid state to write record for topic partition {}.", this.state, this.tp);
                    }
                } catch (SchemaProjectorException | IllegalWorkerStateException e) {
                    throw new ConnectException(e);
                } catch (RetriableException e2) {
                    log.error("Exception on topic partition {}: ", this.tp, e2);
                    this.failureTime = this.time.milliseconds();
                    setRetryTimeout(this.timeoutMs);
                }
            }
            if (this.buffer.isEmpty()) {
                if (this.recordCount > 0 && rotateOnTime(this.currentEncodedPartition, this.currentTimestamp, milliseconds)) {
                    log.info("Committing files after waiting for rotateIntervalMs time but less than flush.size records available.");
                    setNextScheduledRotation();
                    try {
                        commitFiles();
                    } catch (ConnectException e3) {
                        log.error("Exception on topic partition {}: ", this.tp, e3);
                        this.failureTime = this.time.milliseconds();
                        setRetryTimeout(this.timeoutMs);
                    }
                }
                resume();
                setState(State.WRITE_STARTED);
            }
        }
    }

    public void close() throws ConnectException {
        log.debug("Closing TopicPartitionWriter {}", this.tp);
        Iterator<RecordWriter> it = this.writers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.writers.clear();
        this.startOffsets.clear();
    }

    public void buffer(SinkRecord sinkRecord) {
        this.buffer.add(sinkRecord);
    }

    public Long getOffsetToCommitAndReset() {
        Long l = this.offsetToCommit;
        this.offsetToCommit = null;
        return l;
    }

    private String getDirectoryPrefix(String str) {
        return this.partitioner.generatePartitionedPath(this.tp.topic(), str);
    }

    private void nextState() {
        this.state = this.state.next();
    }

    private void setState(State state) {
        this.state = state;
    }

    private boolean rotateOnTime(String str, Long l, long j) {
        if (this.recordCount <= 0) {
            return false;
        }
        boolean z = this.rotateIntervalMs > 0 && (l.longValue() - this.baseRecordTimestamp.longValue() >= this.rotateIntervalMs || !str.equals(this.currentEncodedPartition));
        log.trace("Checking rotation on time with recordCount '{}' and encodedPartition '{}'", Integer.valueOf(this.recordCount), str);
        log.trace("Should apply periodic time-based rotation (rotateIntervalMs: '{}', baseRecordTimestamp: '{}', timestamp: '{}')? {}", new Object[]{Long.valueOf(this.rotateIntervalMs), this.baseRecordTimestamp, l, Boolean.valueOf(z)});
        boolean z2 = this.rotateScheduleIntervalMs > 0 && j >= this.nextScheduledRotation;
        log.trace("Should apply scheduled rotation: (rotateScheduleIntervalMs: '{}', nextScheduledRotation: '{}', now: '{}')? {}", new Object[]{Long.valueOf(this.rotateScheduleIntervalMs), Long.valueOf(this.nextScheduledRotation), Long.valueOf(j), Boolean.valueOf(z2)});
        return z || z2;
    }

    private void setNextScheduledRotation() {
        if (this.rotateScheduleIntervalMs > 0) {
            this.nextScheduledRotation = DateTimeUtils.getNextTimeAdjustedByDay(this.time.milliseconds(), this.rotateScheduleIntervalMs, this.timeZone);
            if (log.isDebugEnabled()) {
                log.debug("Update scheduled rotation timer. Next rotation for {} will be at {}", this.tp, new DateTime(this.nextScheduledRotation).withZone(this.timeZone).toString());
            }
        }
    }

    private boolean rotateOnSize() {
        boolean z = this.recordCount >= this.flushSize;
        log.trace("Should apply size-based rotation (count {} >= flush size {})? {}", new Object[]{Integer.valueOf(this.recordCount), Integer.valueOf(this.flushSize), Boolean.valueOf(z)});
        return z;
    }

    private void pause() {
        log.trace("Pausing writer for topic-partition '{}'", this.tp);
        this.context.pause(new TopicPartition[]{this.tp});
    }

    private void resume() {
        log.trace("Resuming writer for topic-partition '{}'", this.tp);
        this.context.resume(new TopicPartition[]{this.tp});
    }

    private RecordWriter getWriter(SinkRecord sinkRecord, String str) throws ConnectException {
        if (this.writers.containsKey(str)) {
            return this.writers.get(str);
        }
        RecordWriter recordWriter = this.writerProvider.getRecordWriter(this.connectorConfig, getCommitFilename(str));
        this.writers.put(str, recordWriter);
        return recordWriter;
    }

    private String getCommitFilename(String str) {
        String fileKeyToCommit;
        if (this.commitFiles.containsKey(str)) {
            fileKeyToCommit = this.commitFiles.get(str);
        } else {
            fileKeyToCommit = fileKeyToCommit(getDirectoryPrefix(str), this.startOffsets.get(str).longValue());
            this.commitFiles.put(str, fileKeyToCommit);
        }
        return fileKeyToCommit;
    }

    private String fileKey(String str, String str2, String str3) {
        return str + this.dirDelim + str2 + this.dirDelim + str3;
    }

    private String fileKeyToCommit(String str, long j) {
        return fileKey(this.topicsDir, str, this.tp.topic() + this.fileDelim + this.tp.partition() + this.fileDelim + String.format(this.zeroPadOffsetFormat, Long.valueOf(j)) + this.extension);
    }

    private void writeRecord(SinkRecord sinkRecord) {
        this.currentOffset = sinkRecord.kafkaOffset();
        if (!this.startOffsets.containsKey(this.currentEncodedPartition)) {
            log.trace("Setting writer's start offset for '{}' to {}", this.currentEncodedPartition, Long.valueOf(this.currentOffset));
            this.startOffsets.put(this.currentEncodedPartition, Long.valueOf(this.currentOffset));
        }
        getWriter(sinkRecord, this.currentEncodedPartition).write(sinkRecord);
        this.recordCount++;
    }

    private void commitFiles() {
        for (Map.Entry<String, String> entry : this.commitFiles.entrySet()) {
            commitFile(entry.getKey());
            log.debug("Committed {} for {}", entry.getValue(), this.tp);
        }
        this.offsetToCommit = Long.valueOf(this.currentOffset + 1);
        this.commitFiles.clear();
        this.currentSchemas.clear();
        this.recordCount = 0;
        this.baseRecordTimestamp = null;
        log.info("Files committed to S3. Target commit offset for {} is {}", this.tp, this.offsetToCommit);
    }

    private void commitFile(String str) {
        if (!this.startOffsets.containsKey(str)) {
            log.warn("Tried to commit file with missing starting offset partition: {}. Ignoring.");
            return;
        }
        if (this.writers.containsKey(str)) {
            this.writers.get(str).commit();
            this.writers.remove(str);
            log.debug("Removed writer for '{}'", str);
        }
        this.startOffsets.remove(str);
    }

    private void setRetryTimeout(long j) {
        this.context.timeout(j);
    }
}
