Skip to main content

Audience Member Sync

The audience member sync system gives packages a standard way to pull external data (CRM membership status, donor records, email lists, etc.) into Allegro and keep it fresh. Providers register once; the platform handles scheduling, queueing, login-path freshness, and failure isolation.

How It Works

Each external integration is a sync provider — a class implementing Allegro\Contracts\SyncProvider. Providers declare:

  • What to sync — a membersToSync() query scoping which audience members are in scope for scheduled runs.
  • How fresh — a refreshInterval() for scheduled cadence, and an optional maxAgeOnLogin() for how stale data can be before a login triggers a synchronous sync.
  • When to run — the platform runs a single global 5-minute heartbeat (allegro-audience:sync:tick) that fans out a per-tenant evaluation job. Per provider, the evaluator checks the audience_sync_runs row for the last dispatch time against refreshInterval() before dispatching a new set of sync jobs.

At JWT generation time, SyncProfilesOrchestrator::ensureFreshForLogin() runs synchronously before the JsonWebTokenGenerating event fires. Listeners that add membership claims to the JWT are therefore guaranteed up-to-date data. Scheduled runs dispatch SyncAudienceMemberJob per member onto the audience-sync queue.


Quick Start

Extend Allegro\Services\AudienceSync\AbstractSyncProvider — it provides sensible defaults so you only override what you need.

namespace Acme\Sync;

use Allegro\Models\AudienceMember;
use Allegro\Services\AudienceSync\AbstractSyncProvider;
use Carbon\CarbonInterval;
use Illuminate\Database\Eloquent\Builder;

class AcmeMembershipSync extends AbstractSyncProvider
{
public function name(): string
{
return 'acme_membership'; // matches the audience_member_external_profiles.provider key
}

public function refreshInterval(): CarbonInterval
{
return CarbonInterval::hours(6);
}

public function maxAgeOnLogin(): ?CarbonInterval
{
// Sync synchronously on login when the row is older than 30 minutes.
// Return null to make this a background-only provider.
return CarbonInterval::minutes(30);
}

public function membersToSync(): Builder
{
// Only queue jobs for members active in the last 30 days.
return $this->membersLoggedInWithin(30);
}

public function sync(AudienceMember $member): void
{
$data = app(AcmeApiClient::class)->fetchMember($member->email);

$member->externalProfiles()->updateOrCreate(
['provider' => $this->name()],
['data' => $data],
);

// Grant or revoke entitlements based on $data...
}
}

AbstractSyncProvider defaults:

MethodDefault
maxAgeOnLogin()null (background-only)
isAvailableForTenant()true

Registering a Provider

Call SyncProviderRegistry::register() once in your package's ServiceProvider::boot():

use Allegro\Services\AudienceSync\SyncProviderRegistry;

public function boot(SyncProviderRegistry $registry): void
{
$registry->register(AcmeMembershipSync::class);
}

register() is idempotent — calling it twice with the same class is safe (e.g., under Octane). Registering two different classes with the same name() throws DuplicateSyncProviderException at boot time.


Scoping Members

membersLoggedInWithin(int $days)

AbstractSyncProvider provides this helper for the common case:

public function membersToSync(): Builder
{
return $this->membersLoggedInWithin(30); // members who logged in within 30 days
}

It queries audience_members.last_active_at, a denormalized column updated whenever a member authenticates or refreshes their JWT. Use this to avoid dispatching jobs for inactive members.

Custom queries

Return any Builder<AudienceMember> from membersToSync(). Return a query that matches zero rows (e.g., AudienceMember::query()->whereRaw('0 = 1')) to disable scheduled sync entirely while still allowing on-login sync.


Staleness Configuration

Two knobs control freshness:

MethodTypePurpose
refreshInterval()CarbonIntervalHow old a row can be before the scheduled job re-syncs it
maxAgeOnLogin()?CarbonIntervalHow old a row can be before a login triggers a synchronous sync; null = never sync on login

Background-only provider

Return null from maxAgeOnLogin() (the default). The provider runs on the scheduler cadence only. Use when upstream latency is too high for the login path, or when data does not affect the JWT.

public function maxAgeOnLogin(): ?CarbonInterval
{
return null; // background only; login never blocks on this provider
}

Combined (most common)

Use both: schedule a background refresh and also sync on login when data is stale relative to maxAgeOnLogin(). The login threshold is typically tighter than refreshInterval() so a member always gets fresh data even if the last scheduled run was hours ago.


Tenant Availability

Override isAvailableForTenant() to gate a provider on tenant-level configuration:

public function isAvailableForTenant(): bool
{
return config('acme.enabled', false);
}

The platform evaluates this predicate at every entry point: the login orchestrator, the Artisan command, and inside the queued job. A provider that returns false here is silently skipped — no error, no job dispatched.


Side Effects in sync()

sync() may do more than write the primary audience_member_external_profiles row. Common patterns:

Write member meta:

$member->meta()->updateOrCreate(
['key' => 'total_donations'],
['value' => $data['total_donations']],
);

Grant or revoke entitlements:

if ($data['is_active_member']) {
$member->entitlements()->updateOrCreate(
['source_provider' => $this->name(), 'resource_id' => $resourceId],
['expires_at' => null],
);
} else {
$member->entitlements()
->where('source_provider', $this->name())
->delete();
}
Idempotency is required

sync() may be called multiple times for the same member. Use upserts keyed on source_provider for entitlement writes and updateOrCreate for profile rows. Wrap writes in a DB::transaction() so a throw mid-sync rolls back partial state.


Login Path

JsonWebToken::generate() calls SyncProfilesOrchestrator::ensureFreshForLogin($member) before dispatching JsonWebTokenGenerating. The orchestrator:

  1. Iterates providers registered for the current tenant where maxAgeOnLogin() !== null.
  2. For each, checks whether the primary audience_member_external_profiles row is younger than maxAgeOnLogin().
  3. If stale or missing, calls sync() synchronously — taking the same Cache::lock used by SyncAudienceMemberJob to prevent concurrent writes.
  4. Catches and logs any exceptions (provider failures as warning, unexpected infrastructure failures as error). The JWT is issued regardless of sync failures.

The JsonWebTokenGenerating event fires only after all login-path providers have run, so JWT claim listeners always see the freshest available data.


Scheduled Runs

Scheduling is driven by a single global entry — there is no per-provider cron registration. The platform registers one scheduler entry in routes/console.php:

allegro-audience:sync:tick   →   every 5 minutes

Provider authors implement refreshInterval() as the single source of truth for cadence. Do not implement a scheduleCron() method — it was removed from the contract. The scheduler does not look at your provider individually.

How the tick works

  1. allegro-audience:sync:tick iterates every registered tenant and dispatches an EvaluateTenantSyncsJob for each via $tenant->execute(...), so the evaluation runs bound to that tenant's context.
  2. Inside the job, for each provider returned by SyncProviderRegistry::availableForTenant():
    • The job loads (or creates) the audience_sync_runs row keyed on provider. The table is tenant-scoped via UsesTenantConnection, so each tenant has its own row per provider.
    • Due-check guard — if last_dispatched_at + refreshInterval() is still in the future, the provider isn't due yet; skip.
    • Otherwise the job dispatches a SyncAudienceMemberJob per member from membersToSync() and updates last_dispatched_at to now().
  3. Workers on the audience-sync queue then process each SyncAudienceMemberJob independently. Each job re-checks freshness before calling sync(), so a synchronous login sync between dispatch and execution is respected — no double-sync.

The evaluator does not track batch completion. Gating on last_dispatched_at + refreshInterval() alone means the worst case is a re-dispatch before the prior run has fully drained — harmless because SyncAudienceMemberJob is idempotent per (member, provider) and deduplicated via ShouldBeUnique. The trade-off avoids the stuck-forever failure mode where a missing "finished" signal would permanently block re-evaluation.

Because cadence is just refreshInterval() + the row-based guards, a provider that wants "daily" simply returns CarbonInterval::day(); one that wants "every 15 minutes" returns CarbonInterval::minutes(15). The next tick (within 5 minutes) will pick it up.

Running manually

The manual CLI still works for backfills and debugging. It writes the same audience_sync_runs row and dispatches the same SyncAudienceMemberJob per member, so a manual run immediately suppresses the scheduler from double-dispatching the same provider for that tenant:

# Sync all available providers for one tenant
php artisan allegro-audience:sync --tenant=42

# Sync one provider only for one tenant
php artisan allegro-audience:sync acme_membership --tenant=42

Queue Configuration

All SyncAudienceMemberJob instances are dispatched to the audience-sync queue. Add this queue to your Horizon or supervisor configuration so workers process it:

# supervisord — example worker group
[program:allegro-sync-worker]
command=php artisan queue:work --queue=audience-sync

The dedicated queue isolates sync work from auth-critical queues. A CRM outage that backs up retries does not starve the default queue.

SyncAudienceMemberJob properties:

PropertyValueDescription
$tries3Retried up to 3 times before landing in failed_jobs
$backoff60sSeconds between retries
$uniqueFor600sShouldBeUnique window — deduplicates queued jobs per (member, provider)

Testing

Use Allegro\Testing\FakeSyncProvider in feature tests:

use Allegro\Services\AudienceSync\SyncProviderRegistry;
use Allegro\Testing\FakeSyncProvider;
use Carbon\CarbonInterval;

beforeEach(function () {
// Reset the singleton so registrations from real boot don't bleed in.
app()->forgetInstance(SyncProviderRegistry::class);
});

it('dispatches a sync job for active members', function () {
$provider = new FakeSyncProvider(
providerName: 'fake',
refresh: CarbonInterval::hour(),
loginAge: CarbonInterval::minutes(30),
);

app()->instance(FakeSyncProvider::class, $provider);
app(SyncProviderRegistry::class)->register(FakeSyncProvider::class);

// ...assert jobs dispatched, sync call count, etc.
});

SyncProviderRegistry::register() takes a class string and resolves the provider from the container. Bind your configured instance with app()->instance(...) first so the registry resolves your instance instead of constructing a fresh one with defaults.

FakeSyncProvider constructor parameters (in order):

ParameterDefaultDescription
$providerName'fake'Return value of name()
$refreshnullrefreshInterval(); when null, falls through to CarbonInterval::hour() via refreshInterval()
$loginAgenullmaxAgeOnLogin()
$availableForTenanttrueisAvailableForTenant()
$membersQuerynullClosure returning a Builder; defaults to empty

$onSync is a public property (not a constructor argument) — assign directly: $fake->onSync = fn ($member) => .... FakeSyncProvider::$syncCallCount tracks how many times sync() was called.

Simulating staleness

Use Carbon::setTestNow() to move time forward past an interval:

use Carbon\Carbon;

it('re-syncs stale data on login', function () {
$member = AudienceMember::factory()->create();
$member->externalProfiles()->create([
'provider' => 'fake',
'data' => [],
'updated_at' => now()->subHour(),
]);

Carbon::setTestNow(now()->addHours(2));

// trigger login path...
});

Asserting dispatched jobs

use Allegro\Jobs\SyncAudienceMemberJob;
use Illuminate\Support\Facades\Queue;

it('dispatches jobs on scheduled run', function () {
Queue::fake();

$this->artisan('allegro-audience:sync', ['provider' => 'fake']);

Queue::assertPushed(SyncAudienceMemberJob::class);
});

End-to-End Flow

Login path
----------
JsonWebToken::generate($member)
└─ SyncProfilesOrchestrator::ensureFreshForLogin($member)
└─ foreach provider where maxAgeOnLogin() !== null
└─ EnsureFreshProfile::for($member, $providerName)
├─ row fresh? → return true (no sync)
└─ stale/missing → Cache::lock → provider->sync($member)
└─ event(JsonWebTokenGenerating) ← listeners see fresh data

Scheduled path
--------------
scheduler fires allegro-audience:sync:tick (every 5 minutes)
└─ foreach tenant
└─ EvaluateTenantSyncsJob (dispatched via $tenant->execute(...))
└─ foreach provider in registry->availableForTenant()
├─ load audience_sync_runs row
├─ due-check (last_dispatched_at + refreshInterval() <= now)? skip
└─ SyncAudienceMemberJob::dispatch per member
├─ update last_dispatched_at = now()
└─ worker picks up each job
└─ Cache::lock → freshness re-check → provider->sync($member)

The Cache::lock is shared between the login path and the queued job path. If a login sync is in progress when a worker picks up a job (or vice versa), one path blocks for up to 5 seconds then yields.


Operations

Force-resyncing a member

From the audience member detail page in the admin UI, the Sync Status panel lists every registered provider with its last-synced timestamp and a Re-sync now button. Clicking it dispatches SyncAudienceMemberJob with the $force = true flag, bypassing the freshness check.

Verifying freshness

Query audience_member_external_profiles for the member and provider:

SELECT provider, updated_at
FROM audience_member_external_profiles
WHERE audience_member_id = ?
ORDER BY provider;

The updated_at column is the staleness anchor. Compare it against the provider's refreshInterval() to determine whether a job will re-sync on the next scheduled run.

Rolling out a new provider

  1. Implement SyncProvider (or extend AbstractSyncProvider) in your package.
  2. Register in your ServiceProvider::boot().
  3. Gate the provider with isAvailableForTenant() if rollout should be gradual.
  4. Deploy. The next tick (within 5 minutes) will pick up the provider and evaluate it per tenant.
  5. To backfill immediately: php artisan allegro-audience:sync your_provider_name --tenant={id}.

Recovering from failed jobs

Failed jobs land in failed_jobs after 3 retries. Once the upstream service recovers, flush and re-dispatch:

php artisan queue:retry all

Or let the next scheduled run naturally re-queue jobs for members still in scope — failed_jobs can safely be flushed after upstream recovery.