package com.cloudera.cmon.tree;

import com.cloudera.cmon.MetricEnum;
import com.cloudera.cmon.MetricSchema;
import com.cloudera.cmon.domain.ActivityStatus;
import com.cloudera.cmon.firehose.CMONConfiguration;
import com.cloudera.cmon.firehose.CmonAvroUtil;
import com.cloudera.cmon.firehose.CounterValueAggregator;
import com.cloudera.cmon.firehose.GaugeValueAggregator2;
import com.cloudera.cmon.firehose.ValueAggregator;
import com.cloudera.cmon.firehose.event.MetricValue;
import com.cloudera.cmon.kaiser.KaiserTestBase;
import com.cloudera.cmon.tree.BaseTreeData;
import com.cloudera.cmon.tree.Persisters;
import com.cloudera.cmon.tree.db.ActivityAndAttemptStore;
import com.cloudera.cmon.tree.db.DbActivity;
import com.cloudera.cmon.tree.db.DbActivityAttribute;
import com.cloudera.cmon.tree.db.DbActivityLastMetricValue;
import com.cloudera.cmon.tree.db.DbAttempt;
import com.cloudera.cmon.tree.db.DbAttemptLastMetricValue;
import com.cloudera.cmon.tree.db.TreeEntityManager;
import com.cloudera.cmon.tstore.TimeSeriesStore;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.persistence.EntityManager;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:com/cloudera/cmon/tree/TestActivityMonitoringTree.class */
public class TestActivityMonitoringTree extends BaseActivityTreeTest {
    private static final String MAPREDUCE11 = "mapreduce11_TestActivityMonitoringTree";
    private static final String MAPREDUCE10 = "mapreduce10_TestActivityMonitoringTree";
    private static final String MAPREDUCE9 = "mapreduce9_TestActivityMonitoringTree";
    private static final String MAPREDUCE8 = "mapreduce8_TestActivityMonitoringTree";
    private static final String MAPREDUCE7 = "mapreduce7_TestActivityMonitoringTree";
    private static final String MAPREDUCE6 = "mapreduce6_TestActivityMonitoringTree";
    private static final String MAPREDUCE5 = "mapreduce5_TestActivityMonitoringTree";
    private static final String MAPREDUCE4 = "mapreduce4_TestActivityMonitoringTree";
    private static final String MAPREDUCE3 = "mapreduce3_TestActivityMonitoringTree";
    private static final String MAPREDUCE2 = "mapreduce2_TestActivityMonitoringTree";
    private static final String MAPREDUCE1 = "mapreduce1_TestActivityMonitoringTree";
    private static final double EPSILON = 1.0E-6d;

    @Test
    public void testAttemptMetricAggregation() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job1tama", "tama_attempt1");
        MetricEnum metricEnum = MetricEnum.TOTAL_CPU_USER;
        MetricEnum metricEnum2 = MetricEnum.MEM_RSS;
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum, 0.1d)));
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum2, 0.1d)));
        AttemptTreeData attemptData2 = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job1tama", "tama_attempt2");
        attemptData2.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum, 0.2d)));
        attemptData2.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum2, 0.2d)));
        Assert.assertEquals(0.3d, ((ValueAggregator) activityMonitoringTree.getJobData(MAPREDUCE1, "job1tama").getAggregatorsForTest().get(metricEnum)).getAggregate(new Instant()), EPSILON);
        Assert.assertEquals(0.3d, ((ValueAggregator) activityMonitoringTree.getJobData(MAPREDUCE1, "job1tama").getAggregatorsForTest().get(metricEnum2)).getAggregate(new Instant()), EPSILON);
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.ATTEMPT_STATUS, new Instant(), MetricSchema.AttemptStatus.SUCCEEDED.ordinal())));
        Assert.assertEquals(0.3d, ((ValueAggregator) activityMonitoringTree.getJobData(MAPREDUCE1, "job1tama").getAggregatorsForTest().get(metricEnum)).getAggregate(new Instant()), EPSILON);
        Assert.assertEquals(0.2d, ((ValueAggregator) activityMonitoringTree.getJobData(MAPREDUCE1, "job1tama").getAggregatorsForTest().get(metricEnum2)).getAggregate(new Instant()), EPSILON);
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum, 300.0d)));
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum2, 100.0d)));
        Assert.assertEquals(0.3d, ((ValueAggregator) activityMonitoringTree.getJobData(MAPREDUCE1, "job1tama").getAggregatorsForTest().get(metricEnum)).getAggregate(new Instant()), EPSILON);
        Assert.assertEquals(0.2d, ((ValueAggregator) activityMonitoringTree.getJobData(MAPREDUCE1, "job1tama").getAggregatorsForTest().get(metricEnum2)).getAggregate(new Instant()), EPSILON);
    }

    @Test
    public void testJobPersistence() {
        DbActivity findActivity;
        Instant instant = new Instant();
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job1jp", "jp_job1_attempt1");
        MetricEnum metricEnum = MetricEnum.MEM_RSS;
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum, 0.1d)));
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE1, "job1jp");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant.getMillis())));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData);
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (jobData) {
                findActivity = treeEntityManager.findActivity(jobData.getDatabaseId(), true);
            }
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.NUM_DESIRED_MAPS.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(metricEnum.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.SUBMIT_TIME.getUniqueMetricId())));
            Assert.assertEquals(1.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.NUM_DESIRED_MAPS.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(instant, findActivity.getBegin());
            Assert.assertEquals((Object) null, findActivity.getEnd());
            treeEntityManager.close();
            jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal())));
            Instant instant2 = new Instant();
            new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData);
            Assert.assertTrue(testJobFinalStatus(jobData, instant, null, ActivityStatus.SUCCEEDED).getModTime().getMillis() >= instant2.getMillis());
        } catch (Throwable th) {
            treeEntityManager.close();
            throw th;
        }
    }

    @Test
    public void testJobPersistenceWithInvalidMetrics() {
        DbActivity findActivity;
        Instant instant = new Instant();
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job1jpNaN", "jpNaN_job1_attempt1");
        MetricEnum metricEnum = MetricEnum.MEM_RSS;
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum, 0.1d)));
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE1, "job1jpNaN");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_REDUCES, Double.NaN), CmonAvroUtil.mv(MetricEnum.HDFS_READ, Double.MAX_VALUE), CmonAvroUtil.mv(MetricEnum.HDFS_WRITE, Double.MIN_VALUE), CmonAvroUtil.mv(MetricEnum.DISK_READ, Double.MIN_NORMAL), CmonAvroUtil.mv(MetricEnum.DISK_WRITE, Double.NEGATIVE_INFINITY), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, Double.POSITIVE_INFINITY), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant.getMillis())));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData);
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (jobData) {
                findActivity = treeEntityManager.findActivity(jobData.getDatabaseId(), true);
            }
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.NUM_DESIRED_MAPS.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.NUM_DESIRED_REDUCES.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.HDFS_READ.getUniqueMetricId())));
            if (!dbType.isMySQL()) {
                Assert.assertEquals(0L, Double.compare(((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.HDFS_READ.getUniqueMetricId()))).getValue(), 3.4028234663852886E38d));
            }
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.HDFS_WRITE.getUniqueMetricId())));
            if (!dbType.isMySQL()) {
                Assert.assertEquals(0L, Double.compare(((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.HDFS_WRITE.getUniqueMetricId()))).getValue(), 1.401298464324817E-45d));
            }
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.DISK_READ.getUniqueMetricId())));
            if (!dbType.isMySQL()) {
                Assert.assertEquals(0L, Double.compare(((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.DISK_READ.getUniqueMetricId()))).getValue(), 1.401298464324817E-45d));
            }
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.DISK_WRITE.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.MAP_INPUT_BYTES.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(metricEnum.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.SUBMIT_TIME.getUniqueMetricId())));
            Assert.assertEquals(1.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.NUM_DESIRED_MAPS.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(instant, findActivity.getBegin());
            Assert.assertEquals((Object) null, findActivity.getEnd());
            treeEntityManager.close();
            jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal())));
            Instant instant2 = new Instant();
            new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData);
            Assert.assertTrue(testJobFinalStatus(jobData, instant, null, ActivityStatus.SUCCEEDED).getModTime().getMillis() >= instant2.getMillis());
        } catch (Throwable th) {
            treeEntityManager.close();
            throw th;
        }
    }

    @Test
    public void testJobsStartedBeforeRetentionPeriod() {
        DbActivity findActivityByName;
        DbActivity findActivityByName2;
        CMONConfiguration cMONConfiguration = (CMONConfiguration) Mockito.spy(CMONConfiguration.getSingleton());
        Duration standardDays = Duration.standardDays(1L);
        ((CMONConfiguration) Mockito.doReturn(standardDays).when(cMONConfiguration)).getActivityPurgeDuration();
        ((CMONConfiguration) Mockito.doReturn(standardDays).when(cMONConfiguration)).getAttemptPurgeDuration();
        ActivityAndAttemptStore activityAndAttemptStore = new ActivityAndAttemptStore(emf, cMONConfiguration);
        Instant instant = new Instant();
        Instant minus = instant.minus(standardDays).minus(1L);
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job1_retention_test", "jp_job1_attempt1_retention_test");
        MetricEnum metricEnum = MetricEnum.MEM_RSS;
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum, 0.1d), CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal())));
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE1, "job1_retention_test");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus.getMillis())));
        AttemptTreeData attemptData2 = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job2_retention_test", "jp_job2_attempt1_retention_test");
        attemptData2.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(metricEnum, 0.1d), CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal())));
        JobTreeData jobData2 = activityMonitoringTree.getJobData(MAPREDUCE1, "job2_retention_test");
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus.getMillis())));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData);
        Assert.assertEquals(Persisters.PersistAttemptResult.ATTEMPT_TOO_OLD, new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptDataInternal(attemptData));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData2);
        Assert.assertEquals(Persisters.PersistAttemptResult.ATTEMPT_TOO_OLD, new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptDataInternal(attemptData2));
        synchronized (jobData) {
            Assert.assertEquals((Object) null, jobData.getDatabaseId());
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.RUNNING);
        }
        synchronized (jobData2) {
            Assert.assertEquals((Object) null, jobData2.getDatabaseId());
            Assert.assertTrue(jobData2.getState() == BaseTreeData.TreeState.RUNNING);
        }
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (jobData) {
                findActivityByName = treeEntityManager.findActivityByName(jobData.getJobId(), false);
            }
            Assert.assertEquals((Object) null, findActivityByName);
            synchronized (jobData2) {
                findActivityByName2 = treeEntityManager.findActivityByName(jobData2.getJobId(), false);
            }
            Assert.assertEquals((Object) null, findActivityByName2);
            treeEntityManager.close();
            jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal())));
            new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData);
            activityTreeWalkerAndPersister.run();
            synchronized (jobData) {
                Assert.assertEquals(BaseTreeData.TreeState.FINISHED_AND_CLOSED, jobData.getState());
            }
            synchronized (jobData2) {
                Assert.assertEquals(BaseTreeData.TreeState.RUNNING, jobData2.getState());
            }
            activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(6L)));
            synchronized (jobData) {
                Assert.assertEquals(BaseTreeData.TreeState.FINISHED_AND_CLOSED, jobData.getState());
            }
            synchronized (jobData2) {
                Assert.assertEquals(BaseTreeData.TreeState.RUNNING, jobData2.getState());
            }
            activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(20L)));
            MrServiceTreeData mrServiceData = activityMonitoringTree.getMrServiceData(MAPREDUCE1);
            Assert.assertNull(mrServiceData.getByJob().get(jobData));
            Assert.assertNull(mrServiceData.getByJob().get(jobData2));
        } catch (Throwable th) {
            treeEntityManager.close();
            throw th;
        }
    }

    @Test
    public void testImmediateRetirement() {
        DbActivity findActivityByName;
        DbActivity findActivityByName2;
        CMONConfiguration cMONConfiguration = (CMONConfiguration) Mockito.spy(CMONConfiguration.getSingleton());
        Duration standardDays = Duration.standardDays(1L);
        ((CMONConfiguration) Mockito.doReturn(standardDays).when(cMONConfiguration)).getActivityPurgeDuration();
        ((CMONConfiguration) Mockito.doReturn(standardDays).when(cMONConfiguration)).getAttemptPurgeDuration();
        ActivityAndAttemptStore activityAndAttemptStore = new ActivityAndAttemptStore(emf, cMONConfiguration);
        Instant minus = new Instant().minus(standardDays).minus(1L);
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE1, "job1_immediate_retention_test");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus.getMillis())));
        JobTreeData jobData2 = activityMonitoringTree.getJobData(MAPREDUCE1, "job2_immediate_retention_test");
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus.getMillis())));
        activityTreeWalkerAndPersister.run();
        synchronized (jobData) {
            Assert.assertEquals(BaseTreeData.TreeState.RUNNING, jobData.getState());
        }
        synchronized (jobData2) {
            Assert.assertEquals(BaseTreeData.TreeState.RUNNING, jobData2.getState());
        }
        MrServiceTreeData mrServiceData = activityMonitoringTree.getMrServiceData(MAPREDUCE1);
        Assert.assertNull(mrServiceData.getByJob().get(jobData));
        Assert.assertNull(mrServiceData.getByJob().get(jobData2));
        synchronized (jobData) {
            Assert.assertEquals((Object) null, jobData.getDatabaseId());
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.RUNNING);
        }
        synchronized (jobData2) {
            Assert.assertEquals((Object) null, jobData2.getDatabaseId());
            Assert.assertTrue(jobData2.getState() == BaseTreeData.TreeState.RUNNING);
        }
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (jobData) {
                findActivityByName = treeEntityManager.findActivityByName(jobData.getJobId(), false);
            }
            Assert.assertEquals((Object) null, findActivityByName);
            synchronized (jobData2) {
                findActivityByName2 = treeEntityManager.findActivityByName(jobData2.getJobId(), false);
            }
            Assert.assertEquals((Object) null, findActivityByName2);
            treeEntityManager.close();
        } catch (Throwable th) {
            treeEntityManager.close();
            throw th;
        }
    }

    private DbActivity testJobFinalStatus(JobTreeData jobTreeData, Instant instant, Instant instant2, ActivityStatus activityStatus) {
        DbActivity findActivity;
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (jobTreeData) {
                findActivity = null != jobTreeData.getDatabaseId() ? treeEntityManager.findActivity(jobTreeData.getDatabaseId(), true) : treeEntityManager.findActivityByName(jobTreeData.getJobId(), true);
            }
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.CPU_USER.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.CPU_SYSTEM.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.SUBMIT_TIME.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.FINISH_TIME.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.MOD_TIME.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.MAPS_RUNNING.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.REDUCES_RUNNING.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.MAP_PROGRESS.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.REDUCE_PROGRESS.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.ACTIVITY_STATUS.getUniqueMetricId())));
            Assert.assertEquals(0.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.CPU_USER.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(0.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.CPU_SYSTEM.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(0.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.MAPS_RUNNING.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(0.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.REDUCES_RUNNING.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(100.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.MAP_PROGRESS.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(100.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.REDUCE_PROGRESS.getUniqueMetricId()))).getValue(), 0.001d);
            ActivityStatus byOrdinal = ActivityStatus.getByOrdinal(Double.valueOf(((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.ACTIVITY_STATUS.getUniqueMetricId()))).getValue()).intValue());
            Assert.assertTrue(byOrdinal.isCompleted());
            Assert.assertEquals(activityStatus, byOrdinal);
            Assert.assertTrue(null != findActivity.getBegin());
            Assert.assertTrue(null != findActivity.getEnd());
            Assert.assertNotNull(findActivity.getModTime());
            Assert.assertTrue(findActivity.getBegin().getMillis() > 0);
            Assert.assertTrue(findActivity.getEnd().getMillis() > 0);
            Assert.assertTrue(findActivity.getModTime().getMillis() >= findActivity.getEnd().getMillis());
            if (null != instant) {
                Assert.assertEquals(instant, findActivity.getBegin());
            }
            if (null != instant2) {
                Assert.assertEquals(instant2, findActivity.getEnd());
            }
            Assert.assertTrue(findActivity.getBegin().getMillis() == findActivity.getEnd().getMillis() || findActivity.getBegin().isBefore(findActivity.getEnd().getMillis()));
            DbActivity dbActivity = findActivity;
            treeEntityManager.close();
            return dbActivity;
        } catch (Throwable th) {
            treeEntityManager.close();
            throw th;
        }
    }

    private DbActivity testSynJobFinalStatus(SyntheticTreeData syntheticTreeData, Instant instant) {
        DbActivity findActivity;
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (syntheticTreeData) {
                findActivity = treeEntityManager.findActivity(syntheticTreeData.getDatabaseId(), true);
            }
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.CPU_USER.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.CPU_SYSTEM.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.SUBMIT_TIME.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.FINISH_TIME.getUniqueMetricId())));
            Assert.assertFalse(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.MOD_TIME.getUniqueMetricId())));
            Assert.assertTrue(findActivity.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.ACTIVITY_STATUS.getUniqueMetricId())));
            Assert.assertEquals(0.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.CPU_USER.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(0.0d, ((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.CPU_SYSTEM.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertTrue(ActivityStatus.getByOrdinal(Double.valueOf(((DbActivityLastMetricValue) findActivity.getLastMetricValues().get(Integer.valueOf(MetricEnum.ACTIVITY_STATUS.getUniqueMetricId()))).getValue()).intValue()).isCompleted());
            Assert.assertEquals(instant, findActivity.getBegin());
            Assert.assertTrue(null != findActivity.getEnd());
            Assert.assertTrue(findActivity.getBegin().getMillis() == findActivity.getEnd().getMillis() || findActivity.getBegin().isBefore(findActivity.getEnd().getMillis()));
            Assert.assertNotNull(findActivity.getModTime());
            Assert.assertTrue(findActivity.getModTime().getMillis() >= findActivity.getEnd().getMillis());
            treeEntityManager.close();
            return findActivity;
        } catch (Throwable th) {
            treeEntityManager.close();
            throw th;
        }
    }

    @Test
    public void testJobPersistenceWithNewTree() {
        Instant minus = new Instant().minus(Duration.standardMinutes(1L));
        JobTreeData jobData = new ActivityMonitoringTree().getJobData(MAPREDUCE1, "job_newtree");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus.getMillis()), CmonAvroUtil.mv(MetricEnum.USER, "user1")));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData);
        Instant plus = minus.plus(Duration.standardMinutes(1L));
        JobTreeData jobData2 = new ActivityMonitoringTree().getJobData(MAPREDUCE1, "job_newtree");
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 2.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, plus.getMillis()), CmonAvroUtil.mv(MetricEnum.USER, "user2")));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData2);
        EntityManager createEntityManager = emf.createEntityManager();
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            List resultList = createEntityManager.createQuery("SELECT x FROM DbActivityAttribute x WHERE x.activity.name = :name", DbActivityAttribute.class).setParameter("name", "job_newtree").getResultList();
            Assert.assertEquals(1L, resultList.size());
            Assert.assertEquals("user2", ((DbActivityAttribute) Iterables.getOnlyElement(resultList)).getStringAttribute());
            treeEntityManager.beginForRollbackAndReadonly();
            DbActivity findActivityByName = treeEntityManager.findActivityByName("job_newtree", true);
            Assert.assertTrue(null != findActivityByName);
            Assert.assertTrue(findActivityByName.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.NUM_DESIRED_MAPS.getUniqueMetricId())));
            Assert.assertEquals(2.0d, ((DbActivityLastMetricValue) findActivityByName.getLastMetricValues().get(Integer.valueOf(MetricEnum.NUM_DESIRED_MAPS.getUniqueMetricId()))).getValue(), 0.001d);
            treeEntityManager.close();
            createEntityManager.close();
            jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal())));
            new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistJobData(jobData2);
            testJobFinalStatus(jobData2, minus, null, ActivityStatus.SUCCEEDED);
        } catch (Throwable th) {
            treeEntityManager.close();
            createEntityManager.close();
            throw th;
        }
    }

    @Test
    public void testAttemptPersistence() {
        DbAttempt findAttempt;
        DbAttempt findAttempt2;
        Instant instant = new Instant();
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job2", "job2_attempt1");
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis())));
        activityMonitoringTree.getJobData(MAPREDUCE1, "job1").receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis())));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
        synchronized (attemptData) {
            Assert.assertEquals((Object) null, attemptData.getDatabaseId());
        }
        activityMonitoringTree.getJobData(MAPREDUCE1, "job2").receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant.getMillis())));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            String partition = partitionDesignator.getPartition(DbAttempt.class, instant);
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (attemptData) {
                findAttempt = treeEntityManager.findAttempt(attemptData.getDatabaseId(), partition, true, partitionDesignator.getPartition(DbAttemptLastMetricValue.class, instant));
            }
            Assert.assertEquals(1L, treeEntityManager.countAttemptsWithName(findAttempt.getName(), partition));
            Assert.assertEquals(1L, findAttempt.getLastMetricValues().size());
            Assert.assertEquals(instant, findAttempt.getBegin());
            Assert.assertEquals((Object) null, findAttempt.getEnd());
            Assert.assertEquals((Object) null, findAttempt.getHost());
            treeEntityManager.close();
            attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mv(MetricEnum.HOST, KaiserTestBase.HOST_ID_HOST1)));
            new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
            treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
            try {
                String partition2 = partitionDesignator.getPartition(DbAttempt.class, instant);
                treeEntityManager.beginForRollbackAndReadonly();
                synchronized (attemptData) {
                    findAttempt2 = treeEntityManager.findAttempt(attemptData.getDatabaseId(), partitionDesignator.getPartition(DbAttempt.class, instant), true, partitionDesignator.getPartition(DbAttemptLastMetricValue.class, instant));
                }
                Assert.assertEquals(1L, treeEntityManager.countAttemptsWithName(findAttempt2.getName(), partition2));
                Assert.assertEquals(1L, findAttempt2.getLastMetricValues().size());
                Assert.assertEquals(instant, findAttempt2.getBegin());
                Assert.assertEquals((Object) null, findAttempt2.getEnd());
                Assert.assertEquals(KaiserTestBase.HOST_ID_HOST1, findAttempt2.getHost());
                treeEntityManager.close();
                attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.ATTEMPT_STATUS, instant, MetricSchema.AttemptStatus.SUCCEEDED.ordinal())));
                new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
                testAttemptFinalStatus(attemptData, instant, MetricSchema.AttemptStatus.SUCCEEDED, instant);
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testAttemptPersistenceWithInvalidMetrics() {
        DbAttempt findAttempt;
        Instant instant = new Instant();
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job2_invalid_metrics", "job2_attempt1_invalid_metrics");
        attemptData.receiveMetrics(new Instant(), ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_REDUCES, Double.NaN), CmonAvroUtil.mv(MetricEnum.HDFS_READ, Double.MAX_VALUE), CmonAvroUtil.mv(MetricEnum.HDFS_WRITE, Double.MIN_VALUE), CmonAvroUtil.mv(MetricEnum.DISK_READ, Double.MIN_NORMAL), CmonAvroUtil.mv(MetricEnum.DISK_WRITE, Double.NEGATIVE_INFINITY), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, Double.POSITIVE_INFINITY), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis())));
        activityMonitoringTree.getJobData(MAPREDUCE1, "job2_invalid_metrics").receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.NUM_DESIRED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant.getMillis())));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            String partition = partitionDesignator.getPartition(DbAttempt.class, instant);
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (attemptData) {
                findAttempt = treeEntityManager.findAttempt(attemptData.getDatabaseId(), partition, true, partitionDesignator.getPartition(DbAttemptLastMetricValue.class, instant));
            }
            Assert.assertEquals(1L, treeEntityManager.countAttemptsWithName(findAttempt.getName(), partition));
            Assert.assertEquals(4L, findAttempt.getLastMetricValues().size());
            Assert.assertFalse(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.NUM_DESIRED_REDUCES.getUniqueMetricId())));
            Assert.assertTrue(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.HDFS_READ.getUniqueMetricId())));
            if (!dbType.isMySQL()) {
                Assert.assertEquals(0L, Double.compare(((DbAttemptLastMetricValue) findAttempt.getLastMetricValues().get(Integer.valueOf(MetricEnum.HDFS_READ.getUniqueMetricId()))).getValue(), 3.4028234663852886E38d));
            }
            Assert.assertTrue(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.HDFS_WRITE.getUniqueMetricId())));
            if (!dbType.isMySQL()) {
                Assert.assertEquals(0L, Double.compare(((DbAttemptLastMetricValue) findAttempt.getLastMetricValues().get(Integer.valueOf(MetricEnum.HDFS_WRITE.getUniqueMetricId()))).getValue(), 1.401298464324817E-45d));
            }
            Assert.assertTrue(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.DISK_READ.getUniqueMetricId())));
            if (!dbType.isMySQL()) {
                Assert.assertEquals(0L, Double.compare(((DbAttemptLastMetricValue) findAttempt.getLastMetricValues().get(Integer.valueOf(MetricEnum.DISK_READ.getUniqueMetricId()))).getValue(), 1.401298464324817E-45d));
            }
            Assert.assertFalse(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.DISK_WRITE.getUniqueMetricId())));
            Assert.assertFalse(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.MAP_INPUT_BYTES.getUniqueMetricId())));
            Assert.assertEquals(instant, findAttempt.getBegin());
            Assert.assertEquals((Object) null, findAttempt.getEnd());
            Assert.assertEquals((Object) null, findAttempt.getHost());
            treeEntityManager.close();
        } catch (Throwable th) {
            treeEntityManager.close();
            throw th;
        }
    }

    private void testAttemptFinalStatus(AttemptTreeData attemptTreeData, Instant instant, MetricSchema.AttemptStatus attemptStatus, Instant instant2) {
        DbAttempt findAttempt;
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (attemptTreeData) {
                String partition = partitionDesignator.getPartition(DbAttempt.class, instant2);
                findAttempt = treeEntityManager.findAttempt(attemptTreeData.getDatabaseId(), partition, true, partitionDesignator.getPartition(DbAttemptLastMetricValue.class, instant2));
                Assert.assertTrue(null != findAttempt);
                Assert.assertEquals(1L, treeEntityManager.countAttemptsWithName(findAttempt.getName(), partition));
            }
            Assert.assertTrue(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.CPU_USER.getUniqueMetricId())));
            Assert.assertTrue(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.CPU_SYSTEM.getUniqueMetricId())));
            Assert.assertFalse(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.SUBMIT_TIME.getUniqueMetricId())));
            Assert.assertFalse(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.FINISH_TIME.getUniqueMetricId())));
            Assert.assertTrue(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.PROGRESS.getUniqueMetricId())));
            Assert.assertTrue(findAttempt.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.ATTEMPT_STATUS.getUniqueMetricId())));
            Assert.assertEquals(0.0d, ((DbAttemptLastMetricValue) findAttempt.getLastMetricValues().get(Integer.valueOf(MetricEnum.CPU_USER.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(0.0d, ((DbAttemptLastMetricValue) findAttempt.getLastMetricValues().get(Integer.valueOf(MetricEnum.CPU_SYSTEM.getUniqueMetricId()))).getValue(), 0.001d);
            Assert.assertEquals(100.0d, ((DbAttemptLastMetricValue) findAttempt.getLastMetricValues().get(Integer.valueOf(MetricEnum.PROGRESS.getUniqueMetricId()))).getValue(), 0.001d);
            MetricSchema.AttemptStatus byOrdinal = MetricSchema.AttemptStatus.getByOrdinal(Double.valueOf(((DbAttemptLastMetricValue) findAttempt.getLastMetricValues().get(Integer.valueOf(MetricEnum.ATTEMPT_STATUS.getUniqueMetricId()))).getValue()).intValue());
            Assert.assertTrue(byOrdinal.isCompleted());
            Assert.assertEquals(attemptStatus, byOrdinal);
            Assert.assertTrue(null != findAttempt.getEnd());
            Assert.assertTrue(findAttempt.getBegin().getMillis() == findAttempt.getEnd().getMillis() || findAttempt.getBegin().isBefore(findAttempt.getEnd().getMillis()));
            Assert.assertTrue(findAttempt.getEnd().getMillis() > 0);
            Assert.assertTrue(findAttempt.getBegin().getMillis() > 0);
            if (null != instant) {
                Assert.assertEquals(instant, findAttempt.getBegin());
            }
        } finally {
            treeEntityManager.close();
        }
    }

    @Test
    public void testAttemptLifecycle() {
        Instant instant = new Instant();
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE1, "job3", "job3_attempt1");
        JobTreeData jobData = attemptData.getJobData();
        Instant minus = instant.minus(20L);
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus)));
        MetricEnum metricEnum = MetricEnum.MEM_RSS;
        attemptData.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(metricEnum, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis()), CmonAvroUtil.mv(MetricEnum.TOTAL_CPU_USER, 5.0d), CmonAvroUtil.mvState(MetricEnum.ACTIVITY_STATUS, instant, MetricSchema.AttemptStatus.RUNNING.ordinal())));
        CounterValueAggregator counterValueAggregator = (CounterValueAggregator) jobData.getAggregatorsForTest().get(MetricEnum.TOTAL_CPU_USER);
        Assert.assertEquals(5.0d, counterValueAggregator.getAggregate(), 0.001d);
        Assert.assertEquals(1L, counterValueAggregator.size());
        Instant plus = instant.plus(Duration.standardMinutes(1L));
        attemptData.receiveMetrics(plus, ImmutableList.of(CmonAvroUtil.mv(metricEnum, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis()), CmonAvroUtil.mv(MetricEnum.TOTAL_CPU_USER, 10.0d), CmonAvroUtil.mvState(MetricEnum.ATTEMPT_STATUS, plus, MetricSchema.AttemptStatus.FAILED.ordinal())));
        synchronized (attemptData) {
            Assert.assertEquals(BaseTreeData.TreeState.FINISHED, attemptData.getState());
        }
        Assert.assertEquals((Object) null, ((GaugeValueAggregator2) jobData.getAggregatorsForTest().get(metricEnum)).getLastTimestamp(attemptData));
        Assert.assertEquals(0L, r0.size());
        Assert.assertEquals(0L, r0.size());
        Assert.assertEquals(10.0d, counterValueAggregator.getAggregate(), 0.001d);
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(activityMonitoringTree.getAttemptData(MAPREDUCE1, "job3", "job3_attempt1"));
        synchronized (attemptData) {
            Assert.assertEquals(BaseTreeData.TreeState.FINISHED_AND_CLOSED, attemptData.getState());
        }
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        synchronized (attemptData) {
            Assert.assertTrue(plus.isBefore(attemptData.getExpirationDate()));
            activityTreeWalkerAndPersister.run(attemptData.getExpirationDate().minus(Duration.standardSeconds(1L)));
            Assert.assertTrue(jobData.getByAttempt().containsKey(attemptData.getAttemptId()));
            activityTreeWalkerAndPersister.run(attemptData.getExpirationDate().plus(Duration.standardSeconds(1L)));
            Assert.assertFalse(jobData.getByAttempt().containsKey(attemptData.getAttemptId()));
        }
        testAttemptFinalStatus(attemptData, instant, MetricSchema.AttemptStatus.FAILED, minus);
    }

    @Test
    public void testClusterAggregates() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        for (int i = 0; i < 10; i++) {
            activityMonitoringTree.getAttemptData(MAPREDUCE2, "job1", "job1_attempt" + i).receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, minute(0).getMillis()), CmonAvroUtil.mv(MetricEnum.TOTAL_CPU_USER, 5.0d), CmonAvroUtil.mv(MetricEnum.MAPS_RUNNING, 1.0d), CmonAvroUtil.mvState(MetricEnum.ACTIVITY_STATUS, instant, MetricSchema.AttemptStatus.RUNNING.ordinal())));
        }
        Assert.assertEquals(50.0d, ((ValueAggregator) activityMonitoringTree.getMrServiceData(MAPREDUCE2).getAggregators().get(MetricEnum.TOTAL_CPU_USER)).getAggregate(instant), 0.001d);
        activityMonitoringTree.getJobData(MAPREDUCE2, "job1").receiveMetricValues(Lists.newArrayList(new MetricValue[]{CmonAvroUtil.mvState(MetricEnum.ACTIVITY_STATUS, minute(2), ActivityStatus.SUCCEEDED.ordinal())}));
        for (int i2 = 0; i2 < 10; i2++) {
            activityMonitoringTree.getJobData(MAPREDUCE2, "job" + (i2 + 2)).receiveMetricValues(Lists.newArrayList(new MetricValue[]{CmonAvroUtil.mv(MetricEnum.HDFS_READ, 1.0d), CmonAvroUtil.mv(MetricEnum.HDFS_WRITE, 2.0d), CmonAvroUtil.mv(MetricEnum.DISK_READ, 1.0d), CmonAvroUtil.mv(MetricEnum.DISK_WRITE, 2.0d)}));
        }
        Instant plus = instant.plus(Duration.standardMinutes(2L));
        Assert.assertEquals("Counter value is still the same", 50.0d, ((ValueAggregator) activityMonitoringTree.getMrServiceData(MAPREDUCE2).getAggregators().get(MetricEnum.TOTAL_CPU_USER)).getAggregate(plus), 0.001d);
        Assert.assertEquals("Gauge value is reset", 0.0d, ((ValueAggregator) activityMonitoringTree.getMrServiceData(MAPREDUCE2).getAggregators().get(MetricEnum.MAPS_RUNNING)).getAggregate(plus), 0.001d);
        Assert.assertEquals(10.0d, ((ValueAggregator) activityMonitoringTree.getMrServiceData(MAPREDUCE2).getAggregators().get(MetricEnum.HDFS_READ)).getAggregate(instant), 0.001d);
        Assert.assertEquals(20.0d, ((ValueAggregator) activityMonitoringTree.getMrServiceData(MAPREDUCE2).getAggregators().get(MetricEnum.HDFS_WRITE)).getAggregate(instant), 0.001d);
        Assert.assertEquals(10.0d, ((ValueAggregator) activityMonitoringTree.getMrServiceData(MAPREDUCE2).getAggregators().get(MetricEnum.DISK_READ)).getAggregate(instant), 0.001d);
        Assert.assertEquals(20.0d, ((ValueAggregator) activityMonitoringTree.getMrServiceData(MAPREDUCE2).getAggregators().get(MetricEnum.DISK_WRITE)).getAggregate(instant), 0.001d);
    }

    @Test
    public void testSyntheticActivity() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        int i = 0;
        while (i < 2) {
            String str = "job" + i;
            activityMonitoringTree.getJobData(MAPREDUCE3, str).receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.PIG_JOB_ID, "pig_job_xyz"), CmonAvroUtil.mv(MetricEnum.ACTIVITY_NAME, "pig_job_xyz_name"), CmonAvroUtil.mv(MetricEnum.USER, "somebody"), CmonAvroUtil.mv(MetricEnum.GROUP, "somegroup"), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.TOTAL_LAUNCHED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minute(i == 0 ? 1 : 0))));
            activityMonitoringTree.getAttemptData(MAPREDUCE3, str, str + "attempt1").receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mv(MetricEnum.TOTAL_CPU_USER, 5.0d)));
            i++;
        }
        Assert.assertTrue(((MrServiceTreeData) activityMonitoringTree.getByService().get(MAPREDUCE3)).getBySynthetic().containsKey("pig_job_xyz"));
        SyntheticTreeData syntheticTreeData = (SyntheticTreeData) ((MrServiceTreeData) activityMonitoringTree.getByService().get(MAPREDUCE3)).getBySynthetic().get("pig_job_xyz");
        Assert.assertEquals("Check aggregate coming from jobs.", 200.0d, ((ValueAggregator) syntheticTreeData.getAggregators().get(MetricEnum.MAP_INPUT_BYTES)).getAggregate(instant), 0.001d);
        Assert.assertEquals("Check aggregate coming all the way from attempts.", 10.0d, ((ValueAggregator) syntheticTreeData.getAggregators().get(MetricEnum.TOTAL_CPU_USER)).getAggregate(instant), 0.001d);
        Assert.assertEquals("Check start time was the min of the two possibilities.", minute(0), syntheticTreeData.getStartTime());
        persistTree(activityMonitoringTree);
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            DbActivity findActivityByName = treeEntityManager.findActivityByName("pig_job_xyz", true);
            Assert.assertNotNull(findActivityByName);
            Assert.assertTrue(findActivityByName.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.MAP_INPUT_BYTES.getUniqueMetricId())));
            Assert.assertTrue(findActivityByName.getLastMetricValues().containsKey(Integer.valueOf(MetricEnum.TOTAL_LAUNCHED_MAPS.getUniqueMetricId())));
            Assert.assertTrue(((DbActivityLastMetricValue) findActivityByName.getLastMetricValues().get(Integer.valueOf(MetricEnum.TOTAL_LAUNCHED_MAPS.getUniqueMetricId()))).getValue() == 2.0d);
            Assert.assertTrue(findActivityByName.getActivityAttributes().containsKey(Integer.valueOf(MetricEnum.USER.getUniqueMetricId())));
            Assert.assertTrue(findActivityByName.getActivityAttributes().containsKey(Integer.valueOf(MetricEnum.GROUP.getUniqueMetricId())));
            Assert.assertEquals("somebody", ((DbActivityAttribute) findActivityByName.getActivityAttributes().get(Integer.valueOf(MetricEnum.USER.getUniqueMetricId()))).getStringAttribute());
            Assert.assertEquals("somegroup", ((DbActivityAttribute) findActivityByName.getActivityAttributes().get(Integer.valueOf(MetricEnum.GROUP.getUniqueMetricId()))).getStringAttribute());
            treeEntityManager.close();
            for (int i2 = 0; i2 < 2; i2++) {
                activityMonitoringTree.getJobData(MAPREDUCE3, "job" + i2).receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.TOTAL_LAUNCHED_MAPS, 2.0d)));
            }
            persistTree(activityMonitoringTree);
            TreeEntityManager treeEntityManager2 = new TreeEntityManager(emf, partitionDesignator);
            try {
                treeEntityManager2.beginForRollbackAndReadonly();
                DbActivity findActivityByName2 = treeEntityManager2.findActivityByName("pig_job_xyz", true);
                Assert.assertNotNull(findActivityByName2);
                Assert.assertTrue(((DbActivityLastMetricValue) findActivityByName2.getLastMetricValues().get(Integer.valueOf(MetricEnum.TOTAL_LAUNCHED_MAPS.getUniqueMetricId()))).getValue() == 4.0d);
                treeEntityManager2.close();
                for (int i3 = 0; i3 < 2; i3++) {
                    activityMonitoringTree.getJobData(MAPREDUCE3, "job" + i3).receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.TOTAL_LAUNCHED_MAPS, 3.0d)));
                }
                persistTree(activityMonitoringTree);
                treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
                try {
                    treeEntityManager.beginForRollbackAndReadonly();
                    DbActivity findActivityByName3 = treeEntityManager.findActivityByName("pig_job_xyz", true);
                    Assert.assertNotNull(findActivityByName3);
                    Assert.assertTrue(((DbActivityLastMetricValue) findActivityByName3.getLastMetricValues().get(Integer.valueOf(MetricEnum.TOTAL_LAUNCHED_MAPS.getUniqueMetricId()))).getValue() == 6.0d);
                    treeEntityManager.close();
                    for (int i4 = 0; i4 < 2; i4++) {
                        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE3, "job" + i4);
                        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal())));
                        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.TOTAL_LAUNCHED_MAPS, 4.0d)));
                    }
                    persistTree(activityMonitoringTree);
                    treeEntityManager2 = new TreeEntityManager(emf, partitionDesignator);
                    try {
                        treeEntityManager2.beginForRollbackAndReadonly();
                        DbActivity findActivityByName4 = treeEntityManager2.findActivityByName("pig_job_xyz", true);
                        Assert.assertNotNull(findActivityByName4);
                        Assert.assertTrue(((DbActivityLastMetricValue) findActivityByName4.getLastMetricValues().get(Integer.valueOf(MetricEnum.TOTAL_LAUNCHED_MAPS.getUniqueMetricId()))).getValue() == 6.0d);
                        treeEntityManager2.close();
                    } finally {
                        treeEntityManager2.close();
                    }
                } finally {
                    treeEntityManager.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testSynActMultipleExpirationRemainSame() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        activityMonitoringTree.getJobData(MAPREDUCE3, "pig_mr_job").receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.PIG_JOB_ID, "pig_job_to_expire_twice"), CmonAvroUtil.mv(MetricEnum.ACTIVITY_NAME, "pig_job_xyz_name"), CmonAvroUtil.mv(MetricEnum.USER, "somebody"), CmonAvroUtil.mv(MetricEnum.GROUP, "somegroup"), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.TOTAL_LAUNCHED_MAPS, 1.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minute(0))));
        activityMonitoringTree.getAttemptData(MAPREDUCE3, "pig_mr_job", "pig_mr_jobattempt1").receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mv(MetricEnum.TOTAL_CPU_USER, 5.0d)));
        SyntheticTreeData syntheticTreeData = (SyntheticTreeData) ((MrServiceTreeData) activityMonitoringTree.getByService().get(MAPREDUCE3)).getBySynthetic().get("pig_job_to_expire_twice");
        Assert.assertNotNull(syntheticTreeData);
        synchronized (syntheticTreeData) {
            syntheticTreeData.expire();
        }
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistSyntheticActivityData(syntheticTreeData);
        synchronized (syntheticTreeData) {
            syntheticTreeData.getMetricMap();
            for (Map.Entry entry : syntheticTreeData.getMetricMap().entrySet()) {
                if (!ActivityAndAttemptStore.ACTIVITY_METRICS_TO_IGNORE_FOR_ATTR_TABLES.contains(entry.getKey())) {
                    Assert.assertFalse(((DirtyMarker) entry.getValue()).isDirty());
                }
            }
        }
        synchronized (syntheticTreeData) {
            syntheticTreeData.expire();
            syntheticTreeData.getMetricMap();
            for (Map.Entry entry2 : syntheticTreeData.getMetricMap().entrySet()) {
                if (!ActivityAndAttemptStore.ACTIVITY_METRICS_TO_IGNORE_FOR_ATTR_TABLES.contains(entry2.getKey())) {
                    Assert.assertFalse(((DirtyMarker) entry2.getValue()).isDirty());
                }
            }
        }
    }

    @Test
    public void testOozie() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE3, "mrjob");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant)));
        JobTreeData jobData2 = activityMonitoringTree.getJobData(MAPREDUCE3, "mrpigjob");
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.PIG_JOB_ID, "pig_job3"), CmonAvroUtil.mv(MetricEnum.ACTIVITY_NAME, "pig_job3_name"), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant)));
        activityMonitoringTree.getSyntheticData(MAPREDUCE3, MetricSchema.ActivityType.OOZIE, "ooziejob").receiveMetricsFromOozieJob(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal())));
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.OOZIE_JOB_ID, "ooziejob")));
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.OOZIE_JOB_ID, "ooziejob")));
        persistTree(activityMonitoringTree);
        persistTree(activityMonitoringTree);
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            DbActivity findActivityByName = treeEntityManager.findActivityByName("mrjob", true);
            DbActivity findActivityByName2 = treeEntityManager.findActivityByName("mrpigjob", true);
            DbActivity findActivityByName3 = treeEntityManager.findActivityByName("ooziejob", true);
            Assert.assertEquals("MR's parent is oozie", findActivityByName.getParent(), findActivityByName3);
            Assert.assertEquals("MrPig's grand parent is oozie", findActivityByName3, findActivityByName2.getParent().getParent());
            Assert.assertEquals("MrPig's parent is reasonable", findActivityByName2.getParent().getName(), "pig_job3");
            Assert.assertNull("Oozie has no parent", findActivityByName3.getParent());
            Assert.assertEquals(MetricSchema.ActivityType.OOZIE.ordinal(), findActivityByName3.getActivityType());
            Assert.assertEquals(MetricSchema.ActivityType.MR.ordinal(), findActivityByName2.getActivityType());
            Assert.assertEquals(MetricSchema.ActivityType.MR.ordinal(), findActivityByName.getActivityType());
            Assert.assertEquals(MetricSchema.ActivityType.PIG.ordinal(), findActivityByName2.getParent().getActivityType());
            treeEntityManager.close();
        } catch (Throwable th) {
            treeEntityManager.close();
            throw th;
        }
    }

    @Test
    public void testModTimeUpdate() throws InterruptedException {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE4, "mrjob_tmtu");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant)));
        persistTree(activityMonitoringTree);
        Instant modTime = getModTime(jobData);
        Assert.assertNotNull(modTime);
        Thread.sleep(1L);
        synchronized (jobData) {
            jobData.setLastUpdate(new Instant());
        }
        persistTree(activityMonitoringTree);
        Instant modTime2 = getModTime(jobData);
        Assert.assertTrue("modTime should advance", modTime2.isAfter(modTime));
        Thread.sleep(1L);
        persistTree(activityMonitoringTree);
        Assert.assertTrue("modTime should not advance", getModTime(jobData).equals(modTime2));
    }

    private Instant getModTime(BaseTreeData baseTreeData) {
        Instant modTime;
        TreeEntityManager treeEntityManager = new TreeEntityManager(emf, partitionDesignator);
        try {
            treeEntityManager.beginForRollbackAndReadonly();
            synchronized (baseTreeData) {
                modTime = treeEntityManager.findActivity(baseTreeData.getDatabaseId(), true).getModTime();
            }
            return modTime;
        } finally {
            treeEntityManager.rollback();
        }
    }

    @Test
    public void testJobExpiration() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE4, "mrjobtje", "mrjobtje_attempt1");
        attemptData.receiveMetrics(new Instant().plus(Duration.standardMinutes(10L)), ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis())));
        AttemptTreeData attemptData2 = activityMonitoringTree.getAttemptData(MAPREDUCE4, "mrjobtje", "mrjobtje_attempt2");
        attemptData2.receiveMetrics(new Instant().plus(Duration.standardMinutes(10L)), ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis()), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, instant.plus(Duration.standardMinutes(10L))), CmonAvroUtil.mvState(MetricEnum.ATTEMPT_STATUS, instant, MetricSchema.AttemptStatus.SUCCEEDED.ordinal())));
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE4, "mrjobtje");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant)));
        JobTreeData jobData2 = activityMonitoringTree.getJobData(MAPREDUCE4, "mrjobtje_nostatus");
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant)));
        JobTreeData jobData3 = activityMonitoringTree.getJobData(MAPREDUCE4, "mrjobtje_nostatus_with_end_time");
        jobData3.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, instant)));
        JobTreeData jobData4 = activityMonitoringTree.getJobData(MAPREDUCE4, "mrjobtje_finale_tatus_no_end_time");
        jobData4.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.FAILED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant)));
        JobTreeData jobData5 = activityMonitoringTree.getJobData(MAPREDUCE4, "mrpigjobtje");
        jobData5.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.PIG_JOB_ID, "pig_jobtje"), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant), CmonAvroUtil.mv(MetricEnum.ACTIVITY_NAME, "pig_jobtje_name")));
        SyntheticTreeData syntheticData = activityMonitoringTree.getSyntheticData(MAPREDUCE4, MetricSchema.ActivityType.OOZIE, "ooziejobtje");
        syntheticData.receiveMetricsFromOozieJob(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal())));
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.OOZIE_JOB_ID, "ooziejobtje")));
        jobData5.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.OOZIE_JOB_ID, "ooziejobtje")));
        activityTreeWalkerAndPersister.run();
        MrServiceTreeData mrServiceData = activityMonitoringTree.getMrServiceData(MAPREDUCE4);
        Assert.assertNotNull(mrServiceData.getByJob().get("mrjobtje"));
        Assert.assertNotNull(mrServiceData.getByJob().get("mrpigjobtje"));
        Assert.assertNotNull(mrServiceData.getBySynthetic().get("pig_jobtje"));
        Assert.assertNotNull(mrServiceData.getBySynthetic().get("ooziejobtje"));
        SyntheticTreeData syntheticTreeData = (SyntheticTreeData) mrServiceData.getBySynthetic().get("pig_jobtje");
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(10L)));
        synchronized (jobData) {
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        synchronized (jobData5) {
            Assert.assertTrue(jobData5.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        synchronized (jobData2) {
            Assert.assertTrue(jobData2.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        synchronized (jobData3) {
            Assert.assertTrue(jobData3.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        synchronized (jobData4) {
            Assert.assertTrue(jobData4.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData2);
        testAttemptFinalStatus(attemptData, instant, MetricSchema.AttemptStatus.CMF_UNKNOWN, instant);
        testAttemptFinalStatus(attemptData2, instant, MetricSchema.AttemptStatus.SUCCEEDED, instant);
        testJobFinalStatus(jobData5, instant, null, ActivityStatus.CMF_UNKNOWN);
        testJobFinalStatus(jobData2, instant, null, ActivityStatus.CMF_UNKNOWN);
        testJobFinalStatus(jobData3, instant, null, ActivityStatus.CMF_UNKNOWN);
        testJobFinalStatus(jobData4, instant, null, ActivityStatus.FAILED);
        synchronized (syntheticTreeData) {
            Assert.assertEquals(BaseTreeData.TreeState.RUNNING, syntheticTreeData.getState());
        }
        synchronized (syntheticData) {
            Assert.assertEquals(BaseTreeData.TreeState.RUNNING, syntheticData.getState());
        }
        Instant instant2 = new Instant();
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(11L)));
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(11L)));
        synchronized (syntheticTreeData) {
            Assert.assertEquals(BaseTreeData.TreeState.FINISHED_AND_CLOSED, syntheticTreeData.getState());
        }
        synchronized (syntheticData) {
            Assert.assertEquals(BaseTreeData.TreeState.FINISHED_AND_CLOSED, syntheticData.getState());
        }
        DbActivity testSynJobFinalStatus = testSynJobFinalStatus(syntheticTreeData, instant);
        DbActivity testSynJobFinalStatus2 = testSynJobFinalStatus(syntheticData, instant);
        Assert.assertTrue(testSynJobFinalStatus.getModTime().getMillis() >= instant2.getMillis());
        Assert.assertTrue(testSynJobFinalStatus2.getModTime().getMillis() >= instant2.getMillis());
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(20L)));
        Assert.assertNull(mrServiceData.getByJob().get("mrjobtje"));
        Assert.assertNull(mrServiceData.getByJob().get("mrpigjobtje"));
        Assert.assertNull(mrServiceData.getBySynthetic().get("pig_jobtje"));
        Assert.assertNull(mrServiceData.getBySynthetic().get("ooziejobtje"));
    }

    void persistTree(ActivityMonitoringTree activityMonitoringTree) {
        new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore).run();
    }

    @Test
    public void testTreeDataTimeSeriesInterval() {
        JobTreeData jobData = new ActivityMonitoringTree().getJobData("x", "j");
        Duration standardMinutes = Duration.standardMinutes(1L);
        synchronized (jobData) {
            Assert.assertTrue(jobData.shouldWriteTimeSeries(minute(0), standardMinutes));
            Assert.assertFalse(jobData.shouldWriteTimeSeries(minute(0), standardMinutes));
            Assert.assertTrue(jobData.shouldWriteTimeSeries(minute(1), standardMinutes));
            Assert.assertFalse(jobData.shouldWriteTimeSeries(minute(1).plus(Duration.standardSeconds(20L)), standardMinutes));
            Assert.assertTrue(jobData.shouldWriteTimeSeries(minute(2), standardMinutes));
        }
    }

    @Test
    public void testNoExpirationForRunningJobs() throws InterruptedException {
        Instant lastUpdate;
        Instant lastUpdate2;
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        Instant minus = instant.minus(Duration.standardMinutes(10L));
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE5, "tnefrj_mrjob", "tnefrj_mrjob_attempt1");
        attemptData.receiveMetrics(minus, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, minus, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, minus.getMillis())));
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test1", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE5, "tnefrj_mrjob");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus)));
        SyntheticTreeData syntheticData = activityMonitoringTree.getSyntheticData(MAPREDUCE5, MetricSchema.ActivityType.OOZIE, "tnefr_ooziejobj");
        syntheticData.receiveMetricsFromOozieJob(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal())));
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.OOZIE_JOB_ID, "tnefr_ooziejobj")));
        activityTreeWalkerAndPersister.run(minus);
        MrServiceTreeData mrServiceData = activityMonitoringTree.getMrServiceData(MAPREDUCE5);
        Assert.assertNotNull(mrServiceData.getByJob().get("tnefrj_mrjob"));
        Assert.assertNotNull(mrServiceData.getBySynthetic().get("tnefr_ooziejobj"));
        synchronized (syntheticData) {
            Assert.assertTrue(syntheticData.getLastUpdate().isAfter(instant) || syntheticData.getLastUpdate().isEqual(instant));
            lastUpdate = syntheticData.getLastUpdate();
        }
        Thread.sleep(10L);
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.MAP_OUTPUT_BYTES, 200.0d)));
        synchronized (syntheticData) {
            Assert.assertTrue(syntheticData.getLastUpdate().isAfter(lastUpdate));
            lastUpdate2 = syntheticData.getLastUpdate();
        }
        Thread.sleep(10L);
        syntheticData.receiveMetricsFromOozieJob(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.DISK_WRITE, 100.0d)));
        synchronized (syntheticData) {
            Assert.assertTrue(syntheticData.getLastUpdate().isAfter(lastUpdate2));
            syntheticData.getLastUpdate();
        }
        activityTreeWalkerAndPersister.run(instant);
        synchronized (syntheticData) {
            Assert.assertTrue(syntheticData.getState() == BaseTreeData.TreeState.RUNNING);
        }
        synchronized (jobData) {
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.RUNNING);
        }
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(12L)));
        synchronized (syntheticData) {
            Assert.assertEquals(BaseTreeData.TreeState.RUNNING, syntheticData.getState());
        }
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
        testAttemptFinalStatus(attemptData, minus, MetricSchema.AttemptStatus.CMF_UNKNOWN, minus);
        testJobFinalStatus(jobData, minus, null, ActivityStatus.CMF_UNKNOWN);
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(13L)));
        synchronized (jobData) {
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        Assert.assertNull(mrServiceData.getByJob().get("tnefrj_mrjob"));
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(20L)));
        Assert.assertNull(mrServiceData.getBySynthetic().get("tnefr_ooziejobj"));
        testSynJobFinalStatus(syntheticData, minus);
    }

    @Test
    public void testRetireOnly() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE6, "mrjobtro", "mrjobtro_attempt1");
        attemptData.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis())));
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE6, "mrjobtro");
        Instant minus = instant.minus(Duration.standardMinutes(1L));
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, instant)));
        activityTreeWalkerAndPersister.run();
        MrServiceTreeData mrServiceData = activityMonitoringTree.getMrServiceData(MAPREDUCE6);
        Assert.assertNotNull(mrServiceData.getByJob().get("mrjobtro"));
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(10L)));
        synchronized (jobData) {
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
        testAttemptFinalStatus(attemptData, instant, MetricSchema.AttemptStatus.CMF_UNKNOWN, minus);
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(11L)));
        testJobFinalStatus(jobData, minus, instant, ActivityStatus.SUCCEEDED);
        Assert.assertNull(mrServiceData.getByJob().get("mrjobtro"));
        AttemptTreeData attemptData2 = activityMonitoringTree.getAttemptData(MAPREDUCE6, "mrjobtro", "mrjobtro_attempt2");
        attemptData2.receiveMetrics(instant.plus(Duration.standardMinutes(11L)), ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis())));
        JobTreeData jobTreeData = (JobTreeData) mrServiceData.getByJob().get("mrjobtro");
        Assert.assertNotNull(jobTreeData);
        synchronized (jobTreeData) {
            Assert.assertTrue(jobTreeData.getState() == BaseTreeData.TreeState.RUNNING);
        }
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(20L)));
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData2);
        synchronized (attemptData2) {
            Assert.assertEquals((Object) null, attemptData2.getDatabaseId());
        }
        synchronized (jobTreeData) {
            Assert.assertTrue(jobTreeData.getState() == BaseTreeData.TreeState.RUNNING);
        }
        Assert.assertNull(mrServiceData.getByJob().get("mrjobtro"));
        synchronized (attemptData2) {
            Assert.assertEquals(BaseTreeData.TreeState.FINISHED_AND_CLOSED, attemptData2.getState());
        }
        testJobFinalStatus(jobTreeData, minus, instant, ActivityStatus.SUCCEEDED);
    }

    @Test
    public void testAttemptInvalidTimeMetric() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE7, "mrjobtaitm", "mrjobtaitm_attempt1");
        AttemptTreeData attemptData2 = activityMonitoringTree.getAttemptData(MAPREDUCE7, "mrjobtaitm", "mrjobtaitm_attempt2");
        AttemptTreeData attemptData3 = activityMonitoringTree.getAttemptData(MAPREDUCE7, "mrjobtaitm", "mrjobtaitm_attempt3");
        AttemptTreeData attemptData4 = activityMonitoringTree.getAttemptData(MAPREDUCE7, "mrjobtaitm", "mrjobtaitm_attempt4");
        attemptData.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis())));
        attemptData2.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, -1.0d)));
        attemptData3.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, -1.0d), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, -1.0d)));
        attemptData4.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, 0.0d), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, 0.0d)));
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE7, "mrjobtaitm");
        Instant minus = instant.minus(Duration.standardMinutes(1L));
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, minus), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, instant)));
        activityTreeWalkerAndPersister.run();
        Assert.assertNotNull(activityMonitoringTree.getMrServiceData(MAPREDUCE7).getByJob().get("mrjobtaitm"));
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(10L)));
        synchronized (jobData) {
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData2);
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData3);
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData4);
        testAttemptFinalStatus(attemptData, instant, MetricSchema.AttemptStatus.CMF_UNKNOWN, minus);
        testAttemptFinalStatus(attemptData2, null, MetricSchema.AttemptStatus.CMF_UNKNOWN, minus);
        testAttemptFinalStatus(attemptData3, null, MetricSchema.AttemptStatus.CMF_UNKNOWN, minus);
        testAttemptFinalStatus(attemptData4, null, MetricSchema.AttemptStatus.CMF_UNKNOWN, minus);
    }

    @Test
    public void testActivityInvalidTimeMetric() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE10, "mrjobtaitm1");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant.minus(Duration.standardMinutes(1L))), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, instant)));
        JobTreeData jobData2 = activityMonitoringTree.getJobData(MAPREDUCE10, "mrjobtaitm2");
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, -1.0d)));
        JobTreeData jobData3 = activityMonitoringTree.getJobData(MAPREDUCE10, "mrjobtaitm3");
        jobData3.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, -1.0d), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, -1.0d)));
        JobTreeData jobData4 = activityMonitoringTree.getJobData(MAPREDUCE10, "mrjobtaitm4");
        jobData4.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, 0.0d), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, 0.0d)));
        activityTreeWalkerAndPersister.run();
        MrServiceTreeData mrServiceData = activityMonitoringTree.getMrServiceData(MAPREDUCE10);
        Assert.assertNotNull(mrServiceData.getByJob().get("mrjobtaitm1"));
        Assert.assertNotNull(mrServiceData.getByJob().get("mrjobtaitm2"));
        Assert.assertNotNull(mrServiceData.getByJob().get("mrjobtaitm3"));
        Assert.assertNotNull(mrServiceData.getByJob().get("mrjobtaitm4"));
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(10L)));
        synchronized (jobData) {
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        synchronized (jobData2) {
            Assert.assertTrue(jobData2.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        synchronized (jobData3) {
            Assert.assertTrue(jobData3.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        synchronized (jobData4) {
            Assert.assertTrue(jobData4.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        testJobFinalStatus(jobData, instant.minus(Duration.standardMinutes(1L)), instant, ActivityStatus.SUCCEEDED);
        testJobFinalStatus(jobData2, null, null, ActivityStatus.SUCCEEDED);
        testJobFinalStatus(jobData3, null, null, ActivityStatus.SUCCEEDED);
        testJobFinalStatus(jobData4, null, null, ActivityStatus.SUCCEEDED);
    }

    @Test
    public void testEarlyExpiration() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister("test", emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE11, "mrjobtee");
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant.minus(Duration.standardMinutes(1L))), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, instant)));
        JobTreeData jobData2 = activityMonitoringTree.getJobData(MAPREDUCE11, "mrpigjobtee");
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.PIG_JOB_ID, "pig_jobtee"), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, instant), CmonAvroUtil.mv(MetricEnum.ACTIVITY_NAME, "pig_jobtje_name")));
        SyntheticTreeData syntheticData = activityMonitoringTree.getSyntheticData(MAPREDUCE11, MetricSchema.ActivityType.OOZIE, "ooziejobtee");
        syntheticData.receiveMetricsFromOozieJob(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal())));
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.OOZIE_JOB_ID, "ooziejobtee")));
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.OOZIE_JOB_ID, "ooziejobtee")));
        activityTreeWalkerAndPersister.run();
        MrServiceTreeData mrServiceData = activityMonitoringTree.getMrServiceData(MAPREDUCE11);
        Assert.assertNotNull(mrServiceData.getByJob().get("mrjobtee"));
        Assert.assertNotNull(mrServiceData.getByJob().get("mrpigjobtee"));
        Assert.assertNotNull(mrServiceData.getBySynthetic().get("pig_jobtee"));
        Assert.assertNotNull(mrServiceData.getBySynthetic().get("ooziejobtee"));
        synchronized (jobData) {
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        testJobFinalStatus(jobData, instant.minus(Duration.standardMinutes(1L)), null, ActivityStatus.SUCCEEDED);
        SyntheticTreeData syntheticTreeData = (SyntheticTreeData) mrServiceData.getBySynthetic().get("pig_jobtee");
        syntheticData.receiveMetricsFromOozieJob(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal())));
        activityTreeWalkerAndPersister.run();
        synchronized (syntheticData) {
            Assert.assertTrue(syntheticData.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
            Assert.assertTrue(syntheticData.isCompletedAndNotExpired());
        }
        synchronized (syntheticTreeData) {
            Assert.assertTrue(syntheticTreeData.getState() == BaseTreeData.TreeState.RUNNING);
        }
        jobData2.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, instant)));
        activityTreeWalkerAndPersister.run();
        testJobFinalStatus(jobData2, instant, null, ActivityStatus.SUCCEEDED);
        activityTreeWalkerAndPersister.run();
        synchronized (syntheticTreeData) {
            Assert.assertTrue(syntheticTreeData.getState() == BaseTreeData.TreeState.FINISHED_AND_CLOSED);
        }
        testSynJobFinalStatus(syntheticTreeData, instant);
        testSynJobFinalStatus(syntheticData, instant.minus(Duration.standardMinutes(1L)));
    }

    @Test
    public void testJobTimeMetrics() {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE8, "mrjobtjst", "mrjobtjst_attempt1");
        AttemptTreeData attemptData2 = activityMonitoringTree.getAttemptData(MAPREDUCE8, "mrjobtjst", "mrjobtjst_attempt2");
        AttemptTreeData attemptData3 = activityMonitoringTree.getAttemptData(MAPREDUCE8, "mrjobtjst", "mrjobtjst_attempt3");
        AttemptTreeData attemptData4 = activityMonitoringTree.getAttemptData(MAPREDUCE8, "mrjobtjst", "mrjobtjst_attempt4");
        attemptData.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.getMillis())));
        attemptData2.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, -1.0d)));
        Instant minus = instant.minus(10L);
        attemptData3.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, minus)));
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE8, "mrjobtjst");
        synchronized (jobData) {
            Assert.assertEquals((Object) null, jobData.getStartTime());
        }
        Instant plus = instant.plus(Duration.standardMinutes(1L));
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.STARTED.ordinal()), CmonAvroUtil.mv(MetricEnum.MAP_INPUT_BYTES, 100.0d), CmonAvroUtil.mv(MetricEnum.SUBMIT_TIME, plus)));
        attemptData4.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, instant, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, instant.minus(20L))));
        synchronized (jobData) {
            Assert.assertEquals(plus, jobData.getStartTime());
            Assert.assertEquals((Object) null, jobData.getEndTime());
        }
        attemptData.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.ATTEMPT_STATUS, instant, MetricSchema.AttemptStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, instant.plus(10L).getMillis())));
        attemptData2.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.ATTEMPT_STATUS, instant, MetricSchema.AttemptStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, -1.0d)));
        Instant plus2 = instant.plus(20L);
        attemptData3.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.ATTEMPT_STATUS, instant, MetricSchema.AttemptStatus.SUCCEEDED.ordinal()), CmonAvroUtil.mv(MetricEnum.FINISH_TIME, plus2.getMillis())));
        synchronized (jobData) {
            Assert.assertEquals((Object) null, jobData.getEndTime());
        }
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.ACTIVITY_STATUS, ActivityStatus.FAILED.ordinal())));
        synchronized (jobData) {
            Assert.assertNotNull(jobData.getEndTime());
        }
        jobData.receiveMetricValues(ImmutableList.of(CmonAvroUtil.mv(MetricEnum.FINISH_TIME, minus.getMillis())));
        attemptData4.receiveMetrics(instant, ImmutableList.of(CmonAvroUtil.mv(MetricEnum.FINISH_TIME, plus2.plus(100000L))));
        synchronized (jobData) {
            Assert.assertEquals(minus, jobData.getEndTime());
        }
    }

    @Test
    public void testJobsCreatedFromAttempts() throws IOException, InterruptedException {
        ActivityMonitoringTree activityMonitoringTree = new ActivityMonitoringTree();
        Instant instant = new Instant();
        Instant minus = instant.minus(Duration.standardMinutes(10L));
        AttemptTreeData attemptData = activityMonitoringTree.getAttemptData(MAPREDUCE9, "tjcfa_mrjob", "tnefrj_tjcfa_attempt1");
        attemptData.receiveMetrics(minus, ImmutableList.of(CmonAvroUtil.mvState(MetricEnum.TASK_TYPE, minus, MetricSchema.TaskType.MAP.ordinal()), CmonAvroUtil.mv(MetricEnum.CPU_USER, 0.1d), CmonAvroUtil.mv(MetricEnum.START_TIME, minus.getMillis())));
        ActivityTreeWalkerAndPersister activityTreeWalkerAndPersister = new ActivityTreeWalkerAndPersister(MAPREDUCE9, emf, activityMonitoringTree, (TimeSeriesStore) null, activityAndAttemptStore);
        activityTreeWalkerAndPersister.run(instant);
        JobTreeData jobData = activityMonitoringTree.getJobData(MAPREDUCE9, "tjcfa_mrjob");
        synchronized (jobData) {
            Assert.assertTrue(jobData.getState() == BaseTreeData.TreeState.RUNNING);
            Assert.assertFalse(jobData.hasReceivedJobTrackerUpdates());
            Assert.assertFalse(jobData.getMetricMap().containsKey(MetricEnum.ACTIVITY_STATUS));
        }
        new Persisters(emf, (TimeSeriesStore) null, activityAndAttemptStore).persistAttemptData(attemptData);
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(12L)));
        activityTreeWalkerAndPersister.run(instant.plus(Duration.standardMinutes(13L)));
        synchronized (attemptData) {
            Assert.assertEquals((Object) null, attemptData.getDatabaseId());
        }
        synchronized (jobData) {
            Assert.assertEquals((Object) null, jobData.getDatabaseId());
            Assert.assertEquals(BaseTreeData.TreeState.RUNNING, jobData.getState());
        }
        Assert.assertEquals((Object) null, ((MrServiceTreeData) activityMonitoringTree.getByService().get(MAPREDUCE9)).getByJob().get("tjcfa_mrjob"));
    }
}
