001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 020package org.apache.oozie.util; 021 022import java.io.ByteArrayInputStream; 023import java.io.ByteArrayOutputStream; 024import java.io.IOException; 025 026import org.codehaus.jackson.JsonNode; 027import org.codehaus.jackson.JsonParseException; 028import org.codehaus.jackson.map.DeserializationConfig; 029import org.codehaus.jackson.map.JsonMappingException; 030import org.codehaus.jackson.map.ObjectMapper; 031 032import com.google.common.base.Preconditions; 033import org.apache.curator.x.discovery.ServiceInstance; 034import org.apache.curator.x.discovery.ServiceInstanceBuilder; 035import org.apache.curator.x.discovery.ServiceType; 036import org.apache.curator.x.discovery.UriSpec; 037import org.apache.curator.x.discovery.details.InstanceSerializer; 038 039// TODO: Workaround for CURATOR-5 (https://issues.apache.org/jira/browse/CURATOR-5) 040// Remove this class (code from pull request listed on JIRA) and use regular JsonInstanceSerializer once fixed 041// (Otherwise we can't properly serialize objects for the ZK Service Discovery) 042public class FixedJsonInstanceSerializer<T> implements InstanceSerializer<T> 043{ 044 045 private final ObjectMapper mMapper; 046 private final Class<T> mPayloadClass; 047 048 /** 049 * @param payloadClass 050 * used to validate payloads when deserializing 051 */ 052 public FixedJsonInstanceSerializer(final Class<T> payloadClass) { 053 this(payloadClass, new ObjectMapper()); 054 } 055 056 public FixedJsonInstanceSerializer(final Class<T> pPayloadClass, final ObjectMapper pMapper) { 057 mPayloadClass = pPayloadClass; 058 mMapper = pMapper; 059 mMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); 060 } 061 062 @Override 063 public byte[] serialize(final ServiceInstance<T> pInstance) throws Exception { 064 final ByteArrayOutputStream out = new ByteArrayOutputStream(); 065 mMapper.writeValue(out, pInstance); 066 return out.toByteArray(); 067 068 } 069 070 private String getTextField(final JsonNode pNode, final String pFieldName) { 071 Preconditions.checkNotNull(pNode); 072 Preconditions.checkNotNull(pFieldName); 073 return pNode.get(pFieldName) != null ? pNode.get(pFieldName).getTextValue() : null; 074 } 075 076 private Integer getIntegerField(final JsonNode pNode, final String pFieldName) { 077 Preconditions.checkNotNull(pNode); 078 Preconditions.checkNotNull(pFieldName); 079 return (pNode.get(pFieldName) != null && pNode.get(pFieldName).isNumber()) ? pNode.get(pFieldName) 080 .getIntValue() : null; 081 } 082 083 private Long getLongField(final JsonNode pNode, final String pFieldName) { 084 Preconditions.checkNotNull(pNode); 085 Preconditions.checkNotNull(pFieldName); 086 return (pNode.get(pFieldName) != null && pNode.get(pFieldName).isLong()) ? pNode.get(pFieldName).getLongValue() 087 : null; 088 } 089 090 private <O> O getObject(final JsonNode pNode, final String pFieldName, final Class<O> pObjectClass) 091 throws JsonParseException, JsonMappingException, IOException { 092 Preconditions.checkNotNull(pNode); 093 Preconditions.checkNotNull(pFieldName); 094 Preconditions.checkNotNull(pObjectClass); 095 if (pNode.get(pFieldName) != null && pNode.get(pFieldName).isObject()) { 096 return mMapper.readValue(pNode.get(pFieldName), pObjectClass); 097 } else { 098 return null; 099 } 100 } 101 102 @Override 103 public ServiceInstance<T> deserialize(final byte[] pBytes) throws Exception { 104 final ByteArrayInputStream bais = new ByteArrayInputStream(pBytes); 105 final JsonNode rootNode = mMapper.readTree(bais); 106 final ServiceInstanceBuilder<T> builder = ServiceInstance.builder(); 107 { 108 final String address = getTextField(rootNode, "address"); 109 if (address != null) { 110 builder.address(address); 111 } 112 } 113 { 114 final String id = getTextField(rootNode, "id"); 115 if (id != null) { 116 builder.id(id); 117 } 118 } 119 { 120 final String name = getTextField(rootNode, "name"); 121 if (name != null) { 122 builder.name(name); 123 } 124 } 125 { 126 final Integer port = getIntegerField(rootNode, "port"); 127 if (port != null) { 128 builder.port(port); 129 } 130 } 131 { 132 final Integer sslPort = getIntegerField(rootNode, "sslPort"); 133 if (sslPort != null) { 134 builder.sslPort(sslPort); 135 } 136 } 137 { 138 final Long registrationTimeUTC = getLongField(rootNode, "registrationTimeUTC"); 139 if (registrationTimeUTC != null) { 140 builder.registrationTimeUTC(registrationTimeUTC); 141 } 142 } 143 { 144 final T payload = getObject(rootNode, "payload", mPayloadClass); 145 if (payload != null) { 146 builder.payload(payload); 147 } 148 } 149 { 150 final ServiceType serviceType = getObject(rootNode, "serviceType", ServiceType.class); 151 if (serviceType != null) { 152 builder.serviceType(serviceType); 153 } 154 } 155 { 156 final UriSpec uriSpec = getObject(rootNode, "uriSpec", UriSpec.class); 157 if (uriSpec != null) { 158 builder.uriSpec(uriSpec); 159 } 160 } 161 return builder.build(); 162 } 163 164}