# Audience Member Sync

The audience member sync system gives packages a standard way to pull external data (CRM membership status, donor records, email lists, etc.) into Allegro and keep it fresh. Providers register once; the platform handles scheduling, queueing, login-path freshness, and failure isolation.

## How It Works[​](#how-it-works "Direct link to How It Works")

Each external integration is a **sync provider** — a class implementing `Allegro\Contracts\SyncProvider`. Providers declare:

* **What to sync** — a `membersToSync()` query scoping which audience members are in scope for scheduled runs.
* **How fresh** — a `refreshInterval()` for scheduled cadence, and an optional `maxAgeOnLogin()` for how stale data can be before a login triggers a synchronous sync.
* **When to run** — the platform runs a single global 5-minute heartbeat (`allegro-audience:sync:tick`) that fans out a per-tenant evaluation job. Per provider, the evaluator checks the `audience_sync_runs` row for the last dispatch time against `refreshInterval()` before dispatching a new set of sync jobs.

At JWT generation time, `SyncProfilesOrchestrator::ensureFreshForLogin()` runs synchronously before the `JsonWebTokenGenerating` event fires. Listeners that add membership claims to the JWT are therefore guaranteed up-to-date data. Scheduled runs dispatch `SyncAudienceMemberJob` per member onto the `audience-sync` queue.

***

## Quick Start[​](#quick-start "Direct link to Quick Start")

Extend `Allegro\Services\AudienceSync\AbstractSyncProvider` — it provides sensible defaults so you only override what you need.

```php
namespace Acme\Sync;

use Allegro\Models\AudienceMember;
use Allegro\Services\AudienceSync\AbstractSyncProvider;
use Carbon\CarbonInterval;
use Illuminate\Database\Eloquent\Builder;

class AcmeMembershipSync extends AbstractSyncProvider
{
    public function name(): string
    {
        return 'acme_membership'; // matches the audience_member_external_profiles.provider key
    }

    public function refreshInterval(): CarbonInterval
    {
        return CarbonInterval::hours(6);
    }

    public function maxAgeOnLogin(): ?CarbonInterval
    {
        // Sync synchronously on login when the row is older than 30 minutes.
        // Return null to make this a background-only provider.
        return CarbonInterval::minutes(30);
    }

    public function membersToSync(): Builder
    {
        // Only queue jobs for members active in the last 30 days.
        return $this->membersLoggedInWithin(30);
    }

    public function sync(AudienceMember $member): void
    {
        $data = app(AcmeApiClient::class)->fetchMember($member->email);

        $member->externalProfiles()->updateOrCreate(
            ['provider' => $this->name()],
            ['data' => $data],
        );

        // Grant or revoke entitlements based on $data...
    }
}

```

`AbstractSyncProvider` defaults:

| Method                   | Default                  |
| ------------------------ | ------------------------ |
| `maxAgeOnLogin()`        | `null` (background-only) |
| `isAvailableForTenant()` | `true`                   |

***

## Registering a Provider[​](#registering-a-provider "Direct link to Registering a Provider")

Call `SyncProviderRegistry::register()` once in your package's `ServiceProvider::boot()`:

```php
use Allegro\Services\AudienceSync\SyncProviderRegistry;

public function boot(SyncProviderRegistry $registry): void
{
    $registry->register(AcmeMembershipSync::class);
}

```

`register()` is idempotent — calling it twice with the same class is safe (e.g., under Octane). Registering two *different* classes with the same `name()` throws `DuplicateSyncProviderException` at boot time.

***

## Scoping Members[​](#scoping-members "Direct link to Scoping Members")

### `membersLoggedInWithin(int $days)`[​](#membersloggedinwithinint-days "Direct link to membersloggedinwithinint-days")

`AbstractSyncProvider` provides this helper for the common case:

```php
public function membersToSync(): Builder
{
    return $this->membersLoggedInWithin(30); // members who logged in within 30 days
}

```

It queries `audience_members.last_active_at`, a denormalized column updated whenever a member authenticates or refreshes their JWT. Use this to avoid dispatching jobs for inactive members.

### Custom queries[​](#custom-queries "Direct link to Custom queries")

Return any `Builder<AudienceMember>` from `membersToSync()`. Return a query that matches zero rows (e.g., `AudienceMember::query()->whereRaw('0 = 1')`) to disable scheduled sync entirely while still allowing on-login sync.

***

## Staleness Configuration[​](#staleness-configuration "Direct link to Staleness Configuration")

Two knobs control freshness:

| Method              | Type              | Purpose                                                                                       |
| ------------------- | ----------------- | --------------------------------------------------------------------------------------------- |
| `refreshInterval()` | `CarbonInterval`  | How old a row can be before the scheduled job re-syncs it                                     |
| `maxAgeOnLogin()`   | `?CarbonInterval` | How old a row can be before a login triggers a synchronous sync; `null` = never sync on login |

### Background-only provider[​](#background-only-provider "Direct link to Background-only provider")

Return `null` from `maxAgeOnLogin()` (the default). The provider runs on the scheduler cadence only. Use when upstream latency is too high for the login path, or when data does not affect the JWT.

```php
public function maxAgeOnLogin(): ?CarbonInterval
{
    return null; // background only; login never blocks on this provider
}

```

### Combined (most common)[​](#combined-most-common "Direct link to Combined (most common)")

Use both: schedule a background refresh and also sync on login when data is stale relative to `maxAgeOnLogin()`. The login threshold is typically tighter than `refreshInterval()` so a member always gets fresh data even if the last scheduled run was hours ago.

***

## Tenant Availability[​](#tenant-availability "Direct link to Tenant Availability")

Override `isAvailableForTenant()` to gate a provider on tenant-level configuration:

```php
public function isAvailableForTenant(): bool
{
    return config('acme.enabled', false);
}

```

The platform evaluates this predicate at every entry point: the login orchestrator, the Artisan command, and inside the queued job. A provider that returns `false` here is silently skipped — no error, no job dispatched.

***

## Side Effects in `sync()`[​](#side-effects-in-sync "Direct link to side-effects-in-sync")

`sync()` may do more than write the primary `audience_member_external_profiles` row. Common patterns:

**Write member meta:**

```php
$member->meta()->updateOrCreate(
    ['key' => 'total_donations'],
    ['value' => $data['total_donations']],
);

```

**Grant or revoke entitlements:**

```php
if ($data['is_active_member']) {
    $member->entitlements()->updateOrCreate(
        ['source_provider' => $this->name(), 'resource_id' => $resourceId],
        ['expires_at' => null],
    );
} else {
    $member->entitlements()
        ->where('source_provider', $this->name())
        ->delete();
}

```

Idempotency is required

`sync()` may be called multiple times for the same member. Use upserts keyed on `source_provider` for entitlement writes and `updateOrCreate` for profile rows. Wrap writes in a `DB::transaction()` so a throw mid-sync rolls back partial state.

***

## Login Path[​](#login-path "Direct link to Login Path")

`JsonWebToken::generate()` calls `SyncProfilesOrchestrator::ensureFreshForLogin($member)` before dispatching `JsonWebTokenGenerating`. The orchestrator:

1. Iterates providers registered for the current tenant where `maxAgeOnLogin() !== null`.
2. For each, checks whether the primary `audience_member_external_profiles` row is younger than `maxAgeOnLogin()`.
3. If stale or missing, calls `sync()` synchronously — taking the same `Cache::lock` used by `SyncAudienceMemberJob` to prevent concurrent writes.
4. Catches and logs any exceptions (provider failures as `warning`, unexpected infrastructure failures as `error`). The JWT is issued regardless of sync failures.

The `JsonWebTokenGenerating` event fires only after all login-path providers have run, so JWT claim listeners always see the freshest available data.

***

## Scheduled Runs[​](#scheduled-runs "Direct link to Scheduled Runs")

Scheduling is driven by a single global entry — there is no per-provider cron registration. The platform registers **one** scheduler entry in `routes/console.php`:

```text
allegro-audience:sync:tick   →   every 5 minutes

```

Provider authors implement `refreshInterval()` as the single source of truth for cadence. Do **not** implement a `scheduleCron()` method — it was removed from the contract. The scheduler does not look at your provider individually.

### How the tick works[​](#how-the-tick-works "Direct link to How the tick works")

1. **`allegro-audience:sync:tick`** iterates every registered tenant and dispatches an `EvaluateTenantSyncsJob` for each via `$tenant->execute(...)`, so the evaluation runs bound to that tenant's context.

2. Inside the job, for each provider returned by `SyncProviderRegistry::availableForTenant()`:

   * The job loads (or creates) the `audience_sync_runs` row keyed on `provider`. The table is tenant-scoped via `UsesTenantConnection`, so each tenant has its own row per provider.
   * **Due-check guard** — if `last_dispatched_at + refreshInterval()` is still in the future, the provider isn't due yet; skip.
   * Otherwise the job dispatches a `SyncAudienceMemberJob` per member from `membersToSync()` and updates `last_dispatched_at` to `now()`.

3. Workers on the `audience-sync` queue then process each `SyncAudienceMemberJob` independently. Each job re-checks freshness before calling `sync()`, so a synchronous login sync between dispatch and execution is respected — no double-sync.

The evaluator does not track batch completion. Gating on `last_dispatched_at + refreshInterval()` alone means the worst case is a re-dispatch before the prior run has fully drained — harmless because `SyncAudienceMemberJob` is idempotent per `(member, provider)` and deduplicated via `ShouldBeUnique`. The trade-off avoids the stuck-forever failure mode where a missing "finished" signal would permanently block re-evaluation.

Because cadence is just `refreshInterval()` + the row-based guards, a provider that wants "daily" simply returns `CarbonInterval::day()`; one that wants "every 15 minutes" returns `CarbonInterval::minutes(15)`. The next tick (within 5 minutes) will pick it up.

### Running manually[​](#running-manually "Direct link to Running manually")

The manual CLI still works for backfills and debugging. It writes the same `audience_sync_runs` row and dispatches the same `SyncAudienceMemberJob` per member, so a manual run immediately suppresses the scheduler from double-dispatching the same provider for that tenant:

```bash
# Sync all available providers for one tenant
php artisan allegro-audience:sync --tenant=42

# Sync one provider only for one tenant
php artisan allegro-audience:sync acme_membership --tenant=42

```

***

## Queue Configuration[​](#queue-configuration "Direct link to Queue Configuration")

All `SyncAudienceMemberJob` instances are dispatched to the `audience-sync` queue. Add this queue to your Horizon or supervisor configuration so workers process it:

```ini
# supervisord — example worker group
[program:allegro-sync-worker]
command=php artisan queue:work --queue=audience-sync

```

The dedicated queue isolates sync work from auth-critical queues. A CRM outage that backs up retries does not starve the default queue.

`SyncAudienceMemberJob` properties:

| Property     | Value | Description                                                                 |
| ------------ | ----- | --------------------------------------------------------------------------- |
| `$tries`     | 3     | Retried up to 3 times before landing in `failed_jobs`                       |
| `$backoff`   | 60s   | Seconds between retries                                                     |
| `$uniqueFor` | 600s  | `ShouldBeUnique` window — deduplicates queued jobs per `(member, provider)` |

***

## Testing[​](#testing "Direct link to Testing")

Use `Allegro\Testing\FakeSyncProvider` in feature tests:

```php
use Allegro\Services\AudienceSync\SyncProviderRegistry;
use Allegro\Testing\FakeSyncProvider;
use Carbon\CarbonInterval;

beforeEach(function () {
    // Reset the singleton so registrations from real boot don't bleed in.
    app()->forgetInstance(SyncProviderRegistry::class);
});

it('dispatches a sync job for active members', function () {
    $provider = new FakeSyncProvider(
        providerName: 'fake',
        refresh: CarbonInterval::hour(),
        loginAge: CarbonInterval::minutes(30),
    );

    app()->instance(FakeSyncProvider::class, $provider);
    app(SyncProviderRegistry::class)->register(FakeSyncProvider::class);

    // ...assert jobs dispatched, sync call count, etc.
});

```

`SyncProviderRegistry::register()` takes a class string and resolves the provider from the container. Bind your configured instance with `app()->instance(...)` first so the registry resolves your instance instead of constructing a fresh one with defaults.

`FakeSyncProvider` constructor parameters (in order):

| Parameter             | Default  | Description                                                                                         |
| --------------------- | -------- | --------------------------------------------------------------------------------------------------- |
| `$providerName`       | `'fake'` | Return value of `name()`                                                                            |
| `$refresh`            | `null`   | `refreshInterval()`; when `null`, falls through to `CarbonInterval::hour()` via `refreshInterval()` |
| `$loginAge`           | `null`   | `maxAgeOnLogin()`                                                                                   |
| `$availableForTenant` | `true`   | `isAvailableForTenant()`                                                                            |
| `$membersQuery`       | `null`   | Closure returning a `Builder`; defaults to empty                                                    |

`$onSync` is a public property (not a constructor argument) — assign directly: `$fake->onSync = fn ($member) => ...`. `FakeSyncProvider::$syncCallCount` tracks how many times `sync()` was called.

### Simulating staleness[​](#simulating-staleness "Direct link to Simulating staleness")

Use `Carbon::setTestNow()` to move time forward past an interval:

```php
use Carbon\Carbon;

it('re-syncs stale data on login', function () {
    $member = AudienceMember::factory()->create();
    $member->externalProfiles()->create([
        'provider' => 'fake',
        'data' => [],
        'updated_at' => now()->subHour(),
    ]);

    Carbon::setTestNow(now()->addHours(2));

    // trigger login path...
});

```

### Asserting dispatched jobs[​](#asserting-dispatched-jobs "Direct link to Asserting dispatched jobs")

```php
use Allegro\Jobs\SyncAudienceMemberJob;
use Illuminate\Support\Facades\Queue;

it('dispatches jobs on scheduled run', function () {
    Queue::fake();

    $this->artisan('allegro-audience:sync', ['provider' => 'fake']);

    Queue::assertPushed(SyncAudienceMemberJob::class);
});

```

***

## End-to-End Flow[​](#end-to-end-flow "Direct link to End-to-End Flow")

```text
Login path
----------
JsonWebToken::generate($member)
  └─ SyncProfilesOrchestrator::ensureFreshForLogin($member)
       └─ foreach provider where maxAgeOnLogin() !== null
            └─ EnsureFreshProfile::for($member, $providerName)
                 ├─ row fresh? → return true (no sync)
                 └─ stale/missing → Cache::lock → provider->sync($member)
  └─ event(JsonWebTokenGenerating)  ← listeners see fresh data

Scheduled path
--------------
scheduler fires allegro-audience:sync:tick  (every 5 minutes)
  └─ foreach tenant
       └─ EvaluateTenantSyncsJob (dispatched via $tenant->execute(...))
            └─ foreach provider in registry->availableForTenant()
                 ├─ load audience_sync_runs row
                 ├─ due-check (last_dispatched_at + refreshInterval() <= now)? skip
                 └─ SyncAudienceMemberJob::dispatch per member
                      ├─ update last_dispatched_at = now()
                      └─ worker picks up each job
                           └─ Cache::lock → freshness re-check → provider->sync($member)

```

The `Cache::lock` is shared between the login path and the queued job path. If a login sync is in progress when a worker picks up a job (or vice versa), one path blocks for up to 5 seconds then yields.

***

## Operations[​](#operations "Direct link to Operations")

### Force-resyncing a member[​](#force-resyncing-a-member "Direct link to Force-resyncing a member")

From the audience member detail page in the admin UI, the **Sync Status** panel lists every registered provider with its last-synced timestamp and a **Re-sync now** button. Clicking it dispatches `SyncAudienceMemberJob` with the `$force = true` flag, bypassing the freshness check.

### Verifying freshness[​](#verifying-freshness "Direct link to Verifying freshness")

Query `audience_member_external_profiles` for the member and provider:

```sql
SELECT provider, updated_at
FROM audience_member_external_profiles
WHERE audience_member_id = ?
ORDER BY provider;

```

The `updated_at` column is the staleness anchor. Compare it against the provider's `refreshInterval()` to determine whether a job will re-sync on the next scheduled run.

### Rolling out a new provider[​](#rolling-out-a-new-provider "Direct link to Rolling out a new provider")

1. Implement `SyncProvider` (or extend `AbstractSyncProvider`) in your package.
2. Register in your `ServiceProvider::boot()`.
3. Gate the provider with `isAvailableForTenant()` if rollout should be gradual.
4. Deploy. The next tick (within 5 minutes) will pick up the provider and evaluate it per tenant.
5. To backfill immediately: `php artisan allegro-audience:sync your_provider_name --tenant={id}`.

### Recovering from failed jobs[​](#recovering-from-failed-jobs "Direct link to Recovering from failed jobs")

Failed jobs land in `failed_jobs` after 3 retries. Once the upstream service recovers, flush and re-dispatch:

```bash
php artisan queue:retry all

```

Or let the next scheduled run naturally re-queue jobs for members still in scope — `failed_jobs` can safely be flushed after upstream recovery.
