Audience Member Sync
The audience member sync system gives packages a standard way to pull external data (CRM membership status, donor records, email lists, etc.) into Allegro and keep it fresh. Providers register once; the platform handles scheduling, queueing, login-path freshness, and failure isolation.
How It Works
Each external integration is a sync provider — a class implementing
Allegro\Contracts\SyncProvider. Providers declare:
- What to sync — a
membersToSync()query scoping which audience members are in scope for scheduled runs. - How fresh — a
refreshInterval()for scheduled cadence, and an optionalmaxAgeOnLogin()for how stale data can be before a login triggers a synchronous sync. - When to run — the platform runs a single global 5-minute heartbeat
(
allegro-audience:sync:tick) that fans out a per-tenant evaluation job. Per provider, the evaluator checks theaudience_sync_runsrow for the last dispatch time againstrefreshInterval()before dispatching a new set of sync jobs.
At JWT generation time, SyncProfilesOrchestrator::ensureFreshForLogin() runs
synchronously before the JsonWebTokenGenerating event fires. Listeners that
add membership claims to the JWT are therefore guaranteed up-to-date data.
Scheduled runs dispatch SyncAudienceMemberJob per member onto the
audience-sync queue.
Quick Start
Extend Allegro\Services\AudienceSync\AbstractSyncProvider — it provides
sensible defaults so you only override what you need.
namespace Acme\Sync;
use Allegro\Models\AudienceMember;
use Allegro\Services\AudienceSync\AbstractSyncProvider;
use Carbon\CarbonInterval;
use Illuminate\Database\Eloquent\Builder;
class AcmeMembershipSync extends AbstractSyncProvider
{
public function name(): string
{
return 'acme_membership'; // matches the audience_member_external_profiles.provider key
}
public function refreshInterval(): CarbonInterval
{
return CarbonInterval::hours(6);
}
public function maxAgeOnLogin(): ?CarbonInterval
{
// Sync synchronously on login when the row is older than 30 minutes.
// Return null to make this a background-only provider.
return CarbonInterval::minutes(30);
}
public function membersToSync(): Builder
{
// Only queue jobs for members active in the last 30 days.
return $this->membersLoggedInWithin(30);
}
public function sync(AudienceMember $member): void
{
$data = app(AcmeApiClient::class)->fetchMember($member->email);
$member->externalProfiles()->updateOrCreate(
['provider' => $this->name()],
['data' => $data],
);
// Grant or revoke entitlements based on $data...
}
}
AbstractSyncProvider defaults:
| Method | Default |
|---|---|
maxAgeOnLogin() | null (background-only) |
isAvailableForTenant() | true |
Registering a Provider
Call SyncProviderRegistry::register() once in your package's
ServiceProvider::boot():
use Allegro\Services\AudienceSync\SyncProviderRegistry;
public function boot(SyncProviderRegistry $registry): void
{
$registry->register(AcmeMembershipSync::class);
}
register() is idempotent — calling it twice with the same class is safe (e.g.,
under Octane). Registering two different classes with the same name() throws
DuplicateSyncProviderException at boot time.
Scoping Members
membersLoggedInWithin(int $days)
AbstractSyncProvider provides this helper for the common case:
public function membersToSync(): Builder
{
return $this->membersLoggedInWithin(30); // members who logged in within 30 days
}
It queries audience_members.last_active_at, a denormalized column
updated whenever a member authenticates or refreshes their JWT. Use this to avoid dispatching jobs for
inactive members.
Custom queries
Return any Builder<AudienceMember> from membersToSync(). Return a query that
matches zero rows (e.g., AudienceMember::query()->whereRaw('0 = 1')) to
disable scheduled sync entirely while still allowing on-login sync.
Staleness Configuration
Two knobs control freshness:
| Method | Type | Purpose |
|---|---|---|
refreshInterval() | CarbonInterval | How old a row can be before the scheduled job re-syncs it |
maxAgeOnLogin() | ?CarbonInterval | How old a row can be before a login triggers a synchronous sync; null = never sync on login |
Background-only provider
Return null from maxAgeOnLogin() (the default). The provider runs on the
scheduler cadence only. Use when upstream latency is too high for the login
path, or when data does not affect the JWT.
public function maxAgeOnLogin(): ?CarbonInterval
{
return null; // background only; login never blocks on this provider
}
Combined (most common)
Use both: schedule a background refresh and also sync on login when data is
stale relative to maxAgeOnLogin(). The login threshold is typically tighter
than refreshInterval() so a member always gets fresh data even if the last
scheduled run was hours ago.
Tenant Availability
Override isAvailableForTenant() to gate a provider on tenant-level
configuration:
public function isAvailableForTenant(): bool
{
return config('acme.enabled', false);
}
The platform evaluates this predicate at every entry point: the login
orchestrator, the Artisan command, and inside the queued job. A provider that
returns false here is silently skipped — no error, no job dispatched.
Side Effects in sync()
sync() may do more than write the primary audience_member_external_profiles
row. Common patterns:
Write member meta:
$member->meta()->updateOrCreate(
['key' => 'total_donations'],
['value' => $data['total_donations']],
);
Grant or revoke entitlements:
if ($data['is_active_member']) {
$member->entitlements()->updateOrCreate(
['source_provider' => $this->name(), 'resource_id' => $resourceId],
['expires_at' => null],
);
} else {
$member->entitlements()
->where('source_provider', $this->name())
->delete();
}
sync() may be called multiple times for the same member. Use upserts keyed on
source_provider for entitlement writes and updateOrCreate for profile rows.
Wrap writes in a DB::transaction() so a throw mid-sync rolls back partial
state.
Login Path
JsonWebToken::generate() calls
SyncProfilesOrchestrator::ensureFreshForLogin($member) before dispatching
JsonWebTokenGenerating. The orchestrator:
- Iterates providers registered for the current tenant where
maxAgeOnLogin() !== null. - For each, checks whether the primary
audience_member_external_profilesrow is younger thanmaxAgeOnLogin(). - If stale or missing, calls
sync()synchronously — taking the sameCache::lockused bySyncAudienceMemberJobto prevent concurrent writes. - Catches and logs any exceptions (provider failures as
warning, unexpected infrastructure failures aserror). The JWT is issued regardless of sync failures.
The JsonWebTokenGenerating event fires only after all login-path providers have
run, so JWT claim listeners always see the freshest available data.
Scheduled Runs
Scheduling is driven by a single global entry — there is no per-provider cron
registration. The platform registers one scheduler entry in
routes/console.php:
allegro-audience:sync:tick → every 5 minutes
Provider authors implement refreshInterval() as the single source of truth
for cadence. Do not implement a scheduleCron() method — it was removed
from the contract. The scheduler does not look at your provider individually.
How the tick works
allegro-audience:sync:tickiterates every registered tenant and dispatches anEvaluateTenantSyncsJobfor each via$tenant->execute(...), so the evaluation runs bound to that tenant's context.- Inside the job, for each provider returned by
SyncProviderRegistry::availableForTenant():- The job loads (or creates) the
audience_sync_runsrow keyed onprovider. The table is tenant-scoped viaUsesTenantConnection, so each tenant has its own row per provider. - Due-check guard — if
last_dispatched_at + refreshInterval()is still in the future, the provider isn't due yet; skip. - Otherwise the job dispatches a
SyncAudienceMemberJobper member frommembersToSync()and updateslast_dispatched_attonow().
- The job loads (or creates) the
- Workers on the
audience-syncqueue then process eachSyncAudienceMemberJobindependently. Each job re-checks freshness before callingsync(), so a synchronous login sync between dispatch and execution is respected — no double-sync.
The evaluator does not track batch completion. Gating on
last_dispatched_at + refreshInterval() alone means the worst case is a
re-dispatch before the prior run has fully drained — harmless because
SyncAudienceMemberJob is idempotent per (member, provider) and
deduplicated via ShouldBeUnique. The trade-off avoids the stuck-forever
failure mode where a missing "finished" signal would permanently block
re-evaluation.
Because cadence is just refreshInterval() + the row-based guards, a provider
that wants "daily" simply returns CarbonInterval::day(); one that wants
"every 15 minutes" returns CarbonInterval::minutes(15). The next tick
(within 5 minutes) will pick it up.
Running manually
The manual CLI still works for backfills and debugging. It writes the same
audience_sync_runs row and dispatches the same SyncAudienceMemberJob
per member, so a manual run immediately suppresses the scheduler from
double-dispatching the same provider for that tenant:
# Sync all available providers for one tenant
php artisan allegro-audience:sync --tenant=42
# Sync one provider only for one tenant
php artisan allegro-audience:sync acme_membership --tenant=42
Queue Configuration
All SyncAudienceMemberJob instances are dispatched to the audience-sync
queue. Add this queue to your Horizon or supervisor configuration so workers
process it:
# supervisord — example worker group
[program:allegro-sync-worker]
command=php artisan queue:work --queue=audience-sync
The dedicated queue isolates sync work from auth-critical queues. A CRM outage that backs up retries does not starve the default queue.
SyncAudienceMemberJob properties:
| Property | Value | Description |
|---|---|---|
$tries | 3 | Retried up to 3 times before landing in failed_jobs |
$backoff | 60s | Seconds between retries |
$uniqueFor | 600s | ShouldBeUnique window — deduplicates queued jobs per (member, provider) |
Testing
Use Allegro\Testing\FakeSyncProvider in feature tests:
use Allegro\Services\AudienceSync\SyncProviderRegistry;
use Allegro\Testing\FakeSyncProvider;
use Carbon\CarbonInterval;
beforeEach(function () {
// Reset the singleton so registrations from real boot don't bleed in.
app()->forgetInstance(SyncProviderRegistry::class);
});
it('dispatches a sync job for active members', function () {
$provider = new FakeSyncProvider(
providerName: 'fake',
refresh: CarbonInterval::hour(),
loginAge: CarbonInterval::minutes(30),
);
app()->instance(FakeSyncProvider::class, $provider);
app(SyncProviderRegistry::class)->register(FakeSyncProvider::class);
// ...assert jobs dispatched, sync call count, etc.
});
SyncProviderRegistry::register() takes a class string and resolves the
provider from the container. Bind your configured instance with
app()->instance(...) first so the registry resolves your instance instead of
constructing a fresh one with defaults.
FakeSyncProvider constructor parameters (in order):
| Parameter | Default | Description |
|---|---|---|
$providerName | 'fake' | Return value of name() |
$refresh | null | refreshInterval(); when null, falls through to CarbonInterval::hour() via refreshInterval() |
$loginAge | null | maxAgeOnLogin() |
$availableForTenant | true | isAvailableForTenant() |
$membersQuery | null | Closure returning a Builder; defaults to empty |
$onSync is a public property (not a constructor argument) — assign directly:
$fake->onSync = fn ($member) => .... FakeSyncProvider::$syncCallCount
tracks how many times sync() was called.
Simulating staleness
Use Carbon::setTestNow() to move time forward past an interval:
use Carbon\Carbon;
it('re-syncs stale data on login', function () {
$member = AudienceMember::factory()->create();
$member->externalProfiles()->create([
'provider' => 'fake',
'data' => [],
'updated_at' => now()->subHour(),
]);
Carbon::setTestNow(now()->addHours(2));
// trigger login path...
});
Asserting dispatched jobs
use Allegro\Jobs\SyncAudienceMemberJob;
use Illuminate\Support\Facades\Queue;
it('dispatches jobs on scheduled run', function () {
Queue::fake();
$this->artisan('allegro-audience:sync', ['provider' => 'fake']);
Queue::assertPushed(SyncAudienceMemberJob::class);
});
End-to-End Flow
Login path
----------
JsonWebToken::generate($member)
└─ SyncProfilesOrchestrator::ensureFreshForLogin($member)
└─ foreach provider where maxAgeOnLogin() !== null
└─ EnsureFreshProfile::for($member, $providerName)
├─ row fresh? → return true (no sync)
└─ stale/missing → Cache::lock → provider->sync($member)
└─ event(JsonWebTokenGenerating) ← listeners see fresh data
Scheduled path
--------------
scheduler fires allegro-audience:sync:tick (every 5 minutes)
└─ foreach tenant
└─ EvaluateTenantSyncsJob (dispatched via $tenant->execute(...))
└─ foreach provider in registry->availableForTenant()
├─ load audience_sync_runs row
├─ due-check (last_dispatched_at + refreshInterval() <= now)? skip
└─ SyncAudienceMemberJob::dispatch per member
├─ update last_dispatched_at = now()
└─ worker picks up each job
└─ Cache::lock → freshness re-check → provider->sync($member)
The Cache::lock is shared between the login path and the queued job path. If a
login sync is in progress when a worker picks up a job (or vice versa), one path
blocks for up to 5 seconds then yields.
Operations
Force-resyncing a member
From the audience member detail page in the admin UI, the Sync Status panel
lists every registered provider with its last-synced timestamp and a
Re-sync now button. Clicking it dispatches SyncAudienceMemberJob with the
$force = true flag, bypassing the freshness check.
Verifying freshness
Query audience_member_external_profiles for the member and provider:
SELECT provider, updated_at
FROM audience_member_external_profiles
WHERE audience_member_id = ?
ORDER BY provider;
The updated_at column is the staleness anchor. Compare it against the
provider's refreshInterval() to determine whether a job will re-sync on the
next scheduled run.
Rolling out a new provider
- Implement
SyncProvider(or extendAbstractSyncProvider) in your package. - Register in your
ServiceProvider::boot(). - Gate the provider with
isAvailableForTenant()if rollout should be gradual. - Deploy. The next tick (within 5 minutes) will pick up the provider and evaluate it per tenant.
- To backfill immediately:
php artisan allegro-audience:sync your_provider_name --tenant={id}.
Recovering from failed jobs
Failed jobs land in failed_jobs after 3 retries. Once the upstream service
recovers, flush and re-dispatch:
php artisan queue:retry all
Or let the next scheduled run naturally re-queue jobs for members still in
scope — failed_jobs can safely be flushed after upstream recovery.