001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.regex.Matcher;
025import java.util.regex.Pattern;
026
027import org.apache.curator.framework.recipes.leader.LeaderLatch;
028import org.apache.curator.x.discovery.ServiceInstance;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.client.rest.RestConstants;
031import org.apache.oozie.util.IOUtils;
032import org.apache.oozie.util.Instrumentable;
033import org.apache.oozie.util.Instrumentation;
034import org.apache.oozie.util.ZKUtils;
035
036/**
037 * This Service helps coordinate other Services to prevent duplicate processing of Jobs if there are multiple Oozie Servers.  This
038 * implementation uses ZooKeeper and is designed to correctly deal with multiple Oozie Servers.
039 * <p>
040 * The distributed locks provided by {@link ZKLocksService} will prevent any concurrency issues from occurring if multiple Oozie
041 * Servers try to process the same job at the same time.  However, this will make Oozie slower (more waiting on locks) and will
042 * place additional stress on ZooKeeper and the Database.  By "assigning" different Oozie servers to process different jobs, we can
043 * improve this situation.  This is particularly necessary for Services like the {@link RecoveryService}, which could duplicate jobs
044 * otherwise.  We can assign jobs to servers by doing a mod of the jobs' id and the number of servers.
045 * <p>
046 * The leader server is elected by all of the Oozie servers, so there can only be one at a time.  This is useful for tasks that
047 * require (or are better off) being done by only one server (e.g. database purging).  Note that the leader server isn't a
048 * "traditional leader" in the sense that it doesn't command or have authority over the other servers.  This leader election uses
049 * a znode under /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ZK_LEADER_PATH (default is /oozie/services/concurrencyleader).
050 */
051public class ZKJobsConcurrencyService extends JobsConcurrencyService implements Service, Instrumentable {
052
053    private ZKUtils zk;
054
055    // This pattern gives us the id number without the extra stuff
056    private static final Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*");
057
058    private static final String ZK_LEADER_PATH = "concurrencyleader";
059    private static LeaderLatch leaderLatch = null;
060
061    /**
062     * Initialize the zookeeper jobs concurrency service
063     *
064     * @param services services instance.
065     */
066    @Override
067    public void init(Services services) throws ServiceException {
068        super.init(services);
069        try {
070            zk = ZKUtils.register(this);
071            leaderLatch = new LeaderLatch(zk.getClient(), ZKUtils.ZK_BASE_SERVICES_PATH + "/" + ZK_LEADER_PATH, zk.getZKId());
072            leaderLatch.start();
073        }
074        catch (Exception ex) {
075            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
076        }
077    }
078
079    /**
080     * Destroy the zookeeper jobs concurrency service.
081     */
082    @Override
083    public void destroy() {
084        if (leaderLatch != null) {
085            IOUtils.closeSafely(leaderLatch);
086        }
087        if (zk != null) {
088            zk.unregister(this);
089        }
090        zk = null;
091        super.destroy();
092    }
093
094    /**
095     * Instruments the memory locks service.
096     *
097     * @param instr instance to instrument the zookeeper jobs concurrency service to.
098     */
099    @Override
100    public void instrument(Instrumentation instr) {
101        super.instrument(instr);
102    }
103
104    /**
105     * Check to see if this server is the leader server.  This implementation only returns true if this server has been elected by
106     * all of the servers as the leader server.
107     *
108     * @return true if this server is the leader; false if not
109     */
110    @Override
111    public boolean isLeader() {
112        return leaderLatch.hasLeadership();
113    }
114
115    /**
116     * Check to see if jobId should be processed by this server.  This implementation only returns true if the index of this server
117     * in ZooKeeper's list of servers is equal to the id of the job mod the number of servers.
118     *
119     * @param jobId The jobId to check
120     * @return true if this server should process this jobId; false if not
121     */
122    @Override
123    public boolean isJobIdForThisServer(String jobId) {
124        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
125        int numOozies = oozies.size();
126        int myIndex = zk.getZKIdIndex(oozies);
127        return checkJobIdForServer(jobId, numOozies, myIndex);
128    }
129
130    /**
131     * Filter out any job ids that should not be processed by this server.  This implementation only preserves jobs such that the
132     * index of this server in ZooKeeper's list of servers is equal to the id of the job mod the number of servers.
133     *
134     * @param ids The list of job ids to check
135     * @return a filtered list of job ids that this server should process
136     */
137    @Override
138    public List<String> getJobIdsForThisServer(List<String> ids) {
139        List<String> filteredIds = new ArrayList<String>();
140        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
141        int numOozies = oozies.size();
142        int myIndex = zk.getZKIdIndex(oozies);
143        for(String id : ids) {
144            if (checkJobIdForServer(id, numOozies, myIndex)) {
145                filteredIds.add(id);
146            }
147        }
148        return filteredIds;
149    }
150
151    /**
152     * Check if the jobId should be processed by the server with index myIndex when there are numOozies servers.
153     *
154     * @param jobId The jobId to check
155     * @param numOozies The number of Oozie servers
156     * @param myIndex The index of the Oozie server
157     * @return true if the jobId should be processed by the server, false if not
158     */
159    private boolean checkJobIdForServer(String jobId, int numOozies, int myIndex) {
160        boolean belongs = true;
161        Matcher m = ID_PATTERN.matcher(jobId);
162        if (m.matches() && m.groupCount() == 1) {
163            String idNumStr = m.group(1);
164            int idNum = Integer.parseInt(idNumStr);
165            belongs = (idNum % numOozies == myIndex);
166        }
167        return belongs;
168    }
169
170    /**
171     * Return a map of instance id to Oozie server URL.  This implementation always returns a map with where the key is the instance
172     * id and the value is the URL of each Oozie server that we can see in the service discovery in ZooKeeper.
173     *
174     * @return A map of Oozie instance ids and URLs
175     */
176    @Override
177    public Map<String, String> getServerUrls() {
178        Map<String, String> urls = new HashMap<String, String>();
179        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
180        for (ServiceInstance<Map> oozie : oozies) {
181            Map<String, String> metadata = oozie.getPayload();
182            String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
183            String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
184            urls.put(id, url);
185        }
186        return urls;
187    }
188
189    /**
190     * Return a map of instance id to Oozie server URL of other servers.  This implementation always returns a map with
191     * where the key is the instance id and the value is the URL of each Oozie server that we can see in the service
192     * discovery in ZooKeeper.
193     *
194     * @return A map of Oozie instance ids and URLs
195     */
196    @Override
197    public Map<String, String> getOtherServerUrls() {
198        Map<String, String> urls = new HashMap<String, String>();
199        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
200        for (ServiceInstance<Map> oozie : oozies) {
201            Map<String, String> metadata = oozie.getPayload();
202            String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
203
204            if (id.equals(zk.getZKId())) {
205                continue;
206            }
207            String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
208            urls.put(id, url);
209        }
210        return urls;
211    }
212
213    /**
214     * Checks if rest request is for all server. By default it's true.
215     *
216     * @param params the HttpRequest param
217     * @return false if allservers=false, else true;
218     */
219    @Override
220    public boolean isAllServerRequest(Map<String, String[]> params) {
221        return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) == null || params.isEmpty()
222                || !params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false");
223    }
224
225    /**
226     * Return if it is running in HA mode
227     *
228     * @return
229     */
230    @Override
231    public boolean isHighlyAvailableMode() {
232        return true;
233    }
234}