001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.sql.Timestamp;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.commons.lang.StringUtils;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.action.email.EmailActionExecutor;
027import org.apache.oozie.command.CommandException;
028import org.apache.oozie.command.coord.CoordKillXCommand;
029import org.apache.oozie.command.wf.JobXCommand;
030import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
031import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
032import org.apache.oozie.executor.jpa.JPAExecutorException;
033import org.apache.oozie.util.DateUtils;
034import org.apache.oozie.util.XLog;
035
036/**
037 * The Abandoned Coord Checker Service check finds out the abandoned coord jobs in system and kills it. A job is
038 * considered to be abandoned/faulty if total number of actions in failed/timedout/suspended >= limit and there are no
039 * succeeded action and job start time < job.older.than. Email will not be sent if
040 * oozie.service.AbandonedCoordCheckerService.email.address is not configured.
041 */
042public class AbandonedCoordCheckerService implements Service {
043
044    private static final String CONF_PREFIX = Service.CONF_PREFIX + "AbandonedCoordCheckerService.";
045    public static final String TO_ADDRESS = CONF_PREFIX + "email.address";
046    private static final String CONTENT_TYPE = "text/html";
047    private static final String SUBJECT = "Abandoned Coordinators report";
048    public static final String CONF_CHECK_INTERVAL = CONF_PREFIX + "check.interval";
049    public static final String CONF_CHECK_DELAY = CONF_PREFIX + "check.delay";
050    public static final String CONF_FAILURE_LEN = CONF_PREFIX + "failure.limit";
051    public static final String CONF_JOB_OLDER_THAN = CONF_PREFIX + "job.older.than";
052
053    public static final String CONF_JOB_KILL = CONF_PREFIX + "kill.jobs";
054    public static final String OOZIE_BASE_URL = "oozie.base.url";
055    private static String[] to;
056    private static String serverURL;
057
058    public static class AbandonedCoordCheckerRunnable implements Runnable {
059        final int failureLimit;
060        XLog LOG = XLog.getLog(getClass());
061        private boolean shouldKill = false;
062
063        public AbandonedCoordCheckerRunnable(int failureLimit) {
064            this(failureLimit, false);
065        }
066
067        public AbandonedCoordCheckerRunnable(int failureLimit, boolean shouldKill) {
068            this.failureLimit = failureLimit;
069            this.shouldKill = shouldKill;
070        }
071
072        public void run() {
073            if (!Services.get().get(JobsConcurrencyService.class).isLeader()) {
074                LOG.info("Server is not primary server. Skipping run");
075                return;
076            }
077            XLog.Info.get().clear();
078            try {
079                checkCoordJobs();
080            }
081            catch (Exception e) {
082                LOG.error("Error running AbandonedCoordChecker", e);
083            }
084        }
085
086        /**
087         * Check coordinator
088         * @throws Exception
089         */
090        private void checkCoordJobs() throws Exception {
091            StringBuilder msg = new StringBuilder();
092
093            addTableHeader(msg);
094            List<CoordinatorJobBean> jobs;
095            try {
096                Timestamp createdTS = new Timestamp(System.currentTimeMillis()
097                        - (ConfigurationService.getInt(CONF_JOB_OLDER_THAN) * 60 * 1000));
098
099                jobs = CoordJobQueryExecutor.getInstance().getList(CoordJobQuery.GET_COORD_FOR_ABANDONEDCHECK,
100                        failureLimit, createdTS);
101
102                for (CoordinatorJobBean job : jobs) {
103                    processJob(job, msg);
104                }
105                if (jobs.size() > 0) {
106                    addTableTail(msg);
107                    sendMail(msg.toString());
108                }
109
110            }
111            catch (JPAExecutorException je) {
112                throw new CommandException(je);
113            }
114        }
115
116        private void processJob(CoordinatorJobBean job, StringBuilder msg){
117            String killStatus = "Coord kill is disabled";
118            LOG.info("Abandoned Coord found : " + job.getId());
119            if (shouldKill) {
120                try {
121                    new CoordKillXCommand(job.getId()).call();
122                    LOG.info("Killed abandoned coord :  " + job.getId());
123                    killStatus = "Successful";
124                }
125                catch (Exception e) {
126                    LOG.error("Can't kill abandoned coord :  " + job.getId(), e);
127                    killStatus = " Failed : " + e.getMessage();
128                }
129            }
130            addCoordToMessage(job, killStatus, msg);
131        }
132
133        public void addCoordToMessage(CoordinatorJobBean job, String killStatus, StringBuilder msg) {
134            msg.append("<tr>");
135            msg.append("<td><a href=\"").append(JobXCommand.getJobConsoleUrl(job.getId())).append("\">")
136                    .append(job.getId()).append("</a></td>");
137            msg.append("<td>").append(job.getAppName()).append("</td>");
138            msg.append("<td>").append(job.getUser()).append("</td>");
139            msg.append("<td>").append(job.getGroup()).append("</td>");
140            msg.append("<td>").append(killStatus).append("</td>");
141            msg.append("</tr>");
142        }
143
144        public void addTableHeader(StringBuilder msg) {
145            msg.append("<!DOCTYPE html><html><head><style>table,th,td{border:1px solid black;border-collapse:collapse;}</style>"
146                    + "</head><body><table>");
147            msg.append("<tr>");
148            msg.append("<td>").append("Coordinator id").append("</td>");
149            msg.append("<td>").append("Coordinator name").append("</td>");
150            msg.append("<td>").append("User name").append("</td>");
151            msg.append("<td>").append("Group").append("</td>");
152            msg.append("<td>").append("Kill Status").append("</td>");
153            msg.append("</tr>");
154        }
155
156        public void addTableTail(StringBuilder msg) {
157            msg.append("</table></body></html>");
158        }
159
160
161        public void sendMail(String body) throws Exception {
162            if (to == null || to.length == 0 || (to.length == 1 && StringUtils.isEmpty(to[0]))) {
163                LOG.info(TO_ADDRESS + " is not configured. Not sending email");
164                return;
165            }
166            EmailActionExecutor email = new EmailActionExecutor();
167            String subject = SUBJECT + " for " + serverURL + " at " + DateUtils.formatDateOozieTZ(new Date());
168            email.email(to, new String[0], new String[0], subject, body, null, CONTENT_TYPE, null);
169        }
170    }
171
172    @Override
173    public void init(Services services) {
174        to = ConfigurationService.getStrings(TO_ADDRESS);
175        int failureLen = ConfigurationService.getInt(CONF_FAILURE_LEN);
176        boolean shouldKill = ConfigurationService.getBoolean(CONF_JOB_KILL);
177        serverURL = ConfigurationService.get(OOZIE_BASE_URL);
178
179        int delay = ConfigurationService.getInt(CONF_CHECK_DELAY);
180
181        Runnable actionCheckRunnable = new AbandonedCoordCheckerRunnable(failureLen, shouldKill);
182        services.get(SchedulerService.class).schedule(actionCheckRunnable, delay,
183                ConfigurationService.getInt(CONF_CHECK_INTERVAL), SchedulerService.Unit.MIN);
184
185    }
186
187    @Override
188    public void destroy() {
189    }
190
191    @Override
192    public Class<? extends Service> getInterface() {
193        return AbandonedCoordCheckerService.class;
194    }
195}