BlacktieStompAdministrationService.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:12k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. package org.jboss.blacktie.administration;
  19. import java.io.IOException;
  20. import java.io.StringReader;
  21. import java.io.StringWriter;
  22. import java.util.HashSet;
  23. import java.util.Hashtable;
  24. import java.util.Iterator;
  25. import java.util.List;
  26. import java.util.Properties;
  27. import java.util.StringTokenizer;
  28. import javax.ejb.ActivationConfigProperty;
  29. import javax.ejb.MessageDriven;
  30. import javax.jms.Destination;
  31. import javax.jms.Queue;
  32. import javax.management.Attribute;
  33. import javax.management.MBeanServerConnection;
  34. import javax.management.ObjectName;
  35. import javax.management.remote.JMXConnector;
  36. import javax.management.remote.JMXConnectorFactory;
  37. import javax.management.remote.JMXServiceURL;
  38. import javax.xml.parsers.DocumentBuilder;
  39. import javax.xml.parsers.DocumentBuilderFactory;
  40. import javax.xml.transform.OutputKeys;
  41. import javax.xml.transform.Transformer;
  42. import javax.xml.transform.TransformerException;
  43. import javax.xml.transform.TransformerFactory;
  44. import javax.xml.transform.dom.DOMSource;
  45. import javax.xml.transform.stream.StreamResult;
  46. import org.apache.log4j.LogManager;
  47. import org.apache.log4j.Logger;
  48. import org.jboss.blacktie.administration.core.AdministrationProxy;
  49. import org.jboss.blacktie.jatmibroker.core.conf.ConfigurationException;
  50. import org.jboss.blacktie.jatmibroker.core.conf.XMLEnvHandler;
  51. import org.jboss.blacktie.jatmibroker.core.conf.XMLParser;
  52. import org.jboss.blacktie.jatmibroker.xatmi.Connection;
  53. import org.jboss.blacktie.jatmibroker.xatmi.ConnectionException;
  54. import org.jboss.blacktie.jatmibroker.xatmi.Response;
  55. import org.jboss.blacktie.jatmibroker.xatmi.TPSVCINFO;
  56. import org.jboss.blacktie.jatmibroker.xatmi.X_OCTET;
  57. import org.jboss.blacktie.jatmibroker.xatmi.mdb.MDBBlacktieService;
  58. import org.jboss.ejb3.annotation.Depends;
  59. import org.w3c.dom.Document;
  60. import org.w3c.dom.Element;
  61. import org.w3c.dom.Node;
  62. import org.xml.sax.InputSource;
  63. @MessageDriven(activationConfig = {
  64. @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
  65. @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/BTStompAdmin") })
  66. @Depends("jboss.messaging.destination:service=Queue,name=BTStompAdmin")
  67. @javax.ejb.TransactionAttribute(javax.ejb.TransactionAttributeType.NOT_SUPPORTED)
  68. public class BlacktieStompAdministrationService extends MDBBlacktieService
  69. implements javax.jms.MessageListener {
  70. private static final Logger log = LogManager
  71. .getLogger(BlacktieStompAdministrationService.class);
  72. private MBeanServerConnection beanServerConnection;
  73. private Properties prop = new Properties();
  74. public static Hashtable<String, Long> QUEUE_CREATION_TIMES = new Hashtable<String, Long>();
  75. public BlacktieStompAdministrationService() throws IOException,
  76. ConfigurationException, ConnectionException {
  77. super("BTStompAdmin");
  78. XMLEnvHandler handler = new XMLEnvHandler(prop);
  79. XMLParser xmlenv = new XMLParser(handler, "btconfig.xsd");
  80. xmlenv.parse("btconfig.xml");
  81. JMXServiceURL u = new JMXServiceURL((String) prop.get("JMXURL"));
  82. JMXConnector c = JMXConnectorFactory.connect(u);
  83. beanServerConnection = c.getMBeanServerConnection();
  84. }
  85. boolean isDeployQueue(ObjectName objName, String serviceName)
  86. throws Exception {
  87. HashSet dests = (HashSet) beanServerConnection.getAttribute(objName,
  88. "Destinations");
  89. Iterator<Destination> it = dests.iterator();
  90. while (it.hasNext()) {
  91. Destination dest = it.next();
  92. if (dest instanceof Queue) {
  93. String qname = ((Queue) dest).getQueueName();
  94. log.debug("destination is " + qname);
  95. if (qname.equals(serviceName)) {
  96. log.trace("find serviceName " + serviceName);
  97. return true;
  98. }
  99. }
  100. }
  101. log.trace("did not find serviceName " + serviceName);
  102. return false;
  103. }
  104. int consumerCount(String serviceName) throws Exception {
  105. ObjectName objName = new ObjectName(
  106. "jboss.messaging.destination:service=Queue,name=" + serviceName);
  107. Integer count = (Integer) beanServerConnection.getAttribute(objName,
  108. "ConsumerCount");
  109. Element security = (Element) beanServerConnection.getAttribute(objName,
  110. "SecurityConfig");
  111. log.debug(serviceName + " security config is " + printNode(security));
  112. return count.intValue();
  113. }
  114. Element stringToElement(String s) throws Exception {
  115. StringReader sreader = new StringReader(s);
  116. DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
  117. DocumentBuilder parser = factory.newDocumentBuilder();
  118. Document doc = parser.parse(new InputSource(sreader));
  119. return doc.getDocumentElement();
  120. }
  121. String printNode(Node node) {
  122. try {
  123. // Set up the output transformer
  124. TransformerFactory transfac = TransformerFactory.newInstance();
  125. Transformer trans = transfac.newTransformer();
  126. trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
  127. trans.setOutputProperty(OutputKeys.INDENT, "yes");
  128. // Print the DOM node
  129. StringWriter sw = new StringWriter();
  130. StreamResult result = new StreamResult(sw);
  131. DOMSource source = new DOMSource(node);
  132. trans.transform(source, result);
  133. String xmlString = sw.toString();
  134. return xmlString;
  135. } catch (TransformerException e) {
  136. log.error(e);
  137. }
  138. return null;
  139. }
  140. void setSecurityConfig(ObjectName objName, String serviceName) {
  141. log.debug("Get security configuration from xml");
  142. String roleList = (String) prop.getProperty("blacktie." + serviceName
  143. + ".security");
  144. if (roleList == null) {
  145. log.debug("Will use servers default security if present");
  146. String server = (String) prop.getProperty("blacktie." + serviceName
  147. + ".server");
  148. if (server == null && serviceName.indexOf("_ADMIN_") > 0) {
  149. server = serviceName.substring(0, serviceName
  150. .indexOf("_ADMIN_"));
  151. log.trace("Using server name of: " + server);
  152. }
  153. roleList = (String) prop.getProperty("blacktie." + server
  154. + ".security");
  155. if (roleList == null) {
  156. log.warn("No security set for service: " + serviceName);
  157. roleList = "";
  158. }
  159. }
  160. if (roleList.length() > 0) {
  161. String security = "<security>n";
  162. String[] roles = roleList.split(",");
  163. for (int i = 0; i < roles.length; i++) {
  164. String[] details = roles[i].split(":");
  165. security += "<role name="" + details[0] + "" read=""
  166. + details[1] + "" write="" + details[2] + ""/>n";
  167. }
  168. security += "</security>";
  169. log.trace("access security is " + security);
  170. try {
  171. Element element = stringToElement(security);
  172. Attribute attr = new Attribute("SecurityConfig", element);
  173. beanServerConnection.setAttribute(objName, attr);
  174. } catch (Throwable t) {
  175. log.error("Could not set security config " + t);
  176. }
  177. }
  178. }
  179. int deployQueue(String serviceName, String version) {
  180. log.trace("deployQueue: " + serviceName + " version: " + version);
  181. if (version == null
  182. || !version.equals(prop.getProperty("blacktie.domain.version"))) {
  183. log.warn("Blacktie Domain version "
  184. + prop.getProperty("blacktie.domain.version")
  185. + " not match server " + version);
  186. return 4;
  187. }
  188. int result = 0;
  189. Long currentTime = QUEUE_CREATION_TIMES.get(serviceName);
  190. try {
  191. ObjectName objName = new ObjectName(
  192. "jboss.messaging:service=ServerPeer");
  193. boolean queue = false;
  194. queue = isDeployQueue(objName, serviceName);
  195. if (queue == false) {
  196. synchronized (QUEUE_CREATION_TIMES) {
  197. QUEUE_CREATION_TIMES.put(serviceName, System
  198. .currentTimeMillis());
  199. beanServerConnection.invoke(objName, "deployQueue",
  200. new Object[] { serviceName, null }, new String[] {
  201. "java.lang.String", "java.lang.String" });
  202. ObjectName queueName = new ObjectName(
  203. "jboss.messaging.destination:service=Queue,name="
  204. + serviceName);
  205. setSecurityConfig(queueName, serviceName);
  206. }
  207. }
  208. if (queue == false || !serviceName.contains("_ADMIN_")) {
  209. result = 1;
  210. if (AdministrationProxy.isDomainPause
  211. && serviceName.contains("_ADMIN_")) {
  212. log.info("Domain is pause");
  213. result = 3;
  214. }
  215. } else if (consumerCount(serviceName) > 0) {
  216. log.warn("can not advertise ADMIN with same id: " + serviceName);
  217. result = 2;
  218. } else if (AdministrationProxy.isDomainPause) {
  219. log.info("Domain is pause");
  220. result = 3;
  221. } else {
  222. result = 1;
  223. }
  224. } catch (Throwable t) {
  225. log.error("Could not deploy queue of " + serviceName, t);
  226. QUEUE_CREATION_TIMES.put(serviceName, currentTime);
  227. }
  228. return result;
  229. }
  230. int undeployQueue(String serviceName) {
  231. int result = 0;
  232. try {
  233. ObjectName objName = new ObjectName(
  234. "jboss.messaging:service=ServerPeer");
  235. if (isDeployQueue(objName, serviceName)) {
  236. beanServerConnection.invoke(objName, "undeployQueue",
  237. new Object[] { serviceName },
  238. new String[] { "java.lang.String" });
  239. }
  240. result = 1;
  241. } catch (Throwable t) {
  242. log.error("Could not undeploy queue of " + serviceName, t);
  243. }
  244. return result;
  245. }
  246. int decrementConsumer(String serviceName) {
  247. int consumerCounts;
  248. int result = 0;
  249. try {
  250. consumerCounts = consumerCount(serviceName);
  251. if (consumerCounts <= 1) {
  252. result = undeployQueue(serviceName);
  253. log.debug(serviceName + " undeployed");
  254. } else {
  255. // THERE ARE OTHER SERVERS STILL ALIVE
  256. result = 1;
  257. log.info(serviceName + " still has " + consumerCounts
  258. + " consumers");
  259. }
  260. } catch (Throwable t) {
  261. log.error("Could not get consumer counts of " + serviceName, t);
  262. }
  263. return result;
  264. }
  265. public Response tpservice(TPSVCINFO svcinfo) {
  266. log.debug("Message received");
  267. X_OCTET recv = (X_OCTET) svcinfo.getBuffer();
  268. String string = new String(recv.getByteArray());
  269. StringTokenizer st = new StringTokenizer(string, ",", false);
  270. String operation = st.nextToken();
  271. String serverName = st.nextToken();
  272. String serviceName = st.nextToken();
  273. byte[] success = new byte[1];
  274. String server = null;
  275. int k = -1;
  276. try {
  277. Properties prop = new Properties();
  278. XMLEnvHandler handler = new XMLEnvHandler(prop);
  279. XMLParser xmlenv = new XMLParser(handler, "btconfig.xsd");
  280. xmlenv.parse("btconfig.xml");
  281. if ((k = serviceName.indexOf("_ADMIN_")) > 0) {
  282. server = serviceName.substring(0, k);
  283. List<String> servers = (List<String>) prop
  284. .get("blacktie.domain.servers");
  285. if (servers.contains(server) == false) {
  286. log.warn("Could not find the server to advertise for: "
  287. + server);
  288. server = null;
  289. } else {
  290. log.trace("Located server: " + server);
  291. }
  292. } else {
  293. server = (String) prop.get("blacktie." + serviceName
  294. + ".server");
  295. }
  296. if (server != null && server.equals(serverName)) {
  297. log.trace("Service " + serviceName + " exists for server: "
  298. + server);
  299. if (operation.equals("tpunadvertise")) {
  300. log.trace("Unadvertising: " + serviceName);
  301. success[0] = (byte) undeployQueue(serviceName);
  302. } else if (operation.equals("tpadvertise")) {
  303. log.trace("Advertising: " + serviceName);
  304. String version = st.nextToken();
  305. success[0] = (byte) deployQueue(serviceName, version);
  306. } else if (operation.equals("decrementconsumer")) {
  307. log.trace("Decrement consumer: " + serviceName);
  308. success[0] = (byte) decrementConsumer(serviceName);
  309. } else {
  310. log.error("Unknow operation " + operation);
  311. success[0] = 0;
  312. }
  313. } else {
  314. log.error("Service " + serviceName
  315. + " cannot be located for server");
  316. success[0] = 0;
  317. }
  318. X_OCTET buffer = (X_OCTET) svcinfo.tpalloc("X_OCTET", null);
  319. buffer.setByteArray(success);
  320. log.debug("Responding");
  321. return new Response(Connection.TPSUCCESS, 0, buffer, 1, 0);
  322. } catch (ConnectionException e) {
  323. return new Response(Connection.TPFAIL, 0, null, 0, 0);
  324. } catch (ConfigurationException e) {
  325. return new Response(Connection.TPFAIL, 0, null, 0, 0);
  326. }
  327. }
  328. }