package org.apache.beam.runners.core;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.ActiveWindowSet;
import org.apache.beam.runners.core.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.core.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/MergingActiveWindowSetTest.class */
public class MergingActiveWindowSetTest {
    private Sessions windowFn;
    private StateInternals<String> state;
    private MergingActiveWindowSet<IntervalWindow> set;
    private ActiveWindowSet.MergeCallback<IntervalWindow> callback;

    @Before
    public void setup() {
        this.windowFn = Sessions.withGapDuration(Duration.millis(10L));
        this.state = InMemoryStateInternals.forKey("dummyKey");
        this.set = new MergingActiveWindowSet<>(this.windowFn, this.state);
        this.callback = (ActiveWindowSet.MergeCallback) Mockito.mock(ActiveWindowSet.MergeCallback.class);
    }

    @After
    public void after() {
        this.set = null;
        this.state = null;
        this.windowFn = null;
    }

    private void add(long... jArr) {
        for (long j : jArr) {
            System.out.println("ADD " + j);
            Sessions sessions = this.windowFn;
            sessions.getClass();
            Iterator it = this.windowFn.assignWindows(new WindowFn<Object, IntervalWindow>.AssignContext(sessions, j) { // from class: org.apache.beam.runners.core.MergingActiveWindowSetTest.1
                final /* synthetic */ long val$instant;

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(sessions);
                    this.val$instant = j;
                    sessions.getClass();
                }

                public Object element() {
                    return Long.valueOf(this.val$instant);
                }

                public Instant timestamp() {
                    return new Instant(this.val$instant);
                }

                public BoundedWindow window() {
                    return GlobalWindow.INSTANCE;
                }
            }).iterator();
            while (it.hasNext()) {
                this.set.ensureWindowExists((IntervalWindow) it.next());
            }
        }
    }

    private Map<IntervalWindow, IntervalWindow> merge(List<IntervalWindow> list, IntervalWindow intervalWindow) throws Exception {
        IntervalWindow mergedWriteStateAddress = this.set.mergedWriteStateAddress(list, intervalWindow);
        System.out.println("BEFORE MERGE");
        System.out.println(this.set);
        HashMap hashMap = new HashMap();
        for (IntervalWindow intervalWindow2 : list) {
            System.out.println("WILL MERGE " + intervalWindow2 + " INTO " + intervalWindow);
            hashMap.put(intervalWindow2, intervalWindow);
        }
        System.out.println("AFTER MERGE");
        this.set.merge(this.callback);
        ((ActiveWindowSet.MergeCallback) Mockito.verify(this.callback)).onMerge(list, intervalWindow);
        System.out.println(this.set);
        Assert.assertEquals(mergedWriteStateAddress, this.set.writeStateAddress(intervalWindow));
        return hashMap;
    }

    private void activate(Map<IntervalWindow, IntervalWindow> map, long... jArr) {
        for (long j : jArr) {
            BoundedWindow window = window(j, 10L);
            BoundedWindow boundedWindow = (IntervalWindow) map.get(window);
            if (boundedWindow == null) {
                boundedWindow = window;
            }
            System.out.println("ACTIVATE " + boundedWindow);
            this.set.ensureWindowIsActive(boundedWindow);
        }
        this.set.checkInvariants();
    }

    private void cleanup() {
        System.out.println("CLEANUP");
        this.set.cleanupTemporaryWindows();
        this.set.checkInvariants();
        System.out.println(this.set);
        this.set.persist();
        MergingActiveWindowSet mergingActiveWindowSet = new MergingActiveWindowSet(this.windowFn, this.state);
        mergingActiveWindowSet.checkInvariants();
        Assert.assertEquals(this.set, mergingActiveWindowSet);
    }

    private IntervalWindow window(long j, long j2) {
        return new IntervalWindow(new Instant(j), new Duration(j2));
    }

    @Test
    public void testLifecycle() throws Exception {
        add(1, 2, 15);
        Assert.assertEquals(ImmutableSet.of(window(1L, 10L), window(2L, 10L), window(15L, 10L)), this.set.getActiveAndNewWindows());
        activate(merge(ImmutableList.of(window(1L, 10L), window(2L, 10L)), window(1L, 11L)), 1, 2, 15);
        Assert.assertEquals(ImmutableSet.of(window(1L, 11L), window(15L, 10L)), this.set.getActiveAndNewWindows());
        Assert.assertEquals(ImmutableSet.of(window(1L, 11L)), this.set.readStateAddresses(window(1L, 11L)));
        Assert.assertEquals(ImmutableSet.of(window(15L, 10L)), this.set.readStateAddresses(window(15L, 10L)));
        cleanup();
        add(3);
        Assert.assertEquals(ImmutableSet.of(window(3L, 10L), window(1L, 11L), window(15L, 10L)), this.set.getActiveAndNewWindows());
        activate(merge(ImmutableList.of(window(1L, 11L), window(3L, 10L)), window(1L, 12L)), 3);
        Assert.assertEquals(ImmutableSet.of(window(1L, 12L), window(15L, 10L)), this.set.getActiveAndNewWindows());
        Assert.assertEquals(ImmutableSet.of(window(1L, 11L)), this.set.readStateAddresses(window(1L, 12L)));
        Assert.assertEquals(ImmutableSet.of(window(15L, 10L)), this.set.readStateAddresses(window(15L, 10L)));
        cleanup();
        add(8);
        Assert.assertEquals(ImmutableSet.of(window(8L, 10L), window(1L, 12L), window(15L, 10L)), this.set.getActiveAndNewWindows());
        activate(merge(ImmutableList.of(window(1L, 12L), window(8L, 10L), window(15L, 10L)), window(1L, 24L)), 8);
        Assert.assertEquals(ImmutableSet.of(window(1L, 24L)), this.set.getActiveAndNewWindows());
        Assert.assertEquals(ImmutableSet.of(window(1L, 11L)), this.set.readStateAddresses(window(1L, 24L)));
        cleanup();
        add(9);
        Assert.assertEquals(ImmutableSet.of(window(9L, 10L), window(1L, 24L)), this.set.getActiveAndNewWindows());
        activate(merge(ImmutableList.of(window(1L, 24L), window(9L, 10L)), window(1L, 24L)), 9);
        Assert.assertEquals(ImmutableSet.of(window(1L, 24L)), this.set.getActiveAndNewWindows());
        Assert.assertEquals(ImmutableSet.of(window(1L, 11L)), this.set.readStateAddresses(window(1L, 24L)));
        cleanup();
        add(1);
        Assert.assertEquals(ImmutableSet.of(window(1L, 10L), window(1L, 24L)), this.set.getActiveAndNewWindows());
        activate(merge(ImmutableList.of(window(1L, 10L), window(1L, 24L)), window(1L, 24L)), 1);
        Assert.assertEquals(ImmutableSet.of(window(1L, 24L)), this.set.getActiveAndNewWindows());
        Assert.assertEquals(ImmutableSet.of(window(1L, 11L)), this.set.readStateAddresses(window(1L, 24L)));
        cleanup();
        this.set.remove(window(1L, 24L));
        cleanup();
        Assert.assertTrue(this.set.getActiveAndNewWindows().isEmpty());
    }

    @Test
    public void testLegacyState() {
        this.set.addActiveForTesting(window(1L, 12L), ImmutableList.of(window(1L, 10L), window(2L, 10L), window(3L, 10L)));
        Assert.assertTrue(this.set.isActive(window(1L, 12L)));
        Assert.assertEquals(ImmutableSet.of(window(1L, 10L), window(2L, 10L), window(3L, 10L)), this.set.readStateAddresses(window(1L, 12L)));
        Assert.assertEquals(window(1L, 10L), this.set.mergedWriteStateAddress(ImmutableList.of(window(1L, 10L), window(2L, 10L), window(3L, 10L)), window(1L, 12L)));
        this.set.merged(window(1L, 12L));
        cleanup();
        Assert.assertEquals(ImmutableSet.of(window(1L, 10L)), this.set.readStateAddresses(window(1L, 12L)));
    }
}
