Client.pm.svn-base
上传用户:market2
上传日期:2018-11-18
资源大小:18786k
文件大小:11k
源码类别:

外挂编程

开发平台:

Windows_Unix

  1. ##########################################################
  2. #  OpenKore - Bus System
  3. #  Bus client fascade
  4. #
  5. #  This software is open source, licensed under the GNU General Public
  6. #  License, version 2.
  7. #  Basically, this means that you're allowed to modify and distribute
  8. #  this software. However, if you distribute modified versions, you MUST
  9. #  also distribute the source code.
  10. #  See http://www.gnu.org/licenses/gpl.html for the full license.
  11. #
  12. #  $Revision$
  13. #  $Id$
  14. #
  15. #########################################################################
  16. ##
  17. # MODULE DESCRIPTION: High level interface to the bus system
  18. #
  19. # <h2>About the bus system</h2>
  20. # The bus system's goal is to allow different OpenKore instances to easily
  21. # communicate with each other, and to allow external tools to easily
  22. # communicate with a running OpenKore instance.
  23. #
  24. # The bus is a communication channel which supports broadcast communication
  25. # as well as private communication. One can compare it to an open street:
  26. # anyone can shout a message to everybody (broadcast communication) or
  27. # whisper a message into someone else's ears (private communication).
  28. #
  29. # Furthermore, the bus system is based on discrete <em>messages</em> instead
  30. # of byte streams. See @MODULE(Bus::Messages) for information the message
  31. # format.
  32. #
  33. # This module provides an easy to use interface to interact with the bus
  34. # system.
  35. package Bus::Client;
  36. use strict;
  37. use Time::HiRes qw(time);
  38. use Modules 'register';
  39. use Bus::SimpleClient;
  40. use base qw(Bus::SimpleClient);
  41. use Bus::Server::Starter;
  42. use Bus::Query;
  43. use Bus::DialogMaster;
  44. use Bus::DialogSlave;
  45. use Utils::Exceptions;
  46. use Utils::CallbackList;
  47. # State constants.
  48. use constant {
  49. NOT_CONNECTED   => 1,
  50. STARTING_SERVER => 2,
  51. HANDSHAKING     => 3,
  52. CONNECTED       => 4
  53. };
  54. # Time constants.
  55. use constant {
  56. RECONNECT_INTERVAL => 5,
  57. RESTART_INTERVAL   => 5
  58. };
  59. sub new {
  60. my $class = shift;
  61. my %args = @_;
  62. my $self = bless {}, $class;
  63. $self->{host} = $args{host};
  64. $self->{port} = $args{port};
  65. $self->{userAgent}   = $args{userAgent} || "OpenKore";
  66. $self->{privateOnly} = defined($args{privateOnly}) ? $args{privateOnly} : 0;
  67. # A queue containing messages to be sent next time we're
  68. # connected to the bus.
  69. $self->{sendQueue} = [];
  70. $self->{seq} = 0;
  71. $self->{onMessageReceived} = new CallbackList();
  72. $self->{onDialogRequested} = new CallbackList();
  73. if (!$args{host} && !$args{port}) {
  74. $self->{starter} = new Bus::Server::Starter();
  75. $self->{state} = STARTING_SERVER;
  76. } else {
  77. $self->reconnect();
  78. }
  79. return $self;
  80. }
  81. sub iterate {
  82. my ($self) = @_;
  83. my $state = $self->{state};
  84. if ($state == NOT_CONNECTED) {
  85. if (time - $self->{connectTime} > RECONNECT_INTERVAL) {
  86. $self->reconnect();
  87. }
  88. } elsif ($state == STARTING_SERVER) {
  89. if (time - $self->{startTime} > RESTART_INTERVAL) {
  90. #print "Startingn";
  91. my $starter = $self->{starter};
  92. my $state = $starter->iterate();
  93. if ($state == Bus::Server::Starter::STARTED) {
  94. $self->{state} = HANDSHAKING;
  95. $self->{host}  = $starter->getHost();
  96. $self->{port}  = $starter->getPort();
  97. #print "Bus server started at $self->{host}:$self->{port}n";
  98. $self->reconnect();
  99. $self->{startTime} = time;
  100. } elsif ($state == Bus::Server::Starter::FAILED) {
  101. # Cannot start; try again.
  102. #print "Start failed.n";
  103. $self->{starter} = new Bus::Server::Starter();
  104. $self->{startTime} = time;
  105. }
  106. }
  107. } elsif ($state == HANDSHAKING) {
  108. #print "Handshakingn";
  109. my $ID;
  110. my $args = $self->readNext($ID);
  111. if ($args) {
  112. #print "Sending HELLOn";
  113. $self->{ID} = $args->{yourID};
  114. $self->{client}->send("HELLO", {
  115. userAgent   => $self->{userAgent},
  116. privateOnly => $self->{privateOnly}
  117. });
  118. $self->{state} = CONNECTED;
  119. #print "Connectedn";
  120. }
  121. } elsif ($state == CONNECTED) {
  122. # Send queued messages.
  123. while (@{$self->{sendQueue}} > 0) {
  124. my $message = shift @{$self->{sendQueue}};
  125. last if (!$self->send($message->[0], $message->[1]));
  126. }
  127. if ($self->{state} == CONNECTED) {
  128. my $onMessageReceived = $self->{onMessageReceived};
  129. my $onDialogRequested = $self->{onDialogRequested};
  130. my $mrEmpty = $onMessageReceived->empty();
  131. my $drEmpty = $onDialogRequested->empty();
  132. my $MID;
  133. while (my $args = $self->readNext($MID)) {
  134. if (!$mrEmpty) {
  135. $onMessageReceived->call($self, {
  136. messageID => $MID,
  137. args => $args
  138. });
  139. }
  140. if ($MID eq 'REQUEST_DIALOG') {
  141. my $slave = new Bus::DialogSlave({
  142. bus          => $self,
  143. peerID       => $args->{FROM},
  144. peerDialogID => $args->{dialogID},
  145. seq          => $args->{SEQ}
  146. });
  147. $onDialogRequested->call($self, {
  148. reason => $args->{reason},
  149. args   => $args,
  150. dialog => $slave
  151. }) if (!$drEmpty);
  152. # If there are no callbacks registered for the
  153. # onDialogRequested event, then the DialogSlave
  154. # object will reply with a REFUSED message because
  155. # it's destroyed outside of this scope.
  156. }
  157. }
  158. }
  159. }
  160. return $self->{state};
  161. }
  162. sub getState {
  163. return $_[0]->{state};
  164. }
  165. sub serverHost {
  166. return $_[0]->{host};
  167. }
  168. sub serverPort {
  169. return $_[0]->{port};
  170. }
  171. sub ID {
  172. return $_[0]->{ID};
  173. }
  174. sub reconnect {
  175. my ($self) = @_;
  176. eval {
  177. #print "(Re)connectingn";
  178. $self->{client} = new Bus::SimpleClient($self->{host}, $self->{port});
  179. $self->{state} = HANDSHAKING;
  180. };
  181. if (caught('SocketException')) {
  182. #print "Cannot connect: $@n";
  183. $self->{state} = NOT_CONNECTED;
  184. $self->{connectTime} = time;
  185. } elsif ($@) {
  186. die $@;
  187. }
  188. }
  189. # Handle an I/O exception by reconnecting to the bus or restarting the
  190. # bus server.
  191. sub handleIOException {
  192. my ($self) = @_;
  193. if ($self->{starter}) {
  194. $self->{starter} = new Bus::Server::Starter();
  195. $self->{state} = STARTING_SERVER;
  196. # We add a random delay to prevent clients from starting
  197. # the server at the same time.
  198. $self->{startTime} = time + rand(3);
  199. } else {
  200. $self->{state} = NOT_CONNECTED;
  201. $self->{connectTime} = time + rand(3);
  202. }
  203. }
  204. # Read the next message from the bus, if any. This method returns undef immediately
  205. # when there are no messages.
  206. #
  207. # If the connection with the bus broke while reading the message, then
  208. # undef is returned, and we'll attempt to reconnect (or restart the bus
  209. # server) on the next iteration.
  210. sub readNext {
  211. my ($self, $MID) = @_;
  212. my $args;
  213. eval {
  214. $args = $self->{client}->readNext($MID);
  215. };
  216. if (caught('IOException')) {
  217. #print "Disconnected from IPC server.n";
  218. $self->handleIOException();
  219. return undef;
  220. } elsif ($@) {
  221. die $@;
  222. } else {
  223. return $args;
  224. }
  225. }
  226. ##
  227. # boolean $Bus_Client->send(String messageID, args)
  228. # Returns: Whether the message was successfully sent.
  229. #
  230. # Send a message over the bus.
  231. #
  232. # If the connection with the bus broke while sending the message, then
  233. # the message is placed in a queue, and we'll attempt to reconnect (or
  234. # restart the bus server) on the next iteration. Once reconnected,
  235. # all queued messages will be sent.
  236. #
  237. # If you expect a reply for this message then you should use
  238. # $Bus_Client->query() instead.
  239. sub send {
  240. my ($self, $MID, $args) = @_;
  241. if ($self->{state} == CONNECTED) {
  242. eval {
  243. $self->{client}->send($MID, $args);
  244. };
  245. if (caught('IOException')) {
  246. $self->handleIOException();
  247. push @{$self->{sendQueue}}, [$MID, $args];
  248. return 0;
  249. } elsif ($@) {
  250. die $@;
  251. } else {
  252. return 1;
  253. }
  254. } else {
  255. push @{$self->{sendQueue}}, [$MID, $args];
  256. return 0;
  257. }
  258. }
  259. ##
  260. # Bus::Query $Bus_Client->query(String messageID, [Hash args], [Hash options])
  261. # messageID: The message ID of the message to send.
  262. # args: The arguments for the message.
  263. # options: Extra options for this query.
  264. #
  265. # Send a query message over the bus. The returned Bus::Query object allows you to
  266. # asynchronously check for replies for this message, and to fetch replies.
  267. #
  268. # So sending a query over the bus involves these steps:
  269. # `l
  270. # - Send the query.
  271. # - Use the returned Bus::Query object to periodically check whether replies have
  272. #   been received for this query.
  273. # - Fetch the replies.
  274. # `l`
  275. #
  276. # Here is a simple example:
  277. # <pre class="code">
  278. # # Send the query.
  279. # my $query = $Bus_Client->query("hello", { name => "Joe" },
  280. #                 { timeout => 10, collectAll => 1 });
  281. #
  282. # # Wait until the query is done or has timed out.
  283. # while ($query->getState() == Bus::Query::WAITING) {
  284. #     sleep 1;
  285. # }
  286. #
  287. # if ($query->getState() == Bus::Query::DONE) {
  288. #     while (my ($messageID, $args) = $query->getReply()) {
  289. #         print "We have received a reply!n";
  290. #         # Do something with $messageID and $args...
  291. #     }
  292. #
  293. # } else { # The stat is Bus::Query::TIMEOUT
  294. #     print "10 seconds passed and we still don't have a reply!n";
  295. # }
  296. # </pre>
  297. #
  298. # The following options are allowed:
  299. # `l
  300. # - timeout (float) - The maximum number of seconds to wait for clients to respond to
  301. #       this query. If this reply has been reached, and not a single reply has been
  302. #       received, then the query object's state will be set to Bus::Query::TIMEOUT.
  303. #       But if at least one reply has been received by the time the timeout is reached,
  304. #       then the state will be set to Bus::Query::DONE.<br>
  305. #       The default timeout is 5 seconds.
  306. # - collectAll (boolean) - Set to false if you only want to receive one reply for this query,
  307. #       set to true if you want to receive multiple replies for this query.<br>
  308. #       If collectAll is false, and a reply has been received (within the timeout), then
  309. #       the Bus::Query object's state is immediately set to Bus::Query::DONE.<br>
  310. #       If collectAll is true, then the query's state will stay at Bus::Query::WAITING
  311. #       until the timeout has been reached. Once the timeout has been reached, the
  312. #       state will be set to Bus::Query::DONE (if there are replies) or
  313. #       Bus::Query::TIMEOUT (if there are no replies).
  314. # `l`
  315. #
  316. # If the connection with the bus broke while sending the message, then
  317. # the message is placed in a queue, and we'll attempt to reconnect (or
  318. # restart the bus server) on the next iteration. Once reconnected,
  319. # all queued messages will be sent.
  320. sub query {
  321. my ($self, $MID, $args, $options) = @_;
  322. my %params = (
  323. bus  => $self,
  324. seq  => $self->{seq},
  325. messageID => $MID,
  326. args => $args
  327. );
  328. if ($options) {
  329. while (my ($key, $value) = each %{$options}) {
  330. $params{$key} = $value;
  331. }
  332. }
  333. my %params2 = ($args) ? (%{$args}) : ();
  334. $params2{SEQ} = $self->{seq};
  335. $self->send($MID, %params2);
  336. $self->{seq} = ($self->{seq} + 1) % 4294967295;
  337. return new Bus::Query(%params);
  338. }
  339. # requestDialog(Bytes clientID, String reason, args, Hash options)
  340. sub requestDialog {
  341. my ($self, $clientID, $reason, $args, $options) = @_;
  342. $options ||= {};
  343. return new Bus::DialogMaster({
  344. bus => $self,
  345. peerID => $clientID,
  346. reason => $reason,
  347. args   => $args || {},
  348. timeout => $options->{timeout}
  349. });
  350. }
  351. ##
  352. # CallbackList $Bus_Client->onMessageReceived()
  353. #
  354. # This event is triggered when a message has been received from the bus.
  355. # The event argument is a hash, containing these two items:
  356. # `l
  357. # - messageID (String): The message ID.
  358. # - args (Hash): The message arguments.
  359. # `l`
  360. sub onMessageReceived {
  361. return $_[0]->{onMessageReceived};
  362. }
  363. sub onDialogRequested {
  364. return $_[0]->{onDialogRequested};
  365. }
  366. 1;