Skip to main content

Command Palette

Search for a command to run...

Building a Scalable RAG System with Inngest, Drizzle ORM, and PostgreSQL

Published
7 min read
Building a Scalable RAG System with Inngest, Drizzle ORM, and PostgreSQL

How we implemented a robust document processing and retrieval system that handles file uploads, chunking, embedding generation, and semantic search at scale.



The Challenge

Building a Retrieval-Augmented Generation (RAG) system that can handle diverse document types, process them efficiently, and provide fast semantic search capabilities. Our requirements included:

  • Multi-format support: PDFs, images, text files, Excel spreadsheets

  • Scalable processing: Handle large documents without blocking the API

  • Efficient storage: Store embeddings in a vector database for fast similarity search

  • Reliability: Robust error handling and retry mechanisms


Our Solution Architecture

We built our RAG system using:

  • Inngest for background job processing and workflow orchestration

  • Drizzle ORM for type-safe database operations

  • PostgreSQL with pgvector for vector storage and similarity search

  • Google Cloud Storage (GCS) for file storage (configurable to S3 or local)


The Implementation

1. Database Schema with Drizzle ORM

First, let’s look at our database schema using Drizzle ORM. We have two main tables: datastores and chunks.

// Using Drizzle ORM with PostgreSQL
import { pgTable, uuid, varchar, text, integer, vector, jsonb } from 'drizzle-orm/pg-core';
import { index } from 'drizzle-orm/pg-core';

export const datastores = pgTable('datastores', {
  id: uuid('id').primaryKey().defaultRandom(),
  name: varchar('name', { length: 255 }).notNull(),
  description: text('description'),
  status: varchar('status', { length: 50 }).notNull(),
  // ... other fields
});

export const chunks = pgTable('datastore_chunks', {
  id: uuid('id').primaryKey().defaultRandom(),
  datastoreId: uuid('datastore_id').notNull(),
  fileId: uuid('file_id'),
  fileName: varchar('file_name', { length: 255 }),
  content: text('content').notNull(),
  tokenCount: integer('token_count').notNull(),
  embedding: vector('embedding', { dimensions: 768 }), // pgvector column
  metadata: jsonb('chunk_metadata').default({}),
  order: integer('order').notNull()
}, (table) => ({
  // HNSW index for fast vector similarity search
  vectorIdx: index('chunk_vector_idx').using(
    'hnsw',
    table.embedding.op('vector_cosine_ops')
  ),
  datastoreIdIdx: index('chunk_datastore_id_idx').on(table.datastoreId)
}));

Explanation: We use Drizzle ORM for type-safe database operations. The chunks table stores document chunks with their vector embeddings (768 dimensions). The HNSW index with cosine similarity operations enables fast vector similarity search.

2. Document Processing with Inngest

When a user uploads a document, we trigger an Inngest function that processes the file asynchronously:

// Inngest function for document processing
import { Inngest } from 'inngest';

export class InngestDatastoreEngine {
  public datastoreFunctions(): unknown[] {
    return [
      this.inngest.createFunction(
        { id: 'process-uploaded-document', retries: 3 },
        { event: 'datastore/file.uploaded' },
        async ({ event, step }) => {
          const { tenantId, filePath, datastoreId, fileName, fileId } = event.data;

          // Step 1: Load and process file content
          const storageService = await createStorageService();
          const buffer = await storageService.download(filePath);
          const fileContent = buffer.toString('base64');
          const mimeType = getMimeType(path.extname(fileName));

          // Step 2: Convert to text based on file type
          let textContent: string | null = null;

          if (mimeType === 'application/pdf') {
            // Process PDF pages in parallel
            const pageCount = await step.run('get-pdf-page-count', () => {
              return getPdfPageCount(Buffer.from(fileContent, 'base64'));
            });

            const model = await ModelFactoryV2.createModel({
              tenantId,
              provider: ModelProvider.GEMINI,
              functionId: 'datastore-processing'
            });

            const pagePromises = Array.from({ length: pageCount }).map((_, i) =>
              step.run(`process-pdf-page-${i}`, async () => {
                const { pdfBytes } = await splitPdf(pdfBuffer, [i + 1]);
                return await convertDocumentToMarkdown(
                  Buffer.from(pdfBytes).toString('base64'),
                  'application/pdf',
                  model
                );
              })
            );

            const pageContents = await Promise.allSettled(pagePromises);
            textContent = pageContents
              .filter(result => result.status === 'fulfilled')
              .map(result => result.value)
              .join('\n\n');
          }
          // ... handle other file types (images, Excel, text files)

          return { status: 'success', datastoreId, chunksCreated: totalChunks };
        }
      )
    ];
  }
}

Explanation: Inngest provides step-by-step execution with automatic retries and observability. We process different file types (PDFs, images, Excel) by converting them to text using AI models. PDFs are split into pages and processed in parallel for efficiency.

3. Intelligent Chunking Strategy

We use different chunking strategies based on file type:

// Chunking factory with file-type specific splitters
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter';

export class ChunkingFactory {
  static getSplitter(fileType: string, params: SplitterParams): Splitter {
    if (this.isCSV(fileType)) {
      return new CSVLineTextSplitter(params);
    }
    return new RecursiveCharacterTextSplitter(params);
  }

  private static isCSV(fileType: string): boolean {
    return fileType === 'text/csv' || 
           fileType === 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet';
  }
}

// Usage in the processing pipeline
const splitter = ChunkingFactory.getSplitter(mimeType, {
  chunkSize: 1000,      // 1000 characters per chunk
  chunkOverlap: 200     // 200 character overlap between chunks
});

const chunks = await splitter.splitText(textContent);

Explanation: We use different text splitters based on file type. CSV/Excel files use line-based splitting, while other documents use recursive character splitting. Our default chunk size is 1000 characters with 200-character overlap to maintain context.

4. Batch Embedding Generation

To handle large documents efficiently, we process embeddings in batches:

// Batch processing for embeddings
const DEFAULT_CHUNK_BATCH_SIZE = 16;
let totalProcessedChunks = 0;
let hasMoreChunks = true;

while (hasMoreChunks) {
  const { hasMore } = await step.run(`process-chunk-batch-${totalProcessedChunks}`, async () => {
    // Get pending chunks for this file
    const pendingChunks = await this.datastoreRepository.getPendingChunksByFileId(
      fileId,
      DEFAULT_CHUNK_BATCH_SIZE
    );

    if (pendingChunks.length === 0) {
      return { hasMore: false };
    }

    // Create embedding model
    const embeddingModel = await ModelFactoryV2.createEmbeddingModel({
      tenantId,
      provider: ModelProvider.GEMINI,
      functionId: 'datastore-embedding'
    });

    // Generate embeddings for batch
    const contents = pendingChunks.map(chunk => chunk.content);
    const vectorEmbeddings = await embeddingModel.embedManyDocuments({
      values: contents
    });

    // Update chunks with embeddings
    await this.datastoreRepository.updateChunkEmbeddings(
      pendingChunks.map((chunk, i) => ({
        id: chunk.id,
        embedding: vectorEmbeddings[i]
      }))
    );

    return { hasMore: true };
  });

  hasMoreChunks = hasMore;
}

Explanation: We process embeddings in batches of 16 chunks to balance API rate limits and processing efficiency. Each batch is processed as a separate Inngest step, allowing for granular retry and monitoring.

5. Efficient Database Operations with Drizzle

Our repository layer uses Drizzle ORM for type-safe database operations:

// Repository methods using Drizzle ORM
export class DatastoreRepository {
  async createChunks(chunkObjects: DatastoreChunk[], batchSize: number = 1000) {
    const { db } = await createDatabaseClient();

    const results = [];
    for (let i = 0; i < chunkObjects.length; i += batchSize) {
      const batch = chunkObjects.slice(i, i + batchSize);
      const result = await db.insert(chunks).values(batch).returning();
      results.push(...result);
    }
    return results;
  }

  async updateChunkEmbeddings(updates: { id: string; embedding: number[] }[]) {
    const { db } = await createDatabaseClient();

    return db.transaction(async (tx) => {
      const results = [];
      for (const update of updates) {
        const result = await tx
          .update(chunks)
          .set({ embedding: update.embedding })
          .where(eq(chunks.id, update.id))
          .returning();
        results.push(...result);
      }
      return results;
    });
  }
}

Explanation: We use Drizzle ORM’s type-safe query builder for database operations. Batch inserts and updates are wrapped in transactions for consistency. The createChunks method handles large datasets by processing them in configurable batch sizes.

6. Semantic Search Implementation

For querying, we directly call the repository method (no separate Inngest function needed):

// Semantic search using vector similarity
async searchQuery({
  datastoreId,
  query,
  limit = 5,
  tenantId
}: {
  datastoreId: string;
  query: string;
  limit: number;
  tenantId: string;
}) {
  // Generate embedding for the query
  const embeddingModel = await ModelFactoryV2.createEmbeddingModel({
    tenantId,
    provider: ModelProvider.GEMINI,
    functionId: 'datastore-search'
  });

  const embedding = await embeddingModel.embedDocument({ value: query });

  // Search for similar chunks using pgvector
  const results = await this.searchSimilarChunks(datastoreId, embedding, limit);
  return results;
}

async searchSimilarChunks(
  datastoreId: string,
  queryEmbedding: number[],
  limit: number = 5
) {
  const { db } = await createDatabaseClient();

  return db
    .select({
      id: chunks.id,
      content: chunks.content,
      fileName: chunks.fileName,
      fileId: chunks.fileId
    })
    .from(chunks)
    .where(eq(chunks.datastoreId, datastoreId))
    .orderBy(sql`${chunks.embedding} <=> ${queryEmbedding}`) // Cosine distance
    .limit(limit);
}

Explanation: For search, we generate an embedding for the user query using the same model, then use PostgreSQL’s pgvector extension to find the most similar chunks using cosine distance. The <=> operator performs efficient vector similarity search using our HNSW index.


Key Benefits

  1. Scalability: Inngest handles background processing without blocking the API

  2. Reliability: Built-in retry mechanisms and step-by-step execution

  3. Type Safety: Drizzle ORM provides compile-time type checking

  4. Performance: HNSW vector index enables fast similarity search

  5. Flexibility: Configurable storage backends and chunking strategies


Performance Optimizations

  • Parallel PDF processing: Each page is processed concurrently

  • Batch embedding generation: Process 16 chunks at a time

  • Configurable batch sizes: Adjust database batch sizes based on data size

  • Vector indexing: HNSW index for sub-linear search complexity

  • Storage optimization: Large chunk arrays stored in GCS when > 500KB


Conclusion

This RAG implementation demonstrates how modern tools like Inngest, Drizzle ORM, and PostgreSQL with pgvector can work together to create a robust, scalable document processing and retrieval system. The architecture handles diverse file types, processes them efficiently in the background, and provides fast semantic search capabilities.

The key to success is leveraging each tool’s strengths: Inngest for workflow orchestration, Drizzle for type-safe database operations, and pgvector for efficient vector similarity search. This combination provides both developer experience and production reliability.