001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.bundle;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.oozie.BundleActionBean;
031import org.apache.oozie.BundleJobBean;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.XException;
034import org.apache.oozie.action.hadoop.OozieJobInfo;
035import org.apache.oozie.client.Job;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.rest.JsonBean;
038import org.apache.oozie.command.CommandException;
039import org.apache.oozie.command.PreconditionException;
040import org.apache.oozie.command.StartTransitionXCommand;
041import org.apache.oozie.executor.jpa.BatchQueryExecutor;
042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
043import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
046import org.apache.oozie.util.ELUtils;
047import org.apache.oozie.util.JobUtils;
048import org.apache.oozie.util.LogUtils;
049import org.apache.oozie.util.ParamChecker;
050import org.apache.oozie.util.XConfiguration;
051import org.apache.oozie.util.XmlUtils;
052import org.jdom.Attribute;
053import org.jdom.Element;
054import org.jdom.JDOMException;
055
056/**
057 * The command to start Bundle job
058 */
059public class BundleStartXCommand extends StartTransitionXCommand {
060    private final String jobId;
061    private BundleJobBean bundleJob;
062
063    /**
064     * The constructor for class {@link BundleStartXCommand}
065     *
066     * @param jobId the bundle job id
067     */
068    public BundleStartXCommand(String jobId) {
069        super("bundle_start", "bundle_start", 1);
070        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
071    }
072
073    /**
074     * The constructor for class {@link BundleStartXCommand}
075     *
076     * @param jobId the bundle job id
077     * @param dryrun true if dryrun is enable
078     */
079    public BundleStartXCommand(String jobId, boolean dryrun) {
080        super("bundle_start", "bundle_start", 1, dryrun);
081        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
082    }
083
084    /* (non-Javadoc)
085     * @see org.apache.oozie.command.XCommand#getEntityKey()
086     */
087    @Override
088    public String getEntityKey() {
089        return jobId;
090    }
091
092    @Override
093    public String getKey() {
094        return getName() + "_" + jobId;
095    }
096
097    /* (non-Javadoc)
098     * @see org.apache.oozie.command.XCommand#isLockRequired()
099     */
100    @Override
101    protected boolean isLockRequired() {
102        return true;
103    }
104
105    @Override
106    protected void verifyPrecondition() throws CommandException, PreconditionException {
107        if (bundleJob.getStatus() != Job.Status.PREP) {
108            String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
109            LOG.info(msg);
110            throw new PreconditionException(ErrorCode.E1100, msg);
111        }
112    }
113
114    @Override
115    public void loadState() throws CommandException {
116        try {
117            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId);
118            LogUtils.setLogInfo(bundleJob);
119            super.setJob(bundleJob);
120        }
121        catch (XException ex) {
122            throw new CommandException(ex);
123        }
124    }
125
126    /* (non-Javadoc)
127     * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
128     */
129    @Override
130    public void StartChildren() throws CommandException {
131        LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
132        insertBundleActions();
133        startCoordJobs();
134        LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
135    }
136
137    /* (non-Javadoc)
138     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
139     */
140    @Override
141    public void notifyParent() {
142    }
143
144    /* (non-Javadoc)
145     * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
146     */
147    @Override
148    public void performWrites() throws CommandException {
149        try {
150            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
151        }
152        catch (JPAExecutorException e) {
153            throw new CommandException(e);
154        }
155    }
156
157    /**
158     * Insert bundle actions
159     *
160     * @throws CommandException thrown if failed to create bundle actions
161     */
162    @SuppressWarnings("unchecked")
163    private void insertBundleActions() throws CommandException {
164        if (bundleJob != null) {
165            Map<String, Boolean> map = new HashMap<String, Boolean>();
166            try {
167                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
168                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
169                for (Element elem : coordElems) {
170                    Attribute name = elem.getAttribute("name");
171                    Attribute critical = elem.getAttribute("critical");
172                    if (name != null) {
173                        if (map.containsKey(name.getValue())) {
174                            throw new CommandException(ErrorCode.E1304, name);
175                        }
176                        Configuration coordConf = mergeConfig(elem);
177                        // skip coord job if it is not enabled
178                        if (!isEnabled(elem, coordConf)) {
179                            continue;
180                        }
181                        boolean isCritical = false;
182                        if (critical != null && Boolean.parseBoolean(critical.getValue())) {
183                            isCritical = true;
184                        }
185                        map.put(name.getValue(), isCritical);
186                    }
187                    else {
188                        throw new CommandException(ErrorCode.E1305);
189                    }
190                }
191            }
192            catch (JDOMException jex) {
193                throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
194            }
195
196            // if there is no coordinator for this bundle, failed it.
197            if (map.isEmpty()) {
198                bundleJob.setStatus(Job.Status.FAILED);
199                bundleJob.resetPending();
200                try {
201                    BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob);
202                }
203                catch (JPAExecutorException jex) {
204                    throw new CommandException(jex);
205                }
206
207                LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
208                throw new CommandException(ErrorCode.E1318, jobId);
209            }
210
211            for (Entry<String, Boolean> coordName : map.entrySet()) {
212                BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
213                insertList.add(action);
214            }
215        }
216        else {
217            throw new CommandException(ErrorCode.E0604, jobId);
218        }
219    }
220
221    private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
222        BundleActionBean action = new BundleActionBean();
223        action.setBundleActionId(jobId + "_" + coordName);
224        action.setBundleId(jobId);
225        action.setCoordName(coordName);
226        action.setStatus(Job.Status.PREP);
227        action.setLastModifiedTime(new Date());
228        if (isCritical) {
229            action.setCritical();
230        }
231        else {
232            action.resetCritical();
233        }
234        return action;
235    }
236
237    /**
238     * Start Coord Jobs
239     *
240     * @throws CommandException thrown if failed to start coord jobs
241     */
242    @SuppressWarnings("unchecked")
243    private void startCoordJobs() throws CommandException {
244        if (bundleJob != null) {
245            try {
246                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
247                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
248                for (Element coordElem : coordElems) {
249                    Attribute name = coordElem.getAttribute("name");
250
251                    Configuration coordConf = mergeConfig(coordElem);
252                    coordConf.set(OozieClient.BUNDLE_ID, jobId);
253                    if (OozieJobInfo.isJobInfoEnabled()) {
254                        coordConf.set(OozieJobInfo.BUNDLE_NAME, bundleJob.getAppName());
255                    }
256                    // skip coord job if it is not enabled
257                    if (!isEnabled(coordElem, coordConf)) {
258                        continue;
259                    }
260                    String coordName=name.getValue();
261                    try {
262                        coordName = ELUtils.resolveAppName(coordName, coordConf);
263                    }
264                    catch (Exception e) {
265                        throw new CommandException(ErrorCode.E1321, e.getMessage(), e);
266
267                    }
268                    queue(new BundleCoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue()));
269
270                }
271                updateBundleAction();
272            }
273            catch (JDOMException jex) {
274                throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
275            }
276            catch (JPAExecutorException je) {
277                throw new CommandException(je);
278            }
279        }
280        else {
281            throw new CommandException(ErrorCode.E0604, jobId);
282        }
283    }
284
285    private void updateBundleAction() throws JPAExecutorException {
286        for(JsonBean bAction : insertList) {
287            BundleActionBean action = (BundleActionBean) bAction;
288            action.incrementAndGetPending();
289            action.setLastModifiedTime(new Date());
290        }
291    }
292
293    /**
294     * Merge Bundle job config and the configuration from the coord job to pass
295     * to Coord Engine
296     *
297     * @param coordElem the coordinator configuration
298     * @return Configuration merged configuration
299     * @throws CommandException thrown if failed to merge configuration
300     */
301    private Configuration mergeConfig(Element coordElem) throws CommandException {
302        String jobConf = bundleJob.getConf();
303        // Step 1: runConf = jobConf
304        Configuration runConf = null;
305        try {
306            runConf = new XConfiguration(new StringReader(jobConf));
307        }
308        catch (IOException e1) {
309            LOG.warn("Configuration parse error in:" + jobConf);
310            throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
311        }
312        // Step 2: Merge local properties into runConf
313        // extract 'property' tags under 'configuration' block in the coordElem
314        // convert Element to XConfiguration
315        Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
316
317        if (localConfigElement != null) {
318            String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
319            Configuration localConf;
320            try {
321                localConf = new XConfiguration(new StringReader(strConfig));
322            }
323            catch (IOException e1) {
324                LOG.warn("Configuration parse error in:" + strConfig);
325                throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
326            }
327
328            // copy configuration properties in the coordElem to the runConf
329            XConfiguration.copy(localConf, runConf);
330        }
331
332        // Step 3: Extract value of 'app-path' in coordElem, save it as a
333        // new property called 'oozie.coord.application.path', and normalize.
334        String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
335        runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
336        // Normalize coordinator appPath here;
337        try {
338            JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
339        }
340        catch (IOException e) {
341            throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
342        }
343        return runConf;
344    }
345
346    /* (non-Javadoc)
347     * @see org.apache.oozie.command.TransitionXCommand#getJob()
348     */
349    @Override
350    public Job getJob() {
351        return bundleJob;
352    }
353
354    /* (non-Javadoc)
355     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
356     */
357    @Override
358    public void updateJob() throws CommandException {
359        updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob));
360    }
361
362    /**
363     * Checks whether the coordinator is enabled
364     *
365     * @param coordElem
366     * @param coordConf
367     * @return true if coordinator is enabled, otherwise false.
368     * @throws CommandException
369     */
370    private boolean isEnabled(Element coordElem, Configuration coordConf) throws CommandException {
371        Attribute enabled = coordElem.getAttribute("enabled");
372        if (enabled == null) {
373            // default is true
374            return true;
375        }
376        String isEnabled = enabled.getValue();
377        try {
378            isEnabled = ELUtils.resolveAppName(isEnabled, coordConf);
379        }
380        catch (Exception e) {
381            throw new CommandException(ErrorCode.E1321, e.getMessage(), e);
382        }
383        return Boolean.parseBoolean(isEnabled);
384    }
385
386}