package org.apache.beam.runners.direct;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.class */
public class EncodabilityEnforcementFactoryTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create();
    private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
    private PCollection<Record> inputPCollection;
    private DirectRunner.CommittedBundle<Record> inputBundle;
    private PCollection<Record> outputPCollection;

    /* loaded from: input_file:org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest$IdentityDoFn.class */
    private static class IdentityDoFn extends DoFn<Record, Record> {
        private IdentityDoFn() {
        }

        @DoFn.ProcessElement
        public void proc(DoFn<Record, Record>.ProcessContext processContext) {
            processContext.output(processContext.element());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest$Record.class */
    public static class Record {
        private Record() {
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest$RecordNoDecodeCoder.class */
    private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
        private RecordNoDecodeCoder() {
        }

        public void encode(Record record, OutputStream outputStream, Coder.Context context) throws IOException {
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Record m6decode(InputStream inputStream, Coder.Context context) throws IOException {
            throw new CoderException("Decode not allowed");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest$RecordNoEncodeCoder.class */
    private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
        private RecordNoEncodeCoder() {
        }

        public void encode(Record record, OutputStream outputStream, Coder.Context context) throws IOException {
            throw new CoderException("Encode not allowed");
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Record m7decode(InputStream inputStream, Coder.Context context) throws IOException {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest$RecordNotConsistentWithEqualsStructuralValueCoder.class */
    private static class RecordNotConsistentWithEqualsStructuralValueCoder extends AtomicCoder<Record> {
        private RecordNotConsistentWithEqualsStructuralValueCoder() {
        }

        public void encode(Record record, OutputStream outputStream, Coder.Context context) throws CoderException, IOException {
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Record m8decode(InputStream inputStream, Coder.Context context) throws CoderException, IOException {
            return new Record() { // from class: org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.RecordNotConsistentWithEqualsStructuralValueCoder.1
                public String toString() {
                    return "DecodedRecord";
                }
            };
        }

        public boolean consistentWithEquals() {
            return false;
        }

        public Object structuralValue(Record record) {
            return record;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest$RecordStructuralValueCoder.class */
    private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
        private RecordStructuralValueCoder() {
        }

        public void encode(Record record, OutputStream outputStream, Coder.Context context) throws CoderException, IOException {
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Record m9decode(InputStream inputStream, Coder.Context context) throws CoderException, IOException {
            return new Record() { // from class: org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.RecordStructuralValueCoder.1
                public String toString() {
                    return "DecodedRecord";
                }
            };
        }

        public boolean consistentWithEquals() {
            return true;
        }

        public Object structuralValue(Record record) {
            return record;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest$SimpleIdentity.class */
    public static class SimpleIdentity extends SimpleFunction<Record, Record> {
        private SimpleIdentity() {
        }

        public Record apply(Record record) {
            return record;
        }
    }

    @Before
    public void setup() {
        this.inputPCollection = TestPipeline.create().apply(Create.of(new Record[]{new Record()}).withCoder(new RecordNoDecodeCoder()));
        this.outputPCollection = this.inputPCollection.apply(ParDo.of(new IdentityDoFn()));
        this.inputBundle = this.bundleFactory.createRootBundle().add(WindowedValue.valueInGlobalWindow(new Record())).commit(Instant.now());
    }

    @Test
    public void encodeFailsThrows() {
        WindowedValue<Record> valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new Record());
        ModelEnforcement<Record> createEnforcement = createEnforcement(new RecordNoEncodeCoder(), valueInGlobalWindow);
        DirectRunner.UncommittedBundle add = this.bundleFactory.createBundle(this.outputPCollection).add(valueInGlobalWindow);
        createEnforcement.beforeElement(valueInGlobalWindow);
        createEnforcement.afterElement(valueInGlobalWindow);
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("Encode not allowed");
        createEnforcement.afterFinish(this.inputBundle, StepTransformResult.withoutHold(this.outputPCollection.getProducingTransformInternal()).addOutput(add, new DirectRunner.UncommittedBundle[0]).build(), Collections.singleton(add.commit(Instant.now())));
    }

    @Test
    public void decodeFailsThrows() {
        WindowedValue<Record> valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new Record());
        ModelEnforcement<Record> createEnforcement = createEnforcement(new RecordNoDecodeCoder(), valueInGlobalWindow);
        DirectRunner.UncommittedBundle add = this.bundleFactory.createBundle(this.outputPCollection).add(valueInGlobalWindow);
        createEnforcement.beforeElement(valueInGlobalWindow);
        createEnforcement.afterElement(valueInGlobalWindow);
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("Decode not allowed");
        createEnforcement.afterFinish(this.inputBundle, StepTransformResult.withoutHold(this.outputPCollection.getProducingTransformInternal()).addOutput(add, new DirectRunner.UncommittedBundle[0]).build(), Collections.singleton(add.commit(Instant.now())));
    }

    @Test
    public void consistentWithEqualsStructuralValueNotEqualThrows() {
        WindowedValue<Record> valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new Record() { // from class: org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.1
            public String toString() {
                return "OriginalRecord";
            }
        });
        ModelEnforcement<Record> createEnforcement = createEnforcement(new RecordStructuralValueCoder(), valueInGlobalWindow);
        DirectRunner.UncommittedBundle add = this.bundleFactory.createBundle(this.outputPCollection).add(valueInGlobalWindow);
        createEnforcement.beforeElement(valueInGlobalWindow);
        createEnforcement.afterElement(valueInGlobalWindow);
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(IllegalArgumentException.class));
        this.thrown.expectMessage("does not maintain structural value equality");
        this.thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName());
        this.thrown.expectMessage("OriginalRecord");
        createEnforcement.afterFinish(this.inputBundle, StepTransformResult.withoutHold(this.outputPCollection.getProducingTransformInternal()).addOutput(add, new DirectRunner.UncommittedBundle[0]).build(), Collections.singleton(add.commit(Instant.now())));
    }

    @Test
    public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() {
        this.outputPCollection.setCoder(new RecordNotConsistentWithEqualsStructuralValueCoder());
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new Record());
        ModelEnforcement forBundle = this.factory.forBundle(this.inputBundle, this.outputPCollection.getProducingTransformInternal());
        DirectRunner.UncommittedBundle add = this.bundleFactory.createBundle(this.outputPCollection).add(valueInGlobalWindow);
        forBundle.beforeElement(valueInGlobalWindow);
        forBundle.afterElement(valueInGlobalWindow);
        forBundle.afterFinish(this.inputBundle, StepTransformResult.withoutHold(this.outputPCollection.getProducingTransformInternal()).addOutput(add, new DirectRunner.UncommittedBundle[0]).build(), Collections.singleton(add.commit(Instant.now())));
    }

    private ModelEnforcement<Record> createEnforcement(Coder<Record> coder, WindowedValue<Record> windowedValue) {
        PCollection apply = TestPipeline.create().apply(Create.of(new Record[0]).withCoder(coder));
        this.outputPCollection = apply.apply(MapElements.via(new SimpleIdentity()));
        AppliedPTransform producingTransformInternal = this.outputPCollection.getProducingTransformInternal();
        return this.factory.forBundle(this.bundleFactory.createBundle(apply).add(windowedValue).commit(Instant.now()), producingTransformInternal);
    }

    @Test
    public void structurallyEqualResultsSucceeds() {
        PCollection apply = TestPipeline.create().apply(Create.of(new Integer[]{1}).withCoder(VarIntCoder.of()));
        AppliedPTransform producingTransformInternal = apply.apply(Count.globally()).getProducingTransformInternal();
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(1);
        DirectRunner.CommittedBundle commit = this.bundleFactory.createBundle(apply).add(valueInGlobalWindow).commit(Instant.now());
        ModelEnforcement forBundle = this.factory.forBundle(commit, producingTransformInternal);
        forBundle.beforeElement(valueInGlobalWindow);
        forBundle.afterElement(valueInGlobalWindow);
        forBundle.afterFinish(commit, StepTransformResult.withoutHold(producingTransformInternal).build(), Collections.emptyList());
    }
}
