package net.snowflake.ingest.streaming.internal;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import net.snowflake.ingest.internal.apache.arrow.memory.BufferAllocator;
import net.snowflake.ingest.internal.apache.arrow.memory.RootAllocator;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.class */
public class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIngestChannel {
    private static final Logging logger = new Logging(SnowflakeStreamingIngestChannelInternal.class);
    private final String channelName;
    private final String dbName;
    private final String schemaName;
    private final String tableName;
    private volatile String offsetToken;
    private final AtomicLong rowSequencer;
    private final Long channelSequencer;
    private final ArrowRowBuffer arrowBuffer;
    private volatile boolean isValid;
    private volatile boolean isClosed;
    private final SnowflakeStreamingIngestClientInternal owningClient;
    private final BufferAllocator allocator;
    private final String encryptionKey;
    private final Long encryptionKeyId;
    private boolean isTestMode;
    private final OpenChannelRequest.OnErrorOption onErrorOption;

    SnowflakeStreamingIngestChannelInternal(String str, String str2, String str3, String str4, String str5, Long l, Long l2, SnowflakeStreamingIngestClientInternal snowflakeStreamingIngestClientInternal, String str6, Long l3, OpenChannelRequest.OnErrorOption onErrorOption, boolean z) {
        this.channelName = str;
        this.dbName = str2;
        this.schemaName = str3;
        this.tableName = str4;
        this.offsetToken = str5;
        this.channelSequencer = l;
        this.rowSequencer = new AtomicLong(l2.longValue());
        this.isValid = true;
        this.isClosed = false;
        this.owningClient = snowflakeStreamingIngestClientInternal;
        this.isTestMode = z;
        this.allocator = (z || this.owningClient.isTestMode()) ? new RootAllocator() : this.owningClient.getAllocator().newChildAllocator(str, 0L, this.owningClient.getAllocator().getLimit());
        this.arrowBuffer = new ArrowRowBuffer(this);
        this.encryptionKey = str6;
        this.encryptionKeyId = l3;
        this.onErrorOption = onErrorOption;
        logger.logDebug("Channel={} created for table={}", this.channelName, this.tableName);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SnowflakeStreamingIngestChannelInternal(String str, String str2, String str3, String str4, String str5, Long l, Long l2, SnowflakeStreamingIngestClientInternal snowflakeStreamingIngestClientInternal, String str6, Long l3, OpenChannelRequest.OnErrorOption onErrorOption) {
        this(str, str2, str3, str4, str5, l, l2, snowflakeStreamingIngestClientInternal, str6, l3, onErrorOption, false);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getFullyQualifiedName() {
        return String.format("%s.%s.%s.%s", this.dbName, this.schemaName, this.tableName, this.channelName);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getName() {
        return this.channelName;
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getDBName() {
        return this.dbName;
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getSchemaName() {
        return this.schemaName;
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getTableName() {
        return this.tableName;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getOffsetToken() {
        return this.offsetToken;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setOffsetToken(String str) {
        this.offsetToken = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getChannelSequencer() {
        return this.channelSequencer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long incrementAndGetRowSequencer() {
        return this.rowSequencer.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getRowSequencer() {
        return this.rowSequencer.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getEncryptionKey() {
        return this.encryptionKey;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getEncryptionKeyId() {
        return this.encryptionKeyId;
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getFullyQualifiedTableName() {
        return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelData getData() {
        return this.arrowBuffer.flush();
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public boolean isValid() {
        return this.isValid;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void invalidate() {
        this.isValid = false;
        this.arrowBuffer.close();
        logger.logWarn("Channel is invalidated, name={}, channel sequencer={}", getFullyQualifiedName(), this.channelSequencer);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public boolean isClosed() {
        return this.isClosed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markClosed() {
        this.isClosed = true;
        logger.logDebug("Channel is closed, name={}, channel sequencer={}", getFullyQualifiedName(), this.channelSequencer);
    }

    CompletableFuture<Void> flush(boolean z) {
        if (!isClosed() || z) {
            return this.arrowBuffer.getSize() == 0.0f ? CompletableFuture.completedFuture(null) : this.owningClient.flush(false);
        }
        throw new SFException(ErrorCode.CLOSED_CHANNEL, new Object[0]);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public CompletableFuture<Void> close() {
        checkValidation();
        if (isClosed()) {
            return CompletableFuture.completedFuture(null);
        }
        markClosed();
        this.owningClient.removeChannelIfSequencersMatch(this);
        return flush(true).thenRunAsync(() -> {
            List<SnowflakeStreamingIngestChannelInternal> verifyChannelsAreFullyCommitted = this.owningClient.verifyChannelsAreFullyCommitted(Collections.singletonList(this));
            this.arrowBuffer.close();
            if (!verifyChannelsAreFullyCommitted.isEmpty()) {
                throw new SFException(ErrorCode.CHANNEL_WITH_UNCOMMITTED_ROWS, verifyChannelsAreFullyCommitted.stream().map((v0) -> {
                    return v0.getFullyQualifiedName();
                }).collect(Collectors.toList()));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferAllocator getAllocator() {
        return this.allocator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setupSchema(List<ColumnMetadata> list) {
        logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), list);
        this.arrowBuffer.setupSchema(list);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public InsertValidationResponse insertRow(Map<String, Object> map, String str) {
        return insertRows(Collections.singletonList(map), str);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public InsertValidationResponse insertRows(Iterable<Map<String, Object>> iterable, String str) {
        checkValidation();
        if (isClosed()) {
            throw new SFException(ErrorCode.CLOSED_CHANNEL, new Object[0]);
        }
        throttleInsertIfNeeded(Runtime.getRuntime());
        InsertValidationResponse insertRows = this.arrowBuffer.insertRows(iterable, str);
        if (this.arrowBuffer.getSize() >= 1.6E7f) {
            this.owningClient.setNeedFlush();
        }
        return insertRows;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void collectRowSize(float f) {
        if (this.owningClient.inputThroughput != null) {
            this.owningClient.inputThroughput.mark(f);
        }
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getLatestCommittedOffsetToken() {
        checkValidation();
        return this.owningClient.getChannelsStatus(Collections.singletonList(this)).getChannels().get(0).getPersistedOffsetToken();
    }

    void throttleInsertIfNeeded(Runtime runtime) {
        int insertThrottleThresholdInPercentage = this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage();
        if ((runtime.freeMemory() * 100) / runtime.totalMemory() < insertThrottleThresholdInPercentage) {
            long j = runtime.totalMemory();
            long freeMemory = runtime.freeMemory();
            int i = 0;
            long insertThrottleIntervalInMs = this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs();
            while ((runtime.freeMemory() * 100) / runtime.totalMemory() < insertThrottleThresholdInPercentage && i < 10) {
                try {
                    Thread.sleep(insertThrottleIntervalInMs);
                    i++;
                } catch (InterruptedException e) {
                    throw new SFException(ErrorCode.INTERNAL_ERROR, "Insert throttle get interrupted");
                }
            }
            logger.logWarn("Insert throttled for {} ms due to JVM memory pressure, max memory={}, old total memory={}, old free memory={}, new total memory={}, new free memory={}.", Long.valueOf(i * insertThrottleIntervalInMs), Long.valueOf(runtime.maxMemory()), Long.valueOf(j), Long.valueOf(freeMemory), Long.valueOf(runtime.totalMemory()), Long.valueOf(runtime.freeMemory()));
        }
    }

    private void checkValidation() {
        if (isValid()) {
            return;
        }
        this.owningClient.removeChannelIfSequencersMatch(this);
        this.arrowBuffer.close();
        throw new SFException(ErrorCode.INVALID_CHANNEL, new Object[0]);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OpenChannelRequest.OnErrorOption getOnErrorOption() {
        return this.onErrorOption;
    }
}
