package com.cloudera.nav.pushextractor.spark;

import com.cloudera.api.model.ApiService;
import com.cloudera.cdx.client.impl.NullImporter;
import com.cloudera.cdx.extractor.model.graph.SparkInput;
import com.cloudera.cdx.extractor.model.graph.SparkLineageGraph;
import com.cloudera.cdx.extractor.model.graph.SparkOutput;
import com.cloudera.nav.core.model.Source;
import com.cloudera.nav.core.model.SourceType;
import com.cloudera.nav.extract.ClusterIdBasedSourceIdGenerator;
import com.cloudera.nav.extract.ClusterIdGenerator;
import com.cloudera.nav.extract.SourceIdGeneratorFactory;
import com.cloudera.nav.hdfs.extractor.HdfsIdGenerator;
import com.cloudera.nav.hive.extractor.HiveIdGenerator;
import com.cloudera.nav.integration.BaseIntegrationTest;
import com.cloudera.nav.persist.CDXTransaction;
import com.cloudera.nav.persist.CDXTransactionFactory;
import com.cloudera.nav.persist.ClusterManager;
import com.cloudera.nav.persist.ExecEnvironment;
import com.cloudera.nav.persist.PushExtractorIterationGenerator;
import com.cloudera.nav.persist.SourceManager;
import com.cloudera.nav.persistence.relational.DataSourceConfiguration;
import com.cloudera.nav.pushextractor.PushExtractorDao;
import com.cloudera.nav.pushextractor.spark.model.SparkOperation;
import com.cloudera.nav.pushextractor.spark.model.SparkOperationExecution;
import com.cloudera.nav.utils.CdhExecutorFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collection;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.ContextHierarchy;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@ContextHierarchy({@ContextConfiguration(classes = {DataSourceConfiguration.class}), @ContextConfiguration({"classpath:spring-test-embedded-solr.xml"})})
@RunWith(SpringJUnit4ClassRunner.class)
@ActiveProfiles({"prod"})
/* loaded from: input_file:com/cloudera/nav/pushextractor/spark/CDXSparkPushExtractorTest.class */
public class CDXSparkPushExtractorTest extends BaseIntegrationTest {

    @Mock
    private SourceIdGeneratorFactory idGeneratorFactory;

    @Mock
    CdhExecutorFactory executorFactory;
    private SparkPushExtractorContext context;
    private HiveIdGenerator hiveIdGenerator;
    private HdfsIdGenerator hdfsIdGenerator;
    private SourceManager sourceManager;

    @Mock
    ClusterIdGenerator clusterIdGenerator;
    private Source sparkSource;
    private CDXTransactionFactory tf;
    CDXTransaction trans;
    private PushExtractorDao dao;

    @Mock
    private ClusterManager clusterManger;
    private static final String version = "1.0";
    private static final String extractorRunId = "dummyRunId";
    private static final String applicationID = "appID1";
    private static final String applicationID2 = "appID2";
    private static final String yarnApplicationId1 = "yarnApplicationId1";
    private static final String yarnApplicationId2 = "yarnApplicationId2";
    private static final String user = "admin";
    private static final String UNUSED_DATAFORMAT = "unusedDataFormat";
    private static final String HDFS_SRC_ID = "hdfsSrcId";
    private static final String HMS_SRC_ID = "hiveSrcId";
    private static final String S3_SRC_ID = "S3SrcId";
    private static final String SPARK_SRC_ID = "SparkSrcId";

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(Integer.valueOf(this.navOptions.getSolrCommitBatchDuration())).thenReturn(10);
        ((SourceIdGeneratorFactory) Mockito.doReturn(new ClusterIdBasedSourceIdGenerator()).when(this.idGeneratorFactory)).getSourceIdGenerator();
        PushExtractorIterationGenerator pushExtractorIterationGenerator = new PushExtractorIterationGenerator(this.navOptions);
        Mockito.when(Integer.valueOf(pushExtractorIterationGenerator.getIterationCount())).thenAnswer(new Answer<Integer>() { // from class: com.cloudera.nav.pushextractor.spark.CDXSparkPushExtractorTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Integer m5answer(InvocationOnMock invocationOnMock) throws Throwable {
                return new Integer(1);
            }
        });
        this.hdfsIdGenerator = new HdfsIdGenerator();
        this.hiveIdGenerator = new HiveIdGenerator();
        this.sourceManager = (SourceManager) Mockito.spy(new SourceManager(this.emf, this.rmf, this.sequenceGenerator, pushExtractorIterationGenerator, this.idGeneratorFactory, this.executorFactory, this.navOptions, this.clusterIdGenerator, this.clusterManger, ExecEnvironment.UNIT_TEST));
        NullImporter nullImporter = new NullImporter();
        this.em.commit(true);
        this.rm.commit(true);
        Source source = new Source(HDFS_SRC_ID, 1001L, "Cluster 1", "HDFS-1", "hdfs://eng-nav-1.gce.cloudera.com:8020", SourceType.HDFS);
        ((SourceManager) Mockito.doReturn(source).when(this.sourceManager)).getTransientSourceByIdentity(HDFS_SRC_ID);
        Source source2 = new Source(HMS_SRC_ID, 1004L, "Cluster 1", "HIVE-1", "thrift://eng-nav-1.gce.cloudera.com:9083", SourceType.HIVE);
        ((SourceManager) Mockito.doReturn(source2).when(this.sourceManager)).getTransientSourceByIdentity(HMS_SRC_ID);
        ((SourceManager) Mockito.doReturn(ImmutableSet.of(new Source(S3_SRC_ID, 1005L, "Cluster 1", "S3", "s3a://eng-nav-1.gce.cloudera.com", SourceType.S3))).when(this.sourceManager)).getSources(ImmutableSet.of(SourceType.S3));
        this.sparkSource = new Source(SPARK_SRC_ID, 1006L, "Cluster 1", "SPARK_ON_YARN-1", "http://eng-nav-1.gce.cloudera.com:18088", SourceType.SPARK);
        ((SourceManager) Mockito.doReturn(this.sparkSource).when(this.sourceManager)).getTransientSourceByIdentity(SPARK_SRC_ID);
        ((SourceManager) Mockito.doReturn(ImmutableList.of(this.sparkSource)).when(this.sourceManager)).getSources(ImmutableSet.of(SourceType.SPARK));
        ApiService apiService = new ApiService();
        apiService.setName("Spark-1");
        apiService.setType("SPARK");
        this.context = new CDXSparkPushExtractorContext(this.hiveIdGenerator, this.hdfsIdGenerator, this.sequenceGenerator, this.sourceManager, this.sparkSource, this.navOptions, extractorRunId, "Cluster 1", HMS_SRC_ID, HDFS_SRC_ID);
        this.em.persist(ImmutableList.of(this.sparkSource, source, source2), true);
        this.em.commit(true);
        this.tf = new CDXTransactionFactory(this.emf, this.rmf, nullImporter);
        this.trans = this.tf.createTransaction();
        this.dao = new PushExtractorDao(this.trans);
        this.trans.begin();
    }

    @Test(expected = Exception.class)
    public void testExceptionThrownOnError() {
        new CDXSparkExtractor(this.context).extract((CDXSparkLineageGraph) null, this.dao);
    }

    private void forceCommit() {
        this.trans.commit(true);
        this.em.commit(true);
        this.rm.commit(true);
    }

    @Test
    public void testLineageCreationWhenOutputOccursFirst() {
        CDXSparkExtractor cDXSparkExtractor = new CDXSparkExtractor(this.context);
        SparkLineageGraph sparkLineageGraph = new SparkLineageGraph(false, "0", Instant.now().getMillis(), 100L, version, applicationID, yarnApplicationId1, user, new ArrayList(), ImmutableList.of(new SparkOutput("hdfs", "unusedDataQry", "", "hdfs://eng-nav-1.gce.cloudera.com:8020/user/customers/out.txt", (Collection) null)));
        sparkLineageGraph.setOperationId(applicationID);
        sparkLineageGraph.setOperationExecutionId(yarnApplicationId1);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph), this.dao);
        SparkLineageGraph sparkLineageGraph2 = new SparkLineageGraph(false, "0", Instant.now().getMillis(), 100L, version, applicationID, yarnApplicationId1, user, ImmutableList.of(new SparkInput("hiVe", "unusedDataQry", "thrift://eng-nav-1.gce.cloudera.com:9083", "default.sample_07", ImmutableList.of("code"), UNUSED_DATAFORMAT)), (Collection) null);
        sparkLineageGraph2.setOperationId(applicationID);
        sparkLineageGraph2.setOperationExecutionId(yarnApplicationId1);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph2), this.dao);
        Instant now = Instant.now();
        SparkLineageGraph sparkLineageGraph3 = new SparkLineageGraph(true, "0", now.getMillis(), 0L, version, applicationID, yarnApplicationId1, user, (Collection) null, (Collection) null);
        sparkLineageGraph3.setOperationId(applicationID);
        sparkLineageGraph3.setOperationExecutionId(yarnApplicationId1);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph3), this.dao);
        forceCommit();
        Assert.assertNotNull((SparkOperation) this.em.findById(applicationID).get());
        SparkOperationExecution sparkOperationExecution = (SparkOperationExecution) this.em.findById(yarnApplicationId1).get();
        Assert.assertNotNull(sparkOperationExecution);
        Assert.assertEquals(1L, this.rm.query("type:INSTANCE_OF AND ep1Ids:" + r0.getId() + " AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(1L, this.rm.query("type:DATA_FLOW AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(1L, this.rm.query("type:DATA_FLOW AND ep1Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(now, sparkOperationExecution.getEnded());
    }

    @Test
    public void testLineageCreation() {
        CDXSparkExtractor cDXSparkExtractor = new CDXSparkExtractor(this.context);
        SparkLineageGraph sparkLineageGraph = new SparkLineageGraph(false, "0", Instant.now().getMillis(), 100L, version, applicationID, yarnApplicationId1, user, ImmutableList.of(new SparkInput("hiVe", "unusedDataQry", "thrift://eng-nav-1.gce.cloudera.com:9083", "default.sample_07", ImmutableList.of("code"), UNUSED_DATAFORMAT)), (Collection) null);
        sparkLineageGraph.setOperationId(applicationID);
        sparkLineageGraph.setOperationExecutionId(yarnApplicationId1);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph), this.dao);
        SparkLineageGraph sparkLineageGraph2 = new SparkLineageGraph(false, "1", Instant.now().getMillis(), 100L, version, applicationID, yarnApplicationId1, user, ImmutableList.of(new SparkInput("hiVe", "unusedDataQry", "thrift://eng-nav-1.gce.cloudera.com:9083", "default.sample_08", ImmutableList.of("id"), UNUSED_DATAFORMAT)), (Collection) null);
        sparkLineageGraph2.setOperationId(applicationID);
        sparkLineageGraph2.setOperationExecutionId(yarnApplicationId1);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph2), this.dao);
        SparkLineageGraph sparkLineageGraph3 = new SparkLineageGraph(false, "2", Instant.now().getMillis(), 100L, version, applicationID, yarnApplicationId1, user, new ArrayList(), ImmutableList.of(new SparkOutput("hdfs", "unusedDataQry", "", "hdfs://eng-nav-1.gce.cloudera.com:8020/user/customers/out.txt", (Collection) null)));
        sparkLineageGraph3.setOperationId(applicationID);
        sparkLineageGraph3.setOperationExecutionId(yarnApplicationId1);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph3), this.dao);
        SparkLineageGraph sparkLineageGraph4 = new SparkLineageGraph(false, "2", 0L, 100L, version, applicationID, yarnApplicationId1, user, (Collection) null, ImmutableList.of(new SparkOutput("hive", "unusedDataQry", "thrift://eng-nav-1.gce.cloudera.com:9083", "default.result_table", ImmutableList.of("newColumn"))));
        sparkLineageGraph4.setOperationId(applicationID);
        sparkLineageGraph4.setOperationExecutionId(yarnApplicationId1);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph4), this.dao);
        Instant now = Instant.now();
        SparkLineageGraph sparkLineageGraph5 = new SparkLineageGraph(true, "1", now.getMillis(), 0L, version, applicationID, yarnApplicationId1, user, (Collection) null, (Collection) null);
        sparkLineageGraph5.setOperationId(applicationID);
        sparkLineageGraph5.setOperationExecutionId(yarnApplicationId1);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph5), this.dao);
        forceCommit();
        SparkOperation sparkOperation = (SparkOperation) this.em.findById(applicationID).get();
        Assert.assertNotNull(sparkOperation);
        SparkOperationExecution sparkOperationExecution = (SparkOperationExecution) this.em.findById(yarnApplicationId1).get();
        Assert.assertNotNull(sparkOperationExecution);
        Collection errorCodes = sparkOperationExecution.getErrorCodes();
        Assert.assertEquals(errorCodes.size(), 2L);
        Assert.assertTrue(errorCodes.contains("1"));
        Assert.assertTrue(errorCodes.contains("2"));
        Assert.assertEquals(1L, this.rm.query("type:INSTANCE_OF AND ep1Ids:" + sparkOperation.getId() + " AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(2L, this.rm.query("type:DATA_FLOW AND ep2Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(2L, this.rm.query("type:DATA_FLOW AND ep1Ids:" + sparkOperationExecution.getId()).size());
        Assert.assertEquals(sparkOperationExecution.getEnded(), now);
        Assert.assertEquals(sparkOperation.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperation.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperation.getOriginalName(), applicationID);
        Assert.assertEquals(sparkOperationExecution.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperationExecution.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperationExecution.getPrincipal(), user);
        Assert.assertEquals(sparkOperationExecution.getOriginalName(), "yarnApplicationId1-Exec");
        Assert.assertEquals(sparkOperationExecution.getStarted(), sparkLineageGraph.getStartTimestamp());
        Assert.assertEquals(sparkOperationExecution.getEnded(), sparkLineageGraph5.getStartTimestamp());
        SparkLineageGraph sparkLineageGraph6 = new SparkLineageGraph(false, "0", Instant.now().getMillis(), 100L, version, applicationID2, yarnApplicationId2, user, ImmutableList.of(new SparkInput("hiVe", "unusedDataQry", "thrift://eng-nav-1.gce.cloudera.com:9083", "default.movies", ImmutableList.of("actors"), UNUSED_DATAFORMAT)), (Collection) null);
        sparkLineageGraph6.setOperationId(applicationID2);
        sparkLineageGraph6.setOperationExecutionId(yarnApplicationId2);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph6), this.dao);
        SparkLineageGraph sparkLineageGraph7 = new SparkLineageGraph(false, "0", Instant.now().getMillis(), 0L, version, applicationID2, yarnApplicationId2, user, (Collection) null, ImmutableList.of(new SparkOutput("hdfs", (String) null, "", "hdfs://eng-nav-1.gce.cloudera.com:8020/user/actors/actors.txt", (Collection) null)));
        sparkLineageGraph7.setOperationId(applicationID2);
        sparkLineageGraph7.setOperationExecutionId(yarnApplicationId2);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph7), this.dao);
        Instant now2 = Instant.now();
        SparkLineageGraph sparkLineageGraph8 = new SparkLineageGraph(true, "0", now2.getMillis(), 0L, version, applicationID2, yarnApplicationId2, user, (Collection) null, (Collection) null);
        sparkLineageGraph8.setOperationId(applicationID2);
        sparkLineageGraph8.setOperationExecutionId(yarnApplicationId2);
        cDXSparkExtractor.extract(new CDXSparkLineageGraph(sparkLineageGraph8), this.dao);
        forceCommit();
        SparkOperation sparkOperation2 = (SparkOperation) this.em.findById(applicationID2).get();
        Assert.assertNotNull(sparkOperation2);
        SparkOperationExecution sparkOperationExecution2 = (SparkOperationExecution) this.em.findById(yarnApplicationId2).get();
        Assert.assertNotNull(sparkOperationExecution2);
        Collection query = this.rm.query("type:INSTANCE_OF AND ep1Ids:" + sparkOperation2.getId() + " AND ep2Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        this.rm.query("type:DATA_FLOW AND ep2Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        this.rm.query("type:DATA_FLOW AND ep1Ids:" + sparkOperationExecution2.getId());
        Assert.assertEquals(1L, query.size());
        Assert.assertEquals(sparkOperationExecution2.getEnded(), now2);
        Assert.assertEquals(sparkOperation.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperation.getExtractorRunId(), extractorRunId);
        Assert.assertEquals(sparkOperationExecution.getSourceId(), this.sparkSource.getId());
        Assert.assertEquals(sparkOperationExecution.getExtractorRunId(), extractorRunId);
    }
}
