Middleware
Wayfarer processes tasks through a middleware chain, an ordered pipeline of middleware classes. Each middleware receives a task, can modify it, and yields to the next middleware in the chain. If a middleware doesn't yield, the chain halts.
A middleware is a class that implements a call method. It receives a task
argument and must yield to continue the middleware chain.
class MyCustomMiddleware
def call(task)
# Pre-processing: optionally set task metadata
task[:started_at] = Time.now.utc
# Yield to continue the chain (or not)
# Downstream middleware can access `task[:started_at]`
yield if block_given?
# Post-processing
end
end
Task metadata #task[]= is ephemeral
Anything you assign to a task at runtime isn't serialized to the message queue. For example in the case of a retry, no task metadata can be restored.
A DependencyGraph manages the chain ordering by resolving before: and
after: constraints between middlewares.
Default middleware chain
Jobs that include Wayfarer::Base get a default chain of 10 middlewares:
Redis: connects to RedisUriParser: parses the task URLNormalize: normalizes the URLDedup: deduplicates tasksBatchCompletion: tracks batch progressStage: collects staged URLsRouter: matches the URL to an actionUserAgent: retrieves the pageContentType: filters by Content-TypeDispatch: calls the routed action
Handlers that include Wayfarer::Handler get a shorter chain:
ContentTypeRouterDispatch
Wayfarer.config[:middleware][:base] and Wayfarer.config[:middleware][:handler] declare these defaults.
Adding middleware
Use the use class method to add a middleware to a job's chain. You can
declare ordering constraints with before: and after::
class MyJob < ActiveJob::Base
include Wayfarer::Base
use MyCustomMiddleware, after: [Wayfarer::Middleware::Router]
end
Auto-registration
If you reference a middleware in before: or after: that hasn't been
explicitly added, it's registered automatically with no constraints.
Removing middleware
Use middleware.remove to remove a middleware from the chain. All
before: and after: references to the removed middleware are automatically
cleaned up.
class NoDedupJob < ActiveJob::Base
include Wayfarer::Base
self.middleware = middleware.remove(Wayfarer::Middleware::Dedup)
end
Replacing middleware
Use middleware.replace to swap a middleware. The replacement inherits
the exact same before: and after: constraints. The method updates all
references to the old middleware in other entries to point to the replacement.
class TestJob < ActiveJob::Base
include Wayfarer::Base
self.middleware = middleware.replace(
Wayfarer::Middleware::Redis,
MyInMemoryState
)
end
replace inherits constraints
If you need different constraints for the new middleware, use remove
followed by add instead.
The dependency graph
The dependency graph is immutable. Every operation (add, remove, replace)
returns a new instance which is why you always assign the result back:
self.middleware = middleware.add(MyMiddleware, after: [Wayfarer::Middleware::Router])
self.middleware = middleware.remove(Wayfarer::Middleware::Dedup)
self.middleware = middleware.replace(OldMiddleware, NewMiddleware)
Circular dependencies raise a Wayfarer::Middleware::DependencyGraph::CyclicDependencyError.
Modifying the global default
You can modify the default middleware chain globally in an initializer. Any job class defined after this point uses the modified default:
# config/initializers/wayfarer.rb
Wayfarer.config[:middleware][:base] = Wayfarer.config[:middleware][:base]
.add(MyGlobalMiddleware, after: [Wayfarer::Middleware::Router])