Proca.Pipes.Topology (proca v3.0.2)

Topology of processing queue setup in RabbitMQ.

Each Org has its own Topology server and set of exchanges/queues. Processing load and problems are isolated for each org.

This Topology service will reconfigure the excahnges and queues when notified by Proca.Server.Notify org_updated, org_created and org_deleted. These notifications are sent by the API layer when respective event occurs. They are not sent by operations on Proca.Org directly.

(previously responsibility of Proca.Server.Plumbing)

Properties:

  • 3 exchanges reflect 3 stages of processing (supporter confirms their data, moderator confirms the action, action is delivered)
  • Each exchange has build in worker queues attached, if workers are enabled. Worker queues are read by Proca workers.
  • Each exchange has custom queue attached, if enabled on Org. Custom queues are meant to be read by external client.
  • Worker and custom queues have a Dead Letter Exchange attached (DLX), so unprocessed messages are temporarily stored there, so they do not clog the processing. They come back after 30 seconds. [Improvement: change this timeout when the retry queue gets bigger]
  • When data is shared with your org by other org, you only receive action onto deliver exchange.
  • Routing key is: ${campaign}.${action_type}, eg. no_to_gmo.share
  • The action format is v1 or v2, depending on org.action_schema_version. Defaults to 2 for new Orgs.
Action Routing Key: campaign.action_type

EXCHANGE                 MATCH  QUEUE

x org.N.confirm.supporter  #  > =wrk.N.email.supporter
                           #  > =cus.N.confirm.supporter

x org.N.confirm.action     #  > =wrk.N.email.confirm [*]
                           #  > =cus.N.confirm.action

x org.N.deliver         *.mtt > =wrk.N.email.mtt [*]
                        #     > =wrk.N.email.supporter
                        #     > =wrk.N.sqs              -> proca-gw
                              > =wrk.N.http  [*]        -> proca-gw
                              > =cus.N.deliver

                                  DLX:x org.N.fail fanout> org.N.fail
                                  DLX:x org.N.retry direct:$qn-> =$qn

Event Routing Key: event_type.sub_type

x org.N.event      # > =wrk.N.event.webhook
                   # > =cus.N.event

[*] - not yet implemented

Enabled Queues.

  • Custom queues are enabled by flags on Org (boolean columns):

    • custom_supporter_confirm enables cus.N.supporter.confirm
    • custom_action_confirm enables cus.N.action.confirm
    • custom_action_deliver enables cus.N.deliver
  • Worker queues, are enabled by flags on Org (boolean columns):

    • system_sqs_deliver sends to SQS (SQS service must be configured)
    • email.supporter sends double-opt-in email when: email_opt_in is TRUE, and email_opt_in_template is set on Org. Org must have email/template backends set.
    • email.supporter sends thank you emails when Org has email/template backends set. The worker will send emails if ActionPage.thank_you_tempalte_ref refers to template identifier in the backend.

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Name of queue for custom use (usually name is stage name)

Callback implementation for GenServer.init/1.

Name of queue to which a worker is attached (like for email, SQS)

Exchange name for an org, name is exchange name (stage name org fail, retry)

Link to this section Functions

Link to this function

broadway_producer(o, work_type)

Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Name of queue for custom use (usually name is stage name)

Link to this function

declare_custom_queues(chan, o)

Link to this function

declare_exchanges(chan, o)

Link to this function

declare_retry_circuit(chan, o)

Link to this function

declare_retrying_queue(chan, o, arg)

Link to this function

declare_worker_queues(chan, o)

Callback implementation for GenServer.init/1.

Link to this function

retry_queue_arguments(o, queue_name)

Link to this function

start_link(org)

Name of queue to which a worker is attached (like for email, SQS)

Exchange name for an org, name is exchange name (stage name org fail, retry)