package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.TaskContext;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
import org.apache.samza.storage.kv.RocksDbKeyValueStore;
import org.apache.samza.storage.kv.SerializedKeyValueStore;
import org.apache.samza.storage.kv.SerializedKeyValueStoreMetrics;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.WriteOptions;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.class */
public class SamzaTimerInternalsFactoryTest {

    @Rule
    public transient TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest$TestTimerRegistry.class */
    private static class TestTimerRegistry implements Scheduler<KeyedTimerData<String>> {
        private final List<KeyedTimerData<String>> timers;

        private TestTimerRegistry() {
            this.timers = new ArrayList();
        }

        public void schedule(KeyedTimerData<String> keyedTimerData, long j) {
            this.timers.add(keyedTimerData);
        }

        public void delete(KeyedTimerData<String> keyedTimerData) {
            this.timers.remove(keyedTimerData);
        }
    }

    private KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore() {
        Options options = new Options();
        options.setCreateIfMissing(true);
        return new SerializedKeyValueStore(new RocksDbKeyValueStore(this.temporaryFolder.getRoot(), options, new MapConfig(), false, "beamStore", new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("beamStore", new MetricsRegistryMap())), new SamzaStoreStateInternals.ByteArraySerdeFactory.ByteArraySerde(), new SamzaStoreStateInternals.StateValueSerdeFactory.StateValueSerde(), new SerializedKeyValueStoreMetrics("beamStore", new MetricsRegistryMap()));
    }

    private static SamzaStoreStateInternals.Factory<?> createNonKeyedStateInternalsFactory(SamzaPipelineOptions samzaPipelineOptions, KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> keyValueStore) {
        TaskContext taskContext = (TaskContext) Mockito.mock(TaskContext.class);
        Mockito.when(taskContext.getStore(ArgumentMatchers.anyString())).thenReturn(keyValueStore);
        return SamzaStoreStateInternals.createNonKeyedStateInternalsFactory("42", taskContext, samzaPipelineOptions);
    }

    private static SamzaTimerInternalsFactory<String> createTimerInternalsFactory(Scheduler<KeyedTimerData<String>> scheduler, String str, SamzaPipelineOptions samzaPipelineOptions, KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> keyValueStore) {
        return SamzaTimerInternalsFactory.createTimerInternalFactory(StringUtf8Coder.of(), scheduler, str, createNonKeyedStateInternalsFactory(samzaPipelineOptions, keyValueStore), WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED, samzaPipelineOptions);
    }

    @Test
    public void testEventTimeTimers() {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        TimerInternals.TimerData of = TimerInternals.TimerData.of("timer1", global, new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME);
        timerInternalsForKey.setTimer(of);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of("timer2", global, new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME);
        timerInternalsForKey.setTimer(of2);
        createTimerInternalsFactory.setInputWatermark(new Instant(5L));
        Assert.assertTrue(createTimerInternalsFactory.removeReadyTimers().isEmpty());
        createTimerInternalsFactory.setInputWatermark(new Instant(20L));
        Collection removeReadyTimers = createTimerInternalsFactory.removeReadyTimers();
        Assert.assertEquals(1L, removeReadyTimers.size());
        Assert.assertEquals(of, ((KeyedTimerData) removeReadyTimers.iterator().next()).getTimerData());
        createTimerInternalsFactory.setInputWatermark(new Instant(150L));
        Collection removeReadyTimers2 = createTimerInternalsFactory.removeReadyTimers();
        Assert.assertEquals(1L, removeReadyTimers2.size());
        Assert.assertEquals(of2, ((KeyedTimerData) removeReadyTimers2.iterator().next()).getTimerData());
        createStore.close();
    }

    @Test
    public void testRestoreEventBufferSize() throws Exception {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        createTimerInternalsFactory.timerInternalsForKey("testKey").setTimer(TimerInternals.TimerData.of("timer1", global, new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME));
        createStore.close();
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore2 = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory2 = createTimerInternalsFactory(null, "timer", as, createStore2);
        Assert.assertEquals(1L, createTimerInternalsFactory2.getEventTimeBuffer().size());
        createTimerInternalsFactory2.setInputWatermark(new Instant(150L));
        Assert.assertEquals(1L, createTimerInternalsFactory2.removeReadyTimers().size());
        Assert.assertTrue(createTimerInternalsFactory2.getEventTimeBuffer().isEmpty());
        TimerInternals timerInternalsForKey = createTimerInternalsFactory2.timerInternalsForKey("testKey");
        TimerInternals.TimerData of = TimerInternals.TimerData.of("timer2", global, new Instant(200L), new Instant(200L), TimeDomain.EVENT_TIME);
        timerInternalsForKey.setTimer(of);
        Assert.assertEquals(1L, createTimerInternalsFactory2.getEventTimeBuffer().size());
        Assert.assertEquals(0L, createTimerInternalsFactory2.removeReadyTimers().size());
        createTimerInternalsFactory2.setInputWatermark(new Instant(250L));
        Collection removeReadyTimers = createTimerInternalsFactory2.removeReadyTimers();
        Assert.assertEquals(1L, removeReadyTimers.size());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        StringUtf8Coder.of().encode("testKey", byteArrayOutputStream);
        Assert.assertEquals(removeReadyTimers, Arrays.asList(new KeyedTimerData(byteArrayOutputStream.toByteArray(), "testKey", of)));
        createStore2.close();
    }

    @Test
    public void testRestore() throws Exception {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        TimerInternals.TimerData of = TimerInternals.TimerData.of("timer1", global, new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME);
        timerInternalsForKey.setTimer(of);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of("timer2", global, new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME);
        timerInternalsForKey.setTimer(of2);
        createStore.close();
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore2 = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory2 = createTimerInternalsFactory(null, "timer", as, createStore2);
        createTimerInternalsFactory2.setInputWatermark(new Instant(150L));
        Collection removeReadyTimers = createTimerInternalsFactory2.removeReadyTimers();
        Assert.assertEquals(2L, removeReadyTimers.size());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        StringUtf8Coder.of().encode("testKey", byteArrayOutputStream);
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        Assert.assertEquals(removeReadyTimers, Arrays.asList(new KeyedTimerData(byteArray, "testKey", of), new KeyedTimerData(byteArray, "testKey", of2)));
        createStore2.close();
    }

    @Test
    public void testProcessingTimeTimers() throws IOException {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(new TestTimerRegistry(), "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        TimerInternals.TimerData of = TimerInternals.TimerData.of("timer1", global, new Instant(10L), new Instant(10L), TimeDomain.PROCESSING_TIME);
        timerInternalsForKey.setTimer(of);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of("timer2", global, new Instant(100L), new Instant(100L), TimeDomain.PROCESSING_TIME);
        timerInternalsForKey.setTimer(of2);
        TimerInternals.TimerData of3 = TimerInternals.TimerData.of("timer3", "timerFamilyId3", global, new Instant(100L), new Instant(100L), TimeDomain.PROCESSING_TIME);
        timerInternalsForKey.setTimer(of3);
        Assert.assertEquals(3L, r0.timers.size());
        createStore.close();
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore2 = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory2 = createTimerInternalsFactory(new TestTimerRegistry(), "timer", as, createStore2);
        Assert.assertEquals(3L, r0.timers.size());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        StringUtf8Coder.of().encode("testKey", byteArrayOutputStream);
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        createTimerInternalsFactory2.removeProcessingTimer(new KeyedTimerData(byteArray, "testKey", of));
        createTimerInternalsFactory2.removeProcessingTimer(new KeyedTimerData(byteArray, "testKey", of2));
        createTimerInternalsFactory2.removeProcessingTimer(new KeyedTimerData(byteArray, "testKey", of3));
        createStore2.close();
    }

    @Test
    public void testOverride() {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        timerInternalsForKey.setTimer(TimerInternals.TimerData.of("timerId", global, new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME));
        timerInternalsForKey.setTimer(TimerInternals.TimerData.of("timerId", global, new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME));
        timerInternalsForKey.setTimer(TimerInternals.TimerData.of("timerId2", global, new Instant(200L), new Instant(200L), TimeDomain.EVENT_TIME));
        createTimerInternalsFactory.setInputWatermark(new Instant(50L));
        Assert.assertEquals(0L, createTimerInternalsFactory.removeReadyTimers().size());
        createTimerInternalsFactory.setInputWatermark(new Instant(150L));
        Assert.assertEquals(1L, createTimerInternalsFactory.removeReadyTimers().size());
        createTimerInternalsFactory.setInputWatermark(new Instant(250L));
        Assert.assertEquals(1L, createTimerInternalsFactory.removeReadyTimers().size());
        createStore.close();
    }

    @Test
    public void testMaxExpiredEventTimersProcessAtOnce() {
        testMaxExpiredEventTimersProcessAtOnce(10, 10, 5, 5);
        testMaxExpiredEventTimersProcessAtOnce(10, 10, 10, 10);
        testMaxExpiredEventTimersProcessAtOnce(10, 10, 20, 10);
    }

    private void testMaxExpiredEventTimersProcessAtOnce(int i, int i2, int i3, int i4) {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        as.setMaxReadyTimersToProcessOnce(i3);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        for (int i5 = 0; i5 < i; i5++) {
            timerInternalsForKey.setTimer(TimerInternals.TimerData.of("timer" + i5, global, new Instant(i5), new Instant(i5), TimeDomain.EVENT_TIME));
        }
        createTimerInternalsFactory.setInputWatermark(new Instant(i2));
        Assert.assertEquals(i4, createTimerInternalsFactory.removeReadyTimers().size());
        createStore.close();
    }

    @Test
    public void testBufferSizeNotExceedingPipelineOptionValue() {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        as.setEventTimerBufferSize(2);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        for (int i = 0; i < 5; i++) {
            timerInternalsForKey.setTimer(global, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
        }
        Assert.assertEquals(2L, createTimerInternalsFactory.getEventTimeBuffer().size());
        createStore.close();
    }

    @Test
    public void testAllTimersAreFiredWithReload() {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        as.setEventTimerBufferSize(2);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        for (int i = 0; i < 3; i++) {
            timerInternalsForKey.setTimer(global, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
        }
        createTimerInternalsFactory.setInputWatermark(new Instant(3L));
        Assert.assertEquals(3L, createTimerInternalsFactory.removeReadyTimers().size());
        createStore.close();
    }

    @Test
    public void testAllTimersAreFiredInOrder() {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        as.setEventTimerBufferSize(5);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        for (int i = 0; i < 8; i++) {
            timerInternalsForKey.setTimer(global, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
        }
        createTimerInternalsFactory.setInputWatermark(new Instant(1L));
        long j = 0;
        Iterator it = createTimerInternalsFactory.removeReadyTimers().iterator();
        while (it.hasNext()) {
            long millis = ((KeyedTimerData) it.next()).getTimerData().getTimestamp().getMillis();
            Assert.assertTrue(j <= millis);
            j = millis;
        }
        Assert.assertEquals(2L, r0.size());
        for (int i2 = 8; i2 < 20; i2++) {
            timerInternalsForKey.setTimer(global, "timer" + i2, "", new Instant(i2), new Instant(i2), TimeDomain.EVENT_TIME);
        }
        createTimerInternalsFactory.setInputWatermark(new Instant(20L));
        long j2 = 0;
        Iterator it2 = createTimerInternalsFactory.removeReadyTimers().iterator();
        while (it2.hasNext()) {
            long millis2 = ((KeyedTimerData) it2.next()).getTimerData().getTimestamp().getMillis();
            Assert.assertTrue(j2 <= millis2);
            j2 = millis2;
        }
        Assert.assertEquals(18L, r0.size());
        createStore.close();
    }

    @Test
    public void testNewTimersAreInsertedInOrder() {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        as.setEventTimerBufferSize(5);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        for (int i = 0; i < 10; i++) {
            timerInternalsForKey.setTimer(global, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
        }
        createTimerInternalsFactory.setInputWatermark(new Instant(1L));
        long j = 0;
        Iterator it = createTimerInternalsFactory.removeReadyTimers().iterator();
        while (it.hasNext()) {
            long millis = ((KeyedTimerData) it.next()).getTimerData().getTimestamp().getMillis();
            Assert.assertTrue(j <= millis);
            j = millis;
        }
        Assert.assertEquals(2L, r0.size());
        for (int i2 = 0; i2 < 3; i2++) {
            timerInternalsForKey.setTimer(global, "timer" + i2, "", new Instant(i2), new Instant(i2), TimeDomain.EVENT_TIME);
        }
        createTimerInternalsFactory.setInputWatermark(new Instant(5L));
        long j2 = 0;
        Iterator it2 = createTimerInternalsFactory.removeReadyTimers().iterator();
        while (it2.hasNext()) {
            long millis2 = ((KeyedTimerData) it2.next()).getTimerData().getTimestamp().getMillis();
            Assert.assertTrue(j2 <= millis2);
            j2 = millis2;
        }
        Assert.assertEquals(6L, r0.size());
        Assert.assertEquals(4L, createTimerInternalsFactory.getEventTimeBuffer().size());
        createTimerInternalsFactory.setInputWatermark(new Instant(10L));
        Iterator it3 = createTimerInternalsFactory.removeReadyTimers().iterator();
        while (it3.hasNext()) {
            long millis3 = ((KeyedTimerData) it3.next()).getTimerData().getTimestamp().getMillis();
            Assert.assertTrue(j2 <= millis3);
            j2 = millis3;
        }
        Assert.assertEquals(4L, r0.size());
        Assert.assertEquals(0L, createTimerInternalsFactory.getEventTimeBuffer().size());
        createStore.close();
    }

    @Test
    public void testBufferRefilledAfterRestoreToNonFullState() {
        SamzaPipelineOptions as = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        as.setEventTimerBufferSize(5);
        KeyValueStore<SamzaStoreStateInternals.ByteArray, SamzaStoreStateInternals.StateValue<?>> createStore = createStore();
        SamzaTimerInternalsFactory<String> createTimerInternalsFactory = createTimerInternalsFactory(null, "timer", as, createStore);
        StateNamespace global = StateNamespaces.global();
        TimerInternals timerInternalsForKey = createTimerInternalsFactory.timerInternalsForKey("testKey");
        for (int i = 0; i < 6; i++) {
            timerInternalsForKey.setTimer(global, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
        }
        createTimerInternalsFactory.setInputWatermark(new Instant(4L));
        Assert.assertEquals(5L, createTimerInternalsFactory.removeReadyTimers().size());
        Assert.assertEquals(1L, createTimerInternalsFactory.getEventTimeBuffer().size());
        for (int i2 = 6; i2 < 13; i2++) {
            timerInternalsForKey.setTimer(global, "timer" + i2, "", new Instant(i2), new Instant(i2), TimeDomain.EVENT_TIME);
        }
        Assert.assertEquals(5L, createTimerInternalsFactory.getEventTimeBuffer().size());
        createTimerInternalsFactory.setInputWatermark(new Instant(10L));
        long j = 0;
        Iterator it = createTimerInternalsFactory.removeReadyTimers().iterator();
        while (it.hasNext()) {
            long millis = ((KeyedTimerData) it.next()).getTimerData().getTimestamp().getMillis();
            Assert.assertTrue(j <= millis);
            j = millis;
        }
        Assert.assertEquals(6L, r0.size());
        Assert.assertEquals(2L, createTimerInternalsFactory.getEventTimeBuffer().size());
        createStore.close();
    }

    @Test
    public void testByteArray() {
        SamzaStoreStateInternals.ByteArray of = SamzaStoreStateInternals.ByteArray.of("hello world".getBytes(StandardCharsets.UTF_8));
        Serde serde = new SamzaStoreStateInternals.ByteArraySerdeFactory().getSerde("", (Config) null);
        SamzaStoreStateInternals.ByteArray byteArray = (SamzaStoreStateInternals.ByteArray) serde.fromBytes(serde.toBytes(of));
        Assert.assertEquals(of, byteArray);
        HashMap hashMap = new HashMap();
        hashMap.put(of, "found it");
        Assert.assertEquals("found it", hashMap.get(byteArray));
        hashMap.remove(of);
        Assert.assertTrue(!hashMap.containsKey(byteArray));
        Assert.assertTrue(hashMap.isEmpty());
    }
}
