001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.BundleActionBean;
030import org.apache.oozie.BundleJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.XException;
033import org.apache.oozie.action.hadoop.OozieJobInfo;
034import org.apache.oozie.client.Job;
035import org.apache.oozie.client.OozieClient;
036import org.apache.oozie.client.rest.JsonBean;
037import org.apache.oozie.command.CommandException;
038import org.apache.oozie.command.PreconditionException;
039import org.apache.oozie.command.StartTransitionXCommand;
040import org.apache.oozie.command.coord.CoordSubmitXCommand;
041import org.apache.oozie.executor.jpa.BatchQueryExecutor;
042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
043import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
046import org.apache.oozie.util.JobUtils;
047import org.apache.oozie.util.LogUtils;
048import org.apache.oozie.util.ParamChecker;
049import org.apache.oozie.util.XConfiguration;
050import org.apache.oozie.util.XmlUtils;
051import org.jdom.Attribute;
052import org.jdom.Element;
053import org.jdom.JDOMException;
054
055/**
056 * The command to start Bundle job
057 */
058public class BundleStartXCommand extends StartTransitionXCommand {
059    private final String jobId;
060    private BundleJobBean bundleJob;
061
062    /**
063     * The constructor for class {@link BundleStartXCommand}
064     *
065     * @param jobId the bundle job id
066     */
067    public BundleStartXCommand(String jobId) {
068        super("bundle_start", "bundle_start", 1);
069        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
070    }
071
072    /**
073     * The constructor for class {@link BundleStartXCommand}
074     *
075     * @param jobId the bundle job id
076     * @param dryrun true if dryrun is enable
077     */
078    public BundleStartXCommand(String jobId, boolean dryrun) {
079        super("bundle_start", "bundle_start", 1, dryrun);
080        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
081    }
082
083    /* (non-Javadoc)
084     * @see org.apache.oozie.command.XCommand#getEntityKey()
085     */
086    @Override
087    public String getEntityKey() {
088        return jobId;
089    }
090
091    @Override
092    public String getKey() {
093        return getName() + "_" + jobId;
094    }
095
096    /* (non-Javadoc)
097     * @see org.apache.oozie.command.XCommand#isLockRequired()
098     */
099    @Override
100    protected boolean isLockRequired() {
101        return true;
102    }
103
104    @Override
105    protected void verifyPrecondition() throws CommandException, PreconditionException {
106        if (bundleJob.getStatus() != Job.Status.PREP) {
107            String msg = "Bundle " + bundleJob.getId() + " is not in PREP status. It is in : " + bundleJob.getStatus();
108            LOG.info(msg);
109            throw new PreconditionException(ErrorCode.E1100, msg);
110        }
111    }
112
113    @Override
114    public void loadState() throws CommandException {
115        try {
116            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId);
117            LogUtils.setLogInfo(bundleJob, logInfo);
118            super.setJob(bundleJob);
119        }
120        catch (XException ex) {
121            throw new CommandException(ex);
122        }
123    }
124
125    /* (non-Javadoc)
126     * @see org.apache.oozie.command.StartTransitionXCommand#StartChildren()
127     */
128    @Override
129    public void StartChildren() throws CommandException {
130        LOG.debug("Started coord jobs for the bundle=[{0}]", jobId);
131        insertBundleActions();
132        startCoordJobs();
133        LOG.debug("Ended coord jobs for the bundle=[{0}]", jobId);
134    }
135
136    /* (non-Javadoc)
137     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
138     */
139    @Override
140    public void notifyParent() {
141    }
142
143    /* (non-Javadoc)
144     * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
145     */
146    @Override
147    public void performWrites() throws CommandException {
148        try {
149            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
150        }
151        catch (JPAExecutorException e) {
152            throw new CommandException(e);
153        }
154    }
155
156    /**
157     * Insert bundle actions
158     *
159     * @throws CommandException thrown if failed to create bundle actions
160     */
161    @SuppressWarnings("unchecked")
162    private void insertBundleActions() throws CommandException {
163        if (bundleJob != null) {
164            Map<String, Boolean> map = new HashMap<String, Boolean>();
165            try {
166                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
167                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
168                for (Element elem : coordElems) {
169                    Attribute name = elem.getAttribute("name");
170                    Attribute critical = elem.getAttribute("critical");
171                    if (name != null) {
172                        if (map.containsKey(name.getValue())) {
173                            throw new CommandException(ErrorCode.E1304, name);
174                        }
175                        boolean isCritical = false;
176                        if (critical != null && Boolean.parseBoolean(critical.getValue())) {
177                            isCritical = true;
178                        }
179                        map.put(name.getValue(), isCritical);
180                    }
181                    else {
182                        throw new CommandException(ErrorCode.E1305);
183                    }
184                }
185            }
186            catch (JDOMException jex) {
187                throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
188            }
189
190            // if there is no coordinator for this bundle, failed it.
191            if (map.isEmpty()) {
192                bundleJob.setStatus(Job.Status.FAILED);
193                bundleJob.resetPending();
194                try {
195                    BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob);
196                }
197                catch (JPAExecutorException jex) {
198                    throw new CommandException(jex);
199                }
200
201                LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
202                throw new CommandException(ErrorCode.E1318, jobId);
203            }
204
205            for (Entry<String, Boolean> coordName : map.entrySet()) {
206                BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
207                insertList.add(action);
208            }
209        }
210        else {
211            throw new CommandException(ErrorCode.E0604, jobId);
212        }
213    }
214
215    private BundleActionBean createBundleAction(String jobId, String coordName, boolean isCritical) {
216        BundleActionBean action = new BundleActionBean();
217        action.setBundleActionId(jobId + "_" + coordName);
218        action.setBundleId(jobId);
219        action.setCoordName(coordName);
220        action.setStatus(Job.Status.PREP);
221        action.setLastModifiedTime(new Date());
222        if (isCritical) {
223            action.setCritical();
224        }
225        else {
226            action.resetCritical();
227        }
228        return action;
229    }
230
231    /**
232     * Start Coord Jobs
233     *
234     * @throws CommandException thrown if failed to start coord jobs
235     */
236    @SuppressWarnings("unchecked")
237    private void startCoordJobs() throws CommandException {
238        if (bundleJob != null) {
239            try {
240                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
241                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
242                for (Element coordElem : coordElems) {
243                    Attribute name = coordElem.getAttribute("name");
244                    Configuration coordConf = mergeConfig(coordElem);
245                    coordConf.set(OozieClient.BUNDLE_ID, jobId);
246                    if (OozieJobInfo.isJobInfoEnabled()) {
247                        coordConf.set(OozieJobInfo.BUNDLE_NAME, bundleJob.getAppName());
248                    }
249
250                    queue(new CoordSubmitXCommand(coordConf, bundleJob.getId(), name.getValue()));
251
252                }
253                updateBundleAction();
254            }
255            catch (JDOMException jex) {
256                throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
257            }
258            catch (JPAExecutorException je) {
259                throw new CommandException(je);
260            }
261        }
262        else {
263            throw new CommandException(ErrorCode.E0604, jobId);
264        }
265    }
266
267    private void updateBundleAction() throws JPAExecutorException {
268        for(JsonBean bAction : insertList) {
269            BundleActionBean action = (BundleActionBean) bAction;
270            action.incrementAndGetPending();
271            action.setLastModifiedTime(new Date());
272        }
273    }
274
275    /**
276     * Merge Bundle job config and the configuration from the coord job to pass
277     * to Coord Engine
278     *
279     * @param coordElem the coordinator configuration
280     * @return Configuration merged configuration
281     * @throws CommandException thrown if failed to merge configuration
282     */
283    private Configuration mergeConfig(Element coordElem) throws CommandException {
284        String jobConf = bundleJob.getConf();
285        // Step 1: runConf = jobConf
286        Configuration runConf = null;
287        try {
288            runConf = new XConfiguration(new StringReader(jobConf));
289        }
290        catch (IOException e1) {
291            LOG.warn("Configuration parse error in:" + jobConf);
292            throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
293        }
294        // Step 2: Merge local properties into runConf
295        // extract 'property' tags under 'configuration' block in the coordElem
296        // convert Element to XConfiguration
297        Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
298
299        if (localConfigElement != null) {
300            String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
301            Configuration localConf;
302            try {
303                localConf = new XConfiguration(new StringReader(strConfig));
304            }
305            catch (IOException e1) {
306                LOG.warn("Configuration parse error in:" + strConfig);
307                throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
308            }
309
310            // copy configuration properties in the coordElem to the runConf
311            XConfiguration.copy(localConf, runConf);
312        }
313
314        // Step 3: Extract value of 'app-path' in coordElem, save it as a
315        // new property called 'oozie.coord.application.path', and normalize.
316        String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
317        runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
318        // Normalize coordinator appPath here;
319        try {
320            JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
321        }
322        catch (IOException e) {
323            throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
324        }
325        return runConf;
326    }
327
328    /* (non-Javadoc)
329     * @see org.apache.oozie.command.TransitionXCommand#getJob()
330     */
331    @Override
332    public Job getJob() {
333        return bundleJob;
334    }
335
336    /* (non-Javadoc)
337     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
338     */
339    @Override
340    public void updateJob() throws CommandException {
341        updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob));
342    }
343}