Proca.Server.Plumbing (proca v3.0.2)

Plumbing server is responsible for setting up the signature processing in Proca.

How does this work?

Proca uses RabbitMQ queues and Broadway to process signatures, while updating the record to mark that it was processed in each stage.


Proca.Supporter is always connected to one or more Proca.Actions, but in case of confirming we first confirm the supporter, and then we confirm action.

  1. Confirmation stage. Entities are supposed only to be confirmed by one mechanism, so they land in one queue (because of routing key design)

    a. Supporter - confirm if needed(double-opt-in) b. Action - confirm if needed(moderation of types)

  2. Delivery stage. Entities are processed in parallel by many delivery mechanisms, and are copied to many queues as needed (routing keys with wildcards)

    a. Action only, but with supporter info.


The routing keys have such structure in both confirm and deliver exchange:

org . custom-or-system . type
       ^                   ^
       |                   `-- supporter or action
       --- system if proca processes
            custom if some other system reads from custom queue and GETs callbacks


  1. Supporter
  • Contains campaign and action_page data/reference
  • Supporter (encrypted, with personalisation data)
  1. Action
  • Contains campaign and action_page data/reference
  • Action with type, fields


             ___ *.system.supporter        -> (double opt in)
confirm ---*---- ORG_NAME.custom.supporter -> ORG_NAME.confirm
            \___ ORG_NAME.custom.action    -> ORG_NAME.moderate

             ___ *.system.* ->
deliver ---*---- ORG_NAME.*.* -> ORG_NAME.crm
            \____ ORG1,ORG2,ORG3.system.* -> system.sqs

Some queues will by read by external consumer, actually all the ORG* queues. They need full data. OTOH, system queues will be able to deal with a simple set of action_ids etc, with AP id and Org id to help batching these messages.

XXX maybe can be merged with ?

Retry queues:

We use an exchange loop for implementing retries:

[queue_name] with dlx set to -> ( rk=queue_name)

( - # -> [system.retry ttl=30sec] with dlx set to -> (system.retry)

Link to this section Summary

Link to this section Functions

Link to this function


Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function


Link to this function

create_crm_queue(connection, org)

Link to this function

create_org_queue(connection, org, arg)

Link to this function

drop_crm_queue(connection, org)

Link to this function

drop_org_queue(connection, org, arg)

Link to this function

push(exchange, routing_key, data)


push(String.t(), String.t(), map()) :: :ok | :error
Link to this function


Create exchanges for two stages of processing: confirm queue where data is confirmed, and then delivery queue where data can be processed further.

Link to this function


Link to this function


Link to this function

setup_org_queues(connection, org)

Org queues:

  • standard
  • specific: crm
Link to this function

setup_queue(chan, arg)

Link to this function

setup_queues(connection, queue_defs)

Link to this function


Link to this function

with_chan(connection, f)