Building a Scalable RAG System with Inngest, Drizzle ORM, and PostgreSQL

How we implemented a robust document processing and retrieval system that handles file uploads, chunking, embedding generation, and semantic search at scale.

The Challenge
Building a Retrieval-Augmented Generation (RAG) system that can handle diverse document types, process them efficiently, and provide fast semantic search capabilities. Our requirements included:
Multi-format support: PDFs, images, text files, Excel spreadsheets
Scalable processing: Handle large documents without blocking the API
Efficient storage: Store embeddings in a vector database for fast similarity search
Reliability: Robust error handling and retry mechanisms
Our Solution Architecture
We built our RAG system using:
Inngest for background job processing and workflow orchestration
Drizzle ORM for type-safe database operations
PostgreSQL with pgvector for vector storage and similarity search
Google Cloud Storage (GCS) for file storage (configurable to S3 or local)
The Implementation
1. Database Schema with Drizzle ORM
First, let’s look at our database schema using Drizzle ORM. We have two main tables: datastores and chunks.
// Using Drizzle ORM with PostgreSQL
import { pgTable, uuid, varchar, text, integer, vector, jsonb } from 'drizzle-orm/pg-core';
import { index } from 'drizzle-orm/pg-core';
export const datastores = pgTable('datastores', {
id: uuid('id').primaryKey().defaultRandom(),
name: varchar('name', { length: 255 }).notNull(),
description: text('description'),
status: varchar('status', { length: 50 }).notNull(),
// ... other fields
});
export const chunks = pgTable('datastore_chunks', {
id: uuid('id').primaryKey().defaultRandom(),
datastoreId: uuid('datastore_id').notNull(),
fileId: uuid('file_id'),
fileName: varchar('file_name', { length: 255 }),
content: text('content').notNull(),
tokenCount: integer('token_count').notNull(),
embedding: vector('embedding', { dimensions: 768 }), // pgvector column
metadata: jsonb('chunk_metadata').default({}),
order: integer('order').notNull()
}, (table) => ({
// HNSW index for fast vector similarity search
vectorIdx: index('chunk_vector_idx').using(
'hnsw',
table.embedding.op('vector_cosine_ops')
),
datastoreIdIdx: index('chunk_datastore_id_idx').on(table.datastoreId)
}));
Explanation: We use Drizzle ORM for type-safe database operations. The chunks table stores document chunks with their vector embeddings (768 dimensions). The HNSW index with cosine similarity operations enables fast vector similarity search.
2. Document Processing with Inngest
When a user uploads a document, we trigger an Inngest function that processes the file asynchronously:
// Inngest function for document processing
import { Inngest } from 'inngest';
export class InngestDatastoreEngine {
public datastoreFunctions(): unknown[] {
return [
this.inngest.createFunction(
{ id: 'process-uploaded-document', retries: 3 },
{ event: 'datastore/file.uploaded' },
async ({ event, step }) => {
const { tenantId, filePath, datastoreId, fileName, fileId } = event.data;
// Step 1: Load and process file content
const storageService = await createStorageService();
const buffer = await storageService.download(filePath);
const fileContent = buffer.toString('base64');
const mimeType = getMimeType(path.extname(fileName));
// Step 2: Convert to text based on file type
let textContent: string | null = null;
if (mimeType === 'application/pdf') {
// Process PDF pages in parallel
const pageCount = await step.run('get-pdf-page-count', () => {
return getPdfPageCount(Buffer.from(fileContent, 'base64'));
});
const model = await ModelFactoryV2.createModel({
tenantId,
provider: ModelProvider.GEMINI,
functionId: 'datastore-processing'
});
const pagePromises = Array.from({ length: pageCount }).map((_, i) =>
step.run(`process-pdf-page-${i}`, async () => {
const { pdfBytes } = await splitPdf(pdfBuffer, [i + 1]);
return await convertDocumentToMarkdown(
Buffer.from(pdfBytes).toString('base64'),
'application/pdf',
model
);
})
);
const pageContents = await Promise.allSettled(pagePromises);
textContent = pageContents
.filter(result => result.status === 'fulfilled')
.map(result => result.value)
.join('\n\n');
}
// ... handle other file types (images, Excel, text files)
return { status: 'success', datastoreId, chunksCreated: totalChunks };
}
)
];
}
}
Explanation: Inngest provides step-by-step execution with automatic retries and observability. We process different file types (PDFs, images, Excel) by converting them to text using AI models. PDFs are split into pages and processed in parallel for efficiency.
3. Intelligent Chunking Strategy
We use different chunking strategies based on file type:
// Chunking factory with file-type specific splitters
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter';
export class ChunkingFactory {
static getSplitter(fileType: string, params: SplitterParams): Splitter {
if (this.isCSV(fileType)) {
return new CSVLineTextSplitter(params);
}
return new RecursiveCharacterTextSplitter(params);
}
private static isCSV(fileType: string): boolean {
return fileType === 'text/csv' ||
fileType === 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet';
}
}
// Usage in the processing pipeline
const splitter = ChunkingFactory.getSplitter(mimeType, {
chunkSize: 1000, // 1000 characters per chunk
chunkOverlap: 200 // 200 character overlap between chunks
});
const chunks = await splitter.splitText(textContent);
Explanation: We use different text splitters based on file type. CSV/Excel files use line-based splitting, while other documents use recursive character splitting. Our default chunk size is 1000 characters with 200-character overlap to maintain context.
4. Batch Embedding Generation
To handle large documents efficiently, we process embeddings in batches:
// Batch processing for embeddings
const DEFAULT_CHUNK_BATCH_SIZE = 16;
let totalProcessedChunks = 0;
let hasMoreChunks = true;
while (hasMoreChunks) {
const { hasMore } = await step.run(`process-chunk-batch-${totalProcessedChunks}`, async () => {
// Get pending chunks for this file
const pendingChunks = await this.datastoreRepository.getPendingChunksByFileId(
fileId,
DEFAULT_CHUNK_BATCH_SIZE
);
if (pendingChunks.length === 0) {
return { hasMore: false };
}
// Create embedding model
const embeddingModel = await ModelFactoryV2.createEmbeddingModel({
tenantId,
provider: ModelProvider.GEMINI,
functionId: 'datastore-embedding'
});
// Generate embeddings for batch
const contents = pendingChunks.map(chunk => chunk.content);
const vectorEmbeddings = await embeddingModel.embedManyDocuments({
values: contents
});
// Update chunks with embeddings
await this.datastoreRepository.updateChunkEmbeddings(
pendingChunks.map((chunk, i) => ({
id: chunk.id,
embedding: vectorEmbeddings[i]
}))
);
return { hasMore: true };
});
hasMoreChunks = hasMore;
}
Explanation: We process embeddings in batches of 16 chunks to balance API rate limits and processing efficiency. Each batch is processed as a separate Inngest step, allowing for granular retry and monitoring.
5. Efficient Database Operations with Drizzle
Our repository layer uses Drizzle ORM for type-safe database operations:
// Repository methods using Drizzle ORM
export class DatastoreRepository {
async createChunks(chunkObjects: DatastoreChunk[], batchSize: number = 1000) {
const { db } = await createDatabaseClient();
const results = [];
for (let i = 0; i < chunkObjects.length; i += batchSize) {
const batch = chunkObjects.slice(i, i + batchSize);
const result = await db.insert(chunks).values(batch).returning();
results.push(...result);
}
return results;
}
async updateChunkEmbeddings(updates: { id: string; embedding: number[] }[]) {
const { db } = await createDatabaseClient();
return db.transaction(async (tx) => {
const results = [];
for (const update of updates) {
const result = await tx
.update(chunks)
.set({ embedding: update.embedding })
.where(eq(chunks.id, update.id))
.returning();
results.push(...result);
}
return results;
});
}
}
Explanation: We use Drizzle ORM’s type-safe query builder for database operations. Batch inserts and updates are wrapped in transactions for consistency. The createChunks method handles large datasets by processing them in configurable batch sizes.
6. Semantic Search Implementation
For querying, we directly call the repository method (no separate Inngest function needed):
// Semantic search using vector similarity
async searchQuery({
datastoreId,
query,
limit = 5,
tenantId
}: {
datastoreId: string;
query: string;
limit: number;
tenantId: string;
}) {
// Generate embedding for the query
const embeddingModel = await ModelFactoryV2.createEmbeddingModel({
tenantId,
provider: ModelProvider.GEMINI,
functionId: 'datastore-search'
});
const embedding = await embeddingModel.embedDocument({ value: query });
// Search for similar chunks using pgvector
const results = await this.searchSimilarChunks(datastoreId, embedding, limit);
return results;
}
async searchSimilarChunks(
datastoreId: string,
queryEmbedding: number[],
limit: number = 5
) {
const { db } = await createDatabaseClient();
return db
.select({
id: chunks.id,
content: chunks.content,
fileName: chunks.fileName,
fileId: chunks.fileId
})
.from(chunks)
.where(eq(chunks.datastoreId, datastoreId))
.orderBy(sql`${chunks.embedding} <=> ${queryEmbedding}`) // Cosine distance
.limit(limit);
}
Explanation: For search, we generate an embedding for the user query using the same model, then use PostgreSQL’s pgvector extension to find the most similar chunks using cosine distance. The <=> operator performs efficient vector similarity search using our HNSW index.
Key Benefits
Scalability: Inngest handles background processing without blocking the API
Reliability: Built-in retry mechanisms and step-by-step execution
Type Safety: Drizzle ORM provides compile-time type checking
Performance: HNSW vector index enables fast similarity search
Flexibility: Configurable storage backends and chunking strategies
Performance Optimizations
Parallel PDF processing: Each page is processed concurrently
Batch embedding generation: Process 16 chunks at a time
Configurable batch sizes: Adjust database batch sizes based on data size
Vector indexing: HNSW index for sub-linear search complexity
Storage optimization: Large chunk arrays stored in GCS when > 500KB
Conclusion
This RAG implementation demonstrates how modern tools like Inngest, Drizzle ORM, and PostgreSQL with pgvector can work together to create a robust, scalable document processing and retrieval system. The architecture handles diverse file types, processes them efficiently in the background, and provides fast semantic search capabilities.
The key to success is leveraging each tool’s strengths: Inngest for workflow orchestration, Drizzle for type-safe database operations, and pgvector for efficient vector similarity search. This combination provides both developer experience and production reliability.




