User Tools


Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
en:devel:corebos_mqtm [2017/04/27 23:42]
joebordes
en:devel:corebos_mqtm [2020/12/13 14:08] (current)
joebordes [Task Manager]
Line 33: Line 33:
 Let's explain how coreBOS implements the Message Queue in the database. Let's explain how coreBOS implements the Message Queue in the database.
  
 +==== sendMessage ====
 In order to send a message we need to define these parameters: In order to send a message we need to define these parameters:
- 1. channel 
- 2. Producer 
- 3. Consumer 
- 4. Type 
- 5. Share 
- 6. Sequence 
- 7. Expires 
- 8. Deliver after 
- 9. User 
-10. Message ​ 
  
-Channel + ​1.- ​Channel\\ 
-The channel can be any string that both producer and consumer agree upon. The producer will write messages on the channel and the consumer will read (and delete) messages from the channel+ 2.- Producer\\ 
 + 3.- Consumer\\ 
 + 4.- Type\\ 
 + 5.- Share\\ 
 + 6.- Sequence\\ 
 + 7.- Expires\\ 
 + 8.- Deliver after\\ 
 + 9.- User\\ 
 +10.- Message\\ 
 + 
 +=== Channel === 
 + 
 +The channel can be any string that both producer and consumer agree upon. The producer will write messages on the channel and the consumer will read (and delete) messages from the channel
 There are some special reserved channels by coreBOS which cannot be used freely. There are some special reserved channels by coreBOS which cannot be used freely.
  
-Producer +=== Producer ​=== 
-The producer is a string uniquely identifying the creator of the message+ 
 +The producer is a string uniquely identifying the creator of the message
 + 
 +=== Consumer === 
 + 
 +The consumer is a string uniquely identifying the desired receptor of the message.
  
-Consumer +=== Type ===
-The consumer is a string uniwuely identifying the desired receptor of the message+
  
-Type 
 Type is a string describing the contents or goal of the message. Type is a string describing the contents or goal of the message.
-The MQ itself does not use this value for anything. Some values that are currently being used for some implementations are 
-Command, Data, Event, Request,... 
  
-Share+The Message Queue itself does not use this value for anything. Some values that are currently being used for some implementations are: 
 + 
 +Command, Data, Event, Request, ... 
 + 
 +=== Share === 
 Is a string which accepts two values Is a string which accepts two values
 +
 1:M and PS 1:M and PS
 +
 When a message is sent to the queue with share set to 1:M it will be be written only once on the queue so it is consumed only by the intended consumer. When a message is sent to the queue with share set to 1:M it will be be written only once on the queue so it is consumed only by the intended consumer.
-When a message is sent to the queue with share set to PS, the message will be written once per subscriber effectively producing a mass distribution message system. For example, if two or more processes want to listen to the messages of another process they can subscribe to the channel. When the producer emits a message with share PS all subscribers will get the message. 
  
-Sequence+When a message is sent to the queue with share set to PS, the message will be written once per subscriber, effectively producing a mass distribution messaging system. For example, if two or more processes want to listen to the messages of another process they can subscribe to the channel. When that producer emits a message with share PS all subscribers will get the message. 
 + 
 +=== Sequence ​=== 
 This is an integer that represents the sequence of this message from the producer. This is an integer that represents the sequence of this message from the producer.
-The only use the MQ has of this value is to sort the messages by it if there are more than one message waiting, but the MQ does not guarantee order in the message delivery so any real sequencing of messages must be done by the consumer. 
  
-Expires +The only use the message queue has of this value is to sort the messages by it if there are more than one message waiting, but the message queue does not guarantee order in the message delivery so any real sequencing of messages must be done by the consumer. 
-This is the amount of seconds that the message can stay on the queue. ​Part this time the message will be moved to the cbInvalid channel and marked as invalid.+ 
 +=== Expires ​=== 
 + 
 +This is the amount of seconds that the message can stay on the queue. ​After this time the message will be moved to the **cbInvalid channel** and marked as **invalid**. 
 + 
 +=== Deliver After === 
 + 
 +Messages can be deferred to be sent in the future with this value. The message will be accepted and put on the queue, the expire time will be set to the number of seconds given added to the deliver after time.
  
-Deliver after 
-Messages can be deferred to be sent in the future with this value. The message will be accepted and put on he queue, the expire time will be set to the number of seconds given added to the deliver time 
 The message will be totally ignored until the established time to deliver is reached. The message will be totally ignored until the established time to deliver is reached.
  
-User +=== User ===
-The user that is sending the message+
  
-Message +The user that is sending the message. Currently not being used.
-The message itself ​is stored and returned ass string,so you can serialized json encode any object+
  
 +=== Message ===
  
-Getmessage +The message ​itself is stored ​and returned ​as a string, so you can serialize or json encode any object.
-This method will ask the queue manager for a message ​on a channel by a given producer ​and identifying itself ​as a unique consumerIf a message exists it is returned and eliminated from the queue. If no message exists false is returned+
  
-Rejectmessage +==== getMessage ====
-Will out a message on the cbInvalid channel marked as invalid+
  
-Ismessagewaiting +This method will ask the queue manager for a message on a channel by a given producer and identifying itself as a unique consumer. If a message exists it is returned and eliminated from the queue. If no message exists false is returned. 
-Returns true if there is a message waiting on the given channel by the producer for the consumer+ 
 +==== rejectMessage ==== 
 + 
 +Will put a message on the cbInvalid channel marked as invalid. 
 + 
 +==== isMessageWaiting ==== 
 + 
 +Returns true if there is a message waiting on the given channel by the producer for the consumer
 + 
 +==== Subscribe and Unsubscribe ====
  
-Subscribe and unsubscribe ​ 
 Will register or unregister a listener on a channel for a producer and will define which task must be executed when a message arrives. This task can be defined in one of two ways: as a file name and a function or as a file name, a class name and a method. Will register or unregister a listener on a channel for a producer and will define which task must be executed when a message arrives. This task can be defined in one of two ways: as a file name and a function or as a file name, a class name and a method.
  
-Task Manager +=====Task Manager===== 
-In conjunction with the MQ, the task manager is capable of processing the queue and launching tasks. + 
-I will describe how this works for the database implementation natively included in coreBOS but note, once again, that this could be implemented ​usingany ​of the many existing ​message queue solutions out there like RabbitMQ, ​Celery,....+In conjunction with the Message Queue, the task manager is capable of processing the queue and launching tasks. 
 + 
 +I will describe how this works for the database implementation natively included in coreBOS but note, once again, that this could be implemented ​using any of the many existing solutions out there like [[http://​www.celeryproject.org/​|Celery]][[http://​gearman.org/​|Gearman]], ​...
  
 The default task manager in coreBOS is incredibly simple, consisting of an infinite loop that looks for messages on the queue for all subscribers​ and launches the defined task when one arrives. The default task manager in coreBOS is incredibly simple, consisting of an infinite loop that looks for messages on the queue for all subscribers​ and launches the defined task when one arrives.
-On every iteration of the loop it checks and expires messages accordingly. 
-This is implemented using the LAS PHPDaemon (thanks!!) 
  
-Examples +The task manager is an independent process that must be launched or initiated manually with the "​run"​ command like this: 
-Integrations + 
-The messaging system is ideal for this type of work. We created an integration with HubSpot and ActOn using this philosophy. The main idea is to implement an event handler in coreBOS that catches ​creationz, modifications and  deletes on records in the system and sends a message to the queue with the event and all the relevant information.+<​code>​ 
 +cd your_corebos_install_top_directory 
 +php include/​cbmqtm/​run.php -d 
 +</​code>​ 
 + 
 +This will run forever in an infinite loop that, on every iteration of the loop will check and expires messages accordingly. If a message must be delivered it will launch the indicated tasks for all subscribers so they can consume the messages. 
 + 
 +This is implemented using the OSS [[https://​github.com/​shaneharter/​PHP-Daemon|PHP-Daemon]] project (thanks!!) 
 + 
 +=====Examples ​and Ideas===== 
 + 
 +====Integrations==== 
 + 
 +The messaging system is ideal for this type of work. We created an integration with [[https://​www.hubspot.com/​|HubSpot]] and [[https://​www.act-on.com/​|Act-On]] ​using this philosophy. The main idea is to implement an event handler in coreBOS that catches ​creations, modifications and deletes on records in the system and sends a message to the queue with the event and all the relevant information. 
 The task manager sees the message and launches, in background, a process, to actually send the information and keep it in synchronization. The task manager sees the message and launches, in background, a process, to actually send the information and keep it in synchronization.
  
-Polling +====Polling==== 
-Polling is not a very optimal way of solving business requirements,​ specially in  the current ​fab of eventing appearing everywhere, but sometimes it is needed.+ 
 +Polling is not a very optimal way of solving business requirements,​ specially in the current ​trend of eventing appearing everywhere, but sometimes it is needed.
  
 It turns out that accomplishing this with coreBOS message queue and task manager is a trivial setup. It turns out that accomplishing this with coreBOS message queue and task manager is a trivial setup.
  
 It is as simple as sending a message to be delivered at the next poll beat. The task manager will wake up the process which will do its work and then send itself another message to be delivered at the next launch time (poll frequency). It is as simple as sending a message to be delivered at the next poll beat. The task manager will wake up the process which will do its work and then send itself another message to be delivered at the next launch time (poll frequency).
-This is really powerful because it makes it extremely easy to implement different wake up scheduled depending on the business needs . For example, if we get the request to not poll on the weekend we simply set the deliver after date to Monday morning on the last Friday launch. 
  
-Feedback for long running processes+This is really powerful because it makes it extremely easy to implement different wake up scheduled depending on the business needs. For example, if we get the request to not poll on the weekend we simply set the deliver after date to Monday morning on the last Friday launch. 
 + 
 +====Feedback for long running processes==== 
 + 
 +<WRAP center round info 60%>
 This is a future development which I personally want to implement. This is a future development which I personally want to implement.
-Mass editing can be a very long running process. Sending a request to modify a couple of fields on 1000 records can take a long time. Currently, coreBOS sends this request and waits for the process to end. This can produce timeouts in the browser or,worse on the webserver. +</​WRAP>​ 
-So,the idea is to modify mass edit to send a message with the work to be done. Then the task manager will do the work in background so there is no timeout issues and  no blocking in the browser. The background task will report its progress as messages on the queue which will be consumed by a browser process that reads the messages and informs of the progress on screen. This will probably be implemented using server side events and service workers.+ 
 +Mass editing can be a very long running process. Sending a request to modify a couple of fields on 1000 records can take a long time. Currently, coreBOS sends this request and waits for the process to end. This can produce timeouts in the browser or, worseon the webserver. 
 + 
 +So,the idea is to modify mass edit to send a message with the work to be done. Then the task manager will do the work in background so there is no timeout issues and no blocking in the browser. The background task will report its progress as messages on the queue which will be consumed by a browser process that reads the messages and informs of the progress on screen. This will probably be implemented using [[en:​devel:​corebos_sse|server side events]] and service workers
 + 
 +====Logging==== 
 + 
 +Send log messages to the queue and have some process read them and send them to a specialized platform like [[https://​www.elastic.co/​|Elastic]] 
 + 
 +=====Comparing with Martin Fowler'​s proposal===== 
 + 
 +I would like to end this description with a quick comparison to the global view defined by Martin Fowler: 
 + 
 +{{ :​en:​devel:​mfmq.png |Martin Fowler'​s Message Queue}} 
 + 
 +The **Message Channel** and the **Message** itself are the parameters on each of the calls of each method.
  
-Logging +**Pipes, Filters, Routers and Translators** do not exist in the default implementation but they all could be easily created as tasks to push the message along the queue to it's destination.
-Send log messages ​to the queue and have some process read them and send them to aspecialized platform like Elastic+
  
-Comparing with Martin Fowelr proposal+The **Message Endpoint** is simply the getMessage method.
  
-would like to end this description with a quick comparison to the global view defined by Martin Fowler+<WRAP center round tip 60%> 
 +look forward ​to seeing what you construct with this powerful new feature. 
 +</​WRAP>​
  
-Image 
  
-The channel is the parameter definition on EA h of the calls as is the message itself. +=====References=====
-Pipes, filters, router, translations do not exist in the default implementation but they all could be easily created to push the message along the queue to it's destination. +
-Theendpoint is simply the getmessage method ​+
  
-I look forward to seeing what you condtruct with this powerful new feature.+  * [[http://​queues.io/​|Queues]] 
 +  * [[https://​martinfowler.com/​|Martin Fowler]] 
 +  * [[https://​martinfowler.com/​books/​eip.html|Enterprise Integration Patterns]]