Proca.Pipes.Topology (proca v3.4.1)

Topology of processing queue setup in RabbitMQ.

Each Org has its own Topology server and set of exchanges/queues. Processing load and problems are isolated for each org.

This Topology service will reconfigure the excahnges and queues when notified by Proca.Server.Notify updated, created and deleted. These notifications are sent by the API layer when respective event occurs. They are not sent by operations on Proca.Org directly.

Properties:

  • 3 exchanges reflect 3 stages of processing (supporter confirms their data, moderator confirms the action, action is delivered)
  • Each exchange has build in worker queues attached, if workers are enabled. Worker queues are read by Proca workers.
  • Each exchange has custom queue attached, if enabled on Org. Custom queues are meant to be read by external client.
  • Worker and custom queues have a Dead Letter Exchange attached (DLX), so unprocessed messages are temporarily stored there, so they do not clog the processing. They come back after 30 seconds. [Improvement: change this timeout when the retry queue gets bigger]
  • When data is shared with your org by other org, you only receive action onto deliver exchange.
  • Routing key is: ${campaign}.${action_type}, eg. no_to_gmo.share
  • The action format is v1 or v2, depending on org.action_schema_version. Defaults to 2 for new Orgs.
Action Routing Key: campaign.action_type

EXCHANGE                 MATCH  QUEUE

x org.N.confirm.supporter  #  > =wrk.N.email.supporter
                           #  > =cus.N.confirm.supporter

x org.N.confirm.action     #  > =wrk.N.email.confirm [*]
                           #  > =cus.N.confirm.action

x org.N.deliver
                        #     > =wrk.N.email.supporter
                        #     > =wrk.N.sqs              -> proca-gw
                        #     > =cus.N.deliver
                        #     > =wrk.N.webhook
                        #     > =wrk.N.sqs

                                  DLX:x org.N.fail fanout> org.N.fail
                                  DLX:x org.N.retry direct:$qn-> =$qn

Event Routing Key: event_type.sub_type

x org.N.event      # > =wrk.N.webhook
                   # > =wrk.N.sqs
                   # > =cus.N.deliver

[*] - not yet implemented

Enabled Queues.

  • Custom queues are enabled by flags on Org (boolean columns):

    • custom_supporter_confirm enables cus.N.supporter.confirm
    • custom_action_confirm enables cus.N.action.confirm
    • custom_action_deliver enables cus.N.deliver
  • Worker queues, are enabled by flags on Org (boolean columns):

    • email.supporter sends double-opt-in email when: supporter_confirm is true. Org must have email/template backends set. The email will be set if email_supporter_template is set on org or action page.
    • email.supporter sends thank you emails when Org has email/template backends set. The worker will send emails if ActionPage.thank_you_template refers to template identifier in the backend.
    • sqs - sends action data to AWS SQS
    • webhook -sends action data to Webhook backend

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Name of queue for custom use (usually name is stage name)

Name of queue to which a worker is attached (like for email, SQS)

Exchange name for an org, name is exchange name (stage name org fail, retry)

Link to this section Functions

Link to this function

bind_queue(chan, arg)

Link to this function

broadway_producer(o, work_type)

Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

configuration(o)

Name of queue for custom use (usually name is stage name)

Link to this function

declare_custom_queues(chan, o, config)

Link to this function

declare_exchanges(chan, o)

Link to this function

declare_retry_circuit(chan, o)

Link to this function

declare_retrying_queue(chan, o, arg)

Link to this function

declare_worker_queues(chan, o, config)

Link to this function

retry_queue_arguments(o, queue_name)

Link to this function

start_link(org)

Name of queue to which a worker is attached (like for email, SQS)

Exchange name for an org, name is exchange name (stage name org fail, retry)