This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.DataInput;
021 import java.io.DataOutput;
022 import java.io.IOException;
023 import java.sql.Timestamp;
024 import java.util.Date;
025
026 import javax.persistence.Basic;
027 import javax.persistence.Column;
028 import javax.persistence.ColumnResult;
029 import javax.persistence.Entity;
030 import javax.persistence.Lob;
031 import javax.persistence.NamedNativeQueries;
032 import javax.persistence.NamedNativeQuery;
033 import javax.persistence.NamedQueries;
034 import javax.persistence.NamedQuery;
035 import javax.persistence.SqlResultSetMapping;
036
037 import org.apache.hadoop.io.Writable;
038 import org.apache.oozie.client.CoordinatorAction;
039 import org.apache.oozie.client.rest.JsonCoordinatorAction;
040 import org.apache.oozie.util.DateUtils;
041 import org.apache.oozie.util.WritableUtils;
042 import org.apache.openjpa.persistence.jdbc.Index;
043
044 @SqlResultSetMapping(
045 name = "CoordActionJobIdLmt",
046 columns = {@ColumnResult(name = "job_id"),
047 @ColumnResult(name = "min_lmt")})
048
049 @Entity
050 @NamedQueries({
051
052 @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"),
053
054 @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"),
055 // Query to update the action status, pending status and last modified time stamp of a Coordinator action
056 @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
057 // Update query for InputCheck
058 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
059 // Update query for Push-based missing dependency check
060 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
061 // Update query for Start
062 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"),
063
064 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
065
066 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"),
067
068 @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId"),
069
070 @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.status = 'WAITING' OR a.status = 'READY')"),
071
072 // Query used by XTestcase to setup tables
073 @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
074 // Select query used only by test cases
075 @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
076
077 // Select query used by SLAService on restart
078 @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.status, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
079 // Select query used by ActionInfo command
080 @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
081 // Select Query used by Timeout command
082 @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
083 // Select query used by InputCheck command
084 @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
085 // Select query used by CoordActionUpdate command
086 @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"),
087 // Select query used by Check command
088 @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
089 // Select query used by Start command
090 @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
091
092 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
093
094 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
095
096 @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"),
097
098 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
099
100 @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"),
101
102 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"),
103
104 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"),
105
106 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
107 // Query to retrieve Coordinator actions sorted by nominal time
108 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
109 // Query to maintain backward compatibility for coord job info command
110 @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
111 // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
112 @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.status, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"),
113
114 // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions
115 @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'RUNNING'"),
116
117 // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions
118 @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'SUSPENDED'"),
119
120 // Query to retrieve count of Coordinator actions which are pending
121 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"),
122
123 // Query to retrieve status of Coordinator actions
124 @NamedQuery(name = "GET_COORD_ACTIONS_STATUS", query = "select a.status from CoordinatorActionBean a where a.jobId = :jobId"),
125
126 @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
127
128 @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"),
129
130 //Used by coordinator store only
131 @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"),
132
133 @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
134
135 @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
136
137 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
138 // Select query used by rerun, requires almost all columns so select * is used
139 @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
140 // Select query used by log
141 @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
142 // Select query used by rerun, requires almost all columns so select * is used
143 @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
144
145 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")})
146
147 @NamedNativeQueries({
148
149 @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt")
150 })
151 public class CoordinatorActionBean extends JsonCoordinatorAction implements
152 Writable {
153 @Basic
154 @Index
155 @Column(name = "job_id")
156 private String jobId;
157
158 @Basic
159 @Index
160 @Column(name = "status")
161 private String status = null;
162
163 @Basic
164 @Index
165 @Column(name = "nominal_time")
166 private java.sql.Timestamp nominalTimestamp = null;
167
168 @Basic
169 @Index
170 @Column(name = "last_modified_time")
171 private java.sql.Timestamp lastModifiedTimestamp = null;
172
173 @Basic
174 @Index
175 @Column(name = "created_time")
176 private java.sql.Timestamp createdTimestamp = null;
177
178 @Basic
179 @Index
180 @Column(name = "rerun_time")
181 private java.sql.Timestamp rerunTimestamp = null;
182
183 @Basic
184 @Index
185 @Column(name = "external_id")
186 private String externalId;
187
188 @Column(name = "sla_xml")
189 @Lob
190 private String slaXml = null;
191
192 @Basic
193 @Column(name = "pending")
194 private int pending = 0;
195
196 public CoordinatorActionBean() {
197 }
198
199 /**
200 * Serialize the coordinator bean to a data output.
201 *
202 * @param dataOutput data output.
203 * @throws IOException thrown if the coordinator bean could not be
204 * serialized.
205 */
206 @Override
207 public void write(DataOutput dataOutput) throws IOException {
208 WritableUtils.writeStr(dataOutput, getJobId());
209 WritableUtils.writeStr(dataOutput, getType());
210 WritableUtils.writeStr(dataOutput, getId());
211 WritableUtils.writeStr(dataOutput, getCreatedConf());
212 WritableUtils.writeStr(dataOutput, getStatus().toString());
213 dataOutput.writeInt(getActionNumber());
214 WritableUtils.writeStr(dataOutput, getRunConf());
215 WritableUtils.writeStr(dataOutput, getExternalStatus());
216 WritableUtils.writeStr(dataOutput, getTrackerUri());
217 WritableUtils.writeStr(dataOutput, getConsoleUrl());
218 WritableUtils.writeStr(dataOutput, getErrorCode());
219 WritableUtils.writeStr(dataOutput, getErrorMessage());
220 dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
221 dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
222 }
223
224 /**
225 * Deserialize a coordinator bean from a data input.
226 *
227 * @param dataInput data input.
228 * @throws IOException thrown if the workflow bean could not be
229 * deserialized.
230 */
231 @Override
232 public void readFields(DataInput dataInput) throws IOException {
233 setJobId(WritableUtils.readStr(dataInput));
234 setType(WritableUtils.readStr(dataInput));
235 setId(WritableUtils.readStr(dataInput));
236 setCreatedConf(WritableUtils.readStr(dataInput));
237 setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
238 setActionNumber(dataInput.readInt());
239 setRunConf(WritableUtils.readStr(dataInput));
240 setExternalStatus(WritableUtils.readStr(dataInput));
241 setTrackerUri(WritableUtils.readStr(dataInput));
242 setConsoleUrl(WritableUtils.readStr(dataInput));
243 setErrorCode(WritableUtils.readStr(dataInput));
244 setErrorMessage(WritableUtils.readStr(dataInput));
245 long d = dataInput.readLong();
246 if (d != -1) {
247 setCreatedTime(new Date(d));
248 }
249 d = dataInput.readLong();
250 if (d != -1) {
251 setLastModifiedTime(new Date(d));
252 }
253 }
254
255 @Override
256 public String getJobId() {
257 return this.jobId;
258 }
259
260 @Override
261 public void setJobId(String id) {
262 super.setJobId(id);
263 this.jobId = id;
264 }
265
266 @Override
267 public Status getStatus() {
268 return Status.valueOf(status);
269 }
270
271 /**
272 * Return the status in string
273 * @return
274 */
275 public String getStatusStr() {
276 return status;
277 }
278
279 @Override
280 public void setStatus(Status status) {
281 super.setStatus(status);
282 this.status = status.toString();
283 }
284
285 @Override
286 public void setCreatedTime(Date createdTime) {
287 this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
288 super.setCreatedTime(createdTime);
289 }
290
291 public void setRerunTime(Date rerunTime) {
292 this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
293 }
294
295 @Override
296 public void setNominalTime(Date nominalTime) {
297 this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
298 super.setNominalTime(nominalTime);
299 }
300
301 @Override
302 public void setLastModifiedTime(Date lastModifiedTime) {
303 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
304 super.setLastModifiedTime(lastModifiedTime);
305 }
306
307 @Override
308 public Date getCreatedTime() {
309 return DateUtils.toDate(createdTimestamp);
310 }
311
312 public Timestamp getCreatedTimestamp() {
313 return createdTimestamp;
314 }
315
316 public Date getRerunTime() {
317 return DateUtils.toDate(rerunTimestamp);
318 }
319
320 public Timestamp getRerunTimestamp() {
321 return rerunTimestamp;
322 }
323
324 @Override
325 public Date getLastModifiedTime() {
326 return DateUtils.toDate(lastModifiedTimestamp);
327 }
328
329 public Timestamp getLastModifiedTimestamp() {
330 return lastModifiedTimestamp;
331 }
332
333 @Override
334 public Date getNominalTime() {
335 return DateUtils.toDate(nominalTimestamp);
336 }
337
338 public Timestamp getNominalTimestamp() {
339 return nominalTimestamp;
340 }
341
342 @Override
343 public String getExternalId() {
344 return externalId;
345 }
346
347 @Override
348 public void setExternalId(String externalId) {
349 super.setExternalId(externalId);
350 this.externalId = externalId;
351 }
352
353 public String getSlaXml() {
354 return slaXml;
355 }
356
357 public void setSlaXml(String slaXml) {
358 this.slaXml = slaXml;
359 }
360
361 /**
362 * @return true if in terminal status
363 */
364 public boolean isTerminalStatus() {
365 boolean isTerminal = true;
366 switch (getStatus()) {
367 case WAITING:
368 case READY:
369 case SUBMITTED:
370 case RUNNING:
371 case SUSPENDED:
372 isTerminal = false;
373 break;
374 default:
375 isTerminal = true;
376 break;
377 }
378 return isTerminal;
379 }
380
381 /**
382 * Return if the action is complete with failure.
383 *
384 * @return if the action is complete with failure.
385 */
386 public boolean isTerminalWithFailure() {
387 boolean result = false;
388 switch (getStatus()) {
389 case FAILED:
390 case KILLED:
391 case TIMEDOUT:
392 result = true;
393 }
394 return result;
395 }
396
397 /**
398 * Set some actions are in progress for particular coordinator action.
399 *
400 * @param pending set pending to true
401 */
402 public void setPending(int pending) {
403 this.pending = pending;
404 }
405
406 /**
407 * increment pending and return it
408 *
409 * @return pending
410 */
411 public int incrementAndGetPending() {
412 this.pending++;
413 return pending;
414 }
415
416 /**
417 * decrement pending and return it
418 *
419 * @return pending
420 */
421 public int decrementAndGetPending() {
422 this.pending = Math.max(this.pending - 1, 0);
423 return pending;
424 }
425
426 /**
427 * Get some actions are in progress for particular bundle action.
428 *
429 * @return pending
430 */
431 public int getPending() {
432 return this.pending;
433 }
434
435 /**
436 * Return if the action is pending.
437 *
438 * @return if the action is pending.
439 */
440 public boolean isPending() {
441 return pending > 0 ? true : false;
442 }
443 }