Alpha ZealPHP is early-stage and under active development. APIs may change between minor versions until v1.0. Feedback and bug reports welcome on GitHub.

Cross-node Pub/Sub & Streams

Two first-class primitives for cross-worker AND cross-host messaging on top of the pluggable Store/Counter backend. Built on Redis (and Redis Streams for the reliable variant); both arrived in v0.2.39.

When you need it

  • Horizontally-scaled WebSocket — you have N OpenSwoole servers behind a load balancer; the user clicked “send” on server A but the recipient is connected to server B. Each $fd is process-local; only the owning server can $server->push(). Redis is the routing fabric that says “hey, owner: here’s something to push.”
  • Cache invalidation — one server writes a record, every other server’s local L1 cache needs to evict that key. Publish to an invalidation channel, peers subscribe + evict.
  • Live broadcasts — chat rooms, presence, leaderboards, anywhere “every connected client sees this” needs to cross process boundaries.
  • Work queues / event sourcing — need at-least-once delivery? Use Store::publishReliable (Redis Streams) instead of Store::publish. Consumer groups distribute work; ACK confirms processing.

Quick start

Three lines of code from zero to working pub/sub.

// app.php — before $app->run()
use ZealPHP\Store;
use ZealPHP\App;

// Step 1: tell Store to use Redis. ZEALPHP_STORE_BACKEND=redis env works too.
Store::defaultBackend(Store::BACKEND_REDIS);

// Step 2: register a subscriber at boot. Runs in EVERY worker.
App::subscribe('hello:world', function (string $payload, string $channel) {
    error_log("[$channel] received: $payload");
});

// Step 3: publish from anywhere — a route, a timer, a CLI script, …
$receivers = Store::publish('hello:world', 'hi from anywhere');
// Returns number of receivers — across ALL workers AND ALL connected servers.

Reliable variant — Redis Streams

When “might drop during a subscriber reconnect” isn’t acceptable: switch to the Streams primitive. Same shape; ACK semantics; consumer groups distribute work across workers and servers.

App::subscribeReliable('orders', function (string $payload, string $id, string $stream): bool {
    $order = json_decode($payload, true);
    $ok = processOrder($order);
    return $ok;  // true → XACK (done); false/throw → leave pending, retried on reconnect
});

// Anywhere:
$messageId = Store::publishReliable('orders', json_encode($order));
// Returns the Redis message ID, e.g. '1779520329297-0' — durable when AOF/RDB is on.
PrimitiveLatencyDurabilityDeliveryWhen to pick
Store::publish~0.5 ms loopbackNoneBest-effortCache invalidation, WS fan-out, presence, leaderboards.
Store::publishReliable~1–2 msAOF/RDB-backedAt-least-once via consumer groupsOrders, payments, work queues, audit events.

Cross-server WebSocket routing

The pattern you came here for. $fd is process-local; only the owning server can push to it. Store the client_id → server_id mapping in shared Redis; each server subscribes to its identity channel; senders look up + PUBLISH to the owner.

// app.php — boot
Store::defaultBackend(Store::BACKEND_REDIS);
$myServerId = gethostname() . ':' . getmypid();

// Shared mapping: which server owns each connected client.
Store::make('ws_owner', 4096, ['server' => [Store::TYPE_STRING, 64]]);

// Per-worker local: client_id → local fd (only valid in THIS process)
$localFds = [];

App::ws('/ws', function ($server, $frame) use (&$localFds) {
    // Process inbound from local clients normally.
});

// Each server's subscriber only handles its OWN routed messages.
App::subscribe("ws:server:$myServerId", function (string $payload) use ($server, &$localFds) {
    $msg = json_decode($payload, true);
    $fd  = $localFds[$msg['client_id']] ?? null;
    if ($fd !== null && $server->isEstablished($fd)) {
        $server->push($fd, $msg['data']);
    }
});

// Anywhere: route a message to client X, regardless of which server holds it.
function sendToClient(string $clientId, string $data): void {
    $owner = Store::get('ws_owner', $clientId, 'server');
    if ($owner === null) { return; } // client not connected anywhere
    Store::publish("ws:server:$owner", json_encode([
        'client_id' => $clientId,
        'data'      => $data,
    ]));
}

Sub-millisecond loopback, ~ms cross-region. Scales symmetrically — no peer-to-peer state. Every routing decision is one Redis lookup + PUBLISH. Validated end-to-end in the Phase 3 spike (in-process, cross-process two-server, cross-host via wireguard).

Live demo

Hit the running server. Most useful with ZEALPHP_STORE_BACKEND=redis set; on the default Table backend the pub/sub buttons surface a clean StoreException.

Try it from this tab

For the multi-tab routing demo, open a second tab on this page and click Read pubsub log while the first tab publishes — you’ll see the same entries because every worker received the broadcast.

Click a button above to fire a request. The response JSON will land here.

Routes wired in route/demo.php. The 'receivers' field on Publish typically equals your worker count — each worker has its own subscriber cor.

Driver choice (both validated)

Both phpredis (preferred when ext-redis is loaded) and predis SUBSCRIBE loops yield correctly under HOOK_ALL — the production default. phpredis is ~2× faster on hot CRUD; predis works without the ext. Pick phpredis when available. See /store#phpredis-pubsub-caveat for the comparison.

Further reading