package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.client.util.Preconditions;
import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.InstantCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Ascii;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink.class */
public class ShuffleSink<T> extends Sink<WindowedValue<T>> {
    static final long SHUFFLE_WRITER_BUFFER_SIZE = 134217728;
    final byte[] shuffleWriterConfig;
    final ShuffleKind shuffleKind;
    final PipelineOptions options;
    final CounterSet.AddCounterMutator addCounterMutator;
    boolean shardByKey;
    boolean groupValues;
    boolean sortValues;
    WindowedValue.WindowedValueCoder<T> windowedElemCoder;
    WindowedValue.WindowedValueCoder windowedValueCoder;
    Coder<T> elemCoder;
    Coder keyCoder;
    Coder valueCoder;
    Coder sortKeyCoder;
    Coder sortValueCoder;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$1, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$ShuffleSink$ShuffleKind = new int[ShuffleKind.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$ShuffleSink$ShuffleKind[ShuffleKind.UNGROUPED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$ShuffleSink$ShuffleKind[ShuffleKind.PARTITION_KEYS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$ShuffleSink$ShuffleKind[ShuffleKind.GROUP_KEYS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$ShuffleSink$ShuffleKind[ShuffleKind.GROUP_KEYS_AND_SORT_VALUES.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink$ShuffleKind.class */
    public enum ShuffleKind {
        UNGROUPED,
        PARTITION_KEYS,
        GROUP_KEYS,
        GROUP_KEYS_AND_SORT_VALUES
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink$ShuffleSinkWriter.class */
    public class ShuffleSinkWriter implements Sink.SinkWriter<WindowedValue<T>> {
        private static final String COUNTER_WORKER_PREFIX = "worker-";
        private static final String COUNTER_DATASET_PREFIX = "-dataset-";
        private static final String COUNTER_SUFFIX = "-shuffle-bytes";
        private ShuffleEntryWriter writer;
        private long seqNum = 0;
        private final Counter<Long> perWorkerPerDatasetBytesCounter;
        private final Counter<Long> perDatasetBytesCounter;

        ShuffleSinkWriter(ShuffleEntryWriter shuffleEntryWriter, PipelineOptions pipelineOptions, CounterSet.AddCounterMutator addCounterMutator, String str) {
            this.writer = shuffleEntryWriter;
            DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions = (DataflowWorkerHarnessOptions) pipelineOptions.as(DataflowWorkerHarnessOptions.class);
            String valueOf = String.valueOf(COUNTER_WORKER_PREFIX);
            String workerId = dataflowWorkerHarnessOptions.getWorkerId();
            String valueOf2 = String.valueOf(COUNTER_DATASET_PREFIX);
            String valueOf3 = String.valueOf(COUNTER_SUFFIX);
            this.perWorkerPerDatasetBytesCounter = addCounterMutator.addCounter(Counter.longs(new StringBuilder(0 + String.valueOf(valueOf).length() + String.valueOf(workerId).length() + String.valueOf(valueOf2).length() + String.valueOf(str).length() + String.valueOf(valueOf3).length()).append(valueOf).append(workerId).append(valueOf2).append(str).append(valueOf3).toString(), Counter.AggregationKind.SUM));
            this.perDatasetBytesCounter = addCounterMutator.addCounter(Counter.longs(new StringBuilder(26 + String.valueOf(str).length()).append("dax-shuffle-").append(str).append("-written-bytes").toString(), Counter.AggregationKind.SUM));
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink.SinkWriter
        public long add(WindowedValue<T> windowedValue) throws IOException {
            byte[] encodeToByteArray;
            byte[] bArr;
            byte[] encodeToByteArray2;
            T value = windowedValue.getValue();
            if (!ShuffleSink.this.shardByKey) {
                BigEndianLongCoder of = BigEndianLongCoder.of();
                long j = this.seqNum;
                this.seqNum = j + 1;
                encodeToByteArray = CoderUtils.encodeToByteArray(of, Long.valueOf(j));
                bArr = null;
                encodeToByteArray2 = CoderUtils.encodeToByteArray(ShuffleSink.this.windowedElemCoder, windowedValue);
            } else {
                if (!(value instanceof KV)) {
                    throw new AssertionError("expecting the values written to a key-grouping shuffle to be KVs");
                }
                KV kv = (KV) value;
                Object key = kv.getKey();
                Object value2 = kv.getValue();
                encodeToByteArray = CoderUtils.encodeToByteArray(ShuffleSink.this.keyCoder, key);
                if (ShuffleSink.this.sortValues) {
                    if (!(value2 instanceof KV)) {
                        throw new AssertionError("expecting the value parts of the KVs written to a value-sorting shuffle to also be KVs");
                    }
                    KV kv2 = (KV) value2;
                    Object key2 = kv2.getKey();
                    Object value3 = kv2.getValue();
                    bArr = CoderUtils.encodeToByteArray(ShuffleSink.this.sortKeyCoder, key2);
                    encodeToByteArray2 = CoderUtils.encodeToByteArray(ShuffleSink.this.sortValueCoder, value3);
                } else if (ShuffleSink.this.groupValues) {
                    bArr = windowedValue.getTimestamp().equals(BoundedWindow.TIMESTAMP_MIN_VALUE) ? null : CoderUtils.encodeToByteArray(InstantCoder.of(), windowedValue.getTimestamp());
                    encodeToByteArray2 = CoderUtils.encodeToByteArray(ShuffleSink.this.valueCoder, value2);
                } else {
                    bArr = null;
                    encodeToByteArray2 = CoderUtils.encodeToByteArray(ShuffleSink.this.windowedValueCoder, windowedValue.withValue(value2));
                }
            }
            ShuffleEntry shuffleEntry = new ShuffleEntry(encodeToByteArray, bArr, encodeToByteArray2);
            this.writer.put(shuffleEntry);
            long length = shuffleEntry.length();
            this.perWorkerPerDatasetBytesCounter.addValue(Long.valueOf(length));
            this.perDatasetBytesCounter.addValue(Long.valueOf(length));
            return length;
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink.SinkWriter, java.lang.AutoCloseable
        public void close() throws IOException {
            this.writer.close();
        }
    }

    public static ShuffleKind parseShuffleKind(String str) throws Exception {
        try {
            return (ShuffleKind) Enum.valueOf(ShuffleKind.class, str.trim().toUpperCase());
        } catch (IllegalArgumentException e) {
            throw new Exception("unexpected shuffle_kind", e);
        }
    }

    public ShuffleSink(PipelineOptions pipelineOptions, byte[] bArr, ShuffleKind shuffleKind, Coder<WindowedValue<T>> coder, CounterSet.AddCounterMutator addCounterMutator) throws Exception {
        this.shuffleWriterConfig = bArr;
        this.shuffleKind = shuffleKind;
        this.options = pipelineOptions;
        this.addCounterMutator = addCounterMutator;
        initCoder(coder);
    }

    private void initCoder(Coder<WindowedValue<T>> coder) throws Exception {
        switch (AnonymousClass1.$SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$ShuffleSink$ShuffleKind[this.shuffleKind.ordinal()]) {
            case 1:
                this.shardByKey = false;
                this.groupValues = false;
                this.sortValues = false;
                break;
            case 2:
                this.shardByKey = true;
                this.groupValues = false;
                this.sortValues = false;
                break;
            case Ascii.ETX /* 3 */:
                this.shardByKey = true;
                this.groupValues = true;
                this.sortValues = false;
                break;
            case 4:
                this.shardByKey = true;
                this.groupValues = true;
                this.sortValues = true;
                break;
            default:
                throw new AssertionError("unexpected shuffle kind");
        }
        this.windowedElemCoder = (WindowedValue.WindowedValueCoder) coder;
        this.elemCoder = this.windowedElemCoder.getValueCoder();
        if (!this.shardByKey) {
            this.keyCoder = null;
            this.valueCoder = null;
            this.sortKeyCoder = null;
            this.sortValueCoder = null;
            this.windowedValueCoder = null;
            return;
        }
        if (!(this.elemCoder instanceof KvCoder)) {
            throw new Exception("unexpected kind of coder for elements written to a key-grouping shuffle");
        }
        KvCoder kvCoder = (KvCoder) this.elemCoder;
        this.keyCoder = kvCoder.getKeyCoder();
        this.valueCoder = kvCoder.getValueCoder();
        if (!this.sortValues) {
            this.sortKeyCoder = null;
            this.sortValueCoder = null;
        } else {
            if (!(this.valueCoder instanceof KvCoder)) {
                throw new Exception("unexpected kind of coder for values written to a value-sorting shuffle");
            }
            KvCoder kvCoder2 = (KvCoder) this.valueCoder;
            this.sortKeyCoder = kvCoder2.getKeyCoder();
            this.sortValueCoder = kvCoder2.getValueCoder();
        }
        if (this.groupValues) {
            this.windowedValueCoder = null;
        } else {
            this.windowedValueCoder = this.windowedElemCoder.withValueCoder(this.valueCoder);
        }
    }

    public Sink.SinkWriter<WindowedValue<T>> writer(ShuffleEntryWriter shuffleEntryWriter, String str) {
        return new ShuffleSinkWriter(shuffleEntryWriter, this.options, this.addCounterMutator, str);
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink
    public Sink.SinkWriter<WindowedValue<T>> writer() throws IOException {
        Preconditions.checkArgument(this.shuffleWriterConfig != null);
        ApplianceShuffleWriter applianceShuffleWriter = new ApplianceShuffleWriter(this.shuffleWriterConfig, SHUFFLE_WRITER_BUFFER_SIZE);
        return writer(new ChunkingShuffleEntryWriter(applianceShuffleWriter), applianceShuffleWriter.getDatasetId());
    }
}
