package co.cask.cdap.runtime;

import co.cask.cdap.DummyAppWithTrackingTable;
import co.cask.cdap.TrackingTable;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.common.discovery.RandomEndpointStrategy;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.namespace.NamespaceAdmin;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.common.stream.StreamEventCodec;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.queue.QueueEntry;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.DefaultId;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduceUsingRuntimeDatasets;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.test.XSlowTests;
import com.google.common.base.Charsets;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.filesystem.Location;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/runtime/OpenCloseDataSetTest.class */
public class OpenCloseDataSetTest {
    private static Location namespaceHomeLocation;

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final Supplier<File> TEMP_FOLDER_SUPPLIER = new Supplier<File>() { // from class: co.cask.cdap.runtime.OpenCloseDataSetTest.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public File m95get() {
            try {
                return OpenCloseDataSetTest.TEMP_FOLDER.newFolder();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    };

    @BeforeClass
    public static void setup() throws Exception {
        namespaceHomeLocation = ((NamespacedLocationFactory) AppFabricTestHelper.getInjector().getInstance(NamespacedLocationFactory.class)).get(DefaultId.NAMESPACE.toId());
        ((NamespaceAdmin) AppFabricTestHelper.getInjector().getInstance(NamespaceAdmin.class)).create(new NamespaceMeta.Builder().setName(DefaultId.NAMESPACE).build());
        Locations.mkdirsIfNotExists(namespaceHomeLocation);
    }

    @Test(timeout = 120000)
    public void testDataSetsAreClosed() throws Exception {
        TrackingTable.resetTracker();
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(DummyAppWithTrackingTable.class, TEMP_FOLDER_SUPPLIER);
        ArrayList newArrayList = Lists.newArrayList();
        for (ProgramDescriptor programDescriptor : deployApplicationWithManager.getPrograms()) {
            if (!programDescriptor.getProgramId().getType().equals(ProgramType.MAPREDUCE)) {
                newArrayList.add(AppFabricTestHelper.submit(deployApplicationWithManager, programDescriptor.getSpecification().getClassName(), new BasicArguments(), TEMP_FOLDER_SUPPLIER));
            }
        }
        TransactionSystemClient transactionSystemClient = (TransactionSystemClient) AppFabricTestHelper.getInjector().getInstance(TransactionSystemClient.class);
        TransactionAware createProducer = ((QueueClientFactory) AppFabricTestHelper.getInjector().getInstance(QueueClientFactory.class)).createProducer(QueueName.fromStream(deployApplicationWithManager.getApplicationId().getNamespace(), "xx"));
        Transaction startShort = transactionSystemClient.startShort();
        createProducer.startTx(startShort);
        StreamEventCodec streamEventCodec = new StreamEventCodec();
        for (int i = 0; i < 4; i++) {
            createProducer.enqueue(new QueueEntry(streamEventCodec.encodePayload(new StreamEvent(ImmutableMap.of(), ByteBuffer.wrap((AppWithMapReduceUsingRuntimeDatasets.FileMapper.ONLY_KEY + i).getBytes(Charsets.UTF_8))))));
        }
        createProducer.commitTx();
        transactionSystemClient.commit(startShort);
        while (TrackingTable.getTracker("foo", "write") < 4) {
            TimeUnit.MILLISECONDS.sleep(50L);
        }
        Assert.assertEquals(4L, TrackingTable.getTracker("foo", "write"));
        Assert.assertEquals(2L, TrackingTable.getTracker("foo", "open"));
        Gson gson = new Gson();
        Discoverable pick = new RandomEndpointStrategy(((DiscoveryServiceClient) AppFabricTestHelper.getInjector().getInstance(DiscoveryServiceClient.class)).discover(String.format("service.%s.%s.%s", DefaultId.NAMESPACE.getEntityName(), "dummy", "DummyService"))).pick(5L, TimeUnit.SECONDS);
        Assert.assertNotNull(pick);
        DefaultHttpClient defaultHttpClient = new DefaultHttpClient();
        String str = (String) gson.fromJson(new InputStreamReader(defaultHttpClient.execute(new HttpGet(String.format("http://%s:%d/v3/namespaces/default/apps/%s/services/%s/methods/%s", pick.getSocketAddress().getHostName(), Integer.valueOf(pick.getSocketAddress().getPort()), "dummy", "DummyService", "x1"))).getEntity().getContent(), Charsets.UTF_8), String.class);
        defaultHttpClient.getConnectionManager().shutdown();
        Assert.assertEquals("x1", str);
        Assert.assertEquals(1L, TrackingTable.getTracker("foo", "read"));
        Assert.assertEquals(3L, TrackingTable.getTracker("foo", "open"));
        Assert.assertEquals(1L, TrackingTable.getTracker("foo", "close"));
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            ((ProgramController) it.next()).stop().get();
        }
        int tracker = TrackingTable.getTracker("foo", "open");
        Assert.assertTrue(tracker >= 2);
        Assert.assertEquals(tracker, TrackingTable.getTracker("foo", "close"));
        ProgramController programController = null;
        for (ProgramDescriptor programDescriptor2 : deployApplicationWithManager.getPrograms()) {
            if (programDescriptor2.getProgramId().getType().equals(ProgramType.MAPREDUCE)) {
                programController = AppFabricTestHelper.submit(deployApplicationWithManager, programDescriptor2.getSpecification().getClassName(), new BasicArguments(), TEMP_FOLDER_SUPPLIER);
            }
        }
        Assert.assertNotNull(programController);
        while (!programController.getState().equals(ProgramController.State.COMPLETED)) {
            TimeUnit.MILLISECONDS.sleep(100L);
        }
        Assert.assertTrue(tracker < TrackingTable.getTracker("foo", "open"));
        Assert.assertEquals(TrackingTable.getTracker("foo", "open"), TrackingTable.getTracker("foo", "close"));
        Assert.assertTrue(0 < TrackingTable.getTracker("bar", "open"));
        Assert.assertEquals(TrackingTable.getTracker("bar", "open"), TrackingTable.getTracker("bar", "close"));
    }

    @AfterClass
    public static void tearDown() throws IOException {
        Locations.deleteQuietly(namespaceHomeLocation, true);
    }
}
