001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import org.apache.oozie.CoordinatorActionBean;
021import org.apache.oozie.CoordinatorEngine.FILTER_COMPARATORS;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.StringBlob;
024import org.apache.oozie.client.CoordinatorAction;
025import org.apache.oozie.client.OozieClient;
026import org.apache.oozie.service.Services;
027import org.apache.oozie.util.DateUtils;
028import org.apache.oozie.util.Pair;
029import org.apache.oozie.util.ParamChecker;
030
031import javax.persistence.EntityManager;
032import javax.persistence.Query;
033import java.sql.Timestamp;
034import java.util.ArrayList;
035import java.util.HashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039
040/**
041 * Load coordinator actions by offset and len (a subset) for a coordinator job.
042 */
043public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> {
044
045    private String coordJobId = null;
046    private int offset = 1;
047    private int len = 50;
048    private boolean desc = false;
049    private Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap;
050
051    public CoordJobGetActionsSubsetJPAExecutor(String coordJobId) {
052        ParamChecker.notNull(coordJobId, "coordJobId");
053        this.coordJobId = coordJobId;
054    }
055
056    public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap,
057            int offset, int len, boolean desc) {
058        this(coordJobId);
059        this.filterMap = filterMap;
060        this.offset = offset;
061        this.len = len;
062        this.desc = desc;
063    }
064
065    @Override
066    public String getName() {
067        return "CoordJobGetActionsSubsetJPAExecutor";
068    }
069
070    @Override
071    @SuppressWarnings("unchecked")
072    public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException {
073        List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
074        try {
075            if (!Services.get().getConf()
076                    .getBoolean(CoordActionGetForInfoJPAExecutor.COORD_GET_ALL_COLS_FOR_ACTION, false)) {
077                Query q = em.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME");
078                q = setQueryParameters(q, em);
079                List<Object[]> actions = q.getResultList();
080
081                for (Object[] a : actions) {
082                    CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
083                    actionList.add(aa);
084                }
085            } else {
086                Query q = em.createNamedQuery("GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME");
087                q = setQueryParameters(q, em);
088                actionList = q.getResultList();
089            }
090        }
091        catch (Exception e) {
092            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
093        }
094        return actionList;
095    }
096
097    private Query setQueryParameters(Query q, EntityManager em){
098        Map<String, Object> params = null;
099        if (filterMap != null) {
100            // Add the filter clause
101            String query = q.toString();
102            StringBuilder sbTotal = new StringBuilder(query);
103            int offset = query.lastIndexOf("order");
104            // Get the 'where' clause for status filters
105            StringBuilder statusClause = new StringBuilder();
106            params = getWhereClause(statusClause, filterMap);
107            // Insert 'where' before 'order by'
108            sbTotal.insert(offset, statusClause);
109            q = em.createQuery(sbTotal.toString());
110        }
111        if (desc) {
112            q = em.createQuery(q.toString().concat(" desc"));
113        }
114        if (params != null) {
115            for (String pname : params.keySet()) {
116                q.setParameter(pname, params.get(pname));
117            }
118        }
119        q.setParameter("jobId", coordJobId);
120        q.setFirstResult(offset - 1);
121        q.setMaxResults(len);
122        return q;
123    }
124
125    // Form the where clause to filter by status values
126    private Map<String, Object> getWhereClause(StringBuilder sb, Map<Pair<String, FILTER_COMPARATORS>,
127        List<Object>> filterMap) {
128        Map<String, Object> params = new HashMap<String, Object>();
129        int pcnt= 1;
130        for (Entry<Pair<String, FILTER_COMPARATORS>, List<Object>> filter : filterMap.entrySet()) {
131            String field = filter.getKey().getFist();
132            FILTER_COMPARATORS comp = filter.getKey().getSecond();
133            String sqlField;
134            if (field.equals(OozieClient.FILTER_STATUS)) {
135                sqlField = "a.statusStr";
136            } else if (field.equals(OozieClient.FILTER_NOMINAL_TIME)) {
137                sqlField = "a.nominalTimestamp";
138            } else {
139                throw new IllegalArgumentException("Invalid filter key " + field);
140            }
141
142            sb.append(" and ").append(sqlField).append(" ");
143            switch (comp) {
144            case EQUALS:
145                sb.append("IN (");
146                params.putAll(appendParams(sb, filter.getValue(), pcnt));
147                sb.append(")");
148                break;
149
150            case NOT_EQUALS:
151                sb.append("NOT IN (");
152                params.putAll(appendParams(sb, filter.getValue(), pcnt));
153                sb.append(")");
154                break;
155
156            case GREATER:
157            case GREATER_EQUAL:
158            case LESSTHAN:
159            case LESSTHAN_EQUAL:
160                if (filter.getValue().size() != 1) {
161                    throw new IllegalArgumentException(field + comp.getSign() + " can't have more than 1 values");
162                }
163
164                sb.append(comp.getSign()).append(" ");
165                params.putAll(appendParams(sb, filter.getValue(), pcnt));
166                break;
167            }
168
169            pcnt += filter.getValue().size();
170        }
171        sb.append(" ");
172        return params;
173    }
174
175    private Map<String, Object> appendParams(StringBuilder sb, List<Object> value, int sindex) {
176        Map<String, Object> params = new HashMap<String, Object>();
177        boolean first = true;
178        for (Object val : value) {
179            String pname = "p" + sindex++;
180            params.put(pname, val);
181            if (!first) {
182                sb.append(", ");
183            }
184            sb.append(':').append(pname);
185            first = false;
186        }
187        return params;
188    }
189
190    private CoordinatorActionBean getBeanForRunningCoordAction(Object arr[]) {
191        CoordinatorActionBean bean = new CoordinatorActionBean();
192        if (arr[0] != null) {
193            bean.setId((String) arr[0]);
194        }
195        if (arr[1] != null) {
196            bean.setActionNumber((Integer) arr[1]);
197        }
198        if (arr[2] != null) {
199            bean.setConsoleUrl((String) arr[2]);
200        }
201        if (arr[3] != null) {
202            bean.setErrorCode((String) arr[3]);
203        }
204        if (arr[4] != null) {
205            bean.setErrorMessage((String) arr[4]);
206        }
207        if (arr[5] != null) {
208            bean.setExternalId((String) arr[5]);
209        }
210        if (arr[6] != null) {
211            bean.setExternalStatus((String) arr[6]);
212        }
213        if (arr[7] != null) {
214            bean.setJobId((String) arr[7]);
215        }
216        if (arr[8] != null) {
217            bean.setTrackerUri((String) arr[8]);
218        }
219        if (arr[9] != null) {
220            bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[9]));
221        }
222        if (arr[10] != null) {
223            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[10]));
224        }
225        if (arr[11] != null) {
226            bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[11]));
227        }
228        if (arr[12] != null) {
229            bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[12]));
230        }
231        if (arr[13] != null) {
232            bean.setMissingDependenciesBlob((StringBlob) arr[13]);
233        }
234        if (arr[14] != null) {
235            bean.setPushMissingDependenciesBlob((StringBlob) arr[14]);
236        }
237        if (arr[15] != null) {
238            bean.setTimeOut((Integer) arr[15]);
239        }
240        return bean;
241
242    }
243
244}