001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.sql.Timestamp; 025import java.text.MessageFormat; 026import java.util.Date; 027import java.util.List; 028 029import javax.persistence.Basic; 030import javax.persistence.Column; 031import javax.persistence.Entity; 032import javax.persistence.Id; 033import javax.persistence.Lob; 034import javax.persistence.NamedQueries; 035import javax.persistence.NamedQuery; 036import javax.persistence.Table; 037import javax.persistence.Transient; 038 039import org.apache.hadoop.io.Writable; 040import org.apache.oozie.client.CoordinatorAction; 041import org.apache.oozie.client.rest.JsonBean; 042import org.apache.oozie.client.rest.JsonTags; 043import org.apache.oozie.client.rest.JsonUtils; 044import org.apache.oozie.coord.input.dependency.CoordInputDependency; 045import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory; 046import org.apache.oozie.util.DateUtils; 047import org.apache.oozie.util.WritableUtils; 048import org.apache.openjpa.persistence.jdbc.Index; 049import org.apache.openjpa.persistence.jdbc.Strategy; 050import org.json.simple.JSONArray; 051import org.json.simple.JSONObject; 052 053 054@Entity 055@NamedQueries({ 056 057 @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.statusStr = :status where w.id = :id"), 058 059 @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.statusStr = :status where w.id = :id"), 060 // Query to update the action status, pending status and last modified time stamp of a Coordinator action 061 @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.statusStr =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 062 // Update query for InputCheck 063 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"), 064 // Update query for Push-based missing dependency check 065 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"), 066 // Update query for Push-based missing dependency check 067 @NamedQuery(name = "UPDATE_COORD_ACTION_DEPENDENCIES", query = "update CoordinatorActionBean w set w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"), 068 // Update query for Start 069 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.statusStr =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"), 070 071 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 072 073 @NamedQuery(name = "UPDATE_COORD_ACTION_RERUN", query = "update CoordinatorActionBean w set w.actionXml =:actionXml, w.statusStr = :status, w.externalId = :externalId, w.externalStatus = :externalStatus, w.rerunTimestamp = :rerunTime, w.lastModifiedTimestamp = :lastModifiedTime, w.createdTimestamp = :createdTime, w.createdConf = :createdConf, w.runConf = :runConf, w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"), 074 075 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED')"), 076 077 @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id IN (:actionId)"), 078 079 @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.statusStr = 'WAITING' OR a.statusStr = 'READY')"), 080 081 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_COORDINATOR", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId"), 082 083 // Query used by XTestcase to setup tables 084 @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"), 085 // Select query used only by test cases 086 @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"), 087 088 // Select query used by SLAService on restart 089 @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"), 090 // Select query used by ActionInfo command 091 @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"), 092 // Select Query used by Timeout and skip commands 093 @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 094 // Select query used by InputCheck command 095 @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.actionNumber, a.jobId, a.statusStr, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut, a.externalId from CoordinatorActionBean a where a.id = :id"), 096 // Select query used by CoordActionUpdate command 097 @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"), 098 // Select query used by Check command 099 @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 100 // Select query used by Start command 101 @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.statusStr, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 102 103 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.actionNumber, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp"), 104 105 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.actionNumber, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp desc"), 106 107 @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'RUNNING' OR a.statusStr='SUBMITTED')"), 108 109 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 110 111 @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'WAITING'"), 112 113 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED')"), 114 115 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.statusStr = :status"), 116 117 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 118 // Query to retrieve Coordinator actions sorted by nominal time 119 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"), 120 // Query to maintain backward compatibility for coord job info command 121 @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"), 122 // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions 123 @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'FAILED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'IGNORED'"), 124 125 // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions 126 @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'RUNNING'"), 127 128 // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions 129 @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'SUSPENDED'"), 130 131 // Query to retrieve count of Coordinator actions which are pending 132 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"), 133 134 // Query to retrieve status of Coordinator actions 135 @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"), 136 137 // Query to retrieve status of Coordinator actions 138 @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"), 139 140 @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"), 141 142 @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"), 143 144 //Used by coordinator store only 145 @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'RUNNING'"), 146 147 @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"), 148 149 @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'READY') AND a.lastModifiedTimestamp <= :lastModifiedTime and a.nominalTimestamp <= :currentTime and a.jobId in ( select w.id from CoordinatorJobBean w where w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR')"), 150 151 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 152 // Select query used by rerun, requires almost all columns so select * is used 153 @NamedQuery(name = "GET_TERMINATED_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 154 // Select query used by log 155 @NamedQuery(name = "GET_TERMINATED_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 156 // Select query used by rerun, requires almost all columns so select * is used 157 @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"), 158 159 @NamedQuery(name = "GET_ACTIVE_ACTIONS_FOR_DATES", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'WAITING' OR a.statusStr = 'READY' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'RUNNING' OR a.statusStr = 'SUSPENDED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 160 161 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"), 162 163 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE", query = "select count(w) from CoordinatorActionBean w where w.statusStr = 'RUNNING' and w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"), 164 165 @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"), 166 167 @NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.id in (:ids) and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')"), 168 169 @NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')") 170 }) 171 172@Table(name = "COORD_ACTIONS") 173public class CoordinatorActionBean implements 174 Writable,CoordinatorAction,JsonBean { 175 176 @Id 177 private String id; 178 179 @Basic 180 @Index 181 @Column(name = "job_id") 182 private String jobId; 183 184 @Basic 185 @Index 186 @Column(name = "status") 187 private String statusStr = CoordinatorAction.Status.WAITING.toString(); 188 189 @Basic 190 @Index 191 @Column(name = "nominal_time") 192 private java.sql.Timestamp nominalTimestamp = null; 193 194 @Basic 195 @Index 196 @Column(name = "last_modified_time") 197 private java.sql.Timestamp lastModifiedTimestamp = null; 198 199 @Basic 200 @Index 201 @Column(name = "created_time") 202 private java.sql.Timestamp createdTimestamp = null; 203 204 @Basic 205 @Index 206 @Column(name = "rerun_time") 207 private java.sql.Timestamp rerunTimestamp = null; 208 209 @Basic 210 @Index 211 @Column(name = "external_id") 212 private String externalId; 213 214 @Basic 215 @Column(name = "sla_xml") 216 @Lob 217 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 218 private StringBlob slaXml = null; 219 220 @Basic 221 @Column(name = "pending") 222 private int pending = 0; 223 224 @Basic 225 @Column(name = "job_type") 226 private String type; 227 228 @Basic 229 @Column(name = "action_number") 230 private int actionNumber; 231 232 @Basic 233 @Column(name = "created_conf") 234 @Lob 235 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 236 private StringBlob createdConf; 237 238 @Basic 239 @Column(name = "time_out") 240 private int timeOut = 0; 241 242 @Basic 243 @Column(name = "run_conf") 244 @Lob 245 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 246 private StringBlob runConf; 247 248 @Basic 249 @Column(name = "action_xml") 250 @Lob 251 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 252 private StringBlob actionXml; 253 254 @Basic 255 @Column(name = "missing_dependencies") 256 @Lob 257 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 258 private StringBlob missingDependencies; 259 260 @Basic 261 @Column(name = "push_missing_dependencies") 262 @Lob 263 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 264 private StringBlob pushMissingDependencies; 265 266 @Basic 267 @Column(name = "external_status") 268 private String externalStatus; 269 270 @Basic 271 @Column(name = "tracker_uri") 272 private String trackerUri; 273 274 @Basic 275 @Column(name = "console_url") 276 private String consoleUrl; 277 278 @Basic 279 @Column(name = "error_code") 280 private String errorCode; 281 282 @Basic 283 @Column(name = "error_message") 284 private String errorMessage; 285 286 @SuppressWarnings("unchecked") 287 public JSONObject toJSONObject() { 288 return toJSONObject("GMT"); 289 } 290 291 @Transient 292 private CoordInputDependency coordPushInputDependency; 293 294 @Transient 295 private CoordInputDependency coordPullInputDependency; 296 297 298 public CoordinatorActionBean() { 299 } 300 301 /** 302 * Serialize the coordinator bean to a data output. 303 * 304 * @param dataOutput data output. 305 * @throws IOException thrown if the coordinator bean could not be 306 * serialized. 307 */ 308 @Override 309 public void write(DataOutput dataOutput) throws IOException { 310 WritableUtils.writeStr(dataOutput, getJobId()); 311 WritableUtils.writeStr(dataOutput, getType()); 312 WritableUtils.writeStr(dataOutput, getId()); 313 WritableUtils.writeStr(dataOutput, getCreatedConf()); 314 WritableUtils.writeStr(dataOutput, getStatus().toString()); 315 dataOutput.writeInt(getActionNumber()); 316 WritableUtils.writeStr(dataOutput, getRunConf()); 317 WritableUtils.writeStr(dataOutput, getExternalStatus()); 318 WritableUtils.writeStr(dataOutput, getTrackerUri()); 319 WritableUtils.writeStr(dataOutput, getConsoleUrl()); 320 WritableUtils.writeStr(dataOutput, getErrorCode()); 321 WritableUtils.writeStr(dataOutput, getErrorMessage()); 322 dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1); 323 dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1); 324 } 325 326 /** 327 * Deserialize a coordinator bean from a data input. 328 * 329 * @param dataInput data input. 330 * @throws IOException thrown if the workflow bean could not be 331 * deserialized. 332 */ 333 @Override 334 public void readFields(DataInput dataInput) throws IOException { 335 setJobId(WritableUtils.readStr(dataInput)); 336 setType(WritableUtils.readStr(dataInput)); 337 setId(WritableUtils.readStr(dataInput)); 338 setCreatedConf(WritableUtils.readStr(dataInput)); 339 setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput))); 340 setActionNumber(dataInput.readInt()); 341 setRunConf(WritableUtils.readStr(dataInput)); 342 setExternalStatus(WritableUtils.readStr(dataInput)); 343 setTrackerUri(WritableUtils.readStr(dataInput)); 344 setConsoleUrl(WritableUtils.readStr(dataInput)); 345 setErrorCode(WritableUtils.readStr(dataInput)); 346 setErrorMessage(WritableUtils.readStr(dataInput)); 347 long d = dataInput.readLong(); 348 if (d != -1) { 349 setCreatedTime(new Date(d)); 350 } 351 d = dataInput.readLong(); 352 if (d != -1) { 353 setLastModifiedTime(new Date(d)); 354 } 355 } 356 357 @Override 358 public String getJobId() { 359 return this.jobId; 360 } 361 362 public void setJobId(String id) { 363 this.jobId = id; 364 } 365 366 @Override 367 public Status getStatus() { 368 return Status.valueOf(statusStr); 369 } 370 371 /** 372 * Return the status in string 373 * @return 374 */ 375 public String getStatusStr() { 376 return statusStr; 377 } 378 379 public void setStatus(Status status) { 380 this.statusStr = status.toString(); 381 } 382 383 public void setStatusStr(String statusStr) { 384 this.statusStr = statusStr; 385 } 386 387 public void setCreatedTime(Date createdTime) { 388 this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime); 389 } 390 391 public void setRerunTime(Date rerunTime) { 392 this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime); 393 } 394 395 public void setNominalTime(Date nominalTime) { 396 this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime); 397 } 398 399 public void setLastModifiedTime(Date lastModifiedTime) { 400 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 401 } 402 403 public Date getCreatedTime() { 404 return DateUtils.toDate(createdTimestamp); 405 } 406 407 public Timestamp getCreatedTimestamp() { 408 return createdTimestamp; 409 } 410 411 public Date getRerunTime() { 412 return DateUtils.toDate(rerunTimestamp); 413 } 414 415 public Timestamp getRerunTimestamp() { 416 return rerunTimestamp; 417 } 418 419 @Override 420 public Date getLastModifiedTime() { 421 return DateUtils.toDate(lastModifiedTimestamp); 422 } 423 424 public Timestamp getLastModifiedTimestamp() { 425 return lastModifiedTimestamp; 426 } 427 428 @Override 429 public Date getNominalTime() { 430 return DateUtils.toDate(nominalTimestamp); 431 } 432 433 public Timestamp getNominalTimestamp() { 434 return nominalTimestamp; 435 } 436 437 @Override 438 public String getExternalId() { 439 return externalId; 440 } 441 442 public void setExternalId(String externalId) { 443 this.externalId = externalId; 444 } 445 446 public StringBlob getSlaXmlBlob() { 447 return slaXml; 448 } 449 450 public void setSlaXmlBlob(StringBlob slaXml) { 451 this.slaXml = slaXml; 452 } 453 454 public String getSlaXml() { 455 return slaXml == null ? null : slaXml.getString(); 456 } 457 458 public void setSlaXml(String slaXml) { 459 if (this.slaXml == null) { 460 this.slaXml = new StringBlob(slaXml); 461 } 462 else { 463 this.slaXml.setString(slaXml); 464 } 465 } 466 467 /** 468 * @return true if in terminal status 469 */ 470 public boolean isTerminalStatus() { 471 boolean isTerminal = true; 472 switch (getStatus()) { 473 case WAITING: 474 case READY: 475 case SUBMITTED: 476 case RUNNING: 477 case SUSPENDED: 478 isTerminal = false; 479 break; 480 default: 481 isTerminal = true; 482 break; 483 } 484 return isTerminal; 485 } 486 487 /** 488 * Return if the action is complete with failure. 489 * 490 * @return if the action is complete with failure. 491 */ 492 public boolean isTerminalWithFailure() { 493 boolean result = false; 494 switch (getStatus()) { 495 case FAILED: 496 case KILLED: 497 case TIMEDOUT: 498 result = true; 499 } 500 return result; 501 } 502 503 /** 504 * Set some actions are in progress for particular coordinator action. 505 * 506 * @param pending set pending to true 507 */ 508 public void setPending(int pending) { 509 this.pending = pending; 510 } 511 512 /** 513 * increment pending and return it 514 * 515 * @return pending 516 */ 517 public int incrementAndGetPending() { 518 this.pending++; 519 return pending; 520 } 521 522 /** 523 * decrement pending and return it 524 * 525 * @return pending 526 */ 527 public int decrementAndGetPending() { 528 this.pending = Math.max(this.pending - 1, 0); 529 return pending; 530 } 531 532 /** 533 * Get some actions are in progress for particular bundle action. 534 * 535 * @return pending 536 */ 537 public int getPending() { 538 return this.pending; 539 } 540 541 /** 542 * Return if the action is pending. 543 * 544 * @return if the action is pending. 545 */ 546 public boolean isPending() { 547 return pending > 0 ? true : false; 548 } 549 550 @Override 551 public String getId() { 552 return id; 553 } 554 555 public void setId(String id) { 556 this.id = id; 557 } 558 559 public String getType() { 560 return type; 561 } 562 563 public void setType(String type) { 564 this.type = type; 565 } 566 567 public void setActionNumber(int actionNumber) { 568 this.actionNumber = actionNumber; 569 } 570 571 @Override 572 public int getActionNumber() { 573 return actionNumber; 574 } 575 576 @Override 577 public String getCreatedConf() { 578 return createdConf == null ? null : createdConf.getString(); 579 } 580 581 public void setCreatedConf(String createdConf) { 582 if (this.createdConf == null) { 583 this.createdConf = new StringBlob(createdConf); 584 } 585 else { 586 this.createdConf.setString(createdConf); 587 } 588 } 589 590 public void setCreatedConfBlob(StringBlob createdConf) { 591 this.createdConf = createdConf; 592 } 593 594 public StringBlob getCreatedConfBlob() { 595 return createdConf; 596 } 597 598 public void setRunConf(String runConf) { 599 if (this.runConf == null) { 600 this.runConf = new StringBlob(runConf); 601 } 602 else { 603 this.runConf.setString(runConf); 604 } 605 } 606 607 @Override 608 public String getRunConf() { 609 return runConf == null ? null : runConf.getString(); 610 } 611 612 public void setRunConfBlob(StringBlob runConf) { 613 this.runConf = runConf; 614 } 615 616 public StringBlob getRunConfBlob() { 617 return runConf; 618 } 619 620 621 public void setMissingDependencies(String missingDependencies) { 622 if (this.missingDependencies == null) { 623 this.missingDependencies = new StringBlob(missingDependencies); 624 } 625 else { 626 this.missingDependencies.setString(missingDependencies); 627 } 628 } 629 630 @Override 631 public String getMissingDependencies() { 632 return missingDependencies == null ? null : missingDependencies.getString(); 633 } 634 635 public void setMissingDependenciesBlob(StringBlob missingDependencies) { 636 this.missingDependencies = missingDependencies; 637 } 638 639 public StringBlob getMissingDependenciesBlob() { 640 return missingDependencies; 641 } 642 643 @Override 644 public String getPushMissingDependencies() { 645 return pushMissingDependencies == null ? null : pushMissingDependencies.getString(); 646 } 647 648 public void setPushMissingDependencies(String pushMissingDependencies) { 649 if (this.pushMissingDependencies == null) { 650 this.pushMissingDependencies = new StringBlob(pushMissingDependencies); 651 } 652 else { 653 this.pushMissingDependencies.setString(pushMissingDependencies); 654 } 655 } 656 657 public void setPushMissingDependenciesBlob(StringBlob pushMissingDependencies) { 658 this.pushMissingDependencies = pushMissingDependencies; 659 } 660 661 public StringBlob getPushMissingDependenciesBlob() { 662 return pushMissingDependencies; 663 } 664 665 public String getExternalStatus() { 666 return externalStatus; 667 } 668 669 public void setExternalStatus(String externalStatus) { 670 this.externalStatus = externalStatus; 671 } 672 673 @Override 674 public String getTrackerUri() { 675 return trackerUri; 676 } 677 678 public void setTrackerUri(String trackerUri) { 679 this.trackerUri = trackerUri; 680 } 681 682 @Override 683 public String getConsoleUrl() { 684 return consoleUrl; 685 } 686 687 public void setConsoleUrl(String consoleUrl) { 688 this.consoleUrl = consoleUrl; 689 } 690 691 @Override 692 public String getErrorCode() { 693 return errorCode; 694 } 695 696 @Override 697 public String getErrorMessage() { 698 return errorMessage; 699 } 700 701 public void setErrorInfo(String errorCode, String errorMessage) { 702 this.errorCode = errorCode; 703 this.errorMessage = errorMessage; 704 } 705 706 public String getActionXml() { 707 return actionXml == null ? null : actionXml.getString(); 708 } 709 710 public void setActionXml(String actionXml) { 711 if (this.actionXml == null) { 712 this.actionXml = new StringBlob(actionXml); 713 } 714 else { 715 this.actionXml.setString(actionXml); 716 } 717 } 718 719 public void setActionXmlBlob(StringBlob actionXml) { 720 this.actionXml = actionXml; 721 } 722 723 public StringBlob getActionXmlBlob() { 724 return actionXml; 725 } 726 727 @Override 728 public String toString() { 729 return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]", 730 getId(), getStatus()); 731 } 732 733 public int getTimeOut() { 734 return timeOut; 735 } 736 737 public void setTimeOut(int timeOut) { 738 this.timeOut = timeOut; 739 } 740 741 742 public void setErrorCode(String errorCode) { 743 this.errorCode = errorCode; 744 } 745 746 public void setErrorMessage(String errorMessage) { 747 this.errorMessage = errorMessage; 748 } 749 750 @SuppressWarnings("unchecked") 751 public JSONObject toJSONObject(String timeZoneId) { 752 JSONObject json = new JSONObject(); 753 json.put(JsonTags.COORDINATOR_ACTION_ID, id); 754 json.put(JsonTags.COORDINATOR_JOB_ID, jobId); 755 json.put(JsonTags.COORDINATOR_ACTION_TYPE, type); 756 json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber); 757 json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf()); 758 json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId)); 759 json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils.formatDateRfc822(getNominalTime(), timeZoneId)); 760 json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId); 761 // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils 762 // .formatDateRfc822(startTime), timeZoneId); 763 json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr); 764 json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf()); 765 json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, 766 JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId)); 767 // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils 768 // .formatDateRfc822(startTime), timeZoneId); 769 // json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils 770 // .formatDateRfc822(endTime), timeZoneId); 771 json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getPullInputDependencies().getMissingDependencies()); 772 json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushInputDependencies().getMissingDependencies()); 773 json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus); 774 json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri); 775 json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl); 776 json.put(JsonTags.COORDINATOR_ACTION_ERROR_CODE, errorCode); 777 json.put(JsonTags.COORDINATOR_ACTION_ERROR_MESSAGE, errorMessage); 778 json.put(JsonTags.TO_STRING, toString()); 779 return json; 780 } 781 782 /** 783 * Convert a nodes list into a JSONArray. 784 * 785 * @param actions nodes list. 786 * @param timeZoneId time zone to use for dates in the JSON array. 787 * @return the corresponding JSON array. 788 */ 789 @SuppressWarnings("unchecked") 790 public static JSONArray toJSONArray(List<CoordinatorActionBean> actions, String timeZoneId) { 791 JSONArray array = new JSONArray(); 792 for (CoordinatorActionBean action : actions) { 793 array.add(action.toJSONObject(timeZoneId)); 794 } 795 return array; 796 } 797 798 @Override 799 public int hashCode() { 800 final int prime = 31; 801 int result = 1; 802 result = prime * result + ((id == null) ? 0 : id.hashCode()); 803 return result; 804 } 805 806 @Override 807 public boolean equals(Object obj) { 808 if (this == obj) { 809 return true; 810 } 811 if (obj == null) { 812 return false; 813 } 814 if (getClass() != obj.getClass()) { 815 return false; 816 } 817 CoordinatorActionBean other = (CoordinatorActionBean) obj; 818 if (id == null) { 819 if (other.id != null) { 820 return false; 821 } 822 } 823 else if (!id.equals(other.id)) { 824 return false; 825 } 826 return true; 827 } 828 829 public CoordInputDependency getPullInputDependencies() { 830 if (coordPullInputDependency == null) { 831 coordPullInputDependency = CoordInputDependencyFactory.getPullInputDependencies(missingDependencies); 832 } 833 return coordPullInputDependency; 834 835 } 836 837 public CoordInputDependency getPushInputDependencies() { 838 if (coordPushInputDependency == null) { 839 coordPushInputDependency = CoordInputDependencyFactory.getPushInputDependencies(pushMissingDependencies); 840 } 841 return coordPushInputDependency; 842 } 843 844 public void setPullInputDependencies(CoordInputDependency coordPullInputDependency) { 845 this.coordPullInputDependency = coordPullInputDependency; 846 } 847 848 public void setPushInputDependencies(CoordInputDependency coordPushInputDependency) { 849 this.coordPushInputDependency = coordPushInputDependency; 850 } 851 852}