package net.snowflake.ingest.streaming.internal;

import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder;
import net.snowflake.ingest.internal.com.codahale.metrics.Timer;
import net.snowflake.ingest.internal.com.google.common.annotations.VisibleForTesting;
import net.snowflake.ingest.streaming.internal.BlobBuilder;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/snowflake/ingest/streaming/internal/FlushService.class */
public class FlushService<T> {
    private static final Logging logger = new Logging(FlushService.class);
    private final AtomicLong counter;
    private final SnowflakeStreamingIngestClientInternal<T> owningClient;

    @VisibleForTesting
    ScheduledExecutorService flushWorker;

    @VisibleForTesting
    ExecutorService registerWorker;

    @VisibleForTesting
    ExecutorService buildUploadWorkers;
    private final ChannelCache<T> channelCache;
    private final StreamingIngestStage targetStage;
    private final RegisterService<T> registerService;

    @VisibleForTesting
    volatile boolean isNeedFlush;

    @VisibleForTesting
    volatile long lastFlushTime;
    private final boolean isTestMode;
    private final Map<String, Timer.Context> latencyTimerContextMap;
    private final Constants.BdecVersion bdecVersion;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/snowflake/ingest/streaming/internal/FlushService$BlobData.class */
    public static class BlobData<T> {
        private final String filePath;
        private final List<List<ChannelData<T>>> data;

        BlobData(String str, List<List<ChannelData<T>>> list) {
            this.filePath = str;
            this.data = list;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getFilePath() {
            return this.filePath;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public List<List<ChannelData<T>>> getData() {
            return this.data;
        }
    }

    FlushService(SnowflakeStreamingIngestClientInternal<T> snowflakeStreamingIngestClientInternal, ChannelCache<T> channelCache, StreamingIngestStage streamingIngestStage, boolean z) {
        this.owningClient = snowflakeStreamingIngestClientInternal;
        this.channelCache = channelCache;
        this.targetStage = streamingIngestStage;
        this.counter = new AtomicLong(0L);
        this.registerService = new RegisterService<>(snowflakeStreamingIngestClientInternal, z);
        this.isNeedFlush = false;
        this.lastFlushTime = System.currentTimeMillis();
        this.isTestMode = z;
        this.latencyTimerContextMap = new ConcurrentHashMap();
        this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
        createWorkers();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlushService(SnowflakeStreamingIngestClientInternal<T> snowflakeStreamingIngestClientInternal, ChannelCache<T> channelCache, boolean z) {
        this.owningClient = snowflakeStreamingIngestClientInternal;
        this.channelCache = channelCache;
        try {
            this.targetStage = new StreamingIngestStage(z, snowflakeStreamingIngestClientInternal.getRole(), snowflakeStreamingIngestClientInternal.getHttpClient(), snowflakeStreamingIngestClientInternal.getRequestBuilder(), snowflakeStreamingIngestClientInternal.getName());
            this.registerService = new RegisterService<>(snowflakeStreamingIngestClientInternal, z);
            this.counter = new AtomicLong(0L);
            this.isNeedFlush = false;
            this.lastFlushTime = System.currentTimeMillis();
            this.isTestMode = z;
            this.latencyTimerContextMap = new HashMap();
            this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
            createWorkers();
        } catch (SnowflakeSQLException | IOException e) {
            throw new SFException(e, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, new Object[0]);
        }
    }

    private CompletableFuture<Void> statsFuture() {
        return CompletableFuture.runAsync(() -> {
            if (this.owningClient.cpuHistogram != null) {
                this.owningClient.cpuHistogram.update((long) (ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class).getProcessCpuLoad() * 100.0d));
            }
        }, this.flushWorker);
    }

    private CompletableFuture<Void> distributeFlush(boolean z, long j) {
        return CompletableFuture.runAsync(() -> {
            logFlushTask(z, j);
            distributeFlushTasks();
            this.isNeedFlush = false;
            this.lastFlushTime = System.currentTimeMillis();
        }, this.flushWorker);
    }

    private void logFlushTask(boolean z, long j) {
        String format = String.format("Submit forced or ad-hoc flush task on client=%s, isForce=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", this.owningClient.getName(), Boolean.valueOf(z), Boolean.valueOf(this.isNeedFlush), Long.valueOf(j), Long.valueOf(System.currentTimeMillis() - this.lastFlushTime));
        if (logger.isTraceEnabled()) {
            logger.logTrace(format);
        }
        if (logger.isTraceEnabled()) {
            return;
        }
        if (this.isNeedFlush || z) {
            logger.logDebug(format);
        }
    }

    private CompletableFuture<Void> registerFuture() {
        return CompletableFuture.runAsync(() -> {
            this.registerService.registerBlobs(this.latencyTimerContextMap);
        }, this.registerWorker);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> flush(boolean z) {
        long currentTimeMillis = System.currentTimeMillis() - this.lastFlushTime;
        return (z || (!isTestMode() && (this.isNeedFlush || currentTimeMillis >= this.owningClient.getParameterProvider().getBufferFlushIntervalInMs()))) ? statsFuture().thenCompose(r9 -> {
            return distributeFlush(z, currentTimeMillis);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r3 -> {
            return registerFuture();
        }) : statsFuture();
    }

    private void createWorkers() {
        this.flushWorker = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ingest-flush-thread").build());
        this.flushWorker.scheduleWithFixedDelay(() -> {
            flush(false).exceptionally(th -> {
                String format = String.format("Background flush task failed, client=%s, exception=%s, detail=%s, trace=%s.", this.owningClient.getName(), th.getCause(), th.getCause().getMessage(), Utils.getStackTrace(th.getCause()));
                logger.logError(format);
                if (this.owningClient.getTelemetryService() == null) {
                    return null;
                }
                this.owningClient.getTelemetryService().reportClientFailure(getClass().getSimpleName(), format);
                return null;
            });
        }, 0L, this.owningClient.getParameterProvider().getBufferFlushCheckIntervalInMs(), TimeUnit.MILLISECONDS);
        this.registerWorker = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("ingest-register-thread").build());
        ThreadFactory build = new ThreadFactoryBuilder().setNameFormat("ingest-build-upload-thread-%d").build();
        int min = Math.min(Runtime.getRuntime().availableProcessors() * (1 + this.owningClient.getParameterProvider().getIOTimeCpuRatio()), Integer.MAX_VALUE);
        this.buildUploadWorkers = Executors.newFixedThreadPool(min, build);
        logger.logInfo("Create {} threads for build/upload blobs for client={}, total available processors={}", Integer.valueOf(min), this.owningClient.getName(), Integer.valueOf(Runtime.getRuntime().availableProcessors()));
    }

    void distributeFlushTasks() {
        Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>> it = this.channelCache.iterator();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        while (true) {
            if (!it.hasNext() && arrayList2.isEmpty()) {
                this.registerService.addBlobs(arrayList);
                return;
            }
            ArrayList arrayList3 = new ArrayList();
            float f = 0.0f;
            String filePath = getFilePath(this.targetStage.getClientPrefix());
            while (true) {
                if (!it.hasNext() && arrayList2.isEmpty()) {
                    break;
                }
                List synchronizedList = Collections.synchronizedList(new ArrayList());
                if (arrayList2.isEmpty()) {
                    it.next().getValue().values().parallelStream().forEach(snowflakeStreamingIngestChannelInternal -> {
                        ChannelData<T> data;
                        if (!snowflakeStreamingIngestChannelInternal.isValid() || (data = snowflakeStreamingIngestChannelInternal.getData(filePath)) == null) {
                            return;
                        }
                        synchronizedList.add(data);
                    });
                } else {
                    synchronizedList.addAll(arrayList2);
                    arrayList2.clear();
                }
                if (!synchronizedList.isEmpty()) {
                    int i = 0;
                    float f2 = 0.0f;
                    while (true) {
                        if (i >= synchronizedList.size()) {
                            break;
                        }
                        ChannelData<T> channelData = (ChannelData) synchronizedList.get(i);
                        if (i > 0 && shouldStopProcessing(f, f2, channelData, (ChannelData) synchronizedList.get(i - 1))) {
                            arrayList2.addAll(synchronizedList.subList(i, synchronizedList.size()));
                            logger.logInfo("Creation of another blob is needed because of blob/chunk size limit or different encryption ids or different schema, client={}, table={}, fileSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={}, encryptionId2={}, schema1={}, schema2={}", this.owningClient.getName(), channelData.getChannelContext().getTableName(), Float.valueOf(f), Float.valueOf(f2), Float.valueOf(channelData.getBufferSize()), channelData.getChannelContext().getEncryptionKeyId(), ((ChannelData) synchronizedList.get(i - 1)).getChannelContext().getEncryptionKeyId(), channelData.getColumnEps().keySet(), ((ChannelData) synchronizedList.get(i - 1)).getColumnEps().keySet());
                            break;
                        } else {
                            f += channelData.getBufferSize();
                            f2 += channelData.getBufferSize();
                            i++;
                        }
                    }
                    arrayList3.add(synchronizedList.subList(0, i));
                    if (i != synchronizedList.size()) {
                        break;
                    }
                }
            }
            if (arrayList3.isEmpty()) {
                this.counter.decrementAndGet();
            } else {
                long currentTimeMillis = System.currentTimeMillis();
                if (this.owningClient.flushLatency != null) {
                    this.latencyTimerContextMap.putIfAbsent(filePath, this.owningClient.flushLatency.time());
                }
                arrayList.add(new Pair(new BlobData(filePath, arrayList3), CompletableFuture.supplyAsync(() -> {
                    try {
                        BlobMetadata buildAndUpload = buildAndUpload(filePath, arrayList3);
                        buildAndUpload.getBlobStats().setFlushStartMs(currentTimeMillis);
                        return buildAndUpload;
                    } catch (Throwable th) {
                        Throwable cause = th.getCause() == null ? th : th.getCause();
                        String format = String.format("Building blob failed, client=%s, file=%s, exception=%s, detail=%s, trace=%s, all channels in the blob will be invalidated", this.owningClient.getName(), filePath, cause, cause.getMessage(), Utils.getStackTrace(cause));
                        logger.logError(format);
                        if (this.owningClient.getTelemetryService() != null) {
                            this.owningClient.getTelemetryService().reportClientFailure(getClass().getSimpleName(), format);
                        }
                        if (th instanceof IOException) {
                            invalidateAllChannelsInBlob(arrayList3);
                            return null;
                        }
                        if (th instanceof NoSuchAlgorithmException) {
                            throw new SFException(th, ErrorCode.MD5_HASHING_NOT_AVAILABLE, new Object[0]);
                        }
                        if (((th instanceof InvalidAlgorithmParameterException) | (th instanceof NoSuchPaddingException) | (th instanceof IllegalBlockSizeException) | (th instanceof BadPaddingException)) || (th instanceof InvalidKeyException)) {
                            throw new SFException(th, ErrorCode.ENCRYPTION_FAILURE, new Object[0]);
                        }
                        throw new SFException(th, ErrorCode.INTERNAL_ERROR, th.getMessage());
                    }
                }, this.buildUploadWorkers)));
                logger.logInfo("buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}", this.owningClient.getName(), filePath, this.buildUploadWorkers.toString());
            }
        }
    }

    private boolean shouldStopProcessing(float f, float f2, ChannelData<T> channelData, ChannelData<T> channelData2) {
        return f + channelData.getBufferSize() > 2.56E8f || f2 + channelData.getBufferSize() > ((float) this.owningClient.getParameterProvider().getMaxChunkSizeInBytes()) || !Objects.equals(channelData.getChannelContext().getEncryptionKeyId(), channelData2.getChannelContext().getEncryptionKeyId()) || !channelData.getColumnEps().keySet().equals(channelData2.getColumnEps().keySet());
    }

    BlobMetadata buildAndUpload(String str, List<List<ChannelData<T>>> list) throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException {
        Timer.Context createTimerContext = Utils.createTimerContext(this.owningClient.buildLatency);
        BlobBuilder.Blob constructBlobAndMetadata = BlobBuilder.constructBlobAndMetadata(str, list, this.bdecVersion);
        constructBlobAndMetadata.blobStats.setBuildDurationMs(createTimerContext);
        return upload(str, constructBlobAndMetadata.blobBytes, constructBlobAndMetadata.chunksMetadataList, constructBlobAndMetadata.blobStats);
    }

    BlobMetadata upload(String str, byte[] bArr, List<ChunkMetadata> list, BlobStats blobStats) throws NoSuchAlgorithmException {
        logger.logInfo("Start uploading file={}, size={}", str, Integer.valueOf(bArr.length));
        long currentTimeMillis = System.currentTimeMillis();
        Timer.Context createTimerContext = Utils.createTimerContext(this.owningClient.uploadLatency);
        this.targetStage.put(str, bArr);
        if (createTimerContext != null) {
            blobStats.setUploadDurationMs(createTimerContext);
            this.owningClient.uploadThroughput.mark(bArr.length);
            this.owningClient.blobSizeHistogram.update(bArr.length);
            this.owningClient.blobRowCountHistogram.update(list.stream().mapToLong(chunkMetadata -> {
                return chunkMetadata.getEpInfo().getRowCount();
            }).sum());
        }
        logger.logInfo("Finish uploading file={}, size={}, timeInMillis={}", str, Integer.valueOf(bArr.length), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return BlobMetadata.createBlobMetadata(str, BlobBuilder.computeMD5(bArr), this.bdecVersion, list, blobStats);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() throws InterruptedException {
        this.flushWorker.shutdown();
        this.registerWorker.shutdown();
        this.buildUploadWorkers.shutdown();
        if (this.flushWorker.awaitTermination(300L, TimeUnit.SECONDS) && this.registerWorker.awaitTermination(300L, TimeUnit.SECONDS) && this.buildUploadWorkers.awaitTermination(300L, TimeUnit.SECONDS)) {
            return;
        }
        logger.logWarn("Tasks can't be terminated within the timeout, force shutdown now.");
        this.flushWorker.shutdownNow();
        this.registerWorker.shutdownNow();
        this.buildUploadWorkers.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setNeedFlush() {
        this.isNeedFlush = true;
    }

    private String getFilePath(String str) {
        return getFilePath(Calendar.getInstance(TimeZone.getTimeZone("UTC")), str);
    }

    String getFilePath(Calendar calendar, String str) {
        if (this.isTestMode && str == null) {
            str = "testPrefix";
        }
        Utils.assertStringNotNullOrEmpty("client prefix", str);
        return calendar.get(1) + "/" + (calendar.get(2) + 1) + "/" + calendar.get(5) + "/" + calendar.get(11) + "/" + calendar.get(12) + "/" + (Long.toString(TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis()), 36) + "_" + str + "_" + Thread.currentThread().getId() + "_" + this.counter.getAndIncrement() + "." + Constants.BLOB_EXTENSION_TYPE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <CD> void invalidateAllChannelsInBlob(List<List<ChannelData<CD>>> list) {
        list.forEach(list2 -> {
            list2.forEach(channelData -> {
                this.owningClient.getChannelCache().invalidateChannelIfSequencersMatch(channelData.getChannelContext().getDbName(), channelData.getChannelContext().getSchemaName(), channelData.getChannelContext().getTableName(), channelData.getChannelContext().getName(), channelData.getChannelContext().getChannelSequencer());
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getClientPrefix() {
        return this.targetStage.getClientPrefix();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean throttleDueToQueuedFlushTasks() {
        boolean z = ((ThreadPoolExecutor) this.buildUploadWorkers).getQueue().size() > Runtime.getRuntime().availableProcessors();
        if (z) {
            logger.logWarn("Throttled due too many queue flush tasks (probably because of slow uploading speed), client={}, buildUploadWorkers stats={}", this.owningClient.getName(), this.buildUploadWorkers.toString());
        }
        return z;
    }

    boolean isTestMode() {
        return this.isTestMode;
    }
}
