001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.regex.Matcher; 025import java.util.regex.Pattern; 026 027import org.apache.curator.framework.recipes.leader.LeaderLatch; 028import org.apache.curator.x.discovery.ServiceInstance; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.client.rest.RestConstants; 031import org.apache.oozie.util.IOUtils; 032import org.apache.oozie.util.Instrumentable; 033import org.apache.oozie.util.Instrumentation; 034import org.apache.oozie.util.ZKUtils; 035 036/** 037 * This Service helps coordinate other Services to prevent duplicate processing of Jobs if there are multiple Oozie Servers. This 038 * implementation uses ZooKeeper and is designed to correctly deal with multiple Oozie Servers. 039 * <p> 040 * The distributed locks provided by {@link ZKLocksService} will prevent any concurrency issues from occurring if multiple Oozie 041 * Servers try to process the same job at the same time. However, this will make Oozie slower (more waiting on locks) and will 042 * place additional stress on ZooKeeper and the Database. By "assigning" different Oozie servers to process different jobs, we can 043 * improve this situation. This is particularly necessary for Services like the {@link RecoveryService}, which could duplicate jobs 044 * otherwise. We can assign jobs to servers by doing a mod of the jobs' id and the number of servers. 045 * <p> 046 * The leader server is elected by all of the Oozie servers, so there can only be one at a time. This is useful for tasks that 047 * require (or are better off) being done by only one server (e.g. database purging). Note that the leader server isn't a 048 * "traditional leader" in the sense that it doesn't command or have authority over the other servers. This leader election uses 049 * a znode under /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ZK_LEADER_PATH (default is /oozie/services/concurrencyleader). 050 */ 051public class ZKJobsConcurrencyService extends JobsConcurrencyService implements Service, Instrumentable { 052 053 private ZKUtils zk; 054 055 // This pattern gives us the id number without the extra stuff 056 private static final Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*"); 057 058 private static final String ZK_LEADER_PATH = "concurrencyleader"; 059 private static LeaderLatch leaderLatch = null; 060 061 /** 062 * Initialize the zookeeper jobs concurrency service 063 * 064 * @param services services instance. 065 */ 066 @Override 067 public void init(Services services) throws ServiceException { 068 super.init(services); 069 try { 070 zk = ZKUtils.register(this); 071 leaderLatch = new LeaderLatch(zk.getClient(), ZKUtils.ZK_BASE_SERVICES_PATH + "/" + ZK_LEADER_PATH, zk.getZKId()); 072 leaderLatch.start(); 073 } 074 catch (Exception ex) { 075 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 076 } 077 } 078 079 /** 080 * Destroy the zookeeper jobs concurrency service. 081 */ 082 @Override 083 public void destroy() { 084 if (leaderLatch != null) { 085 IOUtils.closeSafely(leaderLatch); 086 } 087 if (zk != null) { 088 zk.unregister(this); 089 } 090 zk = null; 091 super.destroy(); 092 } 093 094 /** 095 * Instruments the memory locks service. 096 * 097 * @param instr instance to instrument the zookeeper jobs concurrency service to. 098 */ 099 @Override 100 public void instrument(Instrumentation instr) { 101 super.instrument(instr); 102 } 103 104 /** 105 * Check to see if this server is the leader server. This implementation only returns true if this server has been elected by 106 * all of the servers as the leader server. 107 * 108 * @return true if this server is the leader; false if not 109 */ 110 @Override 111 public boolean isLeader() { 112 return leaderLatch.hasLeadership(); 113 } 114 115 /** 116 * Check to see if jobId should be processed by this server. This implementation only returns true if the index of this server 117 * in ZooKeeper's list of servers is equal to the id of the job mod the number of servers. 118 * 119 * @param jobId The jobId to check 120 * @return true if this server should process this jobId; false if not 121 */ 122 @Override 123 public boolean isJobIdForThisServer(String jobId) { 124 List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); 125 int numOozies = oozies.size(); 126 int myIndex = zk.getZKIdIndex(oozies); 127 return checkJobIdForServer(jobId, numOozies, myIndex); 128 } 129 130 /** 131 * Filter out any job ids that should not be processed by this server. This implementation only preserves jobs such that the 132 * index of this server in ZooKeeper's list of servers is equal to the id of the job mod the number of servers. 133 * 134 * @param ids The list of job ids to check 135 * @return a filtered list of job ids that this server should process 136 */ 137 @Override 138 public List<String> getJobIdsForThisServer(List<String> ids) { 139 List<String> filteredIds = new ArrayList<String>(); 140 List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); 141 int numOozies = oozies.size(); 142 int myIndex = zk.getZKIdIndex(oozies); 143 for(String id : ids) { 144 if (checkJobIdForServer(id, numOozies, myIndex)) { 145 filteredIds.add(id); 146 } 147 } 148 return filteredIds; 149 } 150 151 /** 152 * Check if the jobId should be processed by the server with index myIndex when there are numOozies servers. 153 * 154 * @param jobId The jobId to check 155 * @param numOozies The number of Oozie servers 156 * @param myIndex The index of the Oozie server 157 * @return true if the jobId should be processed by the server, false if not 158 */ 159 private boolean checkJobIdForServer(String jobId, int numOozies, int myIndex) { 160 boolean belongs = true; 161 Matcher m = ID_PATTERN.matcher(jobId); 162 if (m.matches() && m.groupCount() == 1) { 163 String idNumStr = m.group(1); 164 int idNum = Integer.parseInt(idNumStr); 165 belongs = (idNum % numOozies == myIndex); 166 } 167 return belongs; 168 } 169 170 /** 171 * Return a map of instance id to Oozie server URL. This implementation always returns a map with where the key is the instance 172 * id and the value is the URL of each Oozie server that we can see in the service discovery in ZooKeeper. 173 * 174 * @return A map of Oozie instance ids and URLs 175 */ 176 @Override 177 public Map<String, String> getServerUrls() { 178 Map<String, String> urls = new HashMap<String, String>(); 179 List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); 180 for (ServiceInstance<Map> oozie : oozies) { 181 Map<String, String> metadata = oozie.getPayload(); 182 String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); 183 String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); 184 urls.put(id, url); 185 } 186 return urls; 187 } 188 189 /** 190 * Return a map of instance id to Oozie server URL of other servers. This implementation always returns a map with 191 * where the key is the instance id and the value is the URL of each Oozie server that we can see in the service 192 * discovery in ZooKeeper. 193 * 194 * @return A map of Oozie instance ids and URLs 195 */ 196 @Override 197 public Map<String, String> getOtherServerUrls() { 198 Map<String, String> urls = new HashMap<String, String>(); 199 List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); 200 for (ServiceInstance<Map> oozie : oozies) { 201 Map<String, String> metadata = oozie.getPayload(); 202 String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); 203 204 if (id.equals(zk.getZKId())) { 205 continue; 206 } 207 String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); 208 urls.put(id, url); 209 } 210 return urls; 211 } 212 213 /** 214 * Checks if rest request is for all server. By default it's true. 215 * 216 * @param params the HttpRequest param 217 * @return false if allservers=false, else true; 218 */ 219 @Override 220 public boolean isAllServerRequest(Map<String, String[]> params) { 221 return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) == null || params.isEmpty() 222 || !params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false"); 223 } 224 225 /** 226 * Return if it is running in HA mode 227 * 228 * @return 229 */ 230 @Override 231 public boolean isHighlyAvailableMode() { 232 return true; 233 } 234}