/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Callback;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.ProducerIdAndEpoch;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.header.Header;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.AbstractRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.CompressionType;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.MemoryRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.Record;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.RecordBatch;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.TimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ProducerBatch {
    private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class);
    final long createdMs;
    final TopicPartition topicPartition;
    final ProduceRequestResult produceFuture;
    private final List<Thunk> thunks = new ArrayList<Thunk>();
    private final MemoryRecordsBuilder recordsBuilder;
    private final AtomicInteger attempts = new AtomicInteger(0);
    private final boolean isSplitBatch;
    private final AtomicReference<FinalState> finalState = new AtomicReference<Object>(null);
    int recordCount;
    int maxRecordSize;
    private long lastAttemptMs;
    private long lastAppendTime;
    private long drainedMs;
    private String expiryErrorMessage;
    private boolean retry;
    private boolean reopened = false;

    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
        this(tp, recordsBuilder, now, false);
    }

    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now, boolean isSplitBatch) {
        this.createdMs = now;
        this.lastAttemptMs = now;
        this.recordsBuilder = recordsBuilder;
        this.topicPartition = tp;
        this.lastAppendTime = this.createdMs;
        this.produceFuture = new ProduceRequestResult(this.topicPartition);
        this.retry = false;
        this.isSplitBatch = isSplitBatch;
        float compressionRatioEstimation = CompressionRatioEstimator.estimation(this.topicPartition.topic(), recordsBuilder.compressionType());
        recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
    }

    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
        if (!this.recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return null;
        }
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(this.magic(), this.recordsBuilder.compressionType(), key, value, headers));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length);
        this.thunks.add(new Thunk(callback, future));
        ++this.recordCount;
        return future;
    }

    private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
        if (!this.recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return false;
        }
        this.recordsBuilder.append(timestamp, key, value, headers);
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(this.magic(), this.recordsBuilder.compressionType(), key, value, headers));
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, thunk.future.checksumOrNull(), key == null ? -1 : key.remaining(), value == null ? -1 : value.remaining());
        thunk.future.chain(future);
        this.thunks.add(thunk);
        ++this.recordCount;
        return true;
    }

    public void abort(RuntimeException exception) {
        if (!this.finalState.compareAndSet(null, FinalState.ABORTED)) {
            throw new IllegalStateException("Batch has already been completed in final state " + (Object)((Object)this.finalState.get()));
        }
        log.trace("Aborting batch for partition {}", (Object)this.topicPartition, (Object)exception);
        this.completeFutureAndFireCallbacks(-1L, -1L, exception);
    }

    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
        FinalState finalState;
        if (exception == null) {
            log.trace("Successfully produced messages to {} with base offset {}.", (Object)this.topicPartition, (Object)baseOffset);
            finalState = FinalState.SUCCEEDED;
        } else {
            log.trace("Failed to produce messages to {}.", (Object)this.topicPartition, (Object)exception);
            finalState = FinalState.FAILED;
        }
        if (!this.finalState.compareAndSet(null, finalState)) {
            if (this.finalState.get() == FinalState.ABORTED) {
                log.debug("ProduceResponse returned for {} after batch had already been aborted.", (Object)this.topicPartition);
                return false;
            }
            throw new IllegalStateException("Batch has already been completed in final state " + (Object)((Object)this.finalState.get()));
        }
        this.completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }

    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        this.produceFuture.set(baseOffset, logAppendTime, exception);
        for (Thunk thunk : this.thunks) {
            try {
                if (exception == null) {
                    RecordMetadata metadata = thunk.future.value();
                    if (thunk.callback == null) continue;
                    thunk.callback.onCompletion(metadata, null);
                    continue;
                }
                if (thunk.callback == null) continue;
                thunk.callback.onCompletion(null, exception);
            }
            catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", (Object)this.topicPartition, (Object)e);
            }
        }
        this.produceFuture.done();
    }

    public Deque<ProducerBatch> split(int splitBatchSize) {
        ArrayDeque<ProducerBatch> batches = new ArrayDeque<ProducerBatch>();
        MemoryRecords memoryRecords = this.recordsBuilder.build();
        Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator();
        if (!recordBatchIter.hasNext()) {
            throw new IllegalStateException("Cannot split an empty producer batch.");
        }
        RecordBatch recordBatch = recordBatchIter.next();
        if (recordBatch.magic() < 2 && !recordBatch.isCompressed()) {
            throw new IllegalArgumentException("Batch splitting cannot be used with non-compressed messages with version v0 and v1");
        }
        if (recordBatchIter.hasNext()) {
            throw new IllegalArgumentException("A producer batch should only have one record batch.");
        }
        Iterator<Thunk> thunkIter = this.thunks.iterator();
        ProducerBatch batch = null;
        for (Record record : recordBatch) {
            assert (thunkIter.hasNext());
            Thunk thunk = thunkIter.next();
            if (batch == null) {
                batch = this.createBatchOffAccumulatorForRecord(record, splitBatchSize);
            }
            if (batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) continue;
            batches.add(batch);
            batch = this.createBatchOffAccumulatorForRecord(record, splitBatchSize);
            batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
        }
        if (batch != null) {
            batches.add(batch);
        }
        this.produceFuture.set(-1L, -1L, new RecordBatchTooLargeException());
        this.produceFuture.done();
        if (this.hasSequence()) {
            int sequence = this.baseSequence();
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(this.producerId(), this.producerEpoch());
            for (ProducerBatch newBatch : batches) {
                newBatch.setProducerState(producerIdAndEpoch, sequence, this.isTransactional());
                sequence += newBatch.recordCount;
            }
        }
        return batches;
    }

    private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) {
        int initialSize = Math.max(AbstractRecords.estimateSizeInBytesUpperBound(this.magic(), this.recordsBuilder.compressionType(), record.key(), record.value(), record.headers()), batchSize);
        ByteBuffer buffer = ByteBuffer.allocate(initialSize);
        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, this.magic(), this.recordsBuilder.compressionType(), TimestampType.CREATE_TIME, 0L);
        return new ProducerBatch(this.topicPartition, builder, this.createdMs, true);
    }

    public boolean isCompressed() {
        return this.recordsBuilder.compressionType() != CompressionType.NONE;
    }

    public String toString() {
        return "ProducerBatch(topicPartition=" + this.topicPartition + ", recordCount=" + this.recordCount + ")";
    }

    boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
        boolean expired;
        if (!this.inRetry() && isFull && (long)requestTimeoutMs < now - this.lastAppendTime) {
            this.expiryErrorMessage = now - this.lastAppendTime + " ms has passed since last append";
        } else if (!this.inRetry() && (long)requestTimeoutMs < this.createdTimeMs(now) - lingerMs) {
            this.expiryErrorMessage = this.createdTimeMs(now) - lingerMs + " ms has passed since batch creation plus linger time";
        } else if (this.inRetry() && (long)requestTimeoutMs < this.waitedTimeMs(now) - retryBackoffMs) {
            this.expiryErrorMessage = this.waitedTimeMs(now) - retryBackoffMs + " ms has passed since last attempt plus backoff time";
        }
        boolean bl = expired = this.expiryErrorMessage != null;
        if (expired) {
            this.abortRecordAppends();
        }
        return expired;
    }

    TimeoutException timeoutException() {
        if (this.expiryErrorMessage == null) {
            throw new IllegalStateException("Batch has not expired");
        }
        return new TimeoutException("Expiring " + this.recordCount + " record(s) for " + this.topicPartition + ": " + this.expiryErrorMessage);
    }

    int attempts() {
        return this.attempts.get();
    }

    void reenqueued(long now) {
        this.attempts.getAndIncrement();
        this.lastAttemptMs = Math.max(this.lastAppendTime, now);
        this.lastAppendTime = Math.max(this.lastAppendTime, now);
        this.retry = true;
    }

    long queueTimeMs() {
        return this.drainedMs - this.createdMs;
    }

    long createdTimeMs(long nowMs) {
        return Math.max(0L, nowMs - this.createdMs);
    }

    long waitedTimeMs(long nowMs) {
        return Math.max(0L, nowMs - this.lastAttemptMs);
    }

    void drained(long nowMs) {
        this.drainedMs = Math.max(this.drainedMs, nowMs);
    }

    boolean isSplitBatch() {
        return this.isSplitBatch;
    }

    public boolean inRetry() {
        return this.retry;
    }

    public MemoryRecords records() {
        return this.recordsBuilder.build();
    }

    public int estimatedSizeInBytes() {
        return this.recordsBuilder.estimatedSizeInBytes();
    }

    public double compressionRatio() {
        return this.recordsBuilder.compressionRatio();
    }

    public boolean isFull() {
        return this.recordsBuilder.isFull();
    }

    public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
        this.recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
    }

    public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
        this.reopened = true;
        this.recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
    }

    public void closeForRecordAppends() {
        this.recordsBuilder.closeForRecordAppends();
    }

    public void close() {
        this.recordsBuilder.close();
        if (!this.recordsBuilder.isControlBatch()) {
            CompressionRatioEstimator.updateEstimation(this.topicPartition.topic(), this.recordsBuilder.compressionType(), (float)this.recordsBuilder.compressionRatio());
        }
        this.reopened = false;
    }

    public void abortRecordAppends() {
        this.recordsBuilder.abort();
    }

    public boolean isClosed() {
        return this.recordsBuilder.isClosed();
    }

    public ByteBuffer buffer() {
        return this.recordsBuilder.buffer();
    }

    public int initialCapacity() {
        return this.recordsBuilder.initialCapacity();
    }

    public boolean isWritable() {
        return !this.recordsBuilder.isClosed();
    }

    public byte magic() {
        return this.recordsBuilder.magic();
    }

    public long producerId() {
        return this.recordsBuilder.producerId();
    }

    public short producerEpoch() {
        return this.recordsBuilder.producerEpoch();
    }

    public int baseSequence() {
        return this.recordsBuilder.baseSequence();
    }

    public boolean hasSequence() {
        return this.baseSequence() != -1;
    }

    public boolean isTransactional() {
        return this.recordsBuilder.isTransactional();
    }

    public boolean sequenceHasBeenReset() {
        return this.reopened;
    }

    private static final class Thunk {
        final Callback callback;
        final FutureRecordMetadata future;

        Thunk(Callback callback, FutureRecordMetadata future) {
            this.callback = callback;
            this.future = future;
        }
    }

    private static enum FinalState {
        ABORTED,
        FAILED,
        SUCCEEDED;

    }
}

