001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.sql.Timestamp; 021import java.util.Date; 022import java.util.List; 023 024import org.apache.commons.lang.StringUtils; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.action.email.EmailActionExecutor; 027import org.apache.oozie.command.CommandException; 028import org.apache.oozie.command.coord.CoordKillXCommand; 029import org.apache.oozie.command.wf.JobXCommand; 030import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 031import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 032import org.apache.oozie.executor.jpa.JPAExecutorException; 033import org.apache.oozie.util.DateUtils; 034import org.apache.oozie.util.XLog; 035 036/** 037 * The Abandoned Coord Checker Service check finds out the abandoned coord jobs in system and kills it. A job is 038 * considered to be abandoned/faulty if total number of actions in failed/timedout/suspended >= limit and there are no 039 * succeeded action and job start time < job.older.than. Email will not be sent if 040 * oozie.service.AbandonedCoordCheckerService.email.address is not configured. 041 */ 042public class AbandonedCoordCheckerService implements Service { 043 044 private static final String CONF_PREFIX = Service.CONF_PREFIX + "AbandonedCoordCheckerService."; 045 public static final String TO_ADDRESS = CONF_PREFIX + "email.address"; 046 private static final String CONTENT_TYPE = "text/html"; 047 private static final String SUBJECT = "Abandoned Coordinators report"; 048 public static final String CONF_CHECK_INTERVAL = CONF_PREFIX + "check.interval"; 049 public static final String CONF_CHECK_DELAY = CONF_PREFIX + "check.delay"; 050 public static final String CONF_FAILURE_LEN = CONF_PREFIX + "failure.limit"; 051 public static final String CONF_JOB_OLDER_THAN = CONF_PREFIX + "job.older.than"; 052 053 public static final String CONF_JOB_KILL = CONF_PREFIX + "kill.jobs"; 054 public static final String OOZIE_BASE_URL = "oozie.base.url"; 055 private static String[] to; 056 private static String serverURL; 057 058 public static class AbandonedCoordCheckerRunnable implements Runnable { 059 final int failureLimit; 060 XLog LOG = XLog.getLog(getClass()); 061 private boolean shouldKill = false; 062 063 public AbandonedCoordCheckerRunnable(int failureLimit) { 064 this(failureLimit, false); 065 } 066 067 public AbandonedCoordCheckerRunnable(int failureLimit, boolean shouldKill) { 068 this.failureLimit = failureLimit; 069 this.shouldKill = shouldKill; 070 } 071 072 public void run() { 073 if (!Services.get().get(JobsConcurrencyService.class).isLeader()) { 074 LOG.info("Server is not primary server. Skipping run"); 075 return; 076 } 077 XLog.Info.get().clear(); 078 try { 079 checkCoordJobs(); 080 } 081 catch (Exception e) { 082 LOG.error("Error running AbandonedCoordChecker", e); 083 } 084 } 085 086 /** 087 * Check coordinator 088 * @throws Exception 089 */ 090 private void checkCoordJobs() throws Exception { 091 StringBuilder msg = new StringBuilder(); 092 093 addTableHeader(msg); 094 List<CoordinatorJobBean> jobs; 095 try { 096 Timestamp createdTS = new Timestamp(System.currentTimeMillis() 097 - (ConfigurationService.getInt(CONF_JOB_OLDER_THAN) * 60 * 1000)); 098 099 jobs = CoordJobQueryExecutor.getInstance().getList(CoordJobQuery.GET_COORD_FOR_ABANDONEDCHECK, 100 failureLimit, createdTS); 101 102 for (CoordinatorJobBean job : jobs) { 103 processJob(job, msg); 104 } 105 if (jobs.size() > 0) { 106 addTableTail(msg); 107 sendMail(msg.toString()); 108 } 109 110 } 111 catch (JPAExecutorException je) { 112 throw new CommandException(je); 113 } 114 } 115 116 private void processJob(CoordinatorJobBean job, StringBuilder msg){ 117 String killStatus = "Coord kill is disabled"; 118 LOG.info("Abandoned Coord found : " + job.getId()); 119 if (shouldKill) { 120 try { 121 new CoordKillXCommand(job.getId()).call(); 122 LOG.info("Killed abandoned coord : " + job.getId()); 123 killStatus = "Successful"; 124 } 125 catch (Exception e) { 126 LOG.error("Can't kill abandoned coord : " + job.getId(), e); 127 killStatus = " Failed : " + e.getMessage(); 128 } 129 } 130 addCoordToMessage(job, killStatus, msg); 131 } 132 133 public void addCoordToMessage(CoordinatorJobBean job, String killStatus, StringBuilder msg) { 134 msg.append("<tr>"); 135 msg.append("<td><a href=\"").append(JobXCommand.getJobConsoleUrl(job.getId())).append("\">") 136 .append(job.getId()).append("</a></td>"); 137 msg.append("<td>").append(job.getAppName()).append("</td>"); 138 msg.append("<td>").append(job.getUser()).append("</td>"); 139 msg.append("<td>").append(job.getGroup()).append("</td>"); 140 msg.append("<td>").append(killStatus).append("</td>"); 141 msg.append("</tr>"); 142 } 143 144 public void addTableHeader(StringBuilder msg) { 145 msg.append("<!DOCTYPE html><html><head><style>table,th,td{border:1px solid black;border-collapse:collapse;}</style>" 146 + "</head><body><table>"); 147 msg.append("<tr>"); 148 msg.append("<td>").append("Coordinator id").append("</td>"); 149 msg.append("<td>").append("Coordinator name").append("</td>"); 150 msg.append("<td>").append("User name").append("</td>"); 151 msg.append("<td>").append("Group").append("</td>"); 152 msg.append("<td>").append("Kill Status").append("</td>"); 153 msg.append("</tr>"); 154 } 155 156 public void addTableTail(StringBuilder msg) { 157 msg.append("</table></body></html>"); 158 } 159 160 161 public void sendMail(String body) throws Exception { 162 if (to == null || to.length == 0 || (to.length == 1 && StringUtils.isEmpty(to[0]))) { 163 LOG.info(TO_ADDRESS + " is not configured. Not sending email"); 164 return; 165 } 166 EmailActionExecutor email = new EmailActionExecutor(); 167 String subject = SUBJECT + " for " + serverURL + " at " + DateUtils.formatDateOozieTZ(new Date()); 168 email.email(to, new String[0], new String[0], subject, body, null, CONTENT_TYPE, null); 169 } 170 } 171 172 @Override 173 public void init(Services services) { 174 to = ConfigurationService.getStrings(TO_ADDRESS); 175 int failureLen = ConfigurationService.getInt(CONF_FAILURE_LEN); 176 boolean shouldKill = ConfigurationService.getBoolean(CONF_JOB_KILL); 177 serverURL = ConfigurationService.get(OOZIE_BASE_URL); 178 179 int delay = ConfigurationService.getInt(CONF_CHECK_DELAY); 180 181 Runnable actionCheckRunnable = new AbandonedCoordCheckerRunnable(failureLen, shouldKill); 182 services.get(SchedulerService.class).schedule(actionCheckRunnable, delay, 183 ConfigurationService.getInt(CONF_CHECK_INTERVAL), SchedulerService.Unit.MIN); 184 185 } 186 187 @Override 188 public void destroy() { 189 } 190 191 @Override 192 public Class<? extends Service> getInterface() { 193 return AbandonedCoordCheckerService.class; 194 } 195}