Proca.Server.Plumbing (proca v3.3.1)
Plumbing server is responsible for setting up the signature processing in Proca.
How does this work?
Proca uses RabbitMQ queues and Broadway to process signatures, while updating the record to mark that it was processed in each stage.
Stages:
Proca.Supporter
is always connected to one or more Proca.Action
s, but in
case of confirming we first confirm the supporter, and then we confirm action.
Confirmation stage. Entities are supposed only to be confirmed by one mechanism, so they land in one queue (because of routing key design)
a. Supporter - confirm if needed(double-opt-in) b. Action - confirm if needed(moderation of types)
Delivery stage. Entities are processed in parallel by many delivery mechanisms, and are copied to many queues as needed (routing keys with wildcards)
a. Action only, but with supporter info.
Routing:
The routing keys have such structure in both confirm and deliver exchange:
org . custom-or-system . type
^ ^
| `-- supporter or action
--- system if proca processes
custom if some other system reads from custom queue and GETs callbacks
Data:
- Supporter
- Contains campaign and action_page data/reference
- Supporter (encrypted, with personalisation data)
- Action
- Contains campaign and action_page data/reference
- Action with type, fields
Queues:
___ *.system.supporter -> system.email.confirm (double opt in)
/
confirm ---*---- ORG_NAME.custom.supporter -> ORG_NAME.confirm
\___ ORG_NAME.custom.action -> ORG_NAME.moderate
___ *.system.* -> system.email.thankyou
/
deliver ---*---- ORG_NAME.*.* -> ORG_NAME.crm
\____ ORG1,ORG2,ORG3.system.* -> system.sqs
Some queues will by read by external consumer, actually all the ORG* queues. They need full data. OTOH, system queues will be able to deal with a simple set of action_ids etc, with AP id and Org id to help batching these messages.
XXX maybe system.email.confirm can be merged with system.email.thankyou ?
Retry queues:
We use an exchange loop for implementing retries:
[queue_name] with dlx set to -> (system.fail rk=queue_name)
(system.fail) - # -> [system.retry ttl=30sec] with dlx set to -> (system.retry)
````
Link to this section Summary
Functions
Returns a specification to start this module under a supervisor.
Create exchanges for two stages of processing: confirm queue where data is confirmed, and then delivery queue where data can be processed further.
Org queues
Link to this section Functions
child_spec(init_arg)
Returns a specification to start this module under a supervisor.
See Supervisor
.
connection()
connection_url()
create_crm_queue(connection, org)
create_org_queue(connection, org, arg)
drop_crm_queue(connection, org)
drop_org_queue(connection, org, arg)
push(exchange, routing_key, data)
Specs
setup()
setup_exchanges(connection)
Create exchanges for two stages of processing: confirm queue where data is confirmed, and then delivery queue where data can be processed further.
setup_global_queues(connection)
setup_org_queues(connection)
setup_org_queues(connection, org)
Org queues:
- standard
- specific: crm