package org.apache.beam.runners.direct;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/CloningBundleFactoryTest.class */
public class CloningBundleFactoryTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
    private CloningBundleFactory factory = CloningBundleFactory.create();

    /* loaded from: input_file:org/apache/beam/runners/direct/CloningBundleFactoryTest$Record.class */
    static class Record {
        Record() {
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/CloningBundleFactoryTest$RecordNoDecodeCoder.class */
    static class RecordNoDecodeCoder extends AtomicCoder<Record> {
        RecordNoDecodeCoder() {
        }

        public void encode(Record record, OutputStream outputStream) throws IOException {
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Record m209decode(InputStream inputStream) throws IOException {
            throw new CoderException("Decode not allowed");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/CloningBundleFactoryTest$RecordNoEncodeCoder.class */
    static class RecordNoEncodeCoder extends AtomicCoder<Record> {
        RecordNoEncodeCoder() {
        }

        public void encode(Record record, OutputStream outputStream) throws IOException {
            throw new CoderException("Encode not allowed");
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Record m210decode(InputStream inputStream) throws IOException {
            return null;
        }
    }

    @Test
    public void rootBundleSucceedsIgnoresCoder() {
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new Record());
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow(new Record());
        MatcherAssert.assertThat(this.factory.createRootBundle().add(valueInGlobalWindow).add(valueInGlobalWindow2).commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow, valueInGlobalWindow2}));
    }

    @Test
    public void bundleWorkingCoderSucceedsClonesOutput() {
        PCollection coder = this.p.apply(Create.of(1, new Integer[]{3}).withCoder(VarIntCoder.of())).apply(WithKeys.of("foo")).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(KV.of("foo", 1));
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow(KV.of("foo", 3));
        CommittedBundle commit = this.factory.createBundle(coder).add(valueInGlobalWindow).add(valueInGlobalWindow2).commit(Instant.now());
        MatcherAssert.assertThat(commit.getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow, valueInGlobalWindow2}));
        MatcherAssert.assertThat(commit.getElements(), Matchers.not(Matchers.containsInAnyOrder(new Matcher[]{Matchers.theInstance(valueInGlobalWindow), Matchers.theInstance(valueInGlobalWindow2)})));
        Iterator it = commit.getElements().iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat((KV) ((WindowedValue) it.next()).getValue(), Matchers.not(Matchers.anyOf(Matchers.theInstance((KV) valueInGlobalWindow.getValue()), Matchers.theInstance((KV) valueInGlobalWindow2.getValue()))));
        }
        MatcherAssert.assertThat(commit.getPCollection(), Matchers.equalTo(coder));
    }

    @Test
    public void keyedBundleWorkingCoderSucceedsClonesOutput() {
        PCollection apply = this.p.apply(Create.of(1, new Integer[]{3}).withCoder(VarIntCoder.of())).apply(WithKeys.of("foo")).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())).apply(GroupByKey.create());
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(KV.of("foo", ImmutableList.of(1, 3)));
        CommittedBundle commit = this.factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), apply).add(valueInGlobalWindow).commit(Instant.now());
        MatcherAssert.assertThat(commit.getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow}));
        MatcherAssert.assertThat((KV) ((WindowedValue) Iterables.getOnlyElement(commit.getElements())).getValue(), Matchers.not(Matchers.theInstance((KV) valueInGlobalWindow.getValue())));
        MatcherAssert.assertThat(commit.getPCollection(), Matchers.equalTo(apply));
        MatcherAssert.assertThat(commit.getKey(), Matchers.equalTo(StructuralKey.of("foo", StringUtf8Coder.of())));
    }

    @Test
    public void bundleEncodeFailsAddFails() {
        UncommittedBundle createBundle = this.factory.createBundle(this.p.apply(Create.empty(new RecordNoEncodeCoder())));
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("Encode not allowed");
        createBundle.add(WindowedValue.valueInGlobalWindow(new Record()));
    }

    @Test
    public void bundleDecodeFailsAddFails() {
        UncommittedBundle createBundle = this.factory.createBundle(this.p.apply(Create.empty(new RecordNoDecodeCoder())));
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("Decode not allowed");
        createBundle.add(WindowedValue.valueInGlobalWindow(new Record()));
    }

    @Test
    public void keyedBundleEncodeFailsAddFails() {
        UncommittedBundle createKeyedBundle = this.factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), this.p.apply(Create.empty(new RecordNoEncodeCoder())));
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("Encode not allowed");
        createKeyedBundle.add(WindowedValue.valueInGlobalWindow(new Record()));
    }

    @Test
    public void keyedBundleDecodeFailsAddFails() {
        UncommittedBundle createKeyedBundle = this.factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), this.p.apply(Create.empty(new RecordNoDecodeCoder())));
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("Decode not allowed");
        createKeyedBundle.add(WindowedValue.valueInGlobalWindow(new Record()));
    }
}
