CorbaReceiverImpl.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:9k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. package org.jboss.blacktie.jatmibroker.core.transport.hybrid;
  19. import java.util.ArrayList;
  20. import java.util.List;
  21. import java.util.Properties;
  22. import org.apache.log4j.LogManager;
  23. import org.apache.log4j.Logger;
  24. import org.jboss.blacktie.jatmibroker.core.transport.EventListener;
  25. import org.jboss.blacktie.jatmibroker.core.transport.Message;
  26. import org.jboss.blacktie.jatmibroker.core.transport.OrbManagement;
  27. import org.jboss.blacktie.jatmibroker.core.transport.Receiver;
  28. import org.jboss.blacktie.jatmibroker.jab.JABException;
  29. import org.jboss.blacktie.jatmibroker.jab.JABTransaction;
  30. import org.jboss.blacktie.jatmibroker.xatmi.Connection;
  31. import org.jboss.blacktie.jatmibroker.xatmi.ConnectionException;
  32. import org.omg.CORBA.ORB;
  33. import org.omg.CORBA.Object;
  34. import org.omg.CORBA.Policy;
  35. import org.omg.CosNaming.NameComponent;
  36. import org.omg.PortableServer.POA;
  37. import org.omg.PortableServer.POAPackage.AdapterAlreadyExists;
  38. import org.omg.PortableServer.POAPackage.AdapterNonExistent;
  39. import AtmiBroker.EndpointQueuePOA;
  40. public class CorbaReceiverImpl extends EndpointQueuePOA implements Receiver {
  41. private static final Logger log = LogManager
  42. .getLogger(CorbaReceiverImpl.class);
  43. private POA m_default_poa;
  44. private String callbackIOR;
  45. private List<Message> returnData = new ArrayList<Message>();
  46. private byte[] activate_object;
  47. private String queueName;
  48. private OrbManagement orbManagement;
  49. private int timeout = 0;
  50. private EventListener eventListener;
  51. private int pad = 0;
  52. CorbaReceiverImpl(OrbManagement orbManagement, String queueName)
  53. throws ConnectionException {
  54. this.queueName = queueName;
  55. try {
  56. Policy[] policies = new Policy[0];
  57. this.m_default_poa = orbManagement.getRootPoa().create_POA(
  58. queueName, orbManagement.getRootPoa().the_POAManager(),
  59. policies);
  60. } catch (Throwable t) {
  61. try {
  62. this.m_default_poa = orbManagement.getRootPoa().find_POA(
  63. queueName, true);
  64. } catch (AdapterNonExistent e) {
  65. throw new ConnectionException(-1, "Could not find POA:"
  66. + queueName, e);
  67. }
  68. }
  69. try {
  70. activate_object = m_default_poa.activate_object(this);
  71. Object servant_to_reference = m_default_poa
  72. .servant_to_reference(this);
  73. NameComponent[] name = orbManagement.getNamingContextExt().to_name(
  74. queueName);
  75. orbManagement.getNamingContext().bind(name, servant_to_reference);
  76. } catch (Throwable t) {
  77. throw new ConnectionException(-1, "Could not bind service factory"
  78. + queueName, t);
  79. }
  80. this.orbManagement = orbManagement;
  81. }
  82. CorbaReceiverImpl(EventListener eventListener, OrbManagement orbManagement,
  83. Properties properties) throws ConnectionException {
  84. log.debug("ClientCallbackImpl constructor");
  85. ORB orb = orbManagement.getOrb();
  86. POA poa = orbManagement.getRootPoa();
  87. this.eventListener = eventListener;
  88. try {
  89. try {
  90. Policy[] policies = new Policy[0];
  91. m_default_poa = poa.create_POA("TODO", poa.the_POAManager(),
  92. policies);
  93. } catch (AdapterAlreadyExists e) {
  94. m_default_poa = poa.find_POA("TODO", true);
  95. }
  96. log.debug("JABSession createCallbackObject");
  97. activate_object = m_default_poa.activate_object(this);
  98. log.debug("activated this " + this);
  99. org.omg.CORBA.Object tmp_ref = m_default_poa
  100. .servant_to_reference(this);
  101. log.debug("created reference " + tmp_ref);
  102. AtmiBroker.EndpointQueue clientCallback = AtmiBroker.EndpointQueueHelper
  103. .narrow(tmp_ref);
  104. log.debug("narrowed reference " + clientCallback);
  105. callbackIOR = orb.object_to_string(clientCallback);
  106. log.debug("Created:" + callbackIOR);
  107. } catch (Throwable t) {
  108. throw new ConnectionException(-1, "Cannot create the receiver", t);
  109. }
  110. timeout = Integer.parseInt(properties.getProperty("RequestTimeout"))
  111. * 1000 + Integer.parseInt(properties.getProperty("TimeToLive"))
  112. * 1000;
  113. log.debug("Timeout set as: " + timeout);
  114. }
  115. public POA _default_POA() {
  116. log.debug("ClientCallbackImpl _default_POA");
  117. return m_default_poa;
  118. }
  119. public synchronized void send(String replyto_ior, short rval, int rcode,
  120. byte[] idata, int ilen, int cd, int flags, String type,
  121. String subtype) {
  122. log.debug("Received: " + callbackIOR);
  123. Message message = new Message();
  124. message.cd = cd;
  125. message.replyTo = replyto_ior;
  126. message.flags = flags;
  127. message.control = null;// TODO
  128. message.rval = rval;
  129. message.rcode = rcode;
  130. message.type = type;
  131. message.subtype = subtype;
  132. message.len = ilen - pad;
  133. if (message.len == 0 && message.type == "") {
  134. message.data = null;
  135. } else {
  136. message.data = new byte[message.len];
  137. System.arraycopy(idata, 0, message.data, 0, message.len);
  138. }
  139. if (eventListener != null) {
  140. log.debug("Event listener will be called back");
  141. if (message.rval == EventListener.DISCON_CODE) {
  142. eventListener.setLastEvent(Connection.TPEV_DISCONIMM);
  143. } else if (message.rcode == Connection.TPESVCERR) {
  144. eventListener.setLastEvent(Connection.TPEV_SVCERR);
  145. } else if (message.rval == Connection.TPFAIL) {
  146. eventListener.setLastEvent(Connection.TPEV_SVCFAIL);
  147. eventListener.setLastRCode(message.rcode);
  148. }
  149. }
  150. returnData.add(message);
  151. log.trace("notifying");
  152. notify();
  153. log.trace("notifed");
  154. }
  155. public java.lang.Object getReplyTo() {
  156. return callbackIOR;
  157. }
  158. public Message receive(long flags) throws ConnectionException {
  159. log.debug("Receiving");
  160. synchronized (this) {
  161. if ((flags & Connection.TPNOBLOCK) != Connection.TPNOBLOCK) {
  162. if (returnData.isEmpty()) {
  163. try {
  164. if ((flags & Connection.TPNOTIME) == Connection.TPNOTIME) {
  165. log.debug("blocking");
  166. wait();
  167. log.debug("woke up");
  168. } else {
  169. log.debug("Waiting: " + callbackIOR);
  170. wait(timeout);
  171. log.debug("Waited: " + callbackIOR);
  172. }
  173. } catch (InterruptedException e) {
  174. log.error("Caught exception", e);
  175. }
  176. }
  177. } else {
  178. log.debug("Not waiting for the response, hope its there!");
  179. }
  180. if (returnData.isEmpty()) {
  181. log.debug("Empty return data: " + callbackIOR);
  182. if (JABTransaction.current() != null) {
  183. try {
  184. log.debug("Marking rollbackOnly");
  185. JABTransaction.current().rollback_only();
  186. } catch (JABException e) {
  187. throw new ConnectionException(Connection.TPESYSTEM,
  188. "Could not mark transaction for rollback only");
  189. }
  190. }
  191. throw new ConnectionException(Connection.TPETIME,
  192. "Did not receive a message");
  193. } else {
  194. Message message = returnData.remove(0);
  195. if (message != null) {
  196. log.debug("Message was available");
  197. if (message.rval == EventListener.DISCON_CODE) {
  198. if (JABTransaction.current() != null) {
  199. try {
  200. log
  201. .debug("Marking rollbackOnly as disconnection");
  202. JABTransaction.current().rollback_only();
  203. } catch (JABException e) {
  204. throw new ConnectionException(
  205. Connection.TPESYSTEM,
  206. "Could not mark transaction for rollback only");
  207. }
  208. }
  209. } else if (message.rcode == Connection.TPESVCERR) {
  210. if (JABTransaction.current() != null) {
  211. try {
  212. log.debug("Marking rollbackOnly as svc err");
  213. JABTransaction.current().rollback_only();
  214. } catch (JABException e) {
  215. throw new ConnectionException(
  216. Connection.TPESYSTEM,
  217. "Could not mark transaction for rollback only");
  218. }
  219. }
  220. } else if (message.rval == Connection.TPFAIL) {
  221. if (JABTransaction.current() != null) {
  222. try {
  223. JABTransaction.current().rollback_only();
  224. } catch (JABException e) {
  225. throw new ConnectionException(
  226. Connection.TPESYSTEM,
  227. "Could not mark transaction for rollback only");
  228. }
  229. }
  230. }
  231. } else {
  232. log.debug("message was null");
  233. }
  234. return message;
  235. }
  236. }
  237. }
  238. public void disconnect() {
  239. log.debug("disconnect");
  240. if (queueName != null) {
  241. log.debug("queue name: " + queueName);
  242. try {
  243. NameComponent[] name = orbManagement.getNamingContextExt()
  244. .to_name(queueName);
  245. orbManagement.getNamingContext().unbind(name);
  246. queueName = null;
  247. log.debug("unbound");
  248. } catch (Throwable t) {
  249. log.error("Could not unbind service factory" + queueName, t);
  250. }
  251. }
  252. try {
  253. log.debug("deactivating");
  254. m_default_poa.deactivate_object(activate_object);
  255. log.debug("deactivated");
  256. } catch (Throwable t) {
  257. log.error("Could not unbind service factory" + queueName, t);
  258. }
  259. log.trace("synchronizing");
  260. synchronized (this) {
  261. log.trace("notifying");
  262. notify();
  263. log.trace("notified");
  264. }
  265. }
  266. public void close() {
  267. log.debug("close");
  268. disconnect();
  269. }
  270. }