QueueReaper.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:6k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. package org.jboss.blacktie.administration;
  19. import java.util.HashSet;
  20. import java.util.Iterator;
  21. import java.util.Properties;
  22. import javax.jms.Destination;
  23. import javax.jms.Queue;
  24. import javax.management.MBeanServerConnection;
  25. import javax.management.ObjectName;
  26. import org.apache.log4j.LogManager;
  27. import org.apache.log4j.Logger;
  28. import org.jboss.blacktie.jatmibroker.core.conf.XMLEnvHandler;
  29. import org.jboss.blacktie.jatmibroker.core.conf.XMLParser;
  30. public class QueueReaper implements Runnable {
  31. /** logger */
  32. private static final Logger log = LogManager.getLogger(QueueReaper.class);
  33. /** Interval at which to run */
  34. private long interval = 30 * 1000; // TODO make this configurable
  35. /** Thread to run */
  36. private Thread thread;
  37. /** Whether the thread is executing */
  38. private boolean run;
  39. private MBeanServerConnection beanServerConnection;
  40. public QueueReaper(MBeanServerConnection conn) {
  41. this.thread = new Thread(this);
  42. this.thread.setDaemon(true);
  43. this.thread.setPriority(Thread.MIN_PRIORITY);
  44. this.beanServerConnection = conn;
  45. }
  46. public void startThread() {
  47. if (!this.run) {
  48. this.run = true;
  49. if (this.thread.isInterrupted()) {
  50. Thread.interrupted();
  51. }
  52. this.thread.start();
  53. } else {
  54. log.debug("Thread already running");
  55. }
  56. }
  57. public void stopThread() {
  58. if (!this.thread.isInterrupted()) {
  59. this.thread.interrupt();
  60. }
  61. this.run = false;
  62. try {
  63. this.thread.join();
  64. } catch (InterruptedException e) {
  65. log.warn("Could not join with reaper: " + e.getMessage());
  66. }
  67. }
  68. public boolean isRunning() {
  69. return this.run;
  70. }
  71. public void run() {
  72. while (this.run) {
  73. try {
  74. ObjectName objName = new ObjectName(
  75. "jboss.messaging:service=ServerPeer");
  76. HashSet<Destination> dests = (HashSet<Destination>) beanServerConnection
  77. .getAttribute(objName, "Destinations");
  78. Properties prop = new Properties();
  79. XMLEnvHandler handler = new XMLEnvHandler(prop);
  80. XMLParser xmlenv = new XMLParser(handler, "btconfig.xsd");
  81. xmlenv.parse("btconfig.xml");
  82. Iterator<Destination> it = dests.iterator();
  83. while (it.hasNext()) {
  84. Destination dest = it.next();
  85. if (dest instanceof Queue) {
  86. String serviceName = ((Queue) dest).getQueueName();
  87. String server = (String) prop.get("blacktie."
  88. + serviceName + ".server");
  89. long queueReapCheck = System.currentTimeMillis();
  90. if ((server != null || serviceName.contains("_ADMIN_"))
  91. && isCreatedProgrammatically(serviceName)
  92. && consumerCount(serviceName) == 0) {
  93. log
  94. .warn("undeploy service pending for "
  95. + serviceName
  96. + " as consumer count is 0, will check again in 30 seconds");
  97. this.interval = Integer.parseInt(prop.getProperty(
  98. "QueueReaperInterval", "30")) * 1000;
  99. Thread.sleep(this.interval);
  100. // double check consumer is 0
  101. if (isOlderThanReapCheck(serviceName,
  102. queueReapCheck)
  103. && consumerCount(serviceName) == 0) {
  104. undeployQueue(serviceName);
  105. log.warn("undeploy service " + serviceName
  106. + " for consumer is 0");
  107. } else {
  108. log.info("undeploy service not required for "
  109. + serviceName);
  110. }
  111. } else {
  112. log.debug("Could not determine the server for: "
  113. + serviceName + " at: " + server);
  114. }
  115. }
  116. }
  117. log.debug("Sleeping for " + this.interval + " ms");
  118. Thread.sleep(this.interval);
  119. } catch (InterruptedException e) {
  120. log.debug("Sleeping interrupted");
  121. this.run = false;
  122. } catch (Exception e) {
  123. log.error("run ping thread failed with " + e);
  124. this.run = false;
  125. }
  126. }
  127. }
  128. int consumerCount(String serviceName) throws Exception {
  129. ObjectName objName = new ObjectName(
  130. "jboss.messaging.destination:service=Queue,name=" + serviceName);
  131. Integer count = (Integer) beanServerConnection.getAttribute(objName,
  132. "ConsumerCount");
  133. return count.intValue();
  134. }
  135. Boolean isCreatedProgrammatically(String serviceName) throws Exception {
  136. ObjectName objName = new ObjectName(
  137. "jboss.messaging.destination:service=Queue,name=" + serviceName);
  138. return (Boolean) beanServerConnection.getAttribute(objName,
  139. "CreatedProgrammatically");
  140. }
  141. private boolean isOlderThanReapCheck(String serviceName, long queueReapCheck) {
  142. // TODO THIS WILL NOT CLUSTER AS IT ASSUMES THE QUEUE WAS CREATED BY
  143. // THIS SERVER
  144. synchronized (BlacktieStompAdministrationService.QUEUE_CREATION_TIMES) {
  145. boolean toReturn = false;
  146. Long creationTime = BlacktieStompAdministrationService.QUEUE_CREATION_TIMES
  147. .get(serviceName);
  148. if (creationTime != null) {
  149. toReturn = creationTime < queueReapCheck;
  150. if (!toReturn) {
  151. log.warn("New queue will be ignored: " + serviceName);
  152. }
  153. }
  154. return toReturn;
  155. }
  156. }
  157. int undeployQueue(String serviceName) {
  158. int result = 0;
  159. try {
  160. ObjectName objName = new ObjectName(
  161. "jboss.messaging:service=ServerPeer");
  162. beanServerConnection.invoke(objName, "undeployQueue",
  163. new Object[] { serviceName },
  164. new String[] { "java.lang.String" });
  165. result = 1;
  166. } catch (Throwable t) {
  167. log.error("Could not undeploy queue of " + serviceName, t);
  168. }
  169. return result;
  170. }
  171. }