Skip to content

Middleware

Wayfarer processes tasks through a middleware chain, an ordered pipeline of middleware classes. Each middleware receives a task, can modify it, and yields to the next middleware in the chain. If a middleware doesn't yield, the chain halts.

A middleware is a class that implements a call method. It receives a task argument and must yield to continue the middleware chain.

class MyCustomMiddleware
  def call(task)
    # Pre-processing: optionally set task metadata
    task[:started_at] = Time.now.utc

    # Yield to continue the chain (or not)
    # Downstream middleware can access `task[:started_at]`
    yield if block_given?

    # Post-processing
  end
end

Task metadata #task[]= is ephemeral

Anything you assign to a task at runtime isn't serialized to the message queue. For example in the case of a retry, no task metadata can be restored.

A DependencyGraph manages the chain ordering by resolving before: and after: constraints between middlewares.

Default middleware chain

Jobs that include Wayfarer::Base get a default chain of 10 middlewares:

  1. Redis: connects to Redis
  2. UriParser: parses the task URL
  3. Normalize: normalizes the URL
  4. Dedup: deduplicates tasks
  5. BatchCompletion: tracks batch progress
  6. Stage: collects staged URLs
  7. Router: matches the URL to an action
  8. UserAgent: retrieves the page
  9. ContentType: filters by Content-Type
  10. Dispatch: calls the routed action

Handlers that include Wayfarer::Handler get a shorter chain:

  1. ContentType
  2. Router
  3. Dispatch

Wayfarer.config[:middleware][:base] and Wayfarer.config[:middleware][:handler] declare these defaults.

Adding middleware

Use the use class method to add a middleware to a job's chain. You can declare ordering constraints with before: and after::

class MyJob < ActiveJob::Base
  include Wayfarer::Base

  use MyCustomMiddleware, after: [Wayfarer::Middleware::Router]
end

Auto-registration

If you reference a middleware in before: or after: that hasn't been explicitly added, it's registered automatically with no constraints.

Removing middleware

Use middleware.remove to remove a middleware from the chain. All before: and after: references to the removed middleware are automatically cleaned up.

class NoDedupJob < ActiveJob::Base
  include Wayfarer::Base

  self.middleware = middleware.remove(Wayfarer::Middleware::Dedup)
end

Replacing middleware

Use middleware.replace to swap a middleware. The replacement inherits the exact same before: and after: constraints. The method updates all references to the old middleware in other entries to point to the replacement.

class TestJob < ActiveJob::Base
  include Wayfarer::Base

  self.middleware = middleware.replace(
    Wayfarer::Middleware::Redis,
    MyInMemoryState
  )
end

replace inherits constraints

If you need different constraints for the new middleware, use remove followed by add instead.

The dependency graph

The dependency graph is immutable. Every operation (add, remove, replace) returns a new instance which is why you always assign the result back:

self.middleware = middleware.add(MyMiddleware, after: [Wayfarer::Middleware::Router])
self.middleware = middleware.remove(Wayfarer::Middleware::Dedup)
self.middleware = middleware.replace(OldMiddleware, NewMiddleware)

Circular dependencies raise a Wayfarer::Middleware::DependencyGraph::CyclicDependencyError.

Modifying the global default

You can modify the default middleware chain globally in an initializer. Any job class defined after this point uses the modified default:

# config/initializers/wayfarer.rb
Wayfarer.config[:middleware][:base] = Wayfarer.config[:middleware][:base]
  .add(MyGlobalMiddleware, after: [Wayfarer::Middleware::Router])