package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.Config;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.DatasetDefinition;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.NamespaceAdmin;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data.runtime.LocationStreamFileWriterFactory;
import co.cask.cdap.data.stream.StreamFileWriterFactory;
import co.cask.cdap.data.stream.service.StreamHandler;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DynamicDatasetCache;
import co.cask.cdap.data2.dataset2.SingleThreadDatasetCache;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.DefaultId;
import co.cask.cdap.internal.MockResponder;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.proto.DatasetSpecificationSummary;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.XSlowTests;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.common.Threads;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/MapReduceRunnerTestBase.class */
public class MapReduceRunnerTestBase {
    private static Injector injector;
    private static TransactionManager txService;
    private static StreamHandler streamHandler;
    protected static TransactionExecutorFactory txExecutorFactory;
    protected static DatasetFramework dsFramework;
    protected static DynamicDatasetCache datasetCache;
    protected static MetricStore metricStore;

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    protected static final Supplier<File> TEMP_FOLDER_SUPPLIER = new Supplier<File>() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRunnerTestBase.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public File m53get() {
            try {
                return MapReduceRunnerTestBase.TEMP_FOLDER.newFolder();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    };

    @BeforeClass
    public static void beforeClass() throws Exception {
        CConfiguration create = CConfiguration.create();
        Integer integer = Integer.getInteger("data.tx.timeout");
        if (integer != null) {
            create.setInt("data.tx.timeout", integer.intValue());
        }
        Integer integer2 = Integer.getInteger("data.tx.cleanup.interval");
        if (integer2 != null) {
            create.setInt("data.tx.cleanup.interval", integer2.intValue());
        }
        injector = AppFabricTestHelper.getInjector(create, new AbstractModule() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRunnerTestBase.2
            protected void configure() {
                bind(StreamFileWriterFactory.class).to(LocationStreamFileWriterFactory.class);
            }
        });
        txService = (TransactionManager) injector.getInstance(TransactionManager.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        dsFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        datasetCache = new SingleThreadDatasetCache(new SystemDatasetInstantiator(dsFramework, MapReduceRunnerTestBase.class.getClassLoader(), (Iterable) null), (TransactionSystemClient) injector.getInstance(TransactionSystemClient.class), NamespaceId.DEFAULT, DatasetDefinition.NO_ARGUMENTS, (MetricsContext) null, (Map) null);
        metricStore = (MetricStore) injector.getInstance(MetricStore.class);
        txService.startAndWait();
        streamHandler = (StreamHandler) injector.getInstance(StreamHandler.class);
        ((NamespaceAdmin) injector.getInstance(NamespaceAdmin.class)).create(NamespaceMeta.DEFAULT);
    }

    @AfterClass
    public static void afterClass() throws Exception {
        txService.stopAndWait();
    }

    @After
    public void after() throws Exception {
        Iterator it = dsFramework.getInstances(DefaultId.NAMESPACE).iterator();
        while (it.hasNext()) {
            dsFramework.deleteInstance(DefaultId.NAMESPACE.dataset(((DatasetSpecificationSummary) it.next()).getName()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeToStream(String str, String str2) throws IOException {
        writeToStream(Id.Stream.from(Id.Namespace.DEFAULT, str), str2);
    }

    protected void writeToStream(Id.Stream stream, String str) throws IOException {
        DefaultHttpRequest defaultHttpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, String.format("/v3/namespaces/%s/streams/%s", stream.getNamespaceId(), stream.getId()));
        ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(ByteBuffer.wrap(Bytes.toBytes(str)));
        defaultHttpRequest.setContent(wrappedBuffer);
        defaultHttpRequest.setHeader("Content-Length", Integer.valueOf(wrappedBuffer.readableBytes()));
        MockResponder mockResponder = new MockResponder();
        try {
            streamHandler.enqueue(defaultHttpRequest, mockResponder, stream.getNamespaceId(), stream.getId());
            if (mockResponder.getStatus() != HttpResponseStatus.OK) {
                throw new IOException("Failed to write to stream. Status = " + mockResponder.getStatus());
            }
        } catch (Exception e) {
            Throwables.propagateIfPossible(e, IOException.class);
            throw Throwables.propagate(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ApplicationWithPrograms deployApp(Class<?> cls) throws Exception {
        return AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ApplicationWithPrograms deployApp(Class<?> cls, Config config) throws Exception {
        return AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER, config);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ApplicationWithPrograms deployApp(Id.Namespace namespace, Class<?> cls) throws Exception {
        return AppFabricTestHelper.deployApplicationWithManager(namespace, cls, TEMP_FOLDER_SUPPLIER);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean runProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Arguments arguments) throws Exception {
        return waitForCompletion(AppFabricTestHelper.submit(applicationWithPrograms, cls.getName(), arguments, TEMP_FOLDER_SUPPLIER));
    }

    private boolean waitForCompletion(ProgramController programController) throws InterruptedException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        programController.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRunnerTestBase.3
            public void completed() {
                atomicBoolean.set(true);
                countDownLatch.countDown();
            }

            public void error(Throwable th) {
                countDownLatch.countDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        countDownLatch.await(10L, TimeUnit.MINUTES);
        return atomicBoolean.get();
    }
}
