001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.sql.Timestamp;
025import java.text.MessageFormat;
026import java.util.Date;
027import java.util.List;
028
029import javax.persistence.Basic;
030import javax.persistence.Column;
031import javax.persistence.Entity;
032import javax.persistence.Id;
033import javax.persistence.Lob;
034import javax.persistence.NamedQueries;
035import javax.persistence.NamedQuery;
036import javax.persistence.Table;
037import javax.persistence.Transient;
038
039import org.apache.hadoop.io.Writable;
040import org.apache.oozie.client.CoordinatorAction;
041import org.apache.oozie.client.rest.JsonBean;
042import org.apache.oozie.client.rest.JsonTags;
043import org.apache.oozie.client.rest.JsonUtils;
044import org.apache.oozie.coord.input.dependency.CoordInputDependency;
045import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory;
046import org.apache.oozie.util.DateUtils;
047import org.apache.oozie.util.WritableUtils;
048import org.apache.openjpa.persistence.jdbc.Index;
049import org.apache.openjpa.persistence.jdbc.Strategy;
050import org.json.simple.JSONArray;
051import org.json.simple.JSONObject;
052
053
054@Entity
055@NamedQueries({
056
057        @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.statusStr = :status where w.id = :id"),
058
059        @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.statusStr = :status where w.id = :id"),
060        // Query to update the action status, pending status and last modified time stamp of a Coordinator action
061        @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.statusStr =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
062        // Update query for InputCheck
063        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
064        // Update query for Push-based missing dependency check
065        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime,  w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
066        // Update query for Push-based missing dependency check
067        @NamedQuery(name = "UPDATE_COORD_ACTION_DEPENDENCIES", query = "update CoordinatorActionBean w set w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
068        // Update query for Start
069        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.statusStr =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage  where w.id = :id"),
070
071        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
072
073        @NamedQuery(name = "UPDATE_COORD_ACTION_RERUN", query = "update CoordinatorActionBean w set w.actionXml =:actionXml, w.statusStr = :status, w.externalId = :externalId, w.externalStatus = :externalStatus, w.rerunTimestamp = :rerunTime, w.lastModifiedTimestamp = :lastModifiedTime, w.createdTimestamp = :createdTime, w.createdConf = :createdConf, w.runConf = :runConf, w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"),
074
075        @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED')"),
076
077        @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id IN (:actionId)"),
078
079        @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.statusStr = 'WAITING' OR a.statusStr = 'READY')"),
080
081        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_COORDINATOR", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId"),
082
083        // Query used by XTestcase to setup tables
084        @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
085        // Select query used only by test cases
086        @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
087
088        // Select query used by SLAService on restart
089        @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
090        // Select query used by ActionInfo command
091        @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
092        // Select Query used by Timeout and skip commands
093        @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
094        // Select query used by InputCheck command
095        @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.actionNumber, a.jobId, a.statusStr, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut, a.externalId from CoordinatorActionBean a where a.id = :id"),
096        // Select query used by CoordActionUpdate command
097        @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"),
098        // Select query used by Check command
099        @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
100        // Select query used by Start command
101        @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.statusStr, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
102
103        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.actionNumber, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp"),
104
105        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.actionNumber, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp desc"),
106
107        @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'RUNNING' OR a.statusStr='SUBMITTED')"),
108
109        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
110
111        @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'WAITING'"),
112
113        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED')"),
114
115        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.statusStr = :status"),
116
117        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
118        // Query to retrieve Coordinator actions sorted by nominal time
119        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
120        // Query to maintain backward compatibility for coord job info command
121        @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
122        // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
123        @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'FAILED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'IGNORED'"),
124
125        // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions
126        @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'RUNNING'"),
127
128        // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions
129        @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'SUSPENDED'"),
130
131        // Query to retrieve count of Coordinator actions which are pending
132        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"),
133
134        // Query to retrieve status of Coordinator actions
135        @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"),
136
137        // Query to retrieve status of Coordinator actions
138        @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"),
139
140        @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
141
142        @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"),
143
144        //Used by coordinator store only
145        @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'RUNNING'"),
146
147        @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
148
149        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'READY') AND a.lastModifiedTimestamp <= :lastModifiedTime and a.nominalTimestamp <= :currentTime and a.jobId in ( select w.id from CoordinatorJobBean w where w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR')"),
150
151        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
152        // Select query used by rerun, requires almost all columns so select * is used
153        @NamedQuery(name = "GET_TERMINATED_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
154        // Select query used by log
155        @NamedQuery(name = "GET_TERMINATED_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
156        // Select query used by rerun, requires almost all columns so select * is used
157        @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
158
159        @NamedQuery(name = "GET_ACTIVE_ACTIONS_FOR_DATES", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'WAITING' OR a.statusStr = 'READY' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'RUNNING'  OR a.statusStr = 'SUSPENDED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
160
161        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"),
162
163        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE", query = "select count(w) from CoordinatorActionBean w where w.statusStr = 'RUNNING' and w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
164
165        @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
166
167         @NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml  from CoordinatorActionBean a where a.id in (:ids) and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT'  AND a.statusStr <> 'IGNORED')"),
168
169         @NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml  from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT'  AND a.statusStr <> 'IGNORED')")
170 })
171
172@Table(name = "COORD_ACTIONS")
173public class CoordinatorActionBean implements
174        Writable,CoordinatorAction,JsonBean {
175
176    @Id
177    private String id;
178
179    @Basic
180    @Index
181    @Column(name = "job_id")
182    private String jobId;
183
184    @Basic
185    @Index
186    @Column(name = "status")
187    private String statusStr = CoordinatorAction.Status.WAITING.toString();
188
189    @Basic
190    @Index
191    @Column(name = "nominal_time")
192    private java.sql.Timestamp nominalTimestamp = null;
193
194    @Basic
195    @Index
196    @Column(name = "last_modified_time")
197    private java.sql.Timestamp lastModifiedTimestamp = null;
198
199    @Basic
200    @Index
201    @Column(name = "created_time")
202    private java.sql.Timestamp createdTimestamp = null;
203
204    @Basic
205    @Index
206    @Column(name = "rerun_time")
207    private java.sql.Timestamp rerunTimestamp = null;
208
209    @Basic
210    @Index
211    @Column(name = "external_id")
212    private String externalId;
213
214    @Basic
215    @Column(name = "sla_xml")
216    @Lob
217    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
218    private StringBlob slaXml = null;
219
220    @Basic
221    @Column(name = "pending")
222    private int pending = 0;
223
224    @Basic
225    @Column(name = "job_type")
226    private String type;
227
228    @Basic
229    @Column(name = "action_number")
230    private int actionNumber;
231
232    @Basic
233    @Column(name = "created_conf")
234    @Lob
235    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
236    private StringBlob createdConf;
237
238    @Basic
239    @Column(name = "time_out")
240    private int timeOut = 0;
241
242    @Basic
243    @Column(name = "run_conf")
244    @Lob
245    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
246    private StringBlob runConf;
247
248    @Basic
249    @Column(name = "action_xml")
250    @Lob
251    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
252    private StringBlob actionXml;
253
254    @Basic
255    @Column(name = "missing_dependencies")
256    @Lob
257    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
258    private StringBlob missingDependencies;
259
260    @Basic
261    @Column(name = "push_missing_dependencies")
262    @Lob
263    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
264    private StringBlob pushMissingDependencies;
265
266    @Basic
267    @Column(name = "external_status")
268    private String externalStatus;
269
270    @Basic
271    @Column(name = "tracker_uri")
272    private String trackerUri;
273
274    @Basic
275    @Column(name = "console_url")
276    private String consoleUrl;
277
278    @Basic
279    @Column(name = "error_code")
280    private String errorCode;
281
282    @Basic
283    @Column(name = "error_message")
284    private String errorMessage;
285
286    @SuppressWarnings("unchecked")
287    public JSONObject toJSONObject() {
288        return toJSONObject("GMT");
289    }
290
291    @Transient
292    private CoordInputDependency coordPushInputDependency;
293
294    @Transient
295    private CoordInputDependency coordPullInputDependency;
296
297
298    public CoordinatorActionBean() {
299    }
300
301    /**
302     * Serialize the coordinator bean to a data output.
303     *
304     * @param dataOutput data output.
305     * @throws IOException thrown if the coordinator bean could not be
306     *         serialized.
307     */
308    @Override
309    public void write(DataOutput dataOutput) throws IOException {
310        WritableUtils.writeStr(dataOutput, getJobId());
311        WritableUtils.writeStr(dataOutput, getType());
312        WritableUtils.writeStr(dataOutput, getId());
313        WritableUtils.writeStr(dataOutput, getCreatedConf());
314        WritableUtils.writeStr(dataOutput, getStatus().toString());
315        dataOutput.writeInt(getActionNumber());
316        WritableUtils.writeStr(dataOutput, getRunConf());
317        WritableUtils.writeStr(dataOutput, getExternalStatus());
318        WritableUtils.writeStr(dataOutput, getTrackerUri());
319        WritableUtils.writeStr(dataOutput, getConsoleUrl());
320        WritableUtils.writeStr(dataOutput, getErrorCode());
321        WritableUtils.writeStr(dataOutput, getErrorMessage());
322        dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
323        dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
324    }
325
326    /**
327     * Deserialize a coordinator bean from a data input.
328     *
329     * @param dataInput data input.
330     * @throws IOException thrown if the workflow bean could not be
331     *         deserialized.
332     */
333    @Override
334    public void readFields(DataInput dataInput) throws IOException {
335        setJobId(WritableUtils.readStr(dataInput));
336        setType(WritableUtils.readStr(dataInput));
337        setId(WritableUtils.readStr(dataInput));
338        setCreatedConf(WritableUtils.readStr(dataInput));
339        setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
340        setActionNumber(dataInput.readInt());
341        setRunConf(WritableUtils.readStr(dataInput));
342        setExternalStatus(WritableUtils.readStr(dataInput));
343        setTrackerUri(WritableUtils.readStr(dataInput));
344        setConsoleUrl(WritableUtils.readStr(dataInput));
345        setErrorCode(WritableUtils.readStr(dataInput));
346        setErrorMessage(WritableUtils.readStr(dataInput));
347        long d = dataInput.readLong();
348        if (d != -1) {
349            setCreatedTime(new Date(d));
350        }
351        d = dataInput.readLong();
352        if (d != -1) {
353            setLastModifiedTime(new Date(d));
354        }
355    }
356
357    @Override
358    public String getJobId() {
359        return this.jobId;
360    }
361
362    public void setJobId(String id) {
363        this.jobId = id;
364    }
365
366    @Override
367    public Status getStatus() {
368        return Status.valueOf(statusStr);
369    }
370
371    /**
372     * Return the status in string
373     * @return
374     */
375    public String getStatusStr() {
376        return statusStr;
377    }
378
379    public void setStatus(Status status) {
380        this.statusStr = status.toString();
381    }
382
383    public void setStatusStr(String statusStr) {
384        this.statusStr = statusStr;
385    }
386
387    public void setCreatedTime(Date createdTime) {
388        this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
389    }
390
391    public void setRerunTime(Date rerunTime) {
392        this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
393    }
394
395    public void setNominalTime(Date nominalTime) {
396        this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
397    }
398
399    public void setLastModifiedTime(Date lastModifiedTime) {
400        this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
401    }
402
403    public Date getCreatedTime() {
404        return DateUtils.toDate(createdTimestamp);
405    }
406
407    public Timestamp getCreatedTimestamp() {
408        return createdTimestamp;
409    }
410
411    public Date getRerunTime() {
412        return DateUtils.toDate(rerunTimestamp);
413    }
414
415    public Timestamp getRerunTimestamp() {
416        return rerunTimestamp;
417    }
418
419    @Override
420    public Date getLastModifiedTime() {
421        return DateUtils.toDate(lastModifiedTimestamp);
422    }
423
424    public Timestamp getLastModifiedTimestamp() {
425        return lastModifiedTimestamp;
426    }
427
428    @Override
429    public Date getNominalTime() {
430        return DateUtils.toDate(nominalTimestamp);
431    }
432
433    public Timestamp getNominalTimestamp() {
434        return nominalTimestamp;
435    }
436
437    @Override
438    public String getExternalId() {
439        return externalId;
440    }
441
442    public void setExternalId(String externalId) {
443        this.externalId = externalId;
444    }
445
446    public StringBlob getSlaXmlBlob() {
447        return slaXml;
448    }
449
450    public void setSlaXmlBlob(StringBlob slaXml) {
451        this.slaXml = slaXml;
452    }
453
454    public String getSlaXml() {
455        return slaXml == null ? null : slaXml.getString();
456    }
457
458    public void setSlaXml(String slaXml) {
459        if (this.slaXml == null) {
460            this.slaXml = new StringBlob(slaXml);
461        }
462        else {
463            this.slaXml.setString(slaXml);
464        }
465    }
466
467    /**
468     * @return true if in terminal status
469     */
470    public boolean isTerminalStatus() {
471        boolean isTerminal = true;
472        switch (getStatus()) {
473            case WAITING:
474            case READY:
475            case SUBMITTED:
476            case RUNNING:
477            case SUSPENDED:
478                isTerminal = false;
479                break;
480            default:
481                isTerminal = true;
482                break;
483        }
484        return isTerminal;
485    }
486
487    /**
488     * Return if the action is complete with failure.
489     *
490     * @return if the action is complete with failure.
491     */
492    public boolean isTerminalWithFailure() {
493        boolean result = false;
494        switch (getStatus()) {
495            case FAILED:
496            case KILLED:
497            case TIMEDOUT:
498                result = true;
499        }
500        return result;
501    }
502
503    /**
504     * Set some actions are in progress for particular coordinator action.
505     *
506     * @param pending set pending to true
507     */
508    public void setPending(int pending) {
509        this.pending = pending;
510    }
511
512    /**
513     * increment pending and return it
514     *
515     * @return pending
516     */
517    public int incrementAndGetPending() {
518        this.pending++;
519        return pending;
520    }
521
522    /**
523     * decrement pending and return it
524     *
525     * @return pending
526     */
527    public int decrementAndGetPending() {
528        this.pending = Math.max(this.pending - 1, 0);
529        return pending;
530    }
531
532    /**
533     * Get some actions are in progress for particular bundle action.
534     *
535     * @return pending
536     */
537    public int getPending() {
538        return this.pending;
539    }
540
541    /**
542     * Return if the action is pending.
543     *
544     * @return if the action is pending.
545     */
546    public boolean isPending() {
547        return pending > 0 ? true : false;
548    }
549
550    @Override
551    public String getId() {
552        return id;
553    }
554
555    public void setId(String id) {
556        this.id = id;
557    }
558
559    public String getType() {
560        return type;
561    }
562
563    public void setType(String type) {
564        this.type = type;
565    }
566
567    public void setActionNumber(int actionNumber) {
568        this.actionNumber = actionNumber;
569    }
570
571    @Override
572    public int getActionNumber() {
573        return actionNumber;
574    }
575
576    @Override
577    public String getCreatedConf() {
578        return createdConf == null ? null : createdConf.getString();
579    }
580
581    public void setCreatedConf(String createdConf) {
582        if (this.createdConf == null) {
583            this.createdConf = new StringBlob(createdConf);
584        }
585        else {
586            this.createdConf.setString(createdConf);
587        }
588    }
589
590    public void setCreatedConfBlob(StringBlob createdConf) {
591        this.createdConf = createdConf;
592    }
593
594    public StringBlob getCreatedConfBlob() {
595        return createdConf;
596    }
597
598    public void setRunConf(String runConf) {
599        if (this.runConf == null) {
600            this.runConf = new StringBlob(runConf);
601        }
602        else {
603            this.runConf.setString(runConf);
604        }
605    }
606
607    @Override
608    public String getRunConf() {
609        return runConf == null ? null : runConf.getString();
610    }
611
612    public void setRunConfBlob(StringBlob runConf) {
613        this.runConf = runConf;
614    }
615
616    public StringBlob getRunConfBlob() {
617        return runConf;
618    }
619
620
621    public void setMissingDependencies(String missingDependencies) {
622        if (this.missingDependencies == null) {
623            this.missingDependencies = new StringBlob(missingDependencies);
624        }
625        else {
626            this.missingDependencies.setString(missingDependencies);
627        }
628    }
629
630    @Override
631    public String getMissingDependencies() {
632        return missingDependencies == null ? null : missingDependencies.getString();
633    }
634
635    public void setMissingDependenciesBlob(StringBlob missingDependencies) {
636        this.missingDependencies = missingDependencies;
637    }
638
639    public StringBlob getMissingDependenciesBlob() {
640        return missingDependencies;
641    }
642
643    @Override
644    public String getPushMissingDependencies() {
645        return pushMissingDependencies == null ? null : pushMissingDependencies.getString();
646    }
647
648    public void setPushMissingDependencies(String pushMissingDependencies) {
649        if (this.pushMissingDependencies == null) {
650            this.pushMissingDependencies = new StringBlob(pushMissingDependencies);
651        }
652        else {
653            this.pushMissingDependencies.setString(pushMissingDependencies);
654        }
655    }
656
657    public void setPushMissingDependenciesBlob(StringBlob pushMissingDependencies) {
658        this.pushMissingDependencies = pushMissingDependencies;
659    }
660
661    public StringBlob getPushMissingDependenciesBlob() {
662        return pushMissingDependencies;
663    }
664
665    public String getExternalStatus() {
666        return externalStatus;
667    }
668
669    public void setExternalStatus(String externalStatus) {
670        this.externalStatus = externalStatus;
671    }
672
673    @Override
674    public String getTrackerUri() {
675        return trackerUri;
676    }
677
678    public void setTrackerUri(String trackerUri) {
679        this.trackerUri = trackerUri;
680    }
681
682    @Override
683    public String getConsoleUrl() {
684        return consoleUrl;
685    }
686
687    public void setConsoleUrl(String consoleUrl) {
688        this.consoleUrl = consoleUrl;
689    }
690
691    @Override
692    public String getErrorCode() {
693        return errorCode;
694    }
695
696    @Override
697    public String getErrorMessage() {
698        return errorMessage;
699    }
700
701    public void setErrorInfo(String errorCode, String errorMessage) {
702        this.errorCode = errorCode;
703        this.errorMessage = errorMessage;
704    }
705
706    public String getActionXml() {
707        return actionXml == null ? null : actionXml.getString();
708    }
709
710    public void setActionXml(String actionXml) {
711        if (this.actionXml == null) {
712            this.actionXml = new StringBlob(actionXml);
713        }
714        else {
715            this.actionXml.setString(actionXml);
716        }
717    }
718
719    public void setActionXmlBlob(StringBlob actionXml) {
720        this.actionXml = actionXml;
721    }
722
723    public StringBlob getActionXmlBlob() {
724        return actionXml;
725    }
726
727    @Override
728    public String toString() {
729        return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]",
730                                    getId(), getStatus());
731    }
732
733    public int getTimeOut() {
734        return timeOut;
735    }
736
737    public void setTimeOut(int timeOut) {
738        this.timeOut = timeOut;
739    }
740
741
742    public void setErrorCode(String errorCode) {
743        this.errorCode = errorCode;
744    }
745
746    public void setErrorMessage(String errorMessage) {
747        this.errorMessage = errorMessage;
748    }
749
750    @SuppressWarnings("unchecked")
751    public JSONObject toJSONObject(String timeZoneId) {
752        JSONObject json = new JSONObject();
753        json.put(JsonTags.COORDINATOR_ACTION_ID, id);
754        json.put(JsonTags.COORDINATOR_JOB_ID, jobId);
755        json.put(JsonTags.COORDINATOR_ACTION_TYPE, type);
756        json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber);
757        json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf());
758        json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId));
759        json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils.formatDateRfc822(getNominalTime(), timeZoneId));
760        json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId);
761        // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
762        // .formatDateRfc822(startTime), timeZoneId);
763        json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr);
764        json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf());
765        json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME,
766                JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId));
767        // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
768        // .formatDateRfc822(startTime), timeZoneId);
769        // json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
770        // .formatDateRfc822(endTime), timeZoneId);
771        json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getPullInputDependencies().getMissingDependencies());
772        json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushInputDependencies().getMissingDependencies());
773        json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
774        json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
775        json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
776        json.put(JsonTags.COORDINATOR_ACTION_ERROR_CODE, errorCode);
777        json.put(JsonTags.COORDINATOR_ACTION_ERROR_MESSAGE, errorMessage);
778        json.put(JsonTags.TO_STRING, toString());
779        return json;
780    }
781
782    /**
783     * Convert a nodes list into a JSONArray.
784     *
785     * @param actions nodes list.
786     * @param timeZoneId time zone to use for dates in the JSON array.
787     * @return the corresponding JSON array.
788     */
789    @SuppressWarnings("unchecked")
790    public static JSONArray toJSONArray(List<CoordinatorActionBean> actions, String timeZoneId) {
791        JSONArray array = new JSONArray();
792        for (CoordinatorActionBean action : actions) {
793            array.add(action.toJSONObject(timeZoneId));
794        }
795        return array;
796    }
797
798    @Override
799    public int hashCode() {
800        final int prime = 31;
801        int result = 1;
802        result = prime * result + ((id == null) ? 0 : id.hashCode());
803        return result;
804    }
805
806    @Override
807    public boolean equals(Object obj) {
808        if (this == obj) {
809            return true;
810        }
811        if (obj == null) {
812            return false;
813        }
814        if (getClass() != obj.getClass()) {
815            return false;
816        }
817        CoordinatorActionBean other = (CoordinatorActionBean) obj;
818        if (id == null) {
819            if (other.id != null) {
820                return false;
821            }
822        }
823        else if (!id.equals(other.id)) {
824            return false;
825        }
826        return true;
827    }
828
829    public CoordInputDependency getPullInputDependencies() {
830        if (coordPullInputDependency == null) {
831            coordPullInputDependency = CoordInputDependencyFactory.getPullInputDependencies(missingDependencies);
832        }
833        return coordPullInputDependency;
834
835    }
836
837    public CoordInputDependency getPushInputDependencies() {
838        if (coordPushInputDependency == null) {
839            coordPushInputDependency = CoordInputDependencyFactory.getPushInputDependencies(pushMissingDependencies);
840        }
841        return coordPushInputDependency;
842    }
843
844    public void setPullInputDependencies(CoordInputDependency coordPullInputDependency) {
845        this.coordPullInputDependency = coordPullInputDependency;
846    }
847
848    public void setPushInputDependencies(CoordInputDependency coordPushInputDependency) {
849        this.coordPushInputDependency = coordPushInputDependency;
850    }
851
852}