package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.io.CountingOutputStream;
import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.MimeTypes;
import com.google.cloud.dataflow.sdk.util.RandomAccessData;
import com.google.cloud.dataflow.sdk.util.ScalableBloomFilter;
import com.google.cloud.dataflow.sdk.util.VarInt;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/IsmSink.class */
public class IsmSink<K, V> extends Sink<WindowedValue<KV<K, V>>> {
    private final String filename;
    private final Coder<K> keyCoder;
    private final Coder<V> valueCoder;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/IsmSink$IsmSinkWriter.class */
    private class IsmSinkWriter implements Sink.SinkWriter<WindowedValue<KV<K, V>>> {
        private static final long MAX_BLOCK_SIZE = 1048576;
        private final CountingOutputStream out;
        private final RandomAccessData indexOut;
        private RandomAccessData lastKeyBytes;
        private RandomAccessData currentKeyBytes;
        private RandomAccessData lastIndexKeyBytes;
        private long lastIndexedPosition;
        private long numberOfKeysWritten;
        private final ScalableBloomFilter.Builder bloomFilterBuilder;

        private IsmSinkWriter(WritableByteChannel writableByteChannel) {
            Preconditions.checkNotNull(writableByteChannel);
            this.out = new CountingOutputStream(Channels.newOutputStream(writableByteChannel));
            this.indexOut = new RandomAccessData();
            this.lastKeyBytes = new RandomAccessData();
            this.currentKeyBytes = new RandomAccessData();
            this.lastIndexKeyBytes = new RandomAccessData();
            this.bloomFilterBuilder = ScalableBloomFilter.builder();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink.SinkWriter
        public long add(WindowedValue<KV<K, V>> windowedValue) throws IOException {
            KV<K, V> value = windowedValue.getValue();
            long count = this.out.getCount();
            IsmSink.this.keyCoder.encode(value.getKey(), this.currentKeyBytes.asOutputStream(), Coder.Context.OUTER);
            int size = this.currentKeyBytes.size();
            int commonPrefixLength = commonPrefixLength(this.lastKeyBytes, this.currentKeyBytes);
            int i = size - commonPrefixLength;
            IsmFormat.KeyPrefixCoder.of().encode(new IsmFormat.KeyPrefix(commonPrefixLength, i), (OutputStream) this.out, Coder.Context.NESTED);
            this.currentKeyBytes.writeTo(this.out, commonPrefixLength, i);
            IsmSink.this.valueCoder.encode(value.getValue(), this.out, Coder.Context.NESTED);
            if (this.lastIndexedPosition + MAX_BLOCK_SIZE < this.out.getCount()) {
                int commonPrefixLength2 = commonPrefixLength(this.lastIndexKeyBytes, this.currentKeyBytes);
                int i2 = size - commonPrefixLength2;
                IsmFormat.KeyPrefixCoder.of().encode(new IsmFormat.KeyPrefix(commonPrefixLength2, i2), this.indexOut.asOutputStream(), Coder.Context.NESTED);
                this.currentKeyBytes.writeTo(this.indexOut.asOutputStream(), commonPrefixLength2, i2);
                VarInt.encode(count, this.indexOut.asOutputStream());
                this.lastIndexKeyBytes.resetTo(0);
                this.currentKeyBytes.writeTo(this.lastIndexKeyBytes.asOutputStream(), 0, this.currentKeyBytes.size());
            }
            this.bloomFilterBuilder.put(this.currentKeyBytes.array(), 0, this.currentKeyBytes.size());
            RandomAccessData randomAccessData = this.lastKeyBytes;
            this.lastKeyBytes = this.currentKeyBytes;
            this.currentKeyBytes = randomAccessData;
            this.currentKeyBytes.resetTo(0);
            this.numberOfKeysWritten++;
            return this.out.getCount() - count;
        }

        private int commonPrefixLength(RandomAccessData randomAccessData, RandomAccessData randomAccessData2) {
            byte[] array = randomAccessData.array();
            byte[] array2 = randomAccessData2.array();
            int min = Math.min(randomAccessData.size(), randomAccessData2.size());
            for (int i = 0; i < min; i++) {
                int i2 = array[i] & 255;
                int i3 = array2[i] & 255;
                if (i2 > i3) {
                    String valueOf = String.valueOf(IsmSinkWriter.class.getSimpleName());
                    String valueOf2 = String.valueOf(randomAccessData);
                    String valueOf3 = String.valueOf(randomAccessData2);
                    throw new IllegalArgumentException(new StringBuilder(177 + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length() + String.valueOf(valueOf3).length()).append(valueOf).append(" expects keys to be written in strictly increasing order but was given ").append(valueOf2).append(" as the previous key and ").append(valueOf3).append(" as the current key. Expected ").append(i2).append(" <= ").append(i3).append(" at position ").append(i).append(".").toString());
                }
                if (i2 != i3) {
                    return i;
                }
            }
            if (randomAccessData.size() < randomAccessData2.size()) {
                return min;
            }
            String valueOf4 = String.valueOf(IsmSinkWriter.class.getSimpleName());
            String valueOf5 = String.valueOf(randomAccessData);
            String valueOf6 = String.valueOf(randomAccessData2);
            throw new IllegalArgumentException(new StringBuilder(191 + String.valueOf(valueOf4).length() + String.valueOf(valueOf5).length() + String.valueOf(valueOf6).length()).append(valueOf4).append(" expects keys to be written in strictly increasing order but was given ").append(valueOf5).append(" as the previous key and ").append(valueOf6).append(" as the current key. Expected length of previous key ").append(randomAccessData.size()).append(" <= ").append(randomAccessData2.size()).append(" to current key.").toString());
        }

        private void finish() throws IOException {
            long count = this.out.getCount();
            ScalableBloomFilter.ScalableBloomFilterCoder.of().encode(this.bloomFilterBuilder.build(), (OutputStream) this.out, Coder.Context.NESTED);
            long count2 = this.out.getCount();
            this.indexOut.writeTo(this.out, 0, this.indexOut.size());
            IsmFormat.FooterCoder.of().encode(new IsmFormat.Footer(count2, count, this.numberOfKeysWritten), (OutputStream) this.out, Coder.Context.OUTER);
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink.SinkWriter, java.lang.AutoCloseable
        public void close() throws IOException {
            finish();
            this.out.close();
        }
    }

    IsmSink(String str, Coder<K> coder, Coder<V> coder2) {
        this.filename = str;
        this.keyCoder = coder;
        this.valueCoder = coder2;
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink
    public Sink.SinkWriter<WindowedValue<KV<K, V>>> writer() throws IOException {
        return new IsmSinkWriter(IOChannelUtils.create(this.filename, MimeTypes.BINARY));
    }
}
