package org.apache.beam.runners.direct;

import java.io.File;
import java.io.FileReader;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.direct.WriteWithShardingFactory;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Function;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/WriteWithShardingFactoryTest.class */
public class WriteWithShardingFactoryTest {
    public static final int INPUT_SIZE = 10000;

    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();
    private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();

    /* loaded from: input_file:org/apache/beam/runners/direct/WriteWithShardingFactoryTest$TestSink.class */
    private static class TestSink extends Sink<Object> {
        private TestSink() {
        }

        public void validate(PipelineOptions pipelineOptions) {
        }

        public Sink.WriteOperation<Object, ?> createWriteOperation(PipelineOptions pipelineOptions) {
            throw new IllegalArgumentException("Should not be used");
        }
    }

    @Test
    public void dynamicallyReshardedWrite() throws Exception {
        ArrayList arrayList = new ArrayList(INPUT_SIZE);
        for (int i = 0; i < 10000; i++) {
            arrayList.add(UUID.randomUUID().toString());
        }
        Collections.shuffle(arrayList);
        String absolutePath = this.tmp.getRoot().getAbsolutePath();
        String resolve = IOChannelUtils.resolve(absolutePath, new String[]{"resharded_write"});
        TestPipeline create = TestPipeline.create();
        create.apply(Create.of(arrayList)).apply(TextIO.Write.to(resolve));
        create.run();
        Collection<String> match = IOChannelUtils.getFactory(absolutePath).match(resolve + "*");
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        for (String str : match) {
            CharBuffer allocate = CharBuffer.allocate((int) new File(str).length());
            FileReader fileReader = new FileReader(str);
            Throwable th = null;
            try {
                try {
                    fileReader.read(allocate);
                    allocate.flip();
                    if (fileReader != null) {
                        if (0 != 0) {
                            try {
                                fileReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileReader.close();
                        }
                    }
                    for (String str2 : allocate.toString().split("\n")) {
                        if (str2.length() > 0) {
                            arrayList2.add(str2);
                        }
                    }
                } catch (Throwable th3) {
                    if (fileReader != null) {
                        if (th != null) {
                            try {
                                fileReader.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            fileReader.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
        Assert.assertThat(arrayList2, Matchers.containsInAnyOrder(arrayList.toArray()));
        Assert.assertThat(match, Matchers.hasSize(Matchers.allOf(Matchers.greaterThan(1), Matchers.lessThan(Integer.valueOf((int) (Math.log10(10000.0d) + 3.0d))))));
    }

    @Test
    public void withShardingSpecifiesOriginalTransform() {
        Write.Bound withNumShards = Write.to(new TestSink()).withNumShards(3);
        Assert.assertThat(this.factory.override(withNumShards), Matchers.equalTo(withNumShards));
    }

    @Test
    public void withNoShardingSpecifiedReturnsNewTransform() {
        Write.Bound bound = Write.to(new TestSink());
        Assert.assertThat(this.factory.override(bound), Matchers.not(Matchers.equalTo(bound)));
    }

    @Test
    public void keyBasedOnCountFnWithOneElement() throws Exception {
        PCollectionView singletonView = PCollectionViews.singletonView(TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
        DoFnTester of = DoFnTester.of(new WriteWithShardingFactory.KeyBasedOnCountFn(singletonView, 0));
        of.setSideInput(singletonView, GlobalWindow.INSTANCE, 1L);
        Assert.assertThat(of.processBundle(new String[]{"foo", "bar", "bazbar"}), Matchers.containsInAnyOrder(new KV[]{KV.of(0, "foo"), KV.of(0, "bar"), KV.of(0, "bazbar")}));
    }

    @Test
    public void keyBasedOnCountFnWithTwoElements() throws Exception {
        PCollectionView singletonView = PCollectionViews.singletonView(TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
        DoFnTester of = DoFnTester.of(new WriteWithShardingFactory.KeyBasedOnCountFn(singletonView, 0));
        of.setSideInput(singletonView, GlobalWindow.INSTANCE, 2L);
        Assert.assertThat(of.processBundle(new String[]{"foo", "bar"}), Matchers.anyOf(Matchers.containsInAnyOrder(new KV[]{KV.of(0, "foo"), KV.of(1, "bar")}), Matchers.containsInAnyOrder(new KV[]{KV.of(1, "foo"), KV.of(0, "bar")})));
    }

    @Test
    public void keyBasedOnCountFnFewElementsThreeShards() throws Exception {
        PCollectionView singletonView = PCollectionViews.singletonView(TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
        DoFnTester of = DoFnTester.of(new WriteWithShardingFactory.KeyBasedOnCountFn(singletonView, 0));
        of.setSideInput(singletonView, GlobalWindow.INSTANCE, 100L);
        Assert.assertThat(Iterables.transform(of.processBundle(new String[]{"foo", "bar", "baz", "foobar", "foobaz", "barbaz"}), new Function<KV<Integer, String>, Integer>() { // from class: org.apache.beam.runners.direct.WriteWithShardingFactoryTest.1
            public Integer apply(KV<Integer, String> kv) {
                return (Integer) kv.getKey();
            }
        }), Matchers.containsInAnyOrder(new Integer[]{0, 0, 1, 1, 2, 2}));
    }

    @Test
    public void keyBasedOnCountFnManyElements() throws Exception {
        PCollectionView singletonView = PCollectionViews.singletonView(TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
        DoFnTester of = DoFnTester.of(new WriteWithShardingFactory.KeyBasedOnCountFn(singletonView, 0));
        of.setSideInput(singletonView, GlobalWindow.INSTANCE, Long.valueOf((long) Math.pow(10.0d, 10.0d)));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(Long.toHexString(ThreadLocalRandom.current().nextLong()));
        }
        long j = -1;
        Iterator it = of.processBundle(arrayList).iterator();
        while (it.hasNext()) {
            j = Math.max(j, ((Integer) ((KV) it.next()).getKey()).intValue());
        }
        Assert.assertThat(Long.valueOf(j), Matchers.equalTo(9L));
    }

    @Test
    public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
        PCollectionView singletonView = PCollectionViews.singletonView(TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
        DoFnTester of = DoFnTester.of(new WriteWithShardingFactory.KeyBasedOnCountFn(singletonView, 10));
        of.setSideInput(singletonView, GlobalWindow.INSTANCE, 6L);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(Long.toHexString(ThreadLocalRandom.current().nextLong()));
        }
        long j = -1;
        Iterator it = of.processBundle(arrayList).iterator();
        while (it.hasNext()) {
            j = Math.max(j, ((Integer) ((KV) it.next()).getKey()).intValue());
        }
        Assert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(6 - 1)));
    }

    @Test
    public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
        PCollectionView singletonView = PCollectionViews.singletonView(TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
        DoFnTester of = DoFnTester.of(new WriteWithShardingFactory.KeyBasedOnCountFn(singletonView, 3));
        of.setSideInput(singletonView, GlobalWindow.INSTANCE, Long.valueOf((long) Math.pow(10.0d, 10.0d)));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(Long.toHexString(ThreadLocalRandom.current().nextLong()));
        }
        long j = -1;
        Iterator it = of.processBundle(arrayList).iterator();
        while (it.hasNext()) {
            j = Math.max(j, ((Integer) ((KV) it.next()).getKey()).intValue());
        }
        Assert.assertThat(Long.valueOf(j), Matchers.equalTo(12L));
    }
}
