Proca.Server.Plumbing (proca v3.3.1)

Plumbing server is responsible for setting up the signature processing in Proca.

How does this work?

Proca uses RabbitMQ queues and Broadway to process signatures, while updating the record to mark that it was processed in each stage.

Stages:

Proca.Supporter is always connected to one or more Proca.Actions, but in case of confirming we first confirm the supporter, and then we confirm action.

  1. Confirmation stage. Entities are supposed only to be confirmed by one mechanism, so they land in one queue (because of routing key design)

    a. Supporter - confirm if needed(double-opt-in) b. Action - confirm if needed(moderation of types)

  2. Delivery stage. Entities are processed in parallel by many delivery mechanisms, and are copied to many queues as needed (routing keys with wildcards)

    a. Action only, but with supporter info.

Routing:

The routing keys have such structure in both confirm and deliver exchange:

org . custom-or-system . type
       ^                   ^
       |                   `-- supporter or action
       --- system if proca processes
            custom if some other system reads from custom queue and GETs callbacks

Data:

  1. Supporter
  • Contains campaign and action_page data/reference
  • Supporter (encrypted, with personalisation data)
  1. Action
  • Contains campaign and action_page data/reference
  • Action with type, fields

Queues:

             ___ *.system.supporter        -> system.email.confirm (double opt in)
            /
confirm ---*---- ORG_NAME.custom.supporter -> ORG_NAME.confirm
            \___ ORG_NAME.custom.action    -> ORG_NAME.moderate


             ___ *.system.* -> system.email.thankyou
            /
deliver ---*---- ORG_NAME.*.* -> ORG_NAME.crm
            \____ ORG1,ORG2,ORG3.system.* -> system.sqs

Some queues will by read by external consumer, actually all the ORG* queues. They need full data. OTOH, system queues will be able to deal with a simple set of action_ids etc, with AP id and Org id to help batching these messages.

XXX maybe system.email.confirm can be merged with system.email.thankyou ?

Retry queues:

We use an exchange loop for implementing retries:

[queue_name] with dlx set to -> (system.fail rk=queue_name)

(system.fail) - # -> [system.retry ttl=30sec] with dlx set to -> (system.retry)
````

Link to this section Summary

Link to this section Functions

Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

connection_url()

Link to this function

create_crm_queue(connection, org)

Link to this function

create_org_queue(connection, org, arg)

Link to this function

drop_crm_queue(connection, org)

Link to this function

drop_org_queue(connection, org, arg)

Link to this function

push(exchange, routing_key, data)

Specs

push(String.t(), String.t(), map()) :: :ok | :error
Link to this function

setup_exchanges(connection)

Create exchanges for two stages of processing: confirm queue where data is confirmed, and then delivery queue where data can be processed further.

Link to this function

setup_global_queues(connection)

Link to this function

setup_org_queues(connection)

Link to this function

setup_org_queues(connection, org)

Org queues:

  • standard
  • specific: crm
Link to this function

setup_queue(chan, arg)

Link to this function

setup_queues(connection, queue_defs)

Link to this function

start_link(url)

Link to this function

with_chan(connection, f)