001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.sql.Timestamp; 025import java.util.Date; 026 027import javax.persistence.Basic; 028import javax.persistence.Column; 029import javax.persistence.Entity; 030import javax.persistence.Id; 031import javax.persistence.NamedQueries; 032import javax.persistence.NamedQuery; 033import javax.persistence.Table; 034 035import org.apache.hadoop.io.Writable; 036import org.apache.oozie.client.Job.Status; 037import org.apache.oozie.client.rest.JsonBean; 038import org.apache.oozie.util.DateUtils; 039import org.apache.oozie.util.WritableUtils; 040import org.apache.openjpa.persistence.jdbc.Index; 041import org.json.simple.JSONObject; 042 043@Entity 044@Table(name = "BUNDLE_ACTIONS") 045@NamedQueries( { 046 @NamedQuery(name = "DELETE_BUNDLE_ACTION", query = "delete from BundleActionBean w where w.bundleActionId = :bundleActionId"), 047 048 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_PENDING_MODTIME", query = "update BundleActionBean w set w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.bundleActionId = :bundleActionId"), 049 050 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME", query = "update BundleActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.bundleActionId = :bundleActionId"), 051 052 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID", query = "update BundleActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending, w.coordId = :coordId where w.bundleActionId = :bundleActionId"), 053 054 @NamedQuery(name = "GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE", query = "select OBJECT(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> 'IGNORED'"), 055 056 @NamedQuery(name = "GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE", query = "select w.coordId, w.statusStr, w.pending from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> 'IGNORED'"), 057 058 @NamedQuery(name = "GET_BUNDLE_ACTIONS", query = "select OBJECT(w) from BundleActionBean w"), 059 060 @NamedQuery(name = "GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN", query = "select w.bundleActionId, w.bundleId, w.statusStr, w.coordId, w.coordName from BundleActionBean w where w.pending > 0 AND w.lastModifiedTimestamp <= :lastModifiedTime"), 061 062 @NamedQuery(name = "GET_BUNDLE_ACTION", query = "select OBJECT(w) from BundleActionBean w where w.bundleActionId = :bundleActionId"), 063 064 @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT", query = "select count(w) from BundleActionBean w"), 065 066 @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT_BY_JOB", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId"), 067 068 @NamedQuery(name = "GET_BUNDLE_ACTIONS_PENDING_TRUE_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.pending > 0"), 069 070 @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_EQUAL_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> :status"), 071 072 @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_TERMINATE_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'SUSPENDED' OR w.statusStr = 'SUSPENDEDWITHERROR' OR w.statusStr = 'PREPSUSPENDED' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.statusStr = 'PREPPAUSED')"), 073 074 @NamedQuery(name = "GET_BUNDLE_ACTIONS_FAILED_NULL_COORD_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr = 'FAILED' AND w.coordId IS NULL"), 075 076 @NamedQuery(name = "GET_BUNDLE_ACTIONS_OLDER_THAN", query = "select OBJECT(w) from BundleActionBean w order by w.lastModifiedTimestamp"), 077 078 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED' OR a.statusStr = 'DONEWITHERROR')"), 079 080 @NamedQuery(name = "DELETE_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId IN (:bundleId)")}) 081public class BundleActionBean implements Writable, JsonBean { 082 083 @Id 084 @Column(name = "bundle_action_id") 085 private String bundleActionId = null; 086 087 @Index 088 @Column(name = "bundle_id") 089 private String bundleId = null; 090 091 @Column(name = "coord_name") 092 private String coordName = null; 093 094 @Basic 095 @Column(name = "coord_id") 096 private String coordId = null; 097 098 @Basic 099 @Column(name = "status") 100 private String statusStr = null; 101 102 @Basic 103 @Column(name = "critical") 104 private int critical = 0; 105 106 @Basic 107 @Column(name = "pending") 108 private int pending = 0; 109 110 @Basic 111 @Column(name = "last_modified_time") 112 private java.sql.Timestamp lastModifiedTimestamp = null; 113 114 /** 115 * bundleActionId to set 116 * 117 * @param bundleActionId the bundleActionId to set 118 */ 119 public void setBundleActionId(String bundleActionId) { 120 this.bundleActionId = bundleActionId; 121 } 122 123 /** 124 * Get the Bundle Action Id. 125 * 126 * @return the bundleActionId 127 */ 128 public String getBundleActionId() { 129 return bundleActionId; 130 } 131 132 /** 133 * Get the BundleId 134 * 135 * @return bundleId 136 */ 137 public String getBundleId() { 138 return bundleId; 139 } 140 141 /** 142 * Set the Bundle Id. 143 * 144 * @param bundleId 145 */ 146 public void setBundleId(String bundleId) { 147 this.bundleId = bundleId; 148 } 149 150 /** 151 * Get the Coordinator name. 152 * 153 * @return coordName 154 */ 155 public String getCoordName() { 156 return coordName; 157 } 158 159 /** 160 * Set the Coordinator name. 161 * 162 * @param coordName 163 */ 164 public void setCoordName(String coordName) { 165 this.coordName = coordName; 166 } 167 168 /** 169 * Get the coordinator Id. 170 * 171 * @return the coordId 172 */ 173 public String getCoordId() { 174 return coordId; 175 } 176 177 /** 178 * Set the coordinator Id. 179 * 180 * @param coordId 181 */ 182 public void setCoordId(String coordId) { 183 this.coordId = coordId; 184 } 185 186 /** 187 * Get the Status of the Bundle Action 188 * 189 * @return status object 190 */ 191 public Status getStatus() { 192 return Status.valueOf(this.statusStr); 193 } 194 195 /** 196 * Get the Status of the Bundle Action 197 * 198 * @return status string 199 */ 200 public String getStatusStr() { 201 return statusStr; 202 } 203 204 /** 205 * Set the Status of the Bundle Action 206 * 207 * @return status string 208 */ 209 public void setStatusStr(String statusStr) { 210 this.statusStr = statusStr; 211 } 212 213 /** 214 * Set the Status of the Bundle Action 215 * 216 * @param val 217 */ 218 public void setStatus(Status val) { 219 this.statusStr = val.toString(); 220 } 221 222 /** 223 * Set Whether this bundle action is critical or not. 224 * 225 * @param critical set critical to true 226 */ 227 public void setCritical() { 228 this.critical = 1; 229 } 230 231 /** 232 * Reseset Whether this bundle action is critical or not. 233 * 234 * @param critical set critical to false 235 */ 236 public void resetCritical() { 237 this.critical = 0; 238 } 239 240 /** 241 * Return if the action is critical. 242 * 243 * @return if the action is critical. 244 */ 245 public boolean isCritical() { 246 return critical == 1 ? true : false; 247 } 248 249 /** 250 * Set some actions are in progress for particular bundle action. 251 * 252 * @param pending set pending to true 253 */ 254 public void setPending(int pending) { 255 this.pending = pending; 256 } 257 258 /** 259 * increment pending and return it 260 * 261 * @return pending 262 */ 263 public int incrementAndGetPending() { 264 this.pending++; 265 return pending; 266 } 267 268 /** 269 * decrement pending and return it 270 * 271 * @return pending 272 */ 273 public int decrementAndGetPending() { 274 this.pending = Math.max(this.pending-1, 0); 275 return pending; 276 } 277 278 /** 279 * Get some actions are in progress for particular bundle action. 280 * 281 * @return pending 282 */ 283 public int getPending() { 284 return this.pending; 285 } 286 287 /** 288 * Return if the action is pending. 289 * 290 * @return if the action is pending. 291 */ 292 public boolean isPending() { 293 return pending > 0 ? true : false; 294 } 295 296 /** 297 * @return true if in terminal status 298 */ 299 public boolean isTerminalStatus() { 300 boolean isTerminal = false; 301 switch (getStatus()) { 302 case SUCCEEDED: 303 case FAILED: 304 case KILLED: 305 case DONEWITHERROR: 306 isTerminal = true; 307 break; 308 default: 309 isTerminal = false; 310 break; 311 } 312 return isTerminal; 313 } 314 315 /** 316 * Set Last modified time. 317 * 318 * @param lastModifiedTimestamp the lastModifiedTimestamp to set 319 */ 320 public void setLastModifiedTimestamp(java.sql.Timestamp lastModifiedTimestamp) { 321 this.lastModifiedTimestamp = lastModifiedTimestamp; 322 } 323 324 /** 325 * Set Last modified time. 326 * 327 * @param lastModifiedTime the lastModifiedTime to set 328 */ 329 public void setLastModifiedTime(Date lastModifiedTime) { 330 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 331 } 332 333 /** 334 * Get Last modified time. 335 * 336 * @return lastModifiedTime 337 */ 338 public Date getLastModifiedTime() { 339 return DateUtils.toDate(lastModifiedTimestamp); 340 } 341 342 /** 343 * Get Last modified time. 344 * 345 * @return lastModifiedTimestamp 346 */ 347 public Timestamp getLastModifiedTimestamp() { 348 return lastModifiedTimestamp; 349 } 350 351 @Override 352 public void write(DataOutput dataOutput) throws IOException { 353 WritableUtils.writeStr(dataOutput, getBundleActionId()); 354 WritableUtils.writeStr(dataOutput, getBundleId()); 355 WritableUtils.writeStr(dataOutput, getCoordName()); 356 WritableUtils.writeStr(dataOutput, getCoordId()); 357 WritableUtils.writeStr(dataOutput, getStatusStr()); 358 dataOutput.writeInt(critical); 359 dataOutput.writeInt(pending); 360 dataOutput.writeLong((getLastModifiedTimestamp() != null) ? getLastModifiedTimestamp().getTime() : -1); 361 } 362 363 @Override 364 public void readFields(DataInput dataInput) throws IOException { 365 setBundleActionId(WritableUtils.readStr(dataInput)); 366 setBundleId(WritableUtils.readStr(dataInput)); 367 setCoordName(WritableUtils.readStr(dataInput)); 368 setCoordId(WritableUtils.readStr(dataInput)); 369 setStatus(Status.valueOf(WritableUtils.readStr(dataInput))); 370 critical = dataInput.readInt(); 371 pending = dataInput.readInt(); 372 long d = dataInput.readLong(); 373 if (d != -1) { 374 setLastModifiedTime(new Date(d)); 375 } 376 } 377 378 @Override 379 public JSONObject toJSONObject() { 380 return null; 381 } 382 383 @Override 384 public JSONObject toJSONObject(String timeZoneId) { 385 return null; 386 } 387}