package org.apache.beam.sdk.io;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest;
import org.apache.beam.sdk.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.sdk.repackaged.com.google.common.base.Optional;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/WriteTest.class */
public class WriteTest {
    private static List<String> sinkContents = new ArrayList();
    private static AtomicInteger numShards = new AtomicInteger(0);
    private static List<Integer> recordsPerShard = new ArrayList();
    private static final MapElements<String, String> IDENTITY_MAP = MapElements.via(new SimpleFunction<String, String>() { // from class: org.apache.beam.sdk.io.WriteTest.1
        public String apply(String str) {
            return str;
        }
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$TestSink.class */
    public static class TestSink extends Sink<String> {
        private boolean createCalled;
        private boolean validateCalled;

        private TestSink() {
            this.createCalled = false;
            this.validateCalled = false;
        }

        public Sink.WriteOperation<String, ?> createWriteOperation(PipelineOptions pipelineOptions) {
            Assert.assertTrue(this.validateCalled);
            assertTestFlagPresent(pipelineOptions);
            this.createCalled = true;
            return new TestSinkWriteOperation(this);
        }

        public void validate(PipelineOptions pipelineOptions) {
            assertTestFlagPresent(pipelineOptions);
            this.validateCalled = true;
        }

        private void assertTestFlagPresent(PipelineOptions pipelineOptions) {
            Assert.assertEquals("test_value", ((WriteOptions) pipelineOptions.as(WriteOptions.class)).getTestFlag());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasCorrectState() {
            return this.validateCalled && this.createCalled;
        }

        public boolean equals(Object obj) {
            return obj instanceof TestSink;
        }

        public int hashCode() {
            return Objects.hash(getClass());
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("createCalled", this.createCalled).add("validateCalled", this.validateCalled).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$TestSinkWriteOperation.class */
    public static class TestSinkWriteOperation extends Sink.WriteOperation<String, TestWriterResult> {
        private static volatile boolean coderCalled = false;
        private final TestSink sink;
        private State state = State.INITIAL;
        private final UUID id = UUID.randomUUID();

        /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$TestSinkWriteOperation$State.class */
        private enum State {
            INITIAL,
            INITIALIZED,
            FINALIZED
        }

        public TestSinkWriteOperation(TestSink testSink) {
            this.sink = testSink;
        }

        /* renamed from: getSink, reason: merged with bridge method [inline-methods] */
        public TestSink m59getSink() {
            return this.sink;
        }

        public void initialize(PipelineOptions pipelineOptions) throws Exception {
            Assert.assertEquals("test_value", ((WriteOptions) pipelineOptions.as(WriteOptions.class)).getTestFlag());
            Assert.assertThat(this.state, Matchers.anyOf(Matchers.equalTo(State.INITIAL), Matchers.equalTo(State.INITIALIZED)));
            this.state = State.INITIALIZED;
        }

        public void finalize(Iterable<TestWriterResult> iterable, PipelineOptions pipelineOptions) throws Exception {
            Assert.assertEquals("test_value", ((WriteOptions) pipelineOptions.as(WriteOptions.class)).getTestFlag());
            Assert.assertEquals(State.INITIALIZED, this.state);
            Assert.assertTrue(coderCalled);
            HashSet hashSet = new HashSet();
            int i = 0;
            this.state = State.FINALIZED;
            for (TestWriterResult testWriterResult : iterable) {
                i++;
                hashSet.add(testWriterResult.uId);
                WriteTest.sinkContents.addAll(testWriterResult.elementsWritten);
                WriteTest.recordsPerShard.add(Integer.valueOf(testWriterResult.elementsWritten.size()));
            }
            Assert.assertEquals(i, hashSet.size());
        }

        public Sink.Writer<String, TestWriterResult> createWriter(PipelineOptions pipelineOptions) {
            return new TestSinkWriter(this);
        }

        public Coder<TestWriterResult> getWriterResultCoder() {
            coderCalled = true;
            return SerializableCoder.of(TestWriterResult.class);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("id", this.id).add("sink", this.sink).add("state", this.state).add("coderCalled", coderCalled).toString();
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof TestSinkWriteOperation)) {
                return false;
            }
            TestSinkWriteOperation testSinkWriteOperation = (TestSinkWriteOperation) obj;
            return this.sink.equals(testSinkWriteOperation.sink) && this.id.equals(testSinkWriteOperation.id);
        }

        public int hashCode() {
            return Objects.hash(this.id, this.sink);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$TestSinkWriter.class */
    private static class TestSinkWriter extends Sink.Writer<String, TestWriterResult> {
        private State state = State.INITIAL;
        private List<String> elementsWritten = new ArrayList();
        private String uId;
        private final TestSinkWriteOperation writeOperation;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$TestSinkWriter$State.class */
        public enum State {
            INITIAL,
            OPENED,
            WRITING,
            CLOSED
        }

        public TestSinkWriter(TestSinkWriteOperation testSinkWriteOperation) {
            this.writeOperation = testSinkWriteOperation;
        }

        /* renamed from: getWriteOperation, reason: merged with bridge method [inline-methods] */
        public TestSinkWriteOperation m61getWriteOperation() {
            return this.writeOperation;
        }

        public void open(String str) throws Exception {
            WriteTest.numShards.incrementAndGet();
            this.uId = str;
            Assert.assertEquals(State.INITIAL, this.state);
            this.state = State.OPENED;
        }

        public void write(String str) throws Exception {
            Assert.assertThat(this.state, Matchers.anyOf(Matchers.equalTo(State.OPENED), Matchers.equalTo(State.WRITING)));
            this.state = State.WRITING;
            this.elementsWritten.add(str);
        }

        /* renamed from: close, reason: merged with bridge method [inline-methods] */
        public TestWriterResult m62close() throws Exception {
            Assert.assertThat(this.state, Matchers.anyOf(Matchers.equalTo(State.OPENED), Matchers.equalTo(State.WRITING)));
            this.state = State.CLOSED;
            return new TestWriterResult(this.uId, this.elementsWritten);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$TestWriterResult.class */
    public static class TestWriterResult implements Serializable {
        String uId;
        List<String> elementsWritten;

        public TestWriterResult(String str, List<String> list) {
            this.uId = str;
            this.elementsWritten = list;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$WindowAndReshuffle.class */
    private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
        private final Window.Bound<T> window;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$WindowAndReshuffle$AddArbitraryKey.class */
        public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
            private AddArbitraryKey() {
            }

            public void processElement(DoFn<T, KV<Integer, T>>.ProcessContext processContext) throws Exception {
                processContext.output(KV.of(Integer.valueOf(ThreadLocalRandom.current().nextInt()), processContext.element()));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$WindowAndReshuffle$RemoveArbitraryKey.class */
        public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
            private RemoveArbitraryKey() {
            }

            public void processElement(DoFn<KV<Integer, Iterable<T>>, T>.ProcessContext processContext) throws Exception {
                Iterator<T> it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                while (it.hasNext()) {
                    processContext.output(it.next());
                }
            }
        }

        public WindowAndReshuffle(Window.Bound<T> bound) {
            this.window = bound;
        }

        public PCollection<T> apply(PCollection<T> pCollection) {
            return pCollection.apply(this.window).apply(ParDo.of(new AddArbitraryKey())).apply(GroupByKey.create()).apply(ParDo.of(new RemoveArbitraryKey()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteTest$WriteOptions.class */
    public interface WriteOptions extends PipelineOptionsFactoryTest.TestPipelineOptions {
        @Description("Test flag and value")
        String getTestFlag();

        void setTestFlag(String str);
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWrite() {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), IDENTITY_MAP);
    }

    @Test
    @Category({NeedsRunner.class})
    public void testEmptyWrite() {
        runWrite(Collections.emptyList(), IDENTITY_MAP);
        Assert.assertThat(Integer.valueOf(numShards.intValue()), Matchers.greaterThan(0));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testShardedWrite() {
        runShardedWrite(Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, Optional.of(1));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testExpandShardedWrite() {
        runShardedWrite(Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, Optional.of(20));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testShardedWriteBalanced() {
        ArrayList arrayList = new ArrayList(1000);
        for (int i = 0; i < 1000; i++) {
            arrayList.add(String.format("elt%04d", Integer.valueOf(i)));
        }
        runShardedWrite(arrayList, new WindowAndReshuffle(Window.into(Sessions.withGapDuration(Duration.millis(1L)))), Optional.of(10));
        int i2 = Integer.MAX_VALUE;
        int i3 = Integer.MIN_VALUE;
        for (Integer num : recordsPerShard) {
            i2 = Math.min(i2, num.intValue());
            i3 = Math.max(i3, num.intValue());
        }
        Assert.assertThat(Double.valueOf(i2), Matchers.greaterThanOrEqualTo(Double.valueOf(i3 * 0.9d)));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWithEmptyPCollection() {
        runWrite(new ArrayList(), IDENTITY_MAP);
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWindowed() {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), new WindowAndReshuffle(Window.into(FixedWindows.of(Duration.millis(2L)))));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWithSessions() {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), new WindowAndReshuffle(Window.into(Sessions.withGapDuration(Duration.millis(1L)))));
    }

    @Test
    public void testBuildWrite() {
        TestSink testSink = new TestSink() { // from class: org.apache.beam.sdk.io.WriteTest.2
        };
        Write.Bound withNumShards = Write.to(testSink).withNumShards(3);
        Assert.assertEquals(3L, withNumShards.getNumShards());
        Assert.assertThat(withNumShards.getSink(), Matchers.is(testSink));
        Write.Bound withNumShards2 = withNumShards.withNumShards(7);
        Assert.assertEquals(7L, withNumShards2.getNumShards());
        Assert.assertThat(withNumShards2.getSink(), Matchers.is(testSink));
        Assert.assertEquals(3L, withNumShards.getNumShards());
    }

    @Test
    public void testDisplayData() {
        TestSink testSink = new TestSink() { // from class: org.apache.beam.sdk.io.WriteTest.3
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("foo", "bar"));
            }
        };
        DisplayData from = DisplayData.from(Write.to(testSink));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("sink", testSink.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFrom(testSink));
    }

    @Test
    public void testShardedDisplayData() {
        TestSink testSink = new TestSink() { // from class: org.apache.beam.sdk.io.WriteTest.4
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("foo", "bar"));
            }
        };
        DisplayData from = DisplayData.from(Write.to(testSink).withNumShards(1));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("sink", testSink.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFrom(testSink));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("numShards", 1L));
    }

    private static void runWrite(List<String> list, PTransform<PCollection<String>, PCollection<String>> pTransform) {
        runShardedWrite(list, pTransform, Optional.absent());
    }

    private static void runShardedWrite(List<String> list, PTransform<PCollection<String>, PCollection<String>> pTransform, Optional<Integer> optional) {
        WriteOptions writeOptions = (WriteOptions) TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        writeOptions.setTestFlag("test_value");
        Pipeline create = TestPipeline.create(writeOptions);
        sinkContents.clear();
        numShards.set(0);
        recordsPerShard.clear();
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= list.size()) {
                break;
            }
            arrayList.add(Long.valueOf(j2 + 1));
            j = j2 + 1;
        }
        TestSink testSink = new TestSink();
        Write.Bound bound = Write.to(testSink);
        if (optional.isPresent()) {
            bound = bound.withNumShards(((Integer) optional.get()).intValue());
        }
        create.apply(Create.timestamped(list, arrayList).withCoder(StringUtf8Coder.of())).apply(pTransform).apply(bound);
        create.run();
        Assert.assertThat(sinkContents, Matchers.containsInAnyOrder(list.toArray()));
        Assert.assertTrue(testSink.hasCorrectState());
        if (optional.isPresent()) {
            Assert.assertEquals(((Integer) optional.get()).intValue(), numShards.intValue());
            Assert.assertEquals(((Integer) optional.get()).intValue(), recordsPerShard.size());
        }
    }
}
