Syncing orders from Shopify to an ERP (like NetSuite) is critical for fulfillment.
A high-throughput, idempotent workflow that uses a database as a buffer and handles ERP rate limits gracefully.
import { defineWorkflow } from '@dataflows/core'
import { shopify } from '@dataflows/shopify'
import { postgres } from '@dataflows/postgres'
import { netsuite } from '@dataflows/netsuite'
export const orderSync = defineWorkflow({
id: 'order-sync',
trigger: shopify.onOrderCreated(),
async run({ event, step }) {
const order = event.payload
// 1. Idempotency Check & Persistence
const isNew = await step.run('persist-order', async () => {
const exists = await postgres.query(
'SELECT id FROM orders WHERE shopify_id = $1',
[order.id]
)
if (exists.length > 0) return false
await postgres.query(
'INSERT INTO orders (shopify_id, total, status) VALUES ($1, $2, $3)',
[order.id, order.total_price, 'pending']
)
return true
})
if (!isNew) return { status: 'skipped', reason: 'duplicate' }
// 2. Sync to NetSuite (with retries)
await step.run('sync-netsuite', async () => {
try {
await netsuite.createSalesOrder({
externalId: order.id,
items: order.line_items.map(item => ({
sku: item.sku,
quantity: item.quantity
}))
})
} catch (error) {
if (error.code === 'RATE_LIMIT') {
throw new Error('Rate limit hit, retrying...') // Triggers auto-retry
}
throw error
}
})
// 3. Update Status
await step.run('update-status', async () => {
await postgres.query(
'UPDATE orders SET status = $1 WHERE shopify_id = $2',
['synced', order.id]
)
})
}
})
Ensures orders are processed exactly once, even with retries.
Sophisticated retry logic with exponential backoff for ERP downtime.