TestJobStatusPersistency.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.OutputStream;
  20. import java.io.OutputStreamWriter;
  21. import java.io.Writer;
  22. import java.util.Properties;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.io.LongWritable;
  25. import org.apache.hadoop.io.Text;
  26. public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
  27.   private JobID runJob() throws Exception {
  28.     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
  29.     Writer wr = new OutputStreamWriter(os);
  30.     wr.write("hello1n");
  31.     wr.write("hello2n");
  32.     wr.write("hello3n");
  33.     wr.write("hello4n");
  34.     wr.close();
  35.     JobConf conf = createJobConf();
  36.     conf.setJobName("mr");
  37.     conf.setInputFormat(TextInputFormat.class);
  38.     conf.setMapOutputKeyClass(LongWritable.class);
  39.     conf.setMapOutputValueClass(Text.class);
  40.     conf.setOutputFormat(TextOutputFormat.class);
  41.     conf.setOutputKeyClass(LongWritable.class);
  42.     conf.setOutputValueClass(Text.class);
  43.     conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
  44.     conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
  45.     FileInputFormat.setInputPaths(conf, getInputDir());
  46.     FileOutputFormat.setOutputPath(conf, getOutputDir());
  47.     return JobClient.runJob(conf).getID();
  48.   }
  49.   public void testNonPersistency() throws Exception {
  50.     JobID jobId = runJob();
  51.     JobClient jc = new JobClient(createJobConf());
  52.     RunningJob rj = jc.getJob(jobId);
  53.     assertNotNull(rj);
  54.     stopCluster();
  55.     startCluster(false, null);
  56.     jc = new JobClient(createJobConf());
  57.     rj = jc.getJob(jobId);
  58.     assertNull(rj);
  59.   }
  60.   public void testPersistency() throws Exception {
  61.     Properties config = new Properties();
  62.     config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
  63.     config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
  64.     stopCluster();
  65.     startCluster(false, config);
  66.     JobID jobId = runJob();
  67.     JobClient jc = new JobClient(createJobConf());
  68.     RunningJob rj0 = jc.getJob(jobId);
  69.     assertNotNull(rj0);
  70.     boolean sucessfull0 = rj0.isSuccessful();
  71.     String jobName0 = rj0.getJobName();
  72.     Counters counters0 = rj0.getCounters();
  73.     TaskCompletionEvent[] events0 = rj0.getTaskCompletionEvents(0);
  74.     stopCluster();
  75.     startCluster(false, config);
  76.      
  77.     jc = new JobClient(createJobConf());
  78.     RunningJob rj1 = jc.getJob(jobId);
  79.     assertNotNull(rj1);
  80.     assertEquals(sucessfull0, rj1.isSuccessful());
  81.     assertEquals(jobName0, rj0.getJobName());
  82.     assertEquals(counters0.size(), rj1.getCounters().size());
  83.     TaskCompletionEvent[] events1 = rj1.getTaskCompletionEvents(0);
  84.     assertEquals(events0.length, events1.length);    
  85.     for (int i = 0; i < events0.length; i++) {
  86.       assertEquals(events0[i].getTaskAttemptId(), events1[i].getTaskAttemptId());
  87.       assertEquals(events0[i].getTaskStatus(), events1[i].getTaskStatus());
  88.     }
  89.   }
  90. }