/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.history.logging.ats;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
import org.apache.tez.dag.records.TezDAGID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestATSHistoryLoggingService {
    private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class);
    private ATSHistoryLoggingService atsHistoryLoggingService;
    private AppContext appContext;
    private Configuration conf;
    private int atsInvokeCounter;
    private int atsEntitiesCounter;
    private SystemClock clock = new SystemClock();

    @Before
    public void setup() throws Exception {
        this.appContext = (AppContext)Mockito.mock(AppContext.class);
        this.atsHistoryLoggingService = new ATSHistoryLoggingService();
        this.atsHistoryLoggingService.setAppContext(this.appContext);
        this.conf = new Configuration(false);
        this.conf.setLong("tez.yarn.ats.event.flush.timeout.millis", 1000L);
        this.conf.setInt("tez.yarn.ats.max.events.per.batch", 2);
        this.conf.setBoolean("tez.allow.disabled.timeline-domains", true);
        this.atsInvokeCounter = 0;
        this.atsEntitiesCounter = 0;
        this.atsHistoryLoggingService.init(this.conf);
        this.atsHistoryLoggingService.timelineClient = (TimelineClient)Mockito.mock(TimelineClient.class);
        this.atsHistoryLoggingService.start();
        Mockito.when((Object)this.appContext.getClock()).thenReturn((Object)this.clock);
        Mockito.when((Object)this.appContext.getCurrentDAGID()).thenReturn(null);
        Mockito.when((Object)this.atsHistoryLoggingService.timelineClient.putEntities((TimelineEntity[])Matchers.anyVararg())).thenAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                ++TestATSHistoryLoggingService.this.atsInvokeCounter;
                TestATSHistoryLoggingService.this.atsEntitiesCounter += invocation.getArguments().length;
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return null;
            }
        });
    }

    @After
    public void teardown() {
        this.atsHistoryLoggingService.stop();
        this.atsHistoryLoggingService = null;
    }

    @Test(timeout=20000L)
    public void testATSHistoryLoggingServiceShutdown() {
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, (HistoryEvent)new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; ++i) {
            this.atsHistoryLoggingService.handle(historyEvent);
        }
        try {
            Thread.sleep(2500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.atsHistoryLoggingService.stop();
        LOG.info((Object)("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter));
        Assert.assertTrue((this.atsEntitiesCounter >= 4 ? 1 : 0) != 0);
        Assert.assertTrue((this.atsEntitiesCounter < 20 ? 1 : 0) != 0);
    }

    @Test(timeout=20000L)
    public void testATSEventBatching() {
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, (HistoryEvent)new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; ++i) {
            this.atsHistoryLoggingService.handle(historyEvent);
        }
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        LOG.info((Object)("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter));
        Assert.assertTrue((this.atsEntitiesCounter > this.atsInvokeCounter ? 1 : 0) != 0);
        Assert.assertEquals((long)(this.atsEntitiesCounter / 2), (long)this.atsInvokeCounter);
    }

    @Test(timeout=20000L)
    public void testTimelineServiceDisable() throws Exception {
        AppContext appContext1 = (AppContext)Mockito.mock(AppContext.class);
        ATSHistoryLoggingService atsHistoryLoggingService1 = new ATSHistoryLoggingService();
        atsHistoryLoggingService1.setAppContext(this.appContext);
        atsHistoryLoggingService1.timelineClient = (TimelineClient)Mockito.mock(TimelineClient.class);
        Mockito.when((Object)atsHistoryLoggingService1.timelineClient.putEntities((TimelineEntity[])Matchers.anyVararg())).thenAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                ++TestATSHistoryLoggingService.this.atsInvokeCounter;
                TestATSHistoryLoggingService.this.atsEntitiesCounter += invocation.getArguments().length;
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return null;
            }
        });
        this.conf.setBoolean("yarn.timeline-service.enabled", false);
        this.conf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        atsHistoryLoggingService1.init(this.conf);
        atsHistoryLoggingService1.start();
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, (HistoryEvent)new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; ++i) {
            atsHistoryLoggingService1.handle(historyEvent);
        }
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
        LOG.info((Object)("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter));
        Assert.assertEquals((long)this.atsInvokeCounter, (long)0L);
        Assert.assertEquals((long)this.atsEntitiesCounter, (long)0L);
        Assert.assertNull((Object)atsHistoryLoggingService1.timelineClient);
    }
}

