001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.List;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.BinaryBlob;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.StringBlob;
031import org.apache.oozie.WorkflowJobBean;
032import org.apache.oozie.service.JPAService;
033import org.apache.oozie.service.Services;
034import org.apache.oozie.util.DateUtils;
035
036/**
037 * Query Executor that provides API to run query for Workflow Job
038 */
039public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, WorkflowJobQueryExecutor.WorkflowJobQuery> {
040
041    public enum WorkflowJobQuery {
042        UPDATE_WORKFLOW,
043        UPDATE_WORKFLOW_MODTIME,
044        UPDATE_WORKFLOW_STATUS_MODTIME,
045        UPDATE_WORKFLOW_PARENT_MODIFIED,
046        UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
047        UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END,
048        UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END,
049        UPDATE_WORKFLOW_RERUN,
050        GET_WORKFLOW,
051        GET_WORKFLOW_STARTTIME,
052        GET_WORKFLOW_START_END_TIME,
053        GET_WORKFLOW_USER_GROUP,
054        GET_WORKFLOW_SUSPEND,
055        GET_WORKFLOW_ACTION_OP,
056        GET_WORKFLOW_RERUN,
057        GET_WORKFLOW_DEFINITION,
058        GET_WORKFLOW_KILL,
059        GET_WORKFLOW_RESUME,
060        GET_WORKFLOW_STATUS,
061        GET_WORKFLOWS_PARENT_COORD_RERUN,
062        GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN,
063        GET_WORKFLOW_FOR_SLA
064    };
065
066    private static WorkflowJobQueryExecutor instance = new WorkflowJobQueryExecutor();
067
068    private WorkflowJobQueryExecutor() {
069    }
070
071    public static QueryExecutor<WorkflowJobBean, WorkflowJobQueryExecutor.WorkflowJobQuery> getInstance() {
072        return WorkflowJobQueryExecutor.instance;
073    }
074
075    @Override
076    public Query getUpdateQuery(WorkflowJobQuery namedQuery, WorkflowJobBean wfBean, EntityManager em)
077            throws JPAExecutorException {
078
079        Query query = em.createNamedQuery(namedQuery.name());
080        switch (namedQuery) {
081            case UPDATE_WORKFLOW:
082                query.setParameter("appName", wfBean.getAppName());
083                query.setParameter("appPath", wfBean.getAppPath());
084                query.setParameter("conf", wfBean.getConfBlob());
085                query.setParameter("groupName", wfBean.getGroup());
086                query.setParameter("run", wfBean.getRun());
087                query.setParameter("user", wfBean.getUser());
088                query.setParameter("createdTime", wfBean.getCreatedTimestamp());
089                query.setParameter("endTime", wfBean.getEndTimestamp());
090                query.setParameter("externalId", wfBean.getExternalId());
091                query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp());
092                query.setParameter("logToken", wfBean.getLogToken());
093                query.setParameter("protoActionConf", wfBean.getProtoActionConfBlob());
094                query.setParameter("slaXml", wfBean.getSlaXmlBlob());
095                query.setParameter("startTime", wfBean.getStartTimestamp());
096                query.setParameter("status", wfBean.getStatusStr());
097                query.setParameter("wfInstance", wfBean.getWfInstanceBlob());
098                query.setParameter("id", wfBean.getId());
099                break;
100            case UPDATE_WORKFLOW_MODTIME:
101                query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp());
102                query.setParameter("id", wfBean.getId());
103                break;
104            case UPDATE_WORKFLOW_STATUS_MODTIME:
105                query.setParameter("status", wfBean.getStatus().toString());
106                query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp());
107                query.setParameter("id", wfBean.getId());
108                break;
109            case UPDATE_WORKFLOW_PARENT_MODIFIED:
110                query.setParameter("parentId", wfBean.getParentId());
111                query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp());
112                query.setParameter("id", wfBean.getId());
113                break;
114            case UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED:
115                query.setParameter("status", wfBean.getStatus().toString());
116                query.setParameter("wfInstance", wfBean.getWfInstanceBlob());
117                query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp());
118                query.setParameter("id", wfBean.getId());
119                break;
120            case UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END:
121                query.setParameter("status", wfBean.getStatus().toString());
122                query.setParameter("wfInstance", wfBean.getWfInstanceBlob());
123                query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp());
124                query.setParameter("endTime", wfBean.getEndTimestamp());
125                query.setParameter("id", wfBean.getId());
126                break;
127            case UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END:
128                query.setParameter("status", wfBean.getStatus().toString());
129                query.setParameter("wfInstance", wfBean.getWfInstanceBlob());
130                query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp());
131                query.setParameter("startTime", wfBean.getStartTimestamp());
132                query.setParameter("endTime", wfBean.getEndTimestamp());
133                query.setParameter("id", wfBean.getId());
134                break;
135            case UPDATE_WORKFLOW_RERUN:
136                query.setParameter("appName", wfBean.getAppName());
137                query.setParameter("protoActionConf", wfBean.getProtoActionConfBlob());
138                query.setParameter("appPath", wfBean.getAppPath());
139                query.setParameter("conf", wfBean.getConfBlob());
140                query.setParameter("logToken", wfBean.getLogToken());
141                query.setParameter("user", wfBean.getUser());
142                query.setParameter("group", wfBean.getGroup());
143                query.setParameter("externalId", wfBean.getExternalId());
144                query.setParameter("endTime", wfBean.getEndTimestamp());
145                query.setParameter("run", wfBean.getRun());
146                query.setParameter("status", wfBean.getStatus().toString());
147                query.setParameter("wfInstance", wfBean.getWfInstanceBlob());
148                query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp());
149                query.setParameter("id", wfBean.getId());
150                break;
151            default:
152                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
153                        + namedQuery.name());
154        }
155        return query;
156    }
157
158    @Override
159    public Query getSelectQuery(WorkflowJobQuery namedQuery, EntityManager em, Object... parameters)
160            throws JPAExecutorException {
161        Query query = em.createNamedQuery(namedQuery.name());
162        switch (namedQuery) {
163            case GET_WORKFLOW:
164            case GET_WORKFLOW_STARTTIME:
165            case GET_WORKFLOW_START_END_TIME:
166            case GET_WORKFLOW_USER_GROUP:
167            case GET_WORKFLOW_SUSPEND:
168            case GET_WORKFLOW_ACTION_OP:
169            case GET_WORKFLOW_RERUN:
170            case GET_WORKFLOW_DEFINITION:
171            case GET_WORKFLOW_KILL:
172            case GET_WORKFLOW_RESUME:
173            case GET_WORKFLOW_STATUS:
174            case GET_WORKFLOW_FOR_SLA:
175                query.setParameter("id", parameters[0]);
176                break;
177            case GET_WORKFLOWS_PARENT_COORD_RERUN:
178                query.setParameter("parentId", parameters[0]);
179                break;
180            case GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN:
181                long dayInMs = 24 * 60 * 60 * 1000;
182                long olderThanDays = (Long) parameters[0];
183                Timestamp maxEndtime = new Timestamp(System.currentTimeMillis() - (olderThanDays * dayInMs));
184                query.setParameter("endTime", maxEndtime);
185                query.setFirstResult((Integer) parameters[1]);
186                query.setMaxResults((Integer) parameters[2]);
187                break;
188            default:
189                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
190                        + namedQuery.name());
191        }
192        return query;
193    }
194
195    @Override
196    public int executeUpdate(WorkflowJobQuery namedQuery, WorkflowJobBean jobBean) throws JPAExecutorException {
197        JPAService jpaService = Services.get().get(JPAService.class);
198        EntityManager em = jpaService.getEntityManager();
199        Query query = getUpdateQuery(namedQuery, jobBean, em);
200        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
201        return ret;
202    }
203
204    private WorkflowJobBean constructBean(WorkflowJobQuery namedQuery, Object ret, Object... parameters)
205            throws JPAExecutorException {
206        WorkflowJobBean bean;
207        Object[] arr;
208        switch (namedQuery) {
209            case GET_WORKFLOW:
210                bean = (WorkflowJobBean) ret;
211                break;
212            case GET_WORKFLOW_STARTTIME:
213                bean = new WorkflowJobBean();
214                arr = (Object[]) ret;
215                bean.setId((String) arr[0]);
216                bean.setStartTime(DateUtils.toDate((Timestamp) arr[1]));
217                break;
218            case GET_WORKFLOW_START_END_TIME:
219                bean = new WorkflowJobBean();
220                arr = (Object[]) ret;
221                bean.setId((String) arr[0]);
222                bean.setStartTime(DateUtils.toDate((Timestamp) arr[1]));
223                bean.setEndTime(DateUtils.toDate((Timestamp) arr[2]));
224                break;
225            case GET_WORKFLOW_USER_GROUP:
226                bean = new WorkflowJobBean();
227                arr = (Object[]) ret;
228                bean.setUser((String) arr[0]);
229                bean.setGroup((String) arr[1]);
230                break;
231            case GET_WORKFLOW_SUSPEND:
232                bean = new WorkflowJobBean();
233                arr = (Object[]) ret;
234                bean.setId((String) arr[0]);
235                bean.setUser((String) arr[1]);
236                bean.setGroup((String) arr[2]);
237                bean.setAppName((String) arr[3]);
238                bean.setStatusStr((String) arr[4]);
239                bean.setParentId((String) arr[5]);
240                bean.setStartTime(DateUtils.toDate((Timestamp) arr[6]));
241                bean.setEndTime(DateUtils.toDate((Timestamp) arr[7]));
242                bean.setLogToken((String) arr[8]);
243                bean.setWfInstanceBlob((BinaryBlob) (arr[9]));
244                break;
245            case GET_WORKFLOW_ACTION_OP:
246                bean = new WorkflowJobBean();
247                arr = (Object[]) ret;
248                bean.setId((String) arr[0]);
249                bean.setUser((String) arr[1]);
250                bean.setGroup((String) arr[2]);
251                bean.setAppName((String) arr[3]);
252                bean.setAppPath((String) arr[4]);
253                bean.setStatusStr((String) arr[5]);
254                bean.setRun((Integer) arr[6]);
255                bean.setParentId((String) arr[7]);
256                bean.setLogToken((String) arr[8]);
257                bean.setWfInstanceBlob((BinaryBlob) (arr[9]));
258                bean.setProtoActionConfBlob((StringBlob) arr[10]);
259                break;
260            case GET_WORKFLOW_RERUN:
261                bean = new WorkflowJobBean();
262                arr = (Object[]) ret;
263                bean.setId((String) arr[0]);
264                bean.setUser((String) arr[1]);
265                bean.setGroup((String) arr[2]);
266                bean.setAppName((String) arr[3]);
267                bean.setStatusStr((String) arr[4]);
268                bean.setRun((Integer) arr[5]);
269                bean.setLogToken((String) arr[6]);
270                bean.setWfInstanceBlob((BinaryBlob) (arr[7]));
271                bean.setParentId((String)arr[8]);
272                break;
273            case GET_WORKFLOW_DEFINITION:
274                bean = new WorkflowJobBean();
275                arr = (Object[]) ret;
276                bean.setId((String) arr[0]);
277                bean.setUser((String) arr[1]);
278                bean.setGroup((String) arr[2]);
279                bean.setAppName((String) arr[3]);
280                bean.setLogToken((String) arr[4]);
281                bean.setWfInstanceBlob((BinaryBlob) (arr[5]));
282                break;
283            case GET_WORKFLOW_KILL:
284                bean = new WorkflowJobBean();
285                arr = (Object[]) ret;
286                bean.setId((String) arr[0]);
287                bean.setUser((String) arr[1]);
288                bean.setGroup((String) arr[2]);
289                bean.setAppName((String) arr[3]);
290                bean.setAppPath((String) arr[4]);
291                bean.setStatusStr((String) arr[5]);
292                bean.setParentId((String) arr[6]);
293                bean.setStartTime(DateUtils.toDate((Timestamp) arr[7]));
294                bean.setEndTime(DateUtils.toDate((Timestamp) arr[8]));
295                bean.setLogToken((String) arr[9]);
296                bean.setWfInstanceBlob((BinaryBlob) (arr[10]));
297                bean.setSlaXmlBlob((StringBlob) arr[11]);
298                break;
299            case GET_WORKFLOW_RESUME:
300                bean = new WorkflowJobBean();
301                arr = (Object[]) ret;
302                bean.setId((String) arr[0]);
303                bean.setUser((String) arr[1]);
304                bean.setGroup((String) arr[2]);
305                bean.setAppName((String) arr[3]);
306                bean.setAppPath((String) arr[4]);
307                bean.setStatusStr((String) arr[5]);
308                bean.setParentId((String) arr[6]);
309                bean.setStartTime(DateUtils.toDate((Timestamp) arr[7]));
310                bean.setEndTime(DateUtils.toDate((Timestamp) arr[8]));
311                bean.setLogToken((String) arr[9]);
312                bean.setWfInstanceBlob((BinaryBlob) (arr[10]));
313                bean.setProtoActionConfBlob((StringBlob) arr[11]);
314                break;
315            case GET_WORKFLOW_STATUS:
316                bean = new WorkflowJobBean();
317                bean.setId((String) parameters[0]);
318                bean.setStatusStr((String) ret);
319                break;
320            case GET_WORKFLOWS_PARENT_COORD_RERUN:
321                bean = new WorkflowJobBean();
322                arr = (Object[]) ret;
323                bean.setId((String) arr[0]);
324                bean.setStatusStr((String) arr[1]);
325                bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
326                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
327                break;
328            case GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN:
329                bean = new WorkflowJobBean();
330                arr = (Object[]) ret;
331                bean.setId((String) arr[0]);
332                bean.setParentId((String) arr[1]);
333                break;
334            case GET_WORKFLOW_FOR_SLA:
335                bean = new WorkflowJobBean();
336                arr = (Object[]) ret;
337                bean.setId((String) arr[0]);
338                bean.setStatusStr((String) arr[1]);
339                bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
340                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
341                break;
342            default:
343                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
344                        + namedQuery.name());
345        }
346        return bean;
347    }
348
349    @Override
350    public WorkflowJobBean get(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
351        WorkflowJobBean bean = getIfExist(namedQuery, parameters);
352        if (bean == null) {
353            throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery,
354                    Services.get().get(JPAService.class).getEntityManager(), parameters).toString());
355        }
356        return bean;
357    }
358
359    @Override
360    public WorkflowJobBean getIfExist(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
361        JPAService jpaService = Services.get().get(JPAService.class);
362        EntityManager em = jpaService.getEntityManager();
363        Query query = getSelectQuery(namedQuery, em, parameters);
364        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
365        if (ret == null) {
366            return null;
367        }
368        WorkflowJobBean bean = constructBean(namedQuery, ret, parameters);
369        return bean;
370    }
371
372    @Override
373    public List<WorkflowJobBean> getList(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
374        JPAService jpaService = Services.get().get(JPAService.class);
375        EntityManager em = jpaService.getEntityManager();
376        Query query = getSelectQuery(namedQuery, em, parameters);
377        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
378        List<WorkflowJobBean> beanList = new ArrayList<WorkflowJobBean>();
379        if (retList != null) {
380            for (Object ret : retList) {
381                beanList.add(constructBean(namedQuery, ret, parameters));
382            }
383        }
384        return beanList;
385    }
386
387    @Override
388    public Object getSingleValue(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
389        throw new UnsupportedOperationException();
390    }
391}