package org.apache.beam.runners.samza.runtime;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.TestSamzaRunner;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.translation.ConfigBuilder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.samza.config.Config;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.class */
public class SamzaStoreStateInternalsTest implements Serializable {

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(new String[]{"--runner=TestSamzaRunner"}).create());

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest$TestStorageEngine.class */
    public static class TestStorageEngine extends InMemoryKeyValueStorageEngineFactory {
        public KeyValueStore<byte[], byte[]> getKVStore(String str, File file, MetricsRegistry metricsRegistry, SystemStreamPartition systemStreamPartition, JobContext jobContext, ContainerContext containerContext, StorageEngineFactory.StoreMode storeMode) {
            return new TestStore(new KeyValueStoreMetrics(str, metricsRegistry));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest$TestStore.class */
    public static class TestStore extends InMemoryKeyValueStore {
        static List<TestKeyValueIteraor> iterators = Collections.synchronizedList(new ArrayList());

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest$TestStore$TestKeyValueIteraor.class */
        public static class TestKeyValueIteraor implements KeyValueIterator<byte[], byte[]> {
            private final KeyValueIterator<byte[], byte[]> iter;
            boolean closed = false;

            TestKeyValueIteraor(KeyValueIterator<byte[], byte[]> keyValueIterator) {
                this.iter = keyValueIterator;
            }

            public void close() {
                this.iter.close();
                this.closed = true;
            }

            public boolean hasNext() {
                return this.iter.hasNext();
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public Entry<byte[], byte[]> m12next() {
                return (Entry) this.iter.next();
            }
        }

        public TestStore(KeyValueStoreMetrics keyValueStoreMetrics) {
            super(keyValueStoreMetrics);
        }

        public KeyValueIterator<byte[], byte[]> range(byte[] bArr, byte[] bArr2) {
            TestKeyValueIteraor testKeyValueIteraor = new TestKeyValueIteraor(super.range(bArr, bArr2));
            iterators.add(testKeyValueIteraor);
            return testKeyValueIteraor;
        }
    }

    @Test
    public void testMapStateIterator() {
        PAssert.that(this.pipeline.apply(Create.of(KV.of("hello", KV.of("a", 97)), new KV[]{KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12))})).apply(ParDo.of(new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternalsTest.1

            @DoFn.StateId("foo")
            private final StateSpec<MapState<String, Integer>> mapState = StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());

            @DoFn.StateId("count")
            private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers());

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>.ProcessContext processContext, @DoFn.StateId("foo") MapState<String, Integer> mapState, @DoFn.StateId("count") CombiningState<Integer, int[], Integer> combiningState) {
                SamzaMapState samzaMapState = (SamzaMapState) mapState;
                KV kv = (KV) ((KV) processContext.element()).getValue();
                samzaMapState.put((String) kv.getKey(), (Integer) kv.getValue());
                combiningState.add(1);
                if (((Integer) combiningState.read()).intValue() >= 4) {
                    ArrayList arrayList = new ArrayList();
                    Iterator it = (Iterator) samzaMapState.readIterator().read();
                    while (it.hasNext()) {
                        Map.Entry entry = (Map.Entry) it.next();
                        arrayList.add(KV.of((String) entry.getKey(), (Integer) entry.getValue()));
                        processContext.output(KV.of((String) entry.getKey(), (Integer) entry.getValue()));
                    }
                    Assert.assertEquals(arrayList, ImmutableList.of(KV.of("a", 97), KV.of("b", 42), KV.of("c", 12)));
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("a", 97), KV.of("b", 42), KV.of("c", 12)});
        this.pipeline.run();
    }

    @Test
    public void testSetStateIterator() {
        PAssert.that(this.pipeline.apply(Create.of(KV.of("hello", 97), new KV[]{KV.of("hello", 42), KV.of("hello", 42), KV.of("hello", 12)})).apply(ParDo.of(new DoFn<KV<String, Integer>, Set<Integer>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternalsTest.2

            @DoFn.StateId("foo")
            private final StateSpec<SetState<Integer>> setState = StateSpecs.set(VarIntCoder.of());

            @DoFn.StateId("count")
            private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers());

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Integer>, Set<Integer>>.ProcessContext processContext, @DoFn.StateId("foo") SetState<Integer> setState, @DoFn.StateId("count") CombiningState<Integer, int[], Integer> combiningState) {
                SamzaSetState samzaSetState = (SamzaSetState) setState;
                ReadableState isEmpty = samzaSetState.isEmpty();
                samzaSetState.add((Integer) ((KV) processContext.element()).getValue());
                Assert.assertFalse(((Boolean) isEmpty.read()).booleanValue());
                combiningState.add(1);
                if (((Integer) combiningState.read()).intValue() >= 4) {
                    HashSet hashSet = new HashSet();
                    Iterator it = (Iterator) samzaSetState.readIterator().read();
                    while (it.hasNext()) {
                        hashSet.add((Integer) it.next());
                    }
                    processContext.output(hashSet);
                    Assert.assertEquals(hashSet, Sets.newHashSet(new Integer[]{97, 42, 12}));
                }
            }
        }))).containsInAnyOrder(new Set[]{Sets.newHashSet(new Integer[]{97, 42, 12})});
        this.pipeline.run();
    }

    @Test
    public void testIteratorClosed() {
        this.pipeline.apply(Create.of(KV.of("hello", 97), new KV[]{KV.of("hello", 42), KV.of("hello", 42), KV.of("hello", 12)})).apply(ParDo.of(new DoFn<KV<String, Integer>, Set<Integer>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternalsTest.3

            @DoFn.StateId("foo")
            private final StateSpec<SetState<Integer>> setState = StateSpecs.set(VarIntCoder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Integer>, Set<Integer>>.ProcessContext processContext, @DoFn.StateId("foo") SetState<Integer> setState) {
                SamzaSetState samzaSetState = (SamzaSetState) setState;
                samzaSetState.add((Integer) ((KV) processContext.element()).getValue());
                if (Iterators.size((Iterator) samzaSetState.readIterator().read()) > 1) {
                    Iterator it = (Iterator) samzaSetState.readIterator().read();
                    Assert.assertTrue(it.hasNext());
                    it.next();
                }
            }
        }));
        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class).setRunner(TestSamzaRunner.class);
        HashMap hashMap = new HashMap(ConfigBuilder.localRunConfig());
        hashMap.put("stores.foo.factory", TestStorageEngine.class.getName());
        this.pipeline.getOptions().as(SamzaPipelineOptions.class).setConfigOverride(hashMap);
        this.pipeline.run();
        Assert.assertEquals(8L, TestStore.iterators.size());
        TestStore.iterators.forEach(testKeyValueIteraor -> {
            Assert.assertTrue(testKeyValueIteraor.closed);
        });
    }

    @Test
    public void testStateValueSerde() throws IOException {
        Serde serde = new SamzaStoreStateInternals.StateValueSerdeFactory().getSerde("Test", (Config) null);
        VarIntCoder of = VarIntCoder.of();
        byte[] bytes = serde.toBytes(SamzaStoreStateInternals.StateValue.of(123, of));
        SamzaStoreStateInternals.StateValue stateValue = (SamzaStoreStateInternals.StateValue) serde.fromBytes(bytes);
        SamzaStoreStateInternals.StateValue of2 = SamzaStoreStateInternals.StateValue.of(bytes);
        Assert.assertEquals(((Integer) stateValue.getValue(of)).intValue(), 123);
        Assert.assertEquals(((Integer) of2.getValue(of)).intValue(), 123);
        byte[] bytes2 = serde.toBytes(SamzaStoreStateInternals.StateValue.of((Object) null, of));
        SamzaStoreStateInternals.StateValue stateValue2 = (SamzaStoreStateInternals.StateValue) serde.fromBytes(bytes2);
        Assert.assertNull(bytes2);
        Assert.assertNull(stateValue2.getValue(of));
    }
}
