package org.apache.beam.sdk.extensions.sorter;

import java.io.IOException;
import java.util.Iterator;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.dataflow.qual.Pure;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sorter/SortValues.class */
public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT> extends PTransform<PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>>, PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>>> {
    private final BufferedExternalSorter.Options sorterOptions;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sorter/SortValues$SortValuesDoFn.class */
    public static class SortValuesDoFn<PrimaryKeyT, SecondaryKeyT, ValueT> extends DoFn<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>, KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> {
        private final BufferedExternalSorter.Options sorterOptions;
        private final Coder<SecondaryKeyT> keyCoder;
        private final Coder<ValueT> valueCoder;

        /* loaded from: input_file:org/apache/beam/sdk/extensions/sorter/SortValues$SortValuesDoFn$DecodingIterable.class */
        private class DecodingIterable implements Iterable<KV<SecondaryKeyT, ValueT>> {
            final Iterable<KV<byte[], byte[]>> iterable;

            DecodingIterable(Iterable<KV<byte[], byte[]>> iterable) {
                this.iterable = iterable;
            }

            @Override // java.lang.Iterable
            @Nonnull
            public Iterator<KV<SecondaryKeyT, ValueT>> iterator() {
                return new DecodingIterator(this.iterable.iterator());
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/extensions/sorter/SortValues$SortValuesDoFn$DecodingIterator.class */
        private class DecodingIterator implements Iterator<KV<SecondaryKeyT, ValueT>> {
            final Iterator<KV<byte[], byte[]>> iterator;

            DecodingIterator(Iterator<KV<byte[], byte[]>> it) {
                this.iterator = it;
            }

            @Override // java.util.Iterator
            @Pure
            public boolean hasNext() {
                return this.iterator.hasNext();
            }

            @Override // java.util.Iterator
            public KV<SecondaryKeyT, ValueT> next() {
                KV<byte[], byte[]> next = this.iterator.next();
                try {
                    return KV.of(CoderUtils.decodeFromByteArray(SortValuesDoFn.this.keyCoder, (byte[]) next.getKey()), CoderUtils.decodeFromByteArray(SortValuesDoFn.this.valueCoder, (byte[]) next.getValue()));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException("Iterator does not support remove");
            }
        }

        SortValuesDoFn(BufferedExternalSorter.Options options, Coder<SecondaryKeyT> coder, Coder<ValueT> coder2) {
            this.sorterOptions = options;
            this.keyCoder = coder;
            this.valueCoder = coder2;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>, KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>>.ProcessContext processContext) {
            Iterable<KV> iterable = (Iterable) ((KV) processContext.element()).getValue();
            try {
                BufferedExternalSorter create = BufferedExternalSorter.create(this.sorterOptions);
                for (KV kv : iterable) {
                    create.add(KV.of(CoderUtils.encodeToByteArray(this.keyCoder, kv.getKey()), CoderUtils.encodeToByteArray(this.valueCoder, kv.getValue())));
                }
                processContext.output(KV.of(((KV) processContext.element()).getKey(), new DecodingIterable(create.sort())));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private SortValues(BufferedExternalSorter.Options options) {
        this.sorterOptions = options;
    }

    public static <PrimaryKeyT, SecondaryKeyT, ValueT> SortValues<PrimaryKeyT, SecondaryKeyT, ValueT> create(BufferedExternalSorter.Options options) {
        return new SortValues<>(options);
    }

    public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> expand(PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> pCollection) {
        Coder secondaryKeyCoder = getSecondaryKeyCoder(pCollection.getCoder());
        try {
            secondaryKeyCoder.verifyDeterministic();
            return pCollection.apply(ParDo.of(new SortValuesDoFn(this.sorterOptions, secondaryKeyCoder, getValueCoder(pCollection.getCoder())))).setCoder(pCollection.getCoder());
        } catch (Coder.NonDeterministicException e) {
            throw new IllegalStateException("the secondary key coder of SortValues must be deterministic", e);
        }
    }

    private static <PrimaryKeyT, SecondaryKeyT, ValueT> KvCoder<SecondaryKeyT, ValueT> getSecondaryKeyValueCoder(Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> coder) {
        if (!(coder instanceof KvCoder)) {
            throw new IllegalStateException("SortValues requires its input to use KvCoder");
        }
        KvCoder kvCoder = (KvCoder) coder;
        if (!(kvCoder.getValueCoder() instanceof IterableCoder)) {
            throw new IllegalStateException("SortValues requires the values be encoded with IterableCoder");
        }
        IterableCoder valueCoder = kvCoder.getValueCoder();
        if (valueCoder.getElemCoder() instanceof KvCoder) {
            return valueCoder.getElemCoder();
        }
        throw new IllegalStateException("SortValues requires the secondary key-value pairs to use KvCoder");
    }

    private static <PrimaryKeyT, SecondaryKeyT, ValueT> Coder<SecondaryKeyT> getSecondaryKeyCoder(Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> coder) {
        return getSecondaryKeyValueCoder(coder).getKeyCoder();
    }

    private static <PrimaryKeyT, SecondaryKeyT, ValueT> Coder<ValueT> getValueCoder(Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> coder) {
        return getSecondaryKeyValueCoder(coder).getValueCoder();
    }
}
