package com.cloudera.nav.pushextractor.spark;

import com.cloudera.api.model.ApiService;
import com.cloudera.nav.core.model.Source;
import com.cloudera.nav.core.model.SourceType;
import com.cloudera.nav.extract.ClusterIdBasedSourceIdGenerator;
import com.cloudera.nav.extract.ClusterIdGenerator;
import com.cloudera.nav.extract.SourceIdGeneratorFactory;
import com.cloudera.nav.hdfs.extractor.HdfsIdGenerator;
import com.cloudera.nav.hive.extractor.HiveIdGenerator;
import com.cloudera.nav.integration.BaseIntegrationTest;
import com.cloudera.nav.persist.ClusterManager;
import com.cloudera.nav.persist.ExecEnvironment;
import com.cloudera.nav.persist.PushExtractorIterationGenerator;
import com.cloudera.nav.persist.SourceManager;
import com.cloudera.nav.persist.TransactionFactory;
import com.cloudera.nav.persistence.relational.DataSourceConfiguration;
import com.cloudera.nav.pushextractor.spark.model.SparkOperation;
import com.cloudera.nav.pushextractor.spark.model.SparkOperationExecution;
import com.cloudera.nav.utils.CdhExecutorFactory;
import com.cloudera.nav.utils.MD5IdGenerator;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.ContextHierarchy;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@ContextHierarchy({@ContextConfiguration(classes = {DataSourceConfiguration.class}), @ContextConfiguration({"classpath:spring-test-embedded-solr.xml"})})
@RunWith(SpringJUnit4ClassRunner.class)
@ActiveProfiles({"prod"})
/* loaded from: input_file:com/cloudera/nav/pushextractor/spark/SparkPushExtractorTest.class */
public class SparkPushExtractorTest extends BaseIntegrationTest {
    private SparkPushExtractorContext context;
    private HiveIdGenerator hiveIdGenerator;
    private HdfsIdGenerator hdfsIdGenerator;
    private SourceManager sourceManager;
    private ClusterIdGenerator clusterIdGenerator;
    private Source sparkSource;
    private static final String version = "1.0";
    private static final String extractorRunId = "dummyRunId";
    private static final String applicationID = "appID1";
    private static final String applicationID2 = "appID2";
    private static final String applicationExecutionID = "appExeID1";
    private static final String applicationExecutionID2 = "appExeID2";
    private static final String yarnApplicationId1 = "yarnApplicationId1";
    private static final String yarnApplicationId2 = "yarnApplicationId2";
    private static final String user = "admin";

    @Mock
    private SourceIdGeneratorFactory sourceIdGeneratorFactory;

    @Mock
    CdhExecutorFactory executorFactory;

    @Mock
    private ClusterManager clusterManager;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(Integer.valueOf(this.navOptions.getSolrCommitBatchDuration())).thenReturn(10);
        ClusterIdBasedSourceIdGenerator clusterIdBasedSourceIdGenerator = new ClusterIdBasedSourceIdGenerator();
        PushExtractorIterationGenerator pushExtractorIterationGenerator = new PushExtractorIterationGenerator(this.navOptions);
        Mockito.when(Integer.valueOf(pushExtractorIterationGenerator.getIterationCount())).thenAnswer(new Answer<Integer>() { // from class: com.cloudera.nav.pushextractor.spark.SparkPushExtractorTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Integer m6answer(InvocationOnMock invocationOnMock) throws Throwable {
                return new Integer(1);
            }
        });
        this.hdfsIdGenerator = new HdfsIdGenerator();
        this.hiveIdGenerator = new HiveIdGenerator();
        this.clusterIdGenerator = new ClusterIdGenerator(this.emf);
        this.sourceManager = (SourceManager) Mockito.spy(new SourceManager(this.emf, this.rmf, this.sequenceGenerator, pushExtractorIterationGenerator, this.sourceIdGeneratorFactory, this.executorFactory, this.navOptions, this.clusterIdGenerator, this.clusterManager, ExecEnvironment.UNIT_TEST));
        forceCommit();
        Source source = new Source(1001L, "Cluster 1", "clusterUuid", "HDFS-1", "hdfs://eng-nav-1.gce.cloudera.com:8020", SourceType.HDFS, clusterIdBasedSourceIdGenerator);
        ((SourceManager) Mockito.doReturn(Optional.of(source)).when(this.sourceManager)).getSourceWithUrl("clusterUuid", "hdfs://eng-nav-1.gce.cloudera.com:8020", SourceType.HDFS);
        ((SourceManager) Mockito.doReturn(Optional.of(source)).when(this.sourceManager)).getSourceWithUrl("clusterUuid", "hdfs://eng-nav-1.gce.cloudera.com", SourceType.HDFS);
        Source source2 = new Source(1004L, "Cluster 1", "clusterUuid", "HIVE-1", "thrift://eng-nav-1.gce.cloudera.com:9083", SourceType.HIVE, clusterIdBasedSourceIdGenerator);
        ((SourceManager) Mockito.doReturn(new Source(1005L, "Cluster 1", "clusterUuid", "S3", "s3a://eng-nav-1.gce.cloudera.com", SourceType.S3, clusterIdBasedSourceIdGenerator)).when(this.sourceManager)).createIfAbsentGlobalSource(Matchers.anyString(), Matchers.anyString(), (SourceType) Matchers.eq(SourceType.S3), Matchers.anyString());
        this.sparkSource = new Source(1006L, "Cluster 1", "clusterUuid", "SPARK_ON_YARN-1", "http://eng-nav-1.gce.cloudera.com:18088", SourceType.SPARK, clusterIdBasedSourceIdGenerator);
        ((SourceManager) Mockito.doReturn(ImmutableList.of(this.sparkSource)).when(this.sourceManager)).getSourcesForCluster(Matchers.anyString(), (SourceType) Matchers.eq(SourceType.SPARK));
        ((SourceManager) Mockito.doReturn(Arrays.asList(source2)).when(this.sourceManager)).getSiblingSourcesByType(this.sparkSource, SourceType.HIVE);
        ((SourceManager) Mockito.doReturn(Optional.of(source2)).when(this.sourceManager)).getSourceWithUrl("clusterUuid", "thrift://eng-nav-1.gce.cloudera.com:9083", SourceType.HIVE);
        ApiService apiService = new ApiService();
        apiService.setName("Spark-1");
        apiService.setType("SPARK");
        this.context = new SparkPushExtractorContext(new TransactionFactory(this.emf, this.rmf), this.hiveIdGenerator, this.hdfsIdGenerator, this.sequenceGenerator, this.sourceManager, this.sparkSource, this.navOptions, extractorRunId, "clusterUuid");
        this.em.persist(ImmutableList.of(this.sparkSource, source, source2), true);
        this.em.commit();
    }

    @Test(expected = Exception.class)
    public void testExceptionThrownOnError() {
        new SparkPushExtractor(this.context).extract((Collection) null);
    }

    private void forceCommit() {
        this.em.commit(true);
        this.rm.commit(true);
    }

    @Test(expected = Exception.class)
    public void testExceptionThrownWhenSourceNotExtracted() {
        new SparkPushExtractor(this.context).extract(ImmutableList.of(new SparkLineageGraphShim(false, 0, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, ImmutableList.of(new SparkInputShim("hiVe", "unusedDataQry", "unusedDataFormat", "thrift://not-extracted.cloudera.com:9083", "default.sample_07", ImmutableList.of("code"))), (List) null)));
    }

    @Test
    public void testLineageCreationWhenOutputOccursFirst() {
        SparkPushExtractor sparkPushExtractor = new SparkPushExtractor(this.context);
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 0, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, new ArrayList(), ImmutableList.of(new SparkOutputShim("hdfs", "unusedDataQry", "", (String) null, "hdfs://eng-nav-1.gce.cloudera.com/user/customers/out.txt", (List) null)))));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 0, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, ImmutableList.of(new SparkInputShim("hiVe", "unusedDataQry", "unusedDataFormat", "thrift://eng-nav-1.gce.cloudera.com:9083,thrift://eng-nav-4.gce.cloudera.com:9083", "default.sample_07", ImmutableList.of("code"))), (List) null)));
        Instant now = Instant.now();
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(true, 0, 0L, Long.valueOf(now.getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, (List) null, (List) null)));
        forceCommit();
        Assert.assertNotNull((SparkOperation) this.em.findById(MD5IdGenerator.generateIdentity(new String[]{applicationID})).get());
        SparkOperationExecution sparkOperationExecution = (SparkOperationExecution) this.em.findById(SparkIdGenerator.generateSparkAppExecId(yarnApplicationId1)).get();
        Assert.assertNotNull(sparkOperationExecution);
        Assert.assertEquals(1L, this.rm.query("type:INSTANCE_OF AND ep1Ids:" + r0.getId() + " AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(1L, this.rm.query("type:DATA_FLOW AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(1L, this.rm.query("type:DATA_FLOW AND ep1Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(now, sparkOperationExecution.getEnded());
    }

    @Test
    public void testLineageCreation() {
        SparkPushExtractor sparkPushExtractor = new SparkPushExtractor(this.context);
        SparkLineageGraphShim sparkLineageGraphShim = new SparkLineageGraphShim(false, 0, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, ImmutableList.of(new SparkInputShim("hiVe", "unusedDataQry", "unusedDataFormat", "thrift://eng-nav-1.gce.cloudera.com:9083, thrift://eng-nav-2.gce.cloudera.com:9083", "default.sample_07", ImmutableList.of("code"))), (List) null);
        sparkPushExtractor.extract(ImmutableList.of(sparkLineageGraphShim));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 1, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, ImmutableList.of(new SparkInputShim("hiVe", "unusedDataQry", "unusedDataFormat", "thrift://eng-nav-1.gce.cloudera.com:9083,thrift://eng-nav-2.gce.cloudera.com:9083", "default.sample_08", ImmutableList.of("id"))), (List) null)));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 2, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, new ArrayList(), ImmutableList.of(new SparkOutputShim("hdfs", "unusedDataQry", "", (String) null, "hdfs://eng-nav-1.gce.cloudera.com/user/customers/out.txt", (List) null)))));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 2, 100L, 0L, version, applicationID, applicationExecutionID, yarnApplicationId1, user, (List) null, ImmutableList.of(new SparkOutputShim("hive", "unusedDataQry", "", "thrift://eng-nav-1.gce.cloudera.com:9083", "default.result_table", ImmutableList.of("newColumn"))))));
        Instant now = Instant.now();
        SparkLineageGraphShim sparkLineageGraphShim2 = new SparkLineageGraphShim(true, 1, 0L, Long.valueOf(now.getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, (List) null, (List) null);
        sparkPushExtractor.extract(ImmutableList.of(sparkLineageGraphShim2));
        forceCommit();
        SparkOperation sparkOperation = (SparkOperation) this.em.findById(MD5IdGenerator.generateIdentity(new String[]{applicationID})).get();
        Assert.assertNotNull(sparkOperation);
        SparkOperationExecution sparkOperationExecution = (SparkOperationExecution) this.em.findById(SparkIdGenerator.generateSparkAppExecId(yarnApplicationId1)).get();
        Assert.assertNotNull(sparkOperationExecution);
        Collection errorCodes = sparkOperationExecution.getErrorCodes();
        Assert.assertEquals(errorCodes.size(), 2L);
        Assert.assertTrue(errorCodes.contains("1"));
        Assert.assertTrue(errorCodes.contains("2"));
        Assert.assertEquals(1L, this.rm.query("type:INSTANCE_OF AND ep1Ids:" + sparkOperation.getId() + " AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(2L, this.rm.query("type:DATA_FLOW AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(2L, this.rm.query("type:DATA_FLOW AND ep1Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(sparkOperationExecution.getEnded(), now);
        Assert.assertEquals(sparkOperation.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperation.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperation.getOriginalName(), applicationID);
        Assert.assertEquals(sparkOperationExecution.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperationExecution.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperationExecution.getPrincipal(), user);
        Assert.assertEquals(sparkOperationExecution.getOriginalName(), "yarnApplicationId1-Exec");
        Assert.assertEquals(sparkOperationExecution.getStarted(), sparkLineageGraphShim.getSparkLineageGraph().getStartTimestamp());
        Assert.assertEquals(sparkOperationExecution.getEnded(), sparkLineageGraphShim2.getSparkLineageGraph().getStartTimestamp());
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 0, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID2, applicationExecutionID2, yarnApplicationId2, user, ImmutableList.of(new SparkInputShim("hiVe", "unusedDataQry", "unusedDataFormat", "thrift://eng-nav-1.gce.cloudera.com:9083,thrift://eng-nav-2.gce.cloudera.com:9083,thrift://eng-nav-6.gce.cloudera.com:9083 ", "default.movies", ImmutableList.of("actors"))), (List) null)));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 0, 100L, 0L, version, applicationID2, applicationExecutionID2, yarnApplicationId2, user, (List) null, ImmutableList.of(new SparkOutputShim("hdfs", (String) null, "", "unusedDataQry", "hdfs://eng-nav-1.gce.cloudera.com:8020/user/actors/actors.txt", (List) null)))));
        Instant now2 = Instant.now();
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(true, 0, 0L, Long.valueOf(now2.getMillis()), version, applicationID2, applicationExecutionID2, yarnApplicationId2, user, (List) null, (List) null)));
        forceCommit();
        SparkOperation sparkOperation2 = (SparkOperation) this.em.findById(MD5IdGenerator.generateIdentity(new String[]{applicationID2})).get();
        Assert.assertNotNull(sparkOperation2);
        SparkOperationExecution sparkOperationExecution2 = (SparkOperationExecution) this.em.findById(SparkIdGenerator.generateSparkAppExecId(yarnApplicationId2)).get();
        Assert.assertNotNull(sparkOperationExecution2);
        Collection query = this.rm.query("type:INSTANCE_OF AND ep1Ids:" + sparkOperation2.getId() + " AND ep2Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        this.rm.query("type:DATA_FLOW AND ep2Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        this.rm.query("type:DATA_FLOW AND ep1Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        Assert.assertEquals(sparkOperationExecution2.getEnded(), now2);
        Assert.assertEquals(sparkOperation.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperation.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperationExecution.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperationExecution.getExtractorRunId(), extractorRunId);
    }

    @Test
    public void testLineageCreationWithDups() {
        SparkPushExtractor sparkPushExtractor = new SparkPushExtractor(this.context);
        SparkLineageGraphShim sparkLineageGraphShim = new SparkLineageGraphShim(false, 0, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, ImmutableList.of(new SparkInputShim("hiVe", "unusedDataQry", "unusedDataFormat", "thrift://eng-nav-1.gce.cloudera.com:9083, thrift://eng-nav-2.gce.cloudera.com:9083", "default.sample_07", ImmutableList.of("code"))), (List) null);
        sparkPushExtractor.extract(ImmutableList.of(sparkLineageGraphShim));
        SparkInputShim sparkInputShim = new SparkInputShim("hiVe", "unusedDataQry", "unusedDataFormat", "thrift://eng-nav-1.gce.cloudera.com:9083,thrift://eng-nav-2.gce.cloudera.com:9083", "default.sample_08", ImmutableList.of("id"));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 1, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, ImmutableList.of(sparkInputShim), (List) null)));
        forceCommit();
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 1, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, ImmutableList.of(sparkInputShim), (List) null)));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 2, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, new ArrayList(), ImmutableList.of(new SparkOutputShim("hdfs", "unusedDataQry", "", (String) null, "hdfs://eng-nav-1.gce.cloudera.com/user/customers/out.txt", (List) null)))));
        SparkOutputShim sparkOutputShim = new SparkOutputShim("hive", "unusedDataQry", "", "thrift://eng-nav-1.gce.cloudera.com:9083", "default.result_table", ImmutableList.of("newColumn"));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 2, 100L, 0L, version, applicationID, applicationExecutionID, yarnApplicationId1, user, (List) null, ImmutableList.of(sparkOutputShim))));
        forceCommit();
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 2, 100L, 0L, version, applicationID, applicationExecutionID, yarnApplicationId1, user, (List) null, ImmutableList.of(sparkOutputShim))));
        Instant now = Instant.now();
        SparkLineageGraphShim sparkLineageGraphShim2 = new SparkLineageGraphShim(true, 1, 0L, Long.valueOf(now.getMillis()), version, applicationID, applicationExecutionID, yarnApplicationId1, user, (List) null, (List) null);
        sparkPushExtractor.extract(ImmutableList.of(sparkLineageGraphShim2));
        forceCommit();
        SparkOperation sparkOperation = (SparkOperation) this.em.findById(MD5IdGenerator.generateIdentity(new String[]{applicationID})).get();
        Assert.assertNotNull(sparkOperation);
        SparkOperationExecution sparkOperationExecution = (SparkOperationExecution) this.em.findById(SparkIdGenerator.generateSparkAppExecId(yarnApplicationId1)).get();
        Assert.assertNotNull(sparkOperationExecution);
        Collection errorCodes = sparkOperationExecution.getErrorCodes();
        Assert.assertEquals(errorCodes.size(), 2L);
        Assert.assertTrue(errorCodes.contains("1"));
        Assert.assertTrue(errorCodes.contains("2"));
        Assert.assertEquals(1L, this.rm.query("type:INSTANCE_OF AND ep1Ids:" + sparkOperation.getId() + " AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(2L, this.rm.query("type:DATA_FLOW AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(2L, this.rm.query("type:DATA_FLOW AND ep1Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(sparkOperationExecution.getEnded(), now);
        Assert.assertEquals(sparkOperation.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperation.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperation.getOriginalName(), applicationID);
        Assert.assertEquals(sparkOperationExecution.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperationExecution.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperationExecution.getPrincipal(), user);
        Assert.assertEquals(sparkOperationExecution.getOriginalName(), "yarnApplicationId1-Exec");
        Assert.assertEquals(sparkOperationExecution.getStarted(), sparkLineageGraphShim.getSparkLineageGraph().getStartTimestamp());
        Assert.assertEquals(sparkOperationExecution.getEnded(), sparkLineageGraphShim2.getSparkLineageGraph().getStartTimestamp());
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 0, 100L, Long.valueOf(Instant.now().getMillis()), version, applicationID2, applicationExecutionID2, yarnApplicationId2, user, ImmutableList.of(new SparkInputShim("hiVe", "unusedDataQry", "unusedDataFormat", "thrift://eng-nav-1.gce.cloudera.com:9083,thrift://eng-nav-2.gce.cloudera.com:9083,thrift://eng-nav-6.gce.cloudera.com:9083 ", "default.movies", ImmutableList.of("actors"))), (List) null)));
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(false, 0, 100L, 0L, version, applicationID2, applicationExecutionID2, yarnApplicationId2, user, (List) null, ImmutableList.of(new SparkOutputShim("hdfs", (String) null, "", "unusedDataQry", "hdfs://eng-nav-1.gce.cloudera.com:8020/user/actors/actors.txt", (List) null)))));
        Instant now2 = Instant.now();
        sparkPushExtractor.extract(ImmutableList.of(new SparkLineageGraphShim(true, 0, 0L, Long.valueOf(now2.getMillis()), version, applicationID2, applicationExecutionID2, yarnApplicationId2, user, (List) null, (List) null)));
        forceCommit();
        SparkOperation sparkOperation2 = (SparkOperation) this.em.findById(MD5IdGenerator.generateIdentity(new String[]{applicationID2})).get();
        Assert.assertNotNull(sparkOperation2);
        SparkOperationExecution sparkOperationExecution2 = (SparkOperationExecution) this.em.findById(SparkIdGenerator.generateSparkAppExecId(yarnApplicationId2)).get();
        Assert.assertNotNull(sparkOperationExecution2);
        Collection query = this.rm.query("type:INSTANCE_OF AND ep1Ids:" + sparkOperation2.getId() + " AND ep2Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        this.rm.query("type:DATA_FLOW AND ep2Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        this.rm.query("type:DATA_FLOW AND ep1Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        Assert.assertEquals(sparkOperationExecution2.getEnded(), now2);
        Assert.assertEquals(sparkOperation.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperation.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperationExecution.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperationExecution.getExtractorRunId(), extractorRunId);
    }
}
