001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.sql.Timestamp; 024import java.text.MessageFormat; 025import java.util.Date; 026import java.util.List; 027 028import javax.persistence.Basic; 029import javax.persistence.Column; 030import javax.persistence.ColumnResult; 031import javax.persistence.Entity; 032import javax.persistence.Id; 033import javax.persistence.Lob; 034import javax.persistence.NamedNativeQueries; 035import javax.persistence.NamedNativeQuery; 036import javax.persistence.NamedQueries; 037import javax.persistence.NamedQuery; 038import javax.persistence.SqlResultSetMapping; 039import javax.persistence.Table; 040 041import org.apache.hadoop.io.Writable; 042import org.apache.oozie.client.CoordinatorAction; 043import org.apache.oozie.client.rest.JsonBean; 044import org.apache.oozie.client.rest.JsonTags; 045import org.apache.oozie.client.rest.JsonUtils; 046import org.apache.oozie.util.DateUtils; 047import org.apache.oozie.util.WritableUtils; 048import org.apache.openjpa.persistence.jdbc.Index; 049import org.apache.openjpa.persistence.jdbc.Strategy; 050import org.json.simple.JSONArray; 051import org.json.simple.JSONObject; 052 053@SqlResultSetMapping( 054 name = "CoordActionJobIdLmt", 055 columns = {@ColumnResult(name = "job_id"), 056 @ColumnResult(name = "min_lmt")}) 057 058@Entity 059@NamedQueries({ 060 061 @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.statusStr = :status where w.id = :id"), 062 063 @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.statusStr = :status where w.id = :id"), 064 // Query to update the action status, pending status and last modified time stamp of a Coordinator action 065 @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.statusStr =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 066 // Update query for InputCheck 067 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"), 068 // Update query for Push-based missing dependency check 069 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"), 070 // Update query for Push-based missing dependency check 071 @NamedQuery(name = "UPDATE_COORD_ACTION_DEPENDENCIES", query = "update CoordinatorActionBean w set w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"), 072 // Update query for Start 073 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.statusStr =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"), 074 075 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 076 077 @NamedQuery(name = "UPDATE_COORD_ACTION_RERUN", query = "update CoordinatorActionBean w set w.actionXml =:actionXml, w.statusStr = :status, w.externalId = :externalId, w.externalStatus = :externalStatus, w.rerunTimestamp = :rerunTime, w.lastModifiedTimestamp = :lastModifiedTime, w.createdTimestamp = :createdTime, w.createdConf = :createdConf, w.runConf = :runConf, w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"), 078 079 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED')"), 080 081 @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId"), 082 083 @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id = :actionId"), 084 085 @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.statusStr = 'WAITING' OR a.statusStr = 'READY')"), 086 087 // Query used by XTestcase to setup tables 088 @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"), 089 // Select query used only by test cases 090 @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"), 091 092 // Select query used by SLAService on restart 093 @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"), 094 // Select query used by ActionInfo command 095 @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"), 096 // Select Query used by Timeout and skip commands 097 @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 098 // Select query used by InputCheck command 099 @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"), 100 // Select query used by CoordActionUpdate command 101 @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"), 102 // Select query used by Check command 103 @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 104 // Select query used by Start command 105 @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.statusStr, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 106 107 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp"), 108 109 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp desc"), 110 111 @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'RUNNING' OR a.statusStr='SUBMITTED')"), 112 113 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 114 115 @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'WAITING'"), 116 117 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED')"), 118 119 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.statusStr = :status"), 120 121 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 122 // Query to retrieve Coordinator actions sorted by nominal time 123 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"), 124 // Query to maintain backward compatibility for coord job info command 125 @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"), 126 // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions 127 @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'FAILED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'IGNORED'"), 128 129 // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions 130 @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'RUNNING'"), 131 132 // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions 133 @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'SUSPENDED'"), 134 135 // Query to retrieve count of Coordinator actions which are pending 136 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"), 137 138 // Query to retrieve status of Coordinator actions 139 @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"), 140 141 // Query to retrieve status of Coordinator actions 142 @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"), 143 144 @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"), 145 146 @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"), 147 148 //Used by coordinator store only 149 @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'RUNNING'"), 150 151 @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"), 152 153 @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 154 155 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 156 // Select query used by rerun, requires almost all columns so select * is used 157 @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 158 // Select query used by log 159 @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 160 // Select query used by rerun, requires almost all columns so select * is used 161 @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"), 162 163 @NamedQuery(name = "GET_ACTIONS_BY_DATES_FOR_KILL", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 164 165 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"), 166 167 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE", query = "select count(w) from CoordinatorActionBean w where w.statusStr = 'RUNNING' and w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"), 168 169 @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction")}) 170 171@NamedNativeQueries({ 172 173 @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt") 174 }) 175@Table(name = "COORD_ACTIONS") 176public class CoordinatorActionBean implements 177 Writable,CoordinatorAction,JsonBean { 178 179 @Id 180 private String id; 181 182 @Basic 183 @Index 184 @Column(name = "job_id") 185 private String jobId; 186 187 @Basic 188 @Index 189 @Column(name = "status") 190 private String statusStr = CoordinatorAction.Status.WAITING.toString(); 191 192 @Basic 193 @Index 194 @Column(name = "nominal_time") 195 private java.sql.Timestamp nominalTimestamp = null; 196 197 @Basic 198 @Index 199 @Column(name = "last_modified_time") 200 private java.sql.Timestamp lastModifiedTimestamp = null; 201 202 @Basic 203 @Index 204 @Column(name = "created_time") 205 private java.sql.Timestamp createdTimestamp = null; 206 207 @Basic 208 @Index 209 @Column(name = "rerun_time") 210 private java.sql.Timestamp rerunTimestamp = null; 211 212 @Basic 213 @Index 214 @Column(name = "external_id") 215 private String externalId; 216 217 @Basic 218 @Column(name = "sla_xml") 219 @Lob 220 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 221 private StringBlob slaXml = null; 222 223 @Basic 224 @Column(name = "pending") 225 private int pending = 0; 226 227 @Basic 228 @Column(name = "job_type") 229 private String type; 230 231 @Basic 232 @Column(name = "action_number") 233 private int actionNumber; 234 235 @Basic 236 @Column(name = "created_conf") 237 @Lob 238 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 239 private StringBlob createdConf; 240 241 @Basic 242 @Column(name = "time_out") 243 private int timeOut = 0; 244 245 @Basic 246 @Column(name = "run_conf") 247 @Lob 248 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 249 private StringBlob runConf; 250 251 @Basic 252 @Column(name = "action_xml") 253 @Lob 254 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 255 private StringBlob actionXml; 256 257 @Basic 258 @Column(name = "missing_dependencies") 259 @Lob 260 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 261 private StringBlob missingDependencies; 262 263 @Basic 264 @Column(name = "push_missing_dependencies") 265 @Lob 266 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 267 private StringBlob pushMissingDependencies; 268 269 @Basic 270 @Column(name = "external_status") 271 private String externalStatus; 272 273 @Basic 274 @Column(name = "tracker_uri") 275 private String trackerUri; 276 277 @Basic 278 @Column(name = "console_url") 279 private String consoleUrl; 280 281 @Basic 282 @Column(name = "error_code") 283 private String errorCode; 284 285 @Basic 286 @Column(name = "error_message") 287 private String errorMessage; 288 289 @SuppressWarnings("unchecked") 290 public JSONObject toJSONObject() { 291 return toJSONObject("GMT"); 292 } 293 294 public CoordinatorActionBean() { 295 } 296 297 /** 298 * Serialize the coordinator bean to a data output. 299 * 300 * @param dataOutput data output. 301 * @throws IOException thrown if the coordinator bean could not be 302 * serialized. 303 */ 304 @Override 305 public void write(DataOutput dataOutput) throws IOException { 306 WritableUtils.writeStr(dataOutput, getJobId()); 307 WritableUtils.writeStr(dataOutput, getType()); 308 WritableUtils.writeStr(dataOutput, getId()); 309 WritableUtils.writeStr(dataOutput, getCreatedConf()); 310 WritableUtils.writeStr(dataOutput, getStatus().toString()); 311 dataOutput.writeInt(getActionNumber()); 312 WritableUtils.writeStr(dataOutput, getRunConf()); 313 WritableUtils.writeStr(dataOutput, getExternalStatus()); 314 WritableUtils.writeStr(dataOutput, getTrackerUri()); 315 WritableUtils.writeStr(dataOutput, getConsoleUrl()); 316 WritableUtils.writeStr(dataOutput, getErrorCode()); 317 WritableUtils.writeStr(dataOutput, getErrorMessage()); 318 dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1); 319 dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1); 320 } 321 322 /** 323 * Deserialize a coordinator bean from a data input. 324 * 325 * @param dataInput data input. 326 * @throws IOException thrown if the workflow bean could not be 327 * deserialized. 328 */ 329 @Override 330 public void readFields(DataInput dataInput) throws IOException { 331 setJobId(WritableUtils.readStr(dataInput)); 332 setType(WritableUtils.readStr(dataInput)); 333 setId(WritableUtils.readStr(dataInput)); 334 setCreatedConf(WritableUtils.readStr(dataInput)); 335 setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput))); 336 setActionNumber(dataInput.readInt()); 337 setRunConf(WritableUtils.readStr(dataInput)); 338 setExternalStatus(WritableUtils.readStr(dataInput)); 339 setTrackerUri(WritableUtils.readStr(dataInput)); 340 setConsoleUrl(WritableUtils.readStr(dataInput)); 341 setErrorCode(WritableUtils.readStr(dataInput)); 342 setErrorMessage(WritableUtils.readStr(dataInput)); 343 long d = dataInput.readLong(); 344 if (d != -1) { 345 setCreatedTime(new Date(d)); 346 } 347 d = dataInput.readLong(); 348 if (d != -1) { 349 setLastModifiedTime(new Date(d)); 350 } 351 } 352 353 @Override 354 public String getJobId() { 355 return this.jobId; 356 } 357 358 public void setJobId(String id) { 359 this.jobId = id; 360 } 361 362 @Override 363 public Status getStatus() { 364 return Status.valueOf(statusStr); 365 } 366 367 /** 368 * Return the status in string 369 * @return 370 */ 371 public String getStatusStr() { 372 return statusStr; 373 } 374 375 public void setStatus(Status status) { 376 this.statusStr = status.toString(); 377 } 378 379 public void setStatusStr(String statusStr) { 380 this.statusStr = statusStr; 381 } 382 383 public void setCreatedTime(Date createdTime) { 384 this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime); 385 } 386 387 public void setRerunTime(Date rerunTime) { 388 this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime); 389 } 390 391 public void setNominalTime(Date nominalTime) { 392 this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime); 393 } 394 395 public void setLastModifiedTime(Date lastModifiedTime) { 396 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 397 } 398 399 public Date getCreatedTime() { 400 return DateUtils.toDate(createdTimestamp); 401 } 402 403 public Timestamp getCreatedTimestamp() { 404 return createdTimestamp; 405 } 406 407 public Date getRerunTime() { 408 return DateUtils.toDate(rerunTimestamp); 409 } 410 411 public Timestamp getRerunTimestamp() { 412 return rerunTimestamp; 413 } 414 415 @Override 416 public Date getLastModifiedTime() { 417 return DateUtils.toDate(lastModifiedTimestamp); 418 } 419 420 public Timestamp getLastModifiedTimestamp() { 421 return lastModifiedTimestamp; 422 } 423 424 @Override 425 public Date getNominalTime() { 426 return DateUtils.toDate(nominalTimestamp); 427 } 428 429 public Timestamp getNominalTimestamp() { 430 return nominalTimestamp; 431 } 432 433 @Override 434 public String getExternalId() { 435 return externalId; 436 } 437 438 public void setExternalId(String externalId) { 439 this.externalId = externalId; 440 } 441 442 public StringBlob getSlaXmlBlob() { 443 return slaXml; 444 } 445 446 public void setSlaXmlBlob(StringBlob slaXml) { 447 this.slaXml = slaXml; 448 } 449 450 public String getSlaXml() { 451 return slaXml == null ? null : slaXml.getString(); 452 } 453 454 public void setSlaXml(String slaXml) { 455 if (this.slaXml == null) { 456 this.slaXml = new StringBlob(slaXml); 457 } 458 else { 459 this.slaXml.setString(slaXml); 460 } 461 } 462 463 /** 464 * @return true if in terminal status 465 */ 466 public boolean isTerminalStatus() { 467 boolean isTerminal = true; 468 switch (getStatus()) { 469 case WAITING: 470 case READY: 471 case SUBMITTED: 472 case RUNNING: 473 case SUSPENDED: 474 isTerminal = false; 475 break; 476 default: 477 isTerminal = true; 478 break; 479 } 480 return isTerminal; 481 } 482 483 /** 484 * Return if the action is complete with failure. 485 * 486 * @return if the action is complete with failure. 487 */ 488 public boolean isTerminalWithFailure() { 489 boolean result = false; 490 switch (getStatus()) { 491 case FAILED: 492 case KILLED: 493 case TIMEDOUT: 494 result = true; 495 } 496 return result; 497 } 498 499 /** 500 * Set some actions are in progress for particular coordinator action. 501 * 502 * @param pending set pending to true 503 */ 504 public void setPending(int pending) { 505 this.pending = pending; 506 } 507 508 /** 509 * increment pending and return it 510 * 511 * @return pending 512 */ 513 public int incrementAndGetPending() { 514 this.pending++; 515 return pending; 516 } 517 518 /** 519 * decrement pending and return it 520 * 521 * @return pending 522 */ 523 public int decrementAndGetPending() { 524 this.pending = Math.max(this.pending - 1, 0); 525 return pending; 526 } 527 528 /** 529 * Get some actions are in progress for particular bundle action. 530 * 531 * @return pending 532 */ 533 public int getPending() { 534 return this.pending; 535 } 536 537 /** 538 * Return if the action is pending. 539 * 540 * @return if the action is pending. 541 */ 542 public boolean isPending() { 543 return pending > 0 ? true : false; 544 } 545 546 @Override 547 public String getId() { 548 return id; 549 } 550 551 public void setId(String id) { 552 this.id = id; 553 } 554 555 public String getType() { 556 return type; 557 } 558 559 public void setType(String type) { 560 this.type = type; 561 } 562 563 public void setActionNumber(int actionNumber) { 564 this.actionNumber = actionNumber; 565 } 566 567 @Override 568 public int getActionNumber() { 569 return actionNumber; 570 } 571 572 @Override 573 public String getCreatedConf() { 574 return createdConf == null ? null : createdConf.getString(); 575 } 576 577 public void setCreatedConf(String createdConf) { 578 if (this.createdConf == null) { 579 this.createdConf = new StringBlob(createdConf); 580 } 581 else { 582 this.createdConf.setString(createdConf); 583 } 584 } 585 586 public void setCreatedConfBlob(StringBlob createdConf) { 587 this.createdConf = createdConf; 588 } 589 590 public StringBlob getCreatedConfBlob() { 591 return createdConf; 592 } 593 594 public void setRunConf(String runConf) { 595 if (this.runConf == null) { 596 this.runConf = new StringBlob(runConf); 597 } 598 else { 599 this.runConf.setString(runConf); 600 } 601 } 602 603 @Override 604 public String getRunConf() { 605 return runConf == null ? null : runConf.getString(); 606 } 607 608 public void setRunConfBlob(StringBlob runConf) { 609 this.runConf = runConf; 610 } 611 612 public StringBlob getRunConfBlob() { 613 return runConf; 614 } 615 616 617 public void setMissingDependencies(String missingDependencies) { 618 if (this.missingDependencies == null) { 619 this.missingDependencies = new StringBlob(missingDependencies); 620 } 621 else { 622 this.missingDependencies.setString(missingDependencies); 623 } 624 } 625 626 @Override 627 public String getMissingDependencies() { 628 return missingDependencies == null ? null : missingDependencies.getString(); 629 } 630 631 public void setMissingDependenciesBlob(StringBlob missingDependencies) { 632 this.missingDependencies = missingDependencies; 633 } 634 635 public StringBlob getMissingDependenciesBlob() { 636 return missingDependencies; 637 } 638 639 @Override 640 public String getPushMissingDependencies() { 641 return pushMissingDependencies == null ? null : pushMissingDependencies.getString(); 642 } 643 644 public void setPushMissingDependencies(String pushMissingDependencies) { 645 if (this.pushMissingDependencies == null) { 646 this.pushMissingDependencies = new StringBlob(pushMissingDependencies); 647 } 648 else { 649 this.pushMissingDependencies.setString(pushMissingDependencies); 650 } 651 } 652 653 public void setPushMissingDependenciesBlob(StringBlob pushMissingDependencies) { 654 this.pushMissingDependencies = pushMissingDependencies; 655 } 656 657 public StringBlob getPushMissingDependenciesBlob() { 658 return pushMissingDependencies; 659 } 660 661 public String getExternalStatus() { 662 return externalStatus; 663 } 664 665 public void setExternalStatus(String externalStatus) { 666 this.externalStatus = externalStatus; 667 } 668 669 @Override 670 public String getTrackerUri() { 671 return trackerUri; 672 } 673 674 public void setTrackerUri(String trackerUri) { 675 this.trackerUri = trackerUri; 676 } 677 678 @Override 679 public String getConsoleUrl() { 680 return consoleUrl; 681 } 682 683 public void setConsoleUrl(String consoleUrl) { 684 this.consoleUrl = consoleUrl; 685 } 686 687 @Override 688 public String getErrorCode() { 689 return errorCode; 690 } 691 692 @Override 693 public String getErrorMessage() { 694 return errorMessage; 695 } 696 697 public void setErrorInfo(String errorCode, String errorMessage) { 698 this.errorCode = errorCode; 699 this.errorMessage = errorMessage; 700 } 701 702 public String getActionXml() { 703 return actionXml == null ? null : actionXml.getString(); 704 } 705 706 public void setActionXml(String actionXml) { 707 if (this.actionXml == null) { 708 this.actionXml = new StringBlob(actionXml); 709 } 710 else { 711 this.actionXml.setString(actionXml); 712 } 713 } 714 715 public void setActionXmlBlob(StringBlob actionXml) { 716 this.actionXml = actionXml; 717 } 718 719 public StringBlob getActionXmlBlob() { 720 return actionXml; 721 } 722 723 @Override 724 public String toString() { 725 return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]", 726 getId(), getStatus()); 727 } 728 729 public int getTimeOut() { 730 return timeOut; 731 } 732 733 public void setTimeOut(int timeOut) { 734 this.timeOut = timeOut; 735 } 736 737 738 public void setErrorCode(String errorCode) { 739 this.errorCode = errorCode; 740 } 741 742 public void setErrorMessage(String errorMessage) { 743 this.errorMessage = errorMessage; 744 } 745 746 @SuppressWarnings("unchecked") 747 public JSONObject toJSONObject(String timeZoneId) { 748 JSONObject json = new JSONObject(); 749 json.put(JsonTags.COORDINATOR_ACTION_ID, id); 750 json.put(JsonTags.COORDINATOR_JOB_ID, jobId); 751 json.put(JsonTags.COORDINATOR_ACTION_TYPE, type); 752 json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber); 753 json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf()); 754 json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils 755 .formatDateRfc822(getCreatedTime(), timeZoneId)); 756 json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils 757 .formatDateRfc822(getNominalTime(), timeZoneId)); 758 json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId); 759 // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils 760 // .formatDateRfc822(startTime), timeZoneId); 761 json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr); 762 json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf()); 763 json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, JsonUtils 764 .formatDateRfc822(getLastModifiedTime(), timeZoneId)); 765 // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils 766 // .formatDateRfc822(startTime), timeZoneId); 767 // json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils 768 // .formatDateRfc822(endTime), timeZoneId); 769 json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getMissingDependencies()); 770 json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushMissingDependencies()); 771 json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus); 772 json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri); 773 json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl); 774 json.put(JsonTags.COORDINATOR_ACTION_ERROR_CODE, errorCode); 775 json.put(JsonTags.COORDINATOR_ACTION_ERROR_MESSAGE, errorMessage); 776 json.put(JsonTags.TO_STRING, toString()); 777 return json; 778 } 779 780 /** 781 * Convert a nodes list into a JSONArray. 782 * 783 * @param actions nodes list. 784 * @param timeZoneId time zone to use for dates in the JSON array. 785 * @return the corresponding JSON array. 786 */ 787 @SuppressWarnings("unchecked") 788 public static JSONArray toJSONArray(List<CoordinatorActionBean> actions, String timeZoneId) { 789 JSONArray array = new JSONArray(); 790 for (CoordinatorActionBean action : actions) { 791 array.add(action.toJSONObject(timeZoneId)); 792 } 793 return array; 794 } 795 796 @Override 797 public int hashCode() { 798 final int prime = 31; 799 int result = 1; 800 result = prime * result + ((id == null) ? 0 : id.hashCode()); 801 return result; 802 } 803 804 @Override 805 public boolean equals(Object obj) { 806 if (this == obj) { 807 return true; 808 } 809 if (obj == null) { 810 return false; 811 } 812 if (getClass() != obj.getClass()) { 813 return false; 814 } 815 CoordinatorActionBean other = (CoordinatorActionBean) obj; 816 if (id == null) { 817 if (other.id != null) { 818 return false; 819 } 820 } 821 else if (!id.equals(other.id)) { 822 return false; 823 } 824 return true; 825 } 826 827 828}