001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.List;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.StringBlob;
030import org.apache.oozie.WorkflowActionBean;
031import org.apache.oozie.service.JPAService;
032import org.apache.oozie.service.Services;
033import org.apache.oozie.util.DateUtils;
034
035/**
036 * Query Executor that provides API to run query for Workflow Action
037 */
038public class WorkflowActionQueryExecutor extends
039        QueryExecutor<WorkflowActionBean, WorkflowActionQueryExecutor.WorkflowActionQuery> {
040
041    public enum WorkflowActionQuery {
042        UPDATE_ACTION,
043        UPDATE_ACTION_FOR_LAST_CHECKED_TIME,
044        UPDATE_ACTION_START,
045        UPDATE_ACTION_CHECK,
046        UPDATE_ACTION_END,
047        UPDATE_ACTION_PENDING,
048        UPDATE_ACTION_STATUS_PENDING,
049        UPDATE_ACTION_PENDING_TRANS,
050        UPDATE_ACTION_PENDING_TRANS_ERROR,
051        GET_ACTION,
052        GET_ACTION_ID_TYPE_LASTCHECK,
053        GET_ACTION_FAIL,
054        GET_ACTION_SIGNAL,
055        GET_ACTION_CHECK,
056        GET_ACTION_END,
057        GET_ACTION_COMPLETED,
058        GET_RUNNING_ACTIONS,
059        GET_PENDING_ACTIONS,
060        GET_ACTIONS_FOR_WORKFLOW_RERUN,
061        GET_ACTION_FOR_SLA
062    };
063
064    private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor();
065
066    private WorkflowActionQueryExecutor() {
067    }
068
069    public static QueryExecutor<WorkflowActionBean, WorkflowActionQuery> getInstance() {
070        return WorkflowActionQueryExecutor.instance;
071    }
072
073    @Override
074    public Query getUpdateQuery(WorkflowActionQuery namedQuery, WorkflowActionBean actionBean, EntityManager em)
075            throws JPAExecutorException {
076        Query query = em.createNamedQuery(namedQuery.name());
077        switch (namedQuery) {
078            case UPDATE_ACTION:
079                query.setParameter("conf", actionBean.getConfBlob());
080                query.setParameter("consoleUrl", actionBean.getConsoleUrl());
081                query.setParameter("data", actionBean.getDataBlob());
082                query.setParameter("stats", actionBean.getStatsBlob());
083                query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob());
084                query.setParameter("errorCode", actionBean.getErrorCode());
085                query.setParameter("errorMessage", actionBean.getErrorMessage());
086                query.setParameter("externalId", actionBean.getExternalId());
087                query.setParameter("externalStatus", actionBean.getExternalStatus());
088                query.setParameter("name", actionBean.getName());
089                query.setParameter("cred", actionBean.getCred());
090                query.setParameter("retries", actionBean.getRetries());
091                query.setParameter("trackerUri", actionBean.getTrackerUri());
092                query.setParameter("transition", actionBean.getTransition());
093                query.setParameter("type", actionBean.getType());
094                query.setParameter("endTime", actionBean.getEndTimestamp());
095                query.setParameter("executionPath", actionBean.getExecutionPath());
096                query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp());
097                query.setParameter("logToken", actionBean.getLogToken());
098                query.setParameter("pending", actionBean.getPending());
099                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
100                query.setParameter("signalValue", actionBean.getSignalValue());
101                query.setParameter("slaXml", actionBean.getSlaXmlBlob());
102                query.setParameter("startTime", actionBean.getStartTimestamp());
103                query.setParameter("status", actionBean.getStatusStr());
104                query.setParameter("wfId", actionBean.getWfId());
105                query.setParameter("id", actionBean.getId());
106                break;
107            case UPDATE_ACTION_FOR_LAST_CHECKED_TIME:
108                query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp());
109                query.setParameter("id", actionBean.getId());
110                break;
111            case UPDATE_ACTION_PENDING:
112                query.setParameter("pending", actionBean.getPending());
113                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
114                query.setParameter("executionPath", actionBean.getExecutionPath());
115                query.setParameter("id", actionBean.getId());
116                break;
117            case UPDATE_ACTION_STATUS_PENDING:
118                query.setParameter("status", actionBean.getStatus().toString());
119                query.setParameter("pending", actionBean.getPending());
120                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
121                query.setParameter("id", actionBean.getId());
122                break;
123            case UPDATE_ACTION_PENDING_TRANS:
124                query.setParameter("transition", actionBean.getTransition());
125                query.setParameter("pending", actionBean.getPending());
126                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
127                query.setParameter("id", actionBean.getId());
128                break;
129            case UPDATE_ACTION_PENDING_TRANS_ERROR:
130                query.setParameter("transition", actionBean.getTransition());
131                query.setParameter("pending", actionBean.getPending());
132                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
133                query.setParameter("errorCode", actionBean.getErrorCode());
134                query.setParameter("errorMessage", actionBean.getErrorMessage());
135                query.setParameter("status", actionBean.getStatusStr());
136                query.setParameter("id", actionBean.getId());
137                break;
138            case UPDATE_ACTION_START:
139                query.setParameter("startTime", actionBean.getStartTimestamp());
140                query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob());
141                query.setParameter("conf", actionBean.getConfBlob());
142                query.setParameter("errorCode", actionBean.getErrorCode());
143                query.setParameter("errorMessage", actionBean.getErrorMessage());
144                query.setParameter("externalId", actionBean.getExternalId());
145                query.setParameter("trackerUri", actionBean.getTrackerUri());
146                query.setParameter("consoleUrl", actionBean.getConsoleUrl());
147                query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp());
148                query.setParameter("status", actionBean.getStatus().toString());
149                query.setParameter("externalStatus", actionBean.getExternalStatus());
150                query.setParameter("data", actionBean.getDataBlob());
151                query.setParameter("retries", actionBean.getRetries());
152                query.setParameter("pending", actionBean.getPending());
153                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
154                query.setParameter("userRetryCount", actionBean.getUserRetryCount());
155                query.setParameter("id", actionBean.getId());
156                break;
157            case UPDATE_ACTION_CHECK:
158                query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob());
159                query.setParameter("externalStatus", actionBean.getExternalStatus());
160                query.setParameter("status", actionBean.getStatus().toString());
161                query.setParameter("data", actionBean.getDataBlob());
162                query.setParameter("pending", actionBean.getPending());
163                query.setParameter("errorCode", actionBean.getErrorCode());
164                query.setParameter("errorMessage", actionBean.getErrorMessage());
165                query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp());
166                query.setParameter("retries", actionBean.getRetries());
167                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
168                query.setParameter("startTime", actionBean.getStartTimestamp());
169                query.setParameter("stats", actionBean.getStatsBlob());
170                query.setParameter("userRetryCount", actionBean.getUserRetryCount());
171                query.setParameter("id", actionBean.getId());
172                break;
173            case UPDATE_ACTION_END:
174                query.setParameter("errorCode", actionBean.getErrorCode());
175                query.setParameter("errorMessage", actionBean.getErrorMessage());
176                query.setParameter("retries", actionBean.getRetries());
177                query.setParameter("status", actionBean.getStatus().toString());
178                query.setParameter("endTime", actionBean.getEndTimestamp());
179                query.setParameter("pending", actionBean.getPending());
180                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
181                query.setParameter("signalValue", actionBean.getSignalValue());
182                query.setParameter("userRetryCount", actionBean.getUserRetryCount());
183                query.setParameter("externalStatus", actionBean.getExternalStatus());
184                query.setParameter("stats", actionBean.getStatsBlob());
185                query.setParameter("id", actionBean.getId());
186                break;
187            default:
188                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
189                        + namedQuery.name());
190        }
191        return query;
192    }
193
194    @Override
195    public Query getSelectQuery(WorkflowActionQuery namedQuery, EntityManager em, Object... parameters)
196            throws JPAExecutorException {
197        Query query = em.createNamedQuery(namedQuery.name());
198        switch (namedQuery) {
199            case GET_ACTION:
200            case GET_ACTION_ID_TYPE_LASTCHECK:
201            case GET_ACTION_FAIL:
202            case GET_ACTION_SIGNAL:
203            case GET_ACTION_CHECK:
204            case GET_ACTION_END:
205            case GET_ACTION_COMPLETED:
206            case GET_ACTION_FOR_SLA:
207                query.setParameter("id", parameters[0]);
208                break;
209            case GET_RUNNING_ACTIONS:
210                Timestamp ts = new Timestamp(System.currentTimeMillis() - (Integer) parameters[0] * 1000);
211                query.setParameter("lastCheckTime", ts);
212                break;
213            case GET_PENDING_ACTIONS:
214                Long minimumPendingAgeSecs = (Long) parameters[0];
215                Timestamp pts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
216                Timestamp createdTimeInterval = new Timestamp((Long) parameters[1]);
217                query.setParameter("pendingAge", pts);
218                query.setParameter("createdTime", createdTimeInterval);
219                break;
220            case GET_ACTIONS_FOR_WORKFLOW_RERUN:
221                query.setParameter("wfId", parameters[0]);
222                break;
223            default:
224                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
225                        + namedQuery.name());
226        }
227        return query;
228    }
229
230    @Override
231    public int executeUpdate(WorkflowActionQuery namedQuery, WorkflowActionBean actionBean) throws JPAExecutorException {
232        JPAService jpaService = Services.get().get(JPAService.class);
233        EntityManager em = jpaService.getEntityManager();
234        Query query = getUpdateQuery(namedQuery, actionBean, em);
235        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
236        return ret;
237    }
238
239    private WorkflowActionBean constructBean(WorkflowActionQuery namedQuery, Object ret) throws JPAExecutorException {
240        WorkflowActionBean bean;
241        Object[] arr;
242        switch (namedQuery) {
243            case GET_ACTION:
244                bean = (WorkflowActionBean) ret;
245                break;
246            case GET_ACTION_ID_TYPE_LASTCHECK:
247                bean = new WorkflowActionBean();
248                arr = (Object[]) ret;
249                bean.setId((String) arr[0]);
250                bean.setType((String) arr[1]);
251                bean.setLastCheckTime(DateUtils.toDate((Timestamp) arr[2]));
252                break;
253            case GET_ACTION_FAIL:
254                bean = new WorkflowActionBean();
255                arr = (Object[]) ret;
256                bean.setId((String) arr[0]);
257                bean.setJobId((String) arr[1]);
258                bean.setName((String) arr[2]);
259                bean.setStatusStr((String) arr[3]);
260                bean.setPending((Integer) arr[4]);
261                bean.setType((String) arr[5]);
262                bean.setLogToken((String) arr[6]);
263                bean.setTransition((String) arr[7]);
264                bean.setErrorInfo((String) arr[8], (String) arr[9]);
265                break;
266            case GET_ACTION_SIGNAL:
267                bean = new WorkflowActionBean();
268                arr = (Object[]) ret;
269                bean.setId((String) arr[0]);
270                bean.setJobId((String) arr[1]);
271                bean.setName((String) arr[2]);
272                bean.setStatusStr((String) arr[3]);
273                bean.setPending((Integer) arr[4]);
274                bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5]));
275                bean.setType((String) arr[6]);
276                bean.setLogToken((String) arr[7]);
277                bean.setTransition((String) arr[8]);
278                bean.setErrorInfo((String) arr[9], (String) arr[10]);
279                bean.setExecutionPath((String) arr[11]);
280                bean.setSignalValue((String) arr[12]);
281                bean.setSlaXmlBlob((StringBlob) arr[13]);
282                bean.setExternalId((String) arr[14]);
283                break;
284            case GET_ACTION_CHECK:
285                bean = new WorkflowActionBean();
286                arr = (Object[]) ret;
287                bean.setId((String) arr[0]);
288                bean.setJobId((String) arr[1]);
289                bean.setName((String) arr[2]);
290                bean.setStatusStr((String) arr[3]);
291                bean.setPending((Integer) arr[4]);
292                bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5]));
293                bean.setType((String) arr[6]);
294                bean.setLogToken((String) arr[7]);
295                bean.setTransition((String) arr[8]);
296                bean.setRetries((Integer) arr[9]);
297                bean.setUserRetryCount((Integer) arr[10]);
298                bean.setUserRetryMax((Integer) arr[11]);
299                bean.setUserRetryInterval((Integer) arr[12]);
300                bean.setTrackerUri((String) arr[13]);
301                bean.setStartTime(DateUtils.toDate((Timestamp) arr[14]));
302                bean.setEndTime(DateUtils.toDate((Timestamp) arr[15]));
303                bean.setLastCheckTime(DateUtils.toDate((Timestamp) arr[16]));
304                bean.setErrorInfo((String) arr[17], (String) arr[18]);
305                bean.setExternalId((String) arr[19]);
306                bean.setExternalStatus((String) arr[20]);
307                bean.setExternalChildIDsBlob((StringBlob) arr[21]);
308                bean.setConfBlob((StringBlob) arr[22]);
309                break;
310            case GET_ACTION_END:
311                bean = new WorkflowActionBean();
312                arr = (Object[]) ret;
313                bean.setId((String) arr[0]);
314                bean.setJobId((String) arr[1]);
315                bean.setName((String) arr[2]);
316                bean.setStatusStr((String) arr[3]);
317                bean.setPending((Integer) arr[4]);
318                bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5]));
319                bean.setType((String) arr[6]);
320                bean.setLogToken((String) arr[7]);
321                bean.setTransition((String) arr[8]);
322                bean.setRetries((Integer) arr[9]);
323                bean.setTrackerUri((String) arr[10]);
324                bean.setUserRetryCount((Integer) arr[11]);
325                bean.setUserRetryMax((Integer) arr[12]);
326                bean.setUserRetryInterval((Integer) arr[13]);
327                bean.setStartTime(DateUtils.toDate((Timestamp) arr[14]));
328                bean.setEndTime(DateUtils.toDate((Timestamp) arr[15]));
329                bean.setErrorInfo((String) arr[16], (String) arr[17]);
330                bean.setExternalId((String) arr[18]);
331                bean.setExternalStatus((String) arr[19]);
332                bean.setExternalChildIDsBlob((StringBlob) arr[20]);
333                bean.setConfBlob((StringBlob) arr[21]);
334                bean.setDataBlob((StringBlob) arr[22]);
335                bean.setStatsBlob((StringBlob) arr[23]);
336                break;
337            case GET_ACTION_COMPLETED:
338                bean = new WorkflowActionBean();
339                arr = (Object[]) ret;
340                bean.setId((String) arr[0]);
341                bean.setJobId((String) arr[1]);
342                bean.setStatusStr((String) arr[2]);
343                bean.setType((String) arr[3]);
344                bean.setLogToken((String) arr[4]);
345                break;
346            case GET_RUNNING_ACTIONS:
347                bean = new WorkflowActionBean();
348                bean.setId((String)ret);
349                break;
350            case GET_PENDING_ACTIONS:
351                bean = new WorkflowActionBean();
352                arr = (Object[]) ret;
353                bean.setId((String) arr[0]);
354                bean.setJobId((String) arr[1]);
355                bean.setStatusStr((String) arr[2]);
356                bean.setType((String) arr[3]);
357                bean.setPendingAge(DateUtils.toDate((Timestamp) arr[4]));
358                break;
359            case GET_ACTIONS_FOR_WORKFLOW_RERUN:
360                bean = new WorkflowActionBean();
361                arr = (Object[]) ret;
362                bean.setId((String) arr[0]);
363                bean.setName((String) arr[1]);
364                bean.setStatusStr((String) arr[2]);
365                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
366                bean.setType((String) arr[4]);
367                break;
368            case GET_ACTION_FOR_SLA:
369                bean = new WorkflowActionBean();
370                arr = (Object[]) ret;
371                bean.setId((String) arr[0]);
372                bean.setStatusStr((String) arr[1]);
373                bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
374                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
375                break;
376
377            default:
378                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
379                        + namedQuery.name());
380        }
381        return bean;
382    }
383
384    @Override
385    public WorkflowActionBean get(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
386        WorkflowActionBean bean = getIfExist(namedQuery, parameters);
387        if (bean == null) {
388            throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery,
389                    Services.get().get(JPAService.class).getEntityManager(), parameters).toString());
390        }
391        return bean;
392    }
393
394    @Override
395    public WorkflowActionBean getIfExist(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
396        JPAService jpaService = Services.get().get(JPAService.class);
397        EntityManager em = jpaService.getEntityManager();
398        Query query = getSelectQuery(namedQuery, em, parameters);
399        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
400        if (ret == null) {
401            return null;
402        }
403        WorkflowActionBean bean = constructBean(namedQuery, ret);
404        return bean;
405    }
406
407    @Override
408    public List<WorkflowActionBean> getList(WorkflowActionQuery namedQuery, Object... parameters)
409            throws JPAExecutorException {
410        JPAService jpaService = Services.get().get(JPAService.class);
411        EntityManager em = jpaService.getEntityManager();
412        Query query = getSelectQuery(namedQuery, em, parameters);
413        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
414        List<WorkflowActionBean> beanList = new ArrayList<WorkflowActionBean>();
415        if (retList != null) {
416            for (Object ret : retList) {
417                beanList.add(constructBean(namedQuery, ret));
418            }
419        }
420        return beanList;
421    }
422
423    @Override
424    public Object getSingleValue(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
425        throw new UnsupportedOperationException();
426    }
427}