Custom Queries
Escape hatches for custom server logic — pq/q/m builders (Convex), procedures, SQL API, and scheduled reducers (SpacetimeDB).
When generated CRUD isn't enough, drop down to the lower-level builders for custom server logic.
pq, q, m — escape hatches
setup() returns pq, q, and m — thin wrappers around Convex's query/mutation builders that inject auth context and helpers.
| Builder | Auth | Context provides |
|---|---|---|
pq | Optional | viewerId (null if anon), withAuthor |
q | Required | user, viewerId, withAuthor, get (ownership-checked) |
m | Required | user, get, create, patch (with conflict detection), delete |
pq — Public Query (No Auth Required)
const bySlug = pq({
args: { slug: z.string() },
handler: async (c, { slug }) => {
const doc = await c.db
.query('blog')
.withIndex('by_slug', q => q.eq('slug', slug))
.unique()
return doc ? (await c.withAuthor([doc]))[0] : null
}
})q — Authenticated Query
const listDeleted = q({
args: { orgId: zid('org') },
handler: async (c, { orgId }) => {
await requireOrgMember({ db: c.db, orgId, userId: c.user._id })
const docs = await c.db
.query('wiki')
.filter(f => f.eq(f.field('orgId'), orgId))
.order('desc')
.collect()
const deleted: typeof docs = []
for (const d of docs) if (d.deletedAt !== undefined) deleted.push(d)
return deleted
}
})m — Authenticated Mutation
const archive = m({
args: { id: z.string() },
handler: async (c, { id }) => c.patch(id, { archived: true })
})c.patch includes conflict detection — pass expectedUpdatedAt as the third argument.
Mixing custom and generated endpoints
Custom endpoints live in the same file as generated CRUD:
import { orgCrud, q, uniqueCheck } from '../lazy'
import { orgScoped } from '../s'
export const { addEditor, create, list, read, rm, update } = orgCrud(
'wiki',
orgScoped.wiki,
{ acl: true, softDelete: true }
),
listDeleted = q({
args: { orgId: zid('org') },
handler: async (c, { orgId }) => {
/* ... */
}
}),
isSlugAvailable = uniqueCheck(orgScoped.wiki, 'wiki', 'slug')Drop to raw Convex action/mutation/query when you don't need auth context:
import { action } from './_generated/server'
export const search = action({
args: { query: v.string() },
handler: async (_, { query }) => {
/* call external API */
}
}),
{ all, get, load, refresh } = cacheCrud({
/* ... */
})Outgrowing crud() — migration to custom queries
The generated where clauses use runtime .filter() after fetching documents. This works well for tables under ~1,000 documents. When a table grows past that, you'll see the RUNTIME_FILTER_WARN_THRESHOLD warning in logs.
Strict filter mode
Pass strictFilter: true to setup() to throw instead of warn:
const { crud } = setup({
query, mutation, action, internalQuery, internalMutation,
getAuthUserId,
strictFilter: true,
})Step 1: Add a Convex index
blog: ownedTable(owned.blog)
.index('by_category', ['category'])
.index('by_published_date', ['published', '_creationTime'])Step 2: Write a custom query
export const listByCategory = pq({
args: {
category: z.string(),
paginationOpts: z.object({
cursor: z.string().nullable(),
numItems: z.number()
})
},
handler: async (c, { category, paginationOpts }) => {
const results = await c.db
.query('blog')
.withIndex('by_category', q => q.eq('category', category))
.order('desc')
.paginate(paginationOpts)
return { ...results, page: await c.withAuthor(results.page) }
}
})Step 3: Replace the frontend call
const results = usePaginatedQuery(
api.blog.listByCategory,
{ category: 'tech' },
{ initialNumItems: 20 }
)What stays, what changes
| Concern | Generated crud() | Custom pq/q/m |
|---|---|---|
| Auth + ownership | Automatic | c.user, c.get(id) |
| File cleanup | Automatic | Manual (call storage.delete) |
| Where clauses | Runtime .filter() | Convex .withIndex() |
| Conflict detection | expectedUpdatedAt | c.patch(id, data, expectedUpdatedAt) |
| Author enrichment | Automatic | c.withAuthor(docs) |
| Rate limiting | rateLimit option | Manual (checkRateLimit from noboil/convex/server) |
Decision tree
Need to read data?
├─ No auth required → pq
└─ Auth required → q
Need to write data?
└─ Always → m (gives conflict detection via c.patch)
Need to call an external API?
└─ Use raw Convex action (not an noboil builder)
Can generated crud() handle it?
├─ Yes → Keep crud(). Don't write custom code.
└─ No → Add custom alongside crud() in the same fileType safety in custom handlers
pq context
handler: async (c, args) => {
c.db // full Convex DatabaseReader
c.viewerId // string | null (authenticated user ID, null for anonymous)
c.withAuthor // (docs: Doc[]) => Promise<EnrichedDoc[]>
}q context
handler: async (c, args) => {
c.db // full Convex DatabaseReader
c.user // Doc<'users'> — guaranteed non-null (throws if not authenticated)
c.viewerId // string — always present
c.get // (id: Id<T>) => Doc<T> — ownership-checked, throws if not owner
c.withAuthor // same as pq
}m context
handler: async (c, args) => {
c.db // full Convex DatabaseWriter
c.user // Doc<'users'>
c.get // ownership-checked get
c.create // (table, data) => Id — sets userId + updatedAt automatically
c.patch // (id, data, expectedUpdatedAt?) => void — conflict detection built-in
c.delete // (id) => void — ownership-checked, cleans up files
}Coexistence patterns
Pattern 1: Custom query alongside generated CRUD
export const { create, list, read, rm, update } = crud('blog', owned.blog),
bySlug = pq({
args: { slug: z.string() },
handler: async (c, { slug }) => {
return c.db
.query('blog')
.withIndex('by_slug', q => q.eq('slug', slug))
.unique()
}
}),
trending = pq({
args: {},
handler: async c => {
return c.db.query('blog').withIndex('by_views').order('desc').take(10)
}
})All endpoints — generated and custom — export from the same file:
api.blog.list // generated
api.blog.bySlug // custom
api.blog.trending // custom
api.blog.create // generatedPattern 2: Custom mutation extending generated CRUD
export const { create, list, read, rm, update } = crud('blog', owned.blog),
publish = m({
args: { id: z.string() },
handler: async (c, { id }) => {
const doc = await c.get(id)
await c.patch(id, { published: true })
}
})Pattern 3: Gradual replacement
Replace a single generated endpoint while keeping the rest:
export const {
create,
read,
rm,
update
} = crud('blog', owned.blog),
list = pq({
args: { category: z.string().optional() },
handler: async (c, { category }) => {
if (category)
return c.db
.query('blog')
.withIndex('by_category', q => q.eq('category', category))
.collect()
return c.db.query('blog').order('desc').collect()
}
})The frontend code doesn't change — it still imports api.blog.list.
Procedures
Procedures are server-side functions that can return values, run transactions, and call external services. Unlike reducers (which are fire-and-forget), procedures return data to the caller.
export const createPostAndReturn = spacetimedb.procedure(
{ name: 'create_post_and_return' },
{
title: t.string(),
content: t.string()
},
async (ctx, { title, content }) => {
const row = ctx.db.post.insert({
id: 0,
title,
content,
published: false,
updatedAt: ctx.timestamp,
userId: ctx.sender
})
return row.id
}
)Call it from the client:
import { useSpacetimeDB } from 'spacetimedb/react'
const { getConnection } = useSpacetimeDB()
const conn = getConnection()
const id = await conn.reducers.create_post_and_return({
title: 'Hello',
content: 'World'
})Transactions with ctx.withTx
ctx.withTx wraps multiple operations in a transaction. If any operation throws, all changes are rolled back:
export const transferPost = spacetimedb.procedure(
{ name: 'transfer_post' },
{ postId: t.u32(), newOwnerId: t.identity() },
async (ctx, { postId, newOwnerId }) => {
return ctx.withTx(tx => {
const post = tx.db.post.id.find(postId)
if (!post) throw new SenderError('NOT_FOUND: post:transfer')
tx.db.post.id.update({
...post,
userId: newOwnerId,
updatedAt: tx.timestamp
})
tx.db.audit_log.insert({
action: 'transfer',
postId,
fromUser: ctx.sender,
toUser: newOwnerId,
at: tx.timestamp
})
})
}
)If tx.db.audit_log.insert throws, the post.id.update is also rolled back. No partial writes.
Subscription patterns beyond basic CRUD
Subscribing to multiple tables
'use client'
import { useTable } from 'spacetimedb/react'
import { tables } from '@/generated/module_bindings'
const Dashboard = () => {
const [posts, postsReady] = useTable(tables.post)
const [comments, commentsReady] = useTable(tables.comment)
const [presence, presenceReady] = useTable(tables.presence)
const isReady = postsReady && commentsReady && presenceReady
}Conditional subscriptions
SpacetimeDB doesn't have a built-in skip option. Conditionally render the subscribing component instead:
const ConditionalData = ({ enabled }: { enabled: boolean }) => {
if (!enabled) return null
return <DataConsumer />
}
const DataConsumer = () => {
const [data] = useTable(tables.post)
return null
}Subscribing to views
Views are subscribable like tables. Define a view in your module SQL and subscribe to it:
const [postsWithAuthors, isReady] = useTable(tables.post_with_author)Views update automatically when the underlying tables change. Delta updates are pushed to subscribers.
SQL API for complex queries
The HTTP SQL API supports arbitrary SQL queries. Use it for complex joins, aggregations, SSR data fetching in Next.js Server Components, or one-off data exploration.
const STDB_URL = 'http://localhost:4200'
const MODULE = 'my-app'
const query = async (sql: string) => {
const res = await fetch(`${STDB_URI}/v1/database/${MODULE}/sql`, {
method: 'POST',
headers: { 'Content-Type': 'text/plain' },
body: sql
})
if (!res.ok) throw new Error(`SQL error: ${res.status}`)
const [result] = (await res.json()) as [{ rows: unknown[][] }]
return result.rows
}
const counts = await query(`
SELECT category, COUNT(*) as count
FROM post
WHERE published = true
GROUP BY category
ORDER BY count DESC
`)
const postsWithAuthors = await query(`
SELECT p.id, p.title, pr.display_name as author
FROM post p
LEFT JOIN profile pr ON pr.user_id = p.user_id
WHERE p.published = true
LIMIT 20
`)Typed SQL helper
const STDB_URI = process.env.SPACETIMEDB_URI ?? 'http://localhost:4200'
const MODULE = process.env.MODULE_NAME ?? 'my-app'
interface SqlResult<T> {
rows: T[]
durationMicros: number
}
const sql = async <T>(query: string): Promise<SqlResult<T>> => {
const res = await fetch(`${STDB_URI}/v1/database/${MODULE}/sql`, {
method: 'POST',
headers: { 'Content-Type': 'text/plain' },
body: query,
cache: 'no-store'
})
if (!res.ok) throw new Error(`SQL query failed: ${res.status}`)
const [result] = (await res.json()) as [
{
rows: unknown[][]
schema: { elements: { name: string }[] }
total_duration_micros: number
}
]
const keys = result.schema.elements.map(e => e.name)
const rows = result.rows.map(row => {
const obj: Record<string, unknown> = {}
for (let i = 0; i < keys.length; i++) obj[keys[i]!] = row[i]
return obj as T
})
return { rows, durationMicros: result.total_duration_micros }
}
export { sql }Usage in a Next.js Server Component:
import { sql } from '@/lib/sql'
const PostsPage = async () => {
const { rows } = await sql<{ id: number; title: string }>(
'SELECT id, title FROM post WHERE published = true ORDER BY id DESC LIMIT 20'
)
return <ul>{rows.map(p => <li key={p.id}>{p.title}</li>)}</ul>
}Latency is ~0.27ms for simple queries on local Docker.
Scheduled reducers
Use scheduled reducers for deferred or recurring work:
import { ScheduleAt } from 'spacetimedb/server'
const jobSchedule = table(
{ public: false },
{
id: t.u32().autoInc().primaryKey(),
scheduledAt: t.scheduleAt(),
payload: t.string().optional()
}
)
export const processJob = spacetimedb.reducer(
{ name: 'process_job', scheduledReducer: true },
{},
ctx => {
console.log('Job fired at', ctx.timestamp)
}
)
export const scheduleJob = spacetimedb.reducer(
{ name: 'schedule_job' },
{ delayMs: t.u64() },
(ctx, { delayMs }) => {
const fireAt = ctx.timestamp.addMilliseconds(delayMs)
ctx.db.jobSchedule.insert({
id: 0,
scheduledAt: ScheduleAt.time(fireAt),
payload: null
})
}
)Cancel a scheduled job by deleting its row:
ctx.db.jobSchedule.id.delete(jobId)Scheduled reducers are useful for cache TTL expiration, delayed notifications, and retry logic with backoff.
Rate limiting
SpacetimeDB doesn't have built-in rate limiting. Implement it with a tracking table:
const rateLimit = table(
{ public: false },
{
id: t.u32().autoInc().primaryKey(),
userId: t.identity().index(),
action: t.string().index(),
windowStart: t.timestamp(),
count: t.u32()
}
)
const checkRateLimit = (
ctx: ReducerContext,
action: string,
maxPerWindow: number,
windowMs: number
) => {
const now = ctx.timestamp
const existing = ctx.db.rateLimit.userId
.filter(ctx.sender)
.find(r => r.action === action)
if (!existing) {
ctx.db.rateLimit.insert({
id: 0,
userId: ctx.sender,
action,
windowStart: now,
count: 1
})
return
}
const windowExpired =
now.toMillis() - existing.windowStart.toMillis() >= windowMs
if (windowExpired) {
ctx.db.rateLimit.id.update({ ...existing, count: 1, windowStart: now })
return
}
if (existing.count >= maxPerWindow) {
throw new SenderError('RATE_LIMITED: too many requests')
}
ctx.db.rateLimit.id.update({ ...existing, count: existing.count + 1 })
}
export const createPost = spacetimedb.reducer(
{ name: 'create_post' },
{ title: t.string(), content: t.string() },
(ctx, args) => {
checkRateLimit(ctx, 'create_post', 10, 60_000)
}
)