Proca.Stage.Action (proca v3.3.1)

Processing

Producers:

  • Main action memory queue
    • added by process_async()

  • Process old:
    • blocks
    • returns batch of oldest actions to process
    • when no actions, sleep (is this necessary?)

Processing:

  • do first part of processing,

  • if this is normal processing, batch: just process and batch :default

  • if this is lookup processing, batch :detail_lookup

  • partition by org_id

Batcher:

  • default Enum.each process
  • detail_lookup: Enum.each lookup and send over to main action memory queue

Link to this section Summary

Link to this section Functions

Link to this function

ack(atom, successful, failed)

Store!

Link to this function

action_page_id(arg1)

Specs

action_page_id(%{
  :__struct__ => Proca.Action | Proca.Stage.Processing,
  optional(any()) => any()
}) :: any()

Can fail due to some AMQP publish failure

Link to this function

lookup_all(msgs)

Specs

lookup_all([
  %Broadway.Message{
    acknowledger: term(),
    batch_key: term(),
    batch_mode: term(),
    batcher: term(),
    data: term(),
    metadata: term(),
    status: term()
  }
]) :: [
  %Broadway.Message{
    acknowledger: term(),
    batch_key: term(),
    batch_mode: term(),
    batcher: term(),
    data: term(),
    metadata: term(),
    status: term()
  }
]

Can fail due to lookup failure

Link to this function

process_all(msgs)

Link to this function

start_link(opts)

Link to this function

supporter_id(arg1)

Specs

supporter_id(%{
  :__struct__ => Proca.Action | Proca.Stage.Processing,
  optional(any()) => any()
}) :: any()
Link to this function

to_message(action, opt)