CCFish simplifies cross-network ad campaign management with a single CLI command that ingests data from Meta Ads, Google Ads, TikTok Ads, and Unity Ads simultaneously — transforming a 4-hour daily manual workflow into a 5-minute automated pipeline.
The Problem
Running user acquisition across multiple ad networks means your performance data lives in silos. Every morning, the marketing team at CCFish faced the same ritual:
1. Log into the Meta Ads dashboard — export yesterday's CSV.
2. Log into Google Ads — export CSV.
3. Log into TikTok Ads — export CSV.
4. Log into Unity Ads — export CSV.
5. Manually merge all four CSVs into a spreadsheet.
6. Pivot, deduplicate, cross-reference, and calculate blended ROAS.
This took a dedicated person roughly four hours per day. By the time the consolidated data was ready, it was already stale. Optimization decisions were made on yesterday's (or sometimes two-days-ago) data. Campaigns burned budget on underperforming creatives while winning combinations went underfunded because the signal lagged.
Worse, each network exposes its API differently — different auth schemes, different rate limits, different schemas for the same metric. A unified view felt like a pipe dream.
The Solution
Enter the CCFish CLI — a Node.js/TypeScript command-line tool that abstracts each ad network behind a consistent adapter interface and uses asynchronous queue-based ingestion to process campaigns in parallel.
Single Command
```bash
ccfish ads ingest --from 2026-01-01 --to 2026-05-10
```
That's it. One command replaces the entire multi-dashboard, multi-CSV workflow. The tool:
- Authenticates against each ad network's API using stored credentials
- Pulls campaign data for the specified date range
- Normalizes everything to a unified schema
- Ships the data to Cloudflare Queues for async processing
- Workers consume the queue, validate, and write to D1
The result appears in the CCFish dashboard within minutes — not hours.
Architecture
The pipeline has four layers:
| Layer | Technology | Responsibility |
|-------|-----------|----------------|
| CLI | Commander.js + Inquirer | User interaction, argument parsing, credential prompts |
| Adapters | TypeScript classes per network | API communication, auth, rate-limit handling |
| Queue | Cloudflare Queues | Async message transport, retry, backpressure |
| Workers | Cloudflare Workers | Normalize, validate, persist to D1 |
Why Async Queues?
Ad network APIs are slow and unreliable when called synchronously. A single Meta Ads API call can take 3–8 seconds. Calling four networks in sequence would mean 30+ seconds of wall-clock time per ingestion, with any single failure collapsing the entire operation.
By sending ingestion payloads to Cloudflare Queues, the CLI finishes in under a second (it just publishes messages) and workers process each network's data independently. Failed workers retry automatically. Backpressure handling means rate limits are respected.
Implementation
Adapter Pattern Interface
The core abstraction is a clean TypeScript interface that every ad network adapter implements:
```typescript
interface AdNetworkAdapter {
readonly network: AdNetwork;
authenticate(): Promise<AuthToken>;
fetchCampaigns(options: FetchOptions): AsyncGenerator<CampaignRecord>;
rateLimit: RateLimitConfig;
}
type AdNetwork = 'meta' | 'google' | 'tiktok' | 'unity';
interface FetchOptions {
fromDate: string;
toDate: string;
fields: CampaignField[];
}
interface RateLimitConfig {
maxRequestsPerSecond: number;
backoffStrategy: 'exponential' | 'linear';
}
interface CampaignRecord {
campaignId: string;
network: AdNetwork;
campaignName: string;
spend: number;
impressions: number;
clicks: number;
installs: number;
date: string;
currency: string;
}
```
Each adapter (MetaAdapter, GoogleAdapter, TikTokAdapter, UnityAdapter) implements this interface. The `fetchCampaigns` method uses `AsyncGenerator` so records stream in as they arrive — no need to hold all results in memory.
CLI Command Definition
Built with Commander.js and Inquirer for an interactive yet scriptable experience:
```typescript
import { Command } from 'commander';
import { ingestCampaigns } from './actions/ingest';
const program = new Command();
program
.name('ccfish')
.description('CCFish ad campaign management CLI')
.version('1.0.0');
program
.command('ads')
.description('Ad campaign operations');
program
.command('ads ingest')
.description('Ingest campaign data from all ad networks')
.requiredOption('--from <date>', 'Start date (YYYY-MM-DD)')
.option('--to <date>', 'End date (YYYY-MM-DD)', today())
.option('--networks <networks...>', 'Filter to specific networks', allNetworks)
.option('--dry-run', 'Validate without ingesting', false)
.action(async (options) => {
const networks = options.networks as AdNetwork[];
const adapters = networks.map(network => createAdapter(network));
const progress = new ProgressBar('Ingesting [:bar] :current/:total', {
total: networks.length
});
for (const adapter of adapters) {
const producer = new QueueProducer(adapter.network);
for await (const record of adapter.fetchCampaigns({
fromDate: options.from,
toDate: options.to,
fields: ['spend', 'impressions', 'clicks', 'installs']
})) {
if (!options.dryRun) {
await producer.send(record);
}
}
progress.tick();
}
console.log('Ingestion complete. Records enqueued for processing.');
});
program.parse(process.argv);
```
Queue Producer
The producer wraps Cloudflare Queues and handles batching for throughput:
```typescript
class QueueProducer {
private queue: Queue;
private batch: CampaignRecord[] = [];
private readonly BATCH_SIZE = 25;
constructor(private network: AdNetwork) {
this.queue = new Queue(`campaign-ingestion-${network}`);
}
async send(record: CampaignRecord): Promise<void> {
this.batch.push(record);
if (this.batch.length >= this.BATCH_SIZE) {
await this.flush();
}
}
async flush(): Promise<void> {
if (this.batch.length === 0) return;
await this.queue.sendBatch(this.batch.map(r => ({
body: JSON.stringify(r),
contentType: 'application/json'
})));
this.batch = [];
}
}
```
Worker Consumer
On the Cloudflare Workers side, a queue consumer picks up messages, normalizes the data to the unified schema, and writes to D1:
```typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
const db = env.CCFISH_DB;
for (const message of batch.messages) {
const record: CampaignRecord = JSON.parse(message.body);
// Normalize to unified schema
const normalized = {
campaign_id: record.campaignId,
network: record.network,
campaign_name: record.campaignName,
spend: Math.round(record.spend * 100), // store as cents
impressions: record.impressions,
clicks: record.clicks,
installs: record.installs,
date: record.date,
currency: record.currency,
ingested_at: new Date().toISOString()
};
// Upsert into D1
await db
.prepare(`
INSERT INTO campaign_daily (
campaign_id, network, campaign_name,
spend, impressions, clicks, installs,
date, currency, ingested_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (campaign_id, date) DO UPDATE SET
spend = excluded.spend,
impressions = excluded.impressions,
clicks = excluded.clicks,
installs = excluded.installs,
ingested_at = excluded.ingested_at
`)
.bind(
normalized.campaign_id,
normalized.network,
normalized.campaign_name,
normalized.spend,
normalized.impressions,
normalized.clicks,
normalized.installs,
normalized.date,
normalized.currency,
normalized.ingested_at
)
.run();
message.ack();
}
}
};
```
The `ON CONFLICT ... DO UPDATE` pattern means re-running ingestion for the same date range is safe — existing records are updated, not duplicated.
D1 Schema
```sql
CREATE TABLE campaign_daily (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id TEXT NOT NULL,
network TEXT NOT NULL CHECK(network IN ('meta','google','tiktok','unity')),
campaign_name TEXT NOT NULL,
spend INTEGER NOT NULL, -- stored in cents
impressions INTEGER NOT NULL,
clicks INTEGER NOT NULL,
installs INTEGER NOT NULL,
date TEXT NOT NULL,
currency TEXT NOT NULL DEFAULT 'USD',
ingested_at TEXT NOT NULL,
UNIQUE(campaign_id, date)
);
CREATE INDEX idx_campaign_daily_network ON campaign_daily(network);
CREATE INDEX idx_campaign_daily_date ON campaign_daily(date);
```
Results
The impact was immediate and measurable:
| Metric | Before | After |
|--------|--------|-------|
| Time to consolidate | ~4 hours/day | ~5 minutes/run |
| Data freshness | Stale (T+24h) | Near real-time (T+5m) |
| Error rate | ~8% (manual typos, missed exports) | <0.1% (automated validation) |
| Ad spend efficiency | Baseline | +15% ROAS improvement |
| Team satisfaction | Low (rote CSV work) | High (strategic analysis) |
The 15% ROAS improvement came from having real-time cross-network visibility. Previously, if Meta Ads were outperforming Google Ads on installs, the signal took 24+ hours to surface. With the unified dashboard powered by the CLI pipeline, the team could shift budget within the same day — capturing high-performing inventory before auction prices adjusted.
Key Takeaways
1. Adapter Pattern Scales to Any Network
The adapter interface is small — just three methods and a config object. Adding a new network (Snapchat, Pinterest, Apple Search Ads) means writing one new class. The rest of the pipeline stays the same. This took CCFish from 4 networks to 8 in a single sprint.
2. Async Queues Decouple Ingestion from Processing
By using Cloudflare Queues as the intermediary, the CLI doesn't wait for API responses or database writes. It publishes messages and exits. Workers handle the heavy lifting independently, with built-in retries and dead-letter queues for failures. This makes the pipeline resilient to transient API errors.
3. One Command Replaces a Team-Wide Manual Process
The biggest win is cultural. Before the CLI, the daily ingestion ritual was a known bottleneck that everyone dreaded. After the CLI, onboarding a new team member takes 5 minutes instead of 2 hours. The `--dry-run` flag lets anyone validate their credentials and network config without touching production data. The progress bar gives immediate feedback during long backfills.
What's Next
The roadmap includes:
- **Real-time webhook ingestion** — subscribe to ad network webhooks and push data through the same pipeline without CLI invocation
- **Budget alerts** — Workers that check spend thresholds and send Slack notifications
- **Cross-network attribution** — Merge install events across networks to detect overlapping user acquisition
The CLI pipeline is open source and available on the CCFish GitHub. Contributions welcome — new network adapters are the easiest place to start.