package org.apache.beam.runners.direct;

import java.io.File;
import java.io.FileReader;
import java.io.Serializable;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.beam.runners.direct.WriteWithShardingFactory;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/WriteWithShardingFactoryTest.class */
public class WriteWithShardingFactoryTest implements Serializable {
    private static final int INPUT_SIZE = 10000;

    @Rule
    public transient TemporaryFolder tmp = new TemporaryFolder();
    private transient WriteWithShardingFactory<Object, Void> factory = new WriteWithShardingFactory<>();

    @Rule
    public final transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* loaded from: input_file:org/apache/beam/runners/direct/WriteWithShardingFactoryTest$FakeFilenamePolicy.class */
    private static class FakeFilenamePolicy extends FileBasedSink.FilenamePolicy {
        private FakeFilenamePolicy() {
        }

        public ResourceId windowedFilename(int i, int i2, BoundedWindow boundedWindow, PaneInfo paneInfo, FileBasedSink.OutputFileHints outputFileHints) {
            throw new IllegalArgumentException("Should not be used");
        }

        @Nullable
        public ResourceId unwindowedFilename(int i, int i2, FileBasedSink.OutputFileHints outputFileHints) {
            throw new IllegalArgumentException("Should not be used");
        }
    }

    @Test
    public void dynamicallyReshardedWrite() throws Exception {
        ArrayList arrayList = new ArrayList(INPUT_SIZE);
        for (int i = 0; i < INPUT_SIZE; i++) {
            arrayList.add(UUID.randomUUID().toString());
        }
        Collections.shuffle(arrayList);
        String path = this.tmp.getRoot().toPath().resolve("resharded_write").toString();
        this.p.apply(Create.of(arrayList)).apply(TextIO.write().to(path));
        this.p.run();
        List metadata = FileSystems.match(path + '*').metadata();
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        ArrayList arrayList3 = new ArrayList(arrayList.size());
        Iterator it = metadata.iterator();
        while (it.hasNext()) {
            String resourceId = ((MatchResult.Metadata) it.next()).resourceId().toString();
            arrayList3.add(resourceId);
            CharBuffer allocate = CharBuffer.allocate((int) new File(resourceId).length());
            FileReader fileReader = new FileReader(resourceId);
            Throwable th = null;
            try {
                try {
                    fileReader.read(allocate);
                    allocate.flip();
                    if (fileReader != null) {
                        if (0 != 0) {
                            try {
                                fileReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileReader.close();
                        }
                    }
                    for (String str : allocate.toString().split("\n")) {
                        if (str.length() > 0) {
                            arrayList2.add(str);
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (fileReader != null) {
                    if (th != null) {
                        try {
                            fileReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fileReader.close();
                    }
                }
                throw th3;
            }
        }
        Assert.assertThat(arrayList2, Matchers.containsInAnyOrder(arrayList.toArray()));
        Assert.assertThat(arrayList3, Matchers.hasSize(Matchers.allOf(Matchers.greaterThan(1), Matchers.lessThan(Integer.valueOf((int) (Math.log10(10000.0d) + 3.0d))))));
    }

    @Test
    public void withNoShardingSpecifiedReturnsNewTransform() {
        WriteFiles writeFiles = WriteFiles.to(new FileBasedSink<Object, Void, Object>(ValueProvider.StaticValueProvider.of(LocalResources.fromString("/foo", true)), DynamicFileDestinations.constant(new FakeFilenamePolicy())) { // from class: org.apache.beam.runners.direct.WriteWithShardingFactoryTest.1
            public FileBasedSink.WriteOperation<Void, Object> createWriteOperation() {
                throw new IllegalArgumentException("Should not be used");
            }
        });
        Assert.assertThat(this.factory.getReplacementTransform(AppliedPTransform.of("write", this.p.apply(Create.empty(VoidCoder.of())).expand(), Collections.emptyMap(), writeFiles, this.p)).getTransform(), Matchers.not(Matchers.equalTo(writeFiles)));
    }

    @Test
    public void keyBasedOnCountFnWithNoElements() throws Exception {
        Assert.assertThat(DoFnTester.of(new WriteWithShardingFactory.CalculateShardsFn(0)).processBundle(new Long[]{0L}), Matchers.containsInAnyOrder(new Integer[]{1}));
    }

    @Test
    public void keyBasedOnCountFnWithOneElement() throws Exception {
        Assert.assertThat(DoFnTester.of(new WriteWithShardingFactory.CalculateShardsFn(0)).processBundle(new Long[]{1L}), Matchers.containsInAnyOrder(new Integer[]{1}));
    }

    @Test
    public void keyBasedOnCountFnWithTwoElements() throws Exception {
        Assert.assertThat(DoFnTester.of(new WriteWithShardingFactory.CalculateShardsFn(0)).processBundle(new Long[]{2L}), Matchers.containsInAnyOrder(new Integer[]{2}));
    }

    @Test
    public void keyBasedOnCountFnFewElementsThreeShards() throws Exception {
        Assert.assertThat(DoFnTester.of(new WriteWithShardingFactory.CalculateShardsFn(0)).processBundle(new Long[]{5L}), Matchers.containsInAnyOrder(new Integer[]{3}));
    }

    @Test
    public void keyBasedOnCountFnManyElements() throws Exception {
        Assert.assertThat(DoFnTester.of(new WriteWithShardingFactory.CalculateShardsFn(0)).processBundle(new Long[]{Long.valueOf((long) Math.pow(10.0d, 10.0d))}), Matchers.containsInAnyOrder(new Integer[]{10}));
    }

    @Test
    public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
        PCollectionView singletonView = PCollectionViews.singletonView(this.p.apply(Create.of(6L, new Long[0])), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
        DoFnTester of = DoFnTester.of(new WriteWithShardingFactory.CalculateShardsFn(3));
        of.setSideInput(singletonView, GlobalWindow.INSTANCE, 6L);
        Assert.assertThat(of.processBundle(new Long[]{10L}), Matchers.containsInAnyOrder(new Integer[]{6}));
    }

    @Test
    public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
        Assert.assertThat(DoFnTester.of(new WriteWithShardingFactory.CalculateShardsFn(3)).processBundle(new Long[]{Long.valueOf((long) Math.pow(10.0d, 10.0d))}), Matchers.containsInAnyOrder(new Integer[]{13}));
    }
}
