Alpha ZealPHP is early-stage and under active development. APIs may change between minor versions until v1.0. Feedback and bug reports welcome on GitHub.

Async Patterns

Channels, error handling, task workers, race conditions. Foundations covers the model.

You will learn

  • App::parallel() / App::parallelLimit() — the framework's first-class fan-out helpers
  • Fan-out / fan-in with go() + Channel — the underlying mechanism
  • Buffered vs unbuffered channels and when each one is right
  • Error handling in coroutines — exceptions don't propagate; design for it
  • Task workers vs coroutines — when to reach for each
  • Race conditions and what counts as a yield point

Pattern 1: fan-out, fan-in with go() + Channel

The bread-and-butter parallel-I/O pattern. Three API calls in parallel; collect the results through a buffered channel; total elapsed time is the slowest call, not the sum.

use OpenSwoole\Coroutine as co;
use OpenSwoole\Coroutine\Channel;

$app->route('/api/dashboard', function () {
    $ch = new Channel(3);   // buffered: 3 producers can push without blocking

    go(fn() => $ch->push(['key' => 'users',  'data' => Users::recent()]));
    go(fn() => $ch->push(['key' => 'orders', 'data' => Orders::pending()]));
    go(fn() => $ch->push(['key' => 'stats',  'data' => Stats::today()]));

    $out = [];
    for ($i = 0; $i < 3; $i++) {
        $row = $ch->pop();   // blocks until something arrives
        $out[$row['key']] = $row['data'];
    }
    return $out;
});

Three coroutines, three channel pushes, three pops. The framework swaps coroutines whenever one of them hits a yield point (an I/O call, a co::sleep(), a channel push/pop). The worker isn’t blocked — it can serve other requests in parallel even while this handler waits.

Buffered vs unbuffered channels

A Channel(N) buffers up to N values before producers block. Channel(0) (or just Channel()) is unbuffered — every push waits for a matching pop, like a one-slot handoff. Pick by what you actually need:

ChannelBehaviorUse when
new Channel($n)Producer pushes up to n values without blocking; the n+1th push blocks until a pop frees a slot.Fan-out where producers may finish before the consumer is ready (the dashboard example above)
new Channel(0)Every push waits for a paired pop — rendezvous handoff.Strict synchronization, single-slot pipelines, back-pressure

Pattern 2: error handling across go()

Exceptions thrown inside a go() closure don’t propagate to the handler that spawned it. The coroutine ends, the exception goes to the worker’s log, and the parent waits forever if it was expecting a channel value. Always push something:

function safe_fetch(Channel $ch, string $key, callable $work): void {
    go(function () use ($ch, $key, $work) {
        try {
            $ch->push(['key' => $key, 'ok' => true,  'data' => $work()]);
        } catch (\Throwable $e) {
            $ch->push(['key' => $key, 'ok' => false, 'error' => $e->getMessage()]);
        }
    });
}

// Caller treats both shapes:
$ch = new Channel(3);
safe_fetch($ch, 'users',  fn() => Users::recent());
safe_fetch($ch, 'orders', fn() => Orders::pending());
safe_fetch($ch, 'stats',  fn() => Stats::today());

$out = [];
for ($i = 0; $i < 3; $i++) {
    $row = $ch->pop();
    if ($row['ok']) $out[$row['key']] = $row['data'];
    else            $out[$row['key']] = ['error' => $row['error']];
}

The trick is the channel shape carries success-or-failure. The parent never blocks on a coroutine that crashed silently.

Recommended shortcut: App::parallel() and App::parallelLimit()

The hand-rolled go() + Channel fan-out above is the underlying mechanism — but the framework ships first-class helpers so you don’t have to wire it yourself every time:

// App::parallel() — runs every closure concurrently, returns results in input order.
// Blocks until all finish. Throws the first exception if any coroutine fails.
$results = App::parallel([
    fn() => Users::recent(),
    fn() => Orders::pending(),
    fn() => Stats::today(),
]);
[$users, $orders, $stats] = $results;

// App::parallelLimit() — bounded fan-out, at most $concurrency in-flight at once.
// Results are keyed by the input array keys.
$pages = App::parallelLimit(
    items:       range(1, 20),
    fn:          fn(int $page) => Posts::page($page),
    concurrency: 5,
);

Both helpers auto-wrap in Coroutine::run() when called from outside a coroutine (e.g., a CLI script), so they work in any context. Use the raw Channel form when you need fine-grained control — push-on-error shapes, timeouts via pop($timeout), or a mixed producer/consumer pipeline.

Pattern 3: co::sleep() vs usleep()

Inside a coroutine, co::sleep(1) yields the coroutine to the scheduler for 1 second — the worker can serve other requests during that wait. usleep(1_000_000) blocks the whole worker for 1 second — no other requests get processed.

// Bad — pins the worker for 5 seconds. Other requests queue.
$app->route('/poll', function () {
    while (!Job::isDone()) {
        usleep(500_000);
        $progress = Job::progress();
    }
    return ['done' => true];
});

// Good — yields between checks. Worker handles other requests.
$app->route('/poll', function () {
    while (!Job::isDone()) {
        co::sleep(0.5);
        $progress = Job::progress();
    }
    return ['done' => true];
});

The rule extends to every blocking call. PDO with OpenSwoole’s hook_flags = HOOK_ALL (the default in coroutine mode) automatically becomes non-blocking — the framework rewrites the call to yield. curl requests, file_get_contents(), sleep() — all hooked. If you call a function that isn’t hooked, you block the worker. Common offenders: shell-invoking helpers, forking helpers, calls into C extensions that don’t cooperate. Use a task worker for those (next section).

Pattern 4: task workers for CPU-bound or hostile work

Coroutines win for I/O concurrency — many requests waiting on something external. They don’t help with CPU work (PDF rendering, image transcoding, expensive computation) because the coroutine never yields until the work is done. Worse, if you call a non-hookable blocking function, you pin the worker.

Task workers are separate processes managed by OpenSwoole. Dispatch a task; the calling coroutine yields; OpenSwoole runs the task in a worker pool; the result comes back. Your HTTP worker stays unblocked.

// task/render-pdf.php — runs in a task worker, separate process
$render_pdf = function (array $args) {
    // Heavy CPU work here — won’t block any HTTP worker
    return Pdf::render($args['template'], $args['data']);
};

// route handler — fire-and-forget dispatch
// task() returns a task ID, NOT the result. The result is delivered to the
// framework’s on('finish') callback (logged by default). There is no
// synchronous task-wait helper — design for async completion.
$app->route('/api/invoice/{id}', function ($id) {
    $taskId = App::getServer()->task([
        'handler' => '/task/render-pdf',
        'args'    => ['template' => 'invoice', 'data' => Invoice::find($id)->toArray()],
    ]);
    // $taskId is an integer task ID — queue it and notify the client when done
    return ['queued' => true, 'task_id' => $taskId];
});

Set 'task_worker_num' => 8 on $app->run() to provision the pool. The default is 0 — without it, task() returns false (and OpenSwoole logs a warning) instead of dispatching, so always set this explicitly when using task workers. Task workers run with task_enable_coroutine => true by default, so each task itself can use go().

Pattern 5: per-coroutine context for handler state

Inside any handler running in coroutine mode, Coroutine::getContext() returns a per-coroutine bag — perfect for storing data you want to flow across helper calls without threading parameters everywhere. The framework already uses this for RequestContext::instance(); you can use it for your own scoped data.

function track_query(string $sql, float $duration): void {
    $ctx = OpenSwoole\Coroutine::getContext();
    $ctx['queries'] = $ctx['queries'] ?? [];
    $ctx['queries'][] = ['sql' => $sql, 'duration' => $duration];
}

function dump_queries(): array {
    return OpenSwoole\Coroutine::getContext()['queries'] ?? [];
}

// Use anywhere in the request handler chain — no globals, no leakage.

Race conditions: what counts as a yield point

A coroutine doesn’t get pre-empted mid-statement. It only yields at known points:

  • Hookable blocking calls (PDO, curl, file I/O, sleep() — with HOOK_ALL on)
  • Channel push() / pop()
  • Explicit co::sleep()
  • Any go() spawn or yield inside a generator

So $counter++ on a regular variable is safe between yield points. But this is a race:

// BAD — yields between read and write
$x = Cache::get('count');     // ← yields here (I/O)
$x++;                         // someone else may have incremented in between
Cache::set('count', $x);      // we overwrite their increment

// GOOD — atomic via Store (single operation, no yield window)
Store::incr('counters', 'total', 'n');

// GOOD — atomic via Counter (cross-worker atomic integer)
$counter = new \ZealPHP\Counter(0, 'total_count');
$counter->increment();

The rule of thumb: treat any expression that yields as a possible reentrance point. If a value is touched by other requests, read it after a yield and you might be working with stale data.

Try it live

Inside a coroutine handler, you call usleep(1_000_000). What happens to other requests on the same worker?

Key Takeaways

  • App::parallel([]) is the recommended fan-out primitive — total time = slowest task, not sum; throws on first error.
  • For bounded fan-out, App::parallelLimit($items, $fn, $concurrency) caps in-flight coroutines.
  • Under the hood: go() + a buffered Channel — reach for the raw form when you need push-error shapes or timeouts.
  • Exceptions don't cross go() boundaries; push a success-or-failure shape through the channel.
  • Use co::sleep() not usleep(); the framework hooks PDO/curl/file I/O for you.
  • Task workers are for CPU-bound or non-hookable blocking work — separate process pool, not coroutines.
  • Watch race conditions: any yield point is a re-entrance opportunity. Use atomic primitives for shared state.