001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import org.apache.oozie.workflow.WorkflowInstance; 021 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 022 import org.apache.oozie.client.rest.JsonWorkflowJob; 023 import org.apache.oozie.client.WorkflowJob; 024 import org.apache.oozie.util.DateUtils; 025 import org.apache.oozie.util.WritableUtils; 026 import org.apache.hadoop.io.Writable; 027 028 import java.io.DataInput; 029 import java.io.IOException; 030 import java.io.DataOutput; 031 import java.util.Date; 032 033 import javax.persistence.Entity; 034 import javax.persistence.Column; 035 import javax.persistence.NamedQueries; 036 import javax.persistence.NamedQuery; 037 import javax.persistence.Basic; 038 import javax.persistence.Lob; 039 040 import java.sql.Timestamp; 041 042 import org.apache.openjpa.persistence.jdbc.Index; 043 044 @Entity 045 @NamedQueries({ 046 047 @NamedQuery(name = "UPDATE_WORKFLOW", query = "update WorkflowJobBean w set w.appName = :appName, w.appPath = :appPath, w.conf = :conf, w.group = :groupName, w.run = :run, w.user = :user, w.createdTimestamp = :createdTime, w.endTimestamp = :endTime, w.externalId = :externalId, w.lastModifiedTimestamp = :lastModTime,w.logToken = :logToken, w.protoActionConf = :protoActionConf, w.slaXml =:slaXml, w.startTimestamp = :startTime, w.status = :status, w.wfInstance = :wfInstance where w.id = :id"), 048 049 @NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean w where w.id = :id"), 050 051 @NamedQuery(name = "GET_WORKFLOWS", query = "select OBJECT(w) from WorkflowJobBean w order by w.startTimestamp desc"), 052 053 @NamedQuery(name = "GET_WORKFLOWS_COLUMNS", query = "select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, " 054 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId from WorkflowJobBean w order by w.createdTimestamp desc"), 055 056 @NamedQuery(name = "GET_WORKFLOWS_COUNT", query = "select count(w) from WorkflowJobBean w"), 057 058 @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_OLDER_THAN", query = "select w from WorkflowJobBean w where w.endTimestamp < :endTime"), 059 060 @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_WITH_NO_PARENT_OLDER_THAN", query = "select w.id from WorkflowJobBean w where w.endTimestamp < :endTime and w.parentId is null"), 061 062 @NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"), 063 064 @NamedQuery(name = "GET_WORKFLOW_FOR_UPDATE", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"), 065 066 @NamedQuery(name = "GET_WORKFLOW_FOR_SLA", query = "select w.id, w.status, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"), 067 068 @NamedQuery(name = "GET_WORKFLOW_ID_FOR_EXTERNAL_ID", query = "select w.id from WorkflowJobBean w where w.externalId = :externalId"), 069 070 @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS", query = "select count(w) from WorkflowJobBean w where w.status = :status"), 071 072 @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS", query = "select count(w) from WorkflowJobBean w where w.status = :status and w.lastModifiedTimestamp > :lastModTime"), 073 074 @NamedQuery(name = "GET_WORKFLOWS_WITH_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId = :parentId"), 075 076 @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.status = 'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >= :endTime)"), 077 078 @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from WorkflowJobBean w where w.id = :id") 079 }) 080 public class WorkflowJobBean extends JsonWorkflowJob implements Writable { 081 082 @Column(name = "proto_action_conf") 083 @Lob 084 private String protoActionConf = null; 085 086 @Basic 087 @Column(name = "log_token") 088 private String logToken = null; 089 090 @Basic 091 @Index 092 @Column(name = "external_id") 093 private String externalId = null; 094 095 @Basic 096 @Index 097 @Column(name = "status") 098 private String status = WorkflowJob.Status.PREP.toString(); 099 100 @Basic 101 @Column(name = "created_time") 102 private java.sql.Timestamp createdTimestamp = null; 103 104 @Basic 105 @Column(name = "start_time") 106 private java.sql.Timestamp startTimestamp = null; 107 108 @Basic 109 @Index 110 @Column(name = "end_time") 111 private java.sql.Timestamp endTimestamp = null; 112 113 @Basic 114 @Index 115 @Column(name = "last_modified_time") 116 private java.sql.Timestamp lastModifiedTimestamp = null; 117 118 // @Basic(fetch = FetchType.LAZY) 119 // @Column(name="wfinstance",columnDefinition="blob") 120 @Column(name = "wf_instance") 121 @Lob 122 private byte[] wfInstance = null; 123 124 @Column(name = "sla_xml") 125 @Lob 126 private String slaXml = null; 127 128 /** 129 * Default constructor. 130 */ 131 public WorkflowJobBean() { 132 } 133 134 /** 135 * Serialize the workflow bean to a data output. 136 * 137 * @param dataOutput data output. 138 * @throws IOException thrown if the workflow bean could not be serialized. 139 */ 140 public void write(DataOutput dataOutput) throws IOException { 141 WritableUtils.writeStr(dataOutput, getAppPath()); 142 WritableUtils.writeStr(dataOutput, getAppName()); 143 WritableUtils.writeStr(dataOutput, getId()); 144 WritableUtils.writeStr(dataOutput, getParentId()); 145 WritableUtils.writeStr(dataOutput, getConf()); 146 WritableUtils.writeStr(dataOutput, getStatusStr()); 147 dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1); 148 dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1); 149 dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1); 150 dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1); 151 WritableUtils.writeStr(dataOutput, getUser()); 152 WritableUtils.writeStr(dataOutput, getGroup()); 153 dataOutput.writeInt(getRun()); 154 WritableUtils.writeStr(dataOutput, logToken); 155 WritableUtils.writeStr(dataOutput, protoActionConf); 156 } 157 158 /** 159 * Deserialize a workflow bean from a data input. 160 * 161 * @param dataInput data input. 162 * @throws IOException thrown if the workflow bean could not be deserialized. 163 */ 164 public void readFields(DataInput dataInput) throws IOException { 165 setAppPath(WritableUtils.readStr(dataInput)); 166 setAppName(WritableUtils.readStr(dataInput)); 167 setId(WritableUtils.readStr(dataInput)); 168 setParentId(WritableUtils.readStr(dataInput)); 169 setConf(WritableUtils.readStr(dataInput)); 170 setStatus(WorkflowJob.Status.valueOf(WritableUtils.readStr(dataInput))); 171 // setStatus(WritableUtils.readStr(dataInput)); 172 long d = dataInput.readLong(); 173 if (d != -1) { 174 setCreatedTime(new Date(d)); 175 } 176 d = dataInput.readLong(); 177 if (d != -1) { 178 } 179 setStartTime(new Date(d)); 180 d = dataInput.readLong(); 181 if (d != -1) { 182 setLastModifiedTime(new Date(d)); 183 } 184 d = dataInput.readLong(); 185 if (d != -1) { 186 setEndTime(new Date(d)); 187 } 188 setUser(WritableUtils.readStr(dataInput)); 189 setGroup(WritableUtils.readStr(dataInput)); 190 setRun(dataInput.readInt()); 191 logToken = WritableUtils.readStr(dataInput); 192 protoActionConf = WritableUtils.readStr(dataInput); 193 setExternalId(getExternalId()); 194 setProtoActionConf(protoActionConf); 195 } 196 197 public boolean inTerminalState() { 198 boolean inTerminalState = false; 199 switch (WorkflowJob.Status.valueOf(status)) { 200 case FAILED: 201 case KILLED: 202 case SUCCEEDED: 203 inTerminalState = true; 204 break; 205 default: 206 break; 207 } 208 return inTerminalState; 209 } 210 211 public String getLogToken() { 212 return logToken; 213 } 214 215 public void setLogToken(String logToken) { 216 this.logToken = logToken; 217 } 218 219 public String getSlaXml() { 220 return slaXml; 221 } 222 223 public void setSlaXml(String slaXml) { 224 this.slaXml = slaXml; 225 } 226 227 public WorkflowInstance getWorkflowInstance() { 228 return get(this.wfInstance); 229 } 230 231 public byte[] getWfInstance() { 232 return wfInstance; 233 } 234 235 public void setWorkflowInstance(WorkflowInstance workflowInstance) { 236 setWfInstance(workflowInstance); 237 } 238 239 public void setWfInstance(byte[] wfInstance) { 240 this.wfInstance = wfInstance; 241 } 242 243 public void setWfInstance(WorkflowInstance wfInstance) { 244 this.wfInstance = WritableUtils.toByteArray((LiteWorkflowInstance) wfInstance); 245 } 246 247 public String getProtoActionConf() { 248 return protoActionConf; 249 } 250 251 public void setProtoActionConf(String protoActionConf) { 252 this.protoActionConf = protoActionConf; 253 } 254 255 public String getprotoActionConf() { 256 return protoActionConf; 257 } 258 259 public String getlogToken() { 260 return logToken; 261 } 262 263 public String getStatusStr() { 264 return status; 265 } 266 267 public Timestamp getLastModifiedTimestamp() { 268 return lastModifiedTimestamp; 269 } 270 271 public Timestamp getStartTimestamp() { 272 return startTimestamp; 273 } 274 275 public Timestamp getCreatedTimestamp() { 276 return createdTimestamp; 277 } 278 279 public Timestamp getEndTimestamp() { 280 return endTimestamp; 281 } 282 283 @Override 284 public void setAppName(String val) { 285 super.setAppName(val); 286 } 287 288 @Override 289 public void setAppPath(String val) { 290 super.setAppPath(val); 291 } 292 293 @Override 294 public void setConf(String val) { 295 super.setConf(val); 296 } 297 298 @Override 299 public void setStatus(Status val) { 300 super.setStatus(val); 301 this.status = val.toString(); 302 } 303 304 @Override 305 public Status getStatus() { 306 return Status.valueOf(this.status); 307 } 308 309 @Override 310 public void setExternalId(String externalId) { 311 super.setExternalId(externalId); 312 this.externalId = externalId; 313 } 314 315 @Override 316 public String getExternalId() { 317 return externalId; 318 } 319 320 @Override 321 public void setLastModifiedTime(Date lastModifiedTime) { 322 super.setLastModifiedTime(lastModifiedTime); 323 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 324 } 325 326 @Override 327 public Date getLastModifiedTime() { 328 return DateUtils.toDate(lastModifiedTimestamp); 329 } 330 331 @Override 332 public Date getCreatedTime() { 333 return DateUtils.toDate(createdTimestamp); 334 } 335 336 @Override 337 public void setCreatedTime(Date createdTime) { 338 super.setCreatedTime(createdTime); 339 this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime); 340 } 341 342 @Override 343 public Date getStartTime() { 344 return DateUtils.toDate(startTimestamp); 345 } 346 347 @Override 348 public void setStartTime(Date startTime) { 349 super.setStartTime(startTime); 350 this.startTimestamp = DateUtils.convertDateToTimestamp(startTime); 351 } 352 353 @Override 354 public Date getEndTime() { 355 return DateUtils.toDate(endTimestamp); 356 } 357 358 @Override 359 public void setEndTime(Date endTime) { 360 super.setEndTime(endTime); 361 this.endTimestamp = DateUtils.convertDateToTimestamp(endTime); 362 } 363 364 private WorkflowInstance get(byte[] array) { 365 LiteWorkflowInstance pInstance = WritableUtils.fromByteArray(array, LiteWorkflowInstance.class); 366 return pInstance; 367 } 368 369 }