Backend Routes - "pull"

Backend Routes - "pull"

Blog-6 of the 8 Blog series

·

13 min read

Hey everyone! Let's get started with writing the pull endpoint. Before writing the actual endpoint though, let's discuss the topic that is specific to row versioning backend strategy and we skipped in the last blog

CVR (Client View Record)

A client view record (CVR) is a minimal representation of a client view snapshot. In other words, it holds state of a certain client group at a particular instant of time.

we can write it as follows

{
    clients: {
        "id1": "lastMutationID1",
        "id2": "lastMutationID2",    
    },
    todos: {
        "id1": "rowVersion1",
        "id2": "rowVersion2"    
    }
    ...
}

As you can see a CVR consists of all the clients inside a single client group (browser profile) and all the connected clients(browser tabs) with their last mutation ids.

Also consists of all the entities that are stored inside the indexDB of the browser profile in form of a Map<id, rowVersion>

One CVR is generated for each pull response and stored in some temporary storage. The storage doesn’t need to be durable — if the CVR is lost, the server can just send a reset patch. And the storage doesn’t need to be transactional with the database. Redis is fine.

The CVRs are stored keyed under a random unique ID which becomes the cookie sent to Replicache.

During pull, the server uses the cookie to lookup the CVR associated with the previous pull response. It then computes a new CVR for the latest server state and diffs the two CVRs to compute the delta to send to the client.

Now let's write up the CVR class, you can find this inside the cvr.ts file inside models package

export type SearchResult = {
  id: string;
  rowVersion: number;
};

export type ClientViewMetadata = { rowVersion: number };
type ClientViewMap = Map<string, ClientViewMetadata>;

/**
 * A Client View Record (CVR) is a minimal representation of a Client View snapshot.
 * In other words, it captures what data a Client Group had at a particular moment in time.
 */
export class CVR {
  public clients: ClientViewMap;
  public todos: ClientViewMap;

  constructor({ clients, todos }: { clients: ClientViewMap; todos: ClientViewMap }) {
    this.clients = clients;
    this.todos = todos;
  }

  static serializeSearchResult(result: SearchResult[]): Map<string, ClientViewMetadata> {
    const data = new Map<string, ClientViewMetadata>();
    result.forEach((row) => data.set(row.id, { rowVersion: row.rowVersion }));

    return data;
  }

  static getPutsSince(nextData: ClientViewMap, prevData: ClientViewMap): string[] {
    const puts: string[] = [];
    nextData.forEach((meta, id) => {
      const prev = prevData.get(id);
      if (prev === undefined || prev.rowVersion < meta.rowVersion) {
        puts.push(id);
      }
    });
    return puts;
  }

  static getDelsSince(nextData: ClientViewMap, prevData: ClientViewMap): string[] {
    const dels: string[] = [];
    prevData.forEach((_, id) => {
      if (!nextData.has(id)) {
        dels.push(id);
      }
    });
    return dels;
  }

  static generateCVR({
    todosMeta,
    clientsMeta,
  }: {
    todosMeta: SearchResult[];
    clientsMeta: SearchResult[];
  }): CVR {
    return {
      todos: CVR.serializeSearchResult(todosMeta),
      clients: CVR.serializeSearchResult(clientsMeta),
    };
  }
}

This is the CVR utility class which will help us to find the diff with the static methods getPutsSince and getDelsSince which finds out what are the entities that are newly added to the newCVR and deleted from new CVR respectively

generateCVR as the name suggests helps generate a new CVR

seriliazeSearchResults again as the name suggests serialise the search results from the db to store in the cache

Now let's write up another class called cvr-cache which will be a helper class for storing cvr inside redis

let's move to cvr-cache.ts inside models package

import { type Redis, TIME_WINDOWS } from "@repo/lib";

import { type ClientViewMetadata, CVR } from "./cvr";
import { type PullCookie } from "./schemas";

type RedisCVR = {
  todos: Record<string, ClientViewMetadata>[];
  clients: Record<string, ClientViewMetadata>[];
};

export class CVRCache {
  constructor(private redis: Redis) {}

  static makeCVRKey(clientGroupID: string, order: number) {
    return `${clientGroupID}/${order}`;
  }

  static convertRedisObjectToMap(data: Record<string, ClientViewMetadata>[]) {
    return new Map<string, ClientViewMetadata>(
      data.map((item) => Object.entries(item)[0]) as [string, ClientViewMetadata][],
    );
  }

  static convertMapToRedisObject(map: Map<string, ClientViewMetadata>) {
    return Array.from(map).map(([key, value]) => ({ [key]: value }));
  }

  public async getBaseCVR(
    clientGroupID: string,
    cookie: PullCookie,
  ): Promise<{
    baseCVR: CVR;
    previousCVR?: CVR;
  }> {
    let previousCVR: CVR | undefined;

    if (typeof cookie === "object" && cookie !== null && typeof cookie.order === "number") {
      /**
       * Redis CVR might be empty if we push a new schema,
       * that's why we make it to return Partial.
       *
       * Say we add a new model, and the client has an old CVR,
       * that new model will not be in the CVR and break the app, so we need to create a
       * fallback
       */
      const _redisCVR = await this.redis.get(CVRCache.makeCVRKey(clientGroupID, cookie.order));
      const redisCVR = _redisCVR ? (JSON.parse(_redisCVR) as Partial<RedisCVR>) : undefined;

      if (redisCVR) {
        const cvr = new CVR({
          // fallback to empty map in case we have a new model
          todos: redisCVR.todos ? CVRCache.convertRedisObjectToMap(redisCVR.todos) : new Map(),
          clients: redisCVR.clients
            ? CVRCache.convertRedisObjectToMap(redisCVR.clients)
            : new Map(),
        });

        previousCVR = cvr;
      }
    }

    const baseCVR =
      previousCVR ??
      new CVR({
        todos: new Map<string, ClientViewMetadata>(),
        clients: new Map<string, ClientViewMetadata>(),
      });

    return { baseCVR, previousCVR };
  }

  public async setCVR(clientGroupID: string, order: number, cvr: CVR) {
    const redisCVR: RedisCVR = {
      todos: CVRCache.convertMapToRedisObject(cvr.todos),
      clients: CVRCache.convertMapToRedisObject(cvr.clients),
    };

    await this.redis.set(CVRCache.makeCVRKey(clientGroupID, order), JSON.stringify(redisCVR));
    await this.redis.expire(CVRCache.makeCVRKey(clientGroupID, order), TIME_WINDOWS.ONE_HOUR * 12);
  }

  public async delCVR(clientGroupID: string, order: number) {
    await this.redis.del(CVRCache.makeCVRKey(clientGroupID, order));
  }
}

There are three important methods inside this class getBaseCVR, setCVR, & delCVR let's discuss each one of them in detail

getBaseCVR:

this function takes in a clientGroupID and a cookie as argument and returns a baseCVR<either prevCVR if it is not undefined or a new CVR with empty maps for all entries> and a prevCVR<one that is fetched from redis can be undefined>

serCVR:

as name suggests takes in a clietgroupID and an order and creates a key, to which sets your cvr as a value

delCVR:

deletes cvr from a key (which is generated by clientGroupID and order)


Now that we have knocked utility classes out of the way let's get straight to writing the pull endpoint

import { type NextFunction, type Request, type RequestHandler, type Response } from "express";

import {
  AppError,
  CVR,
  CVRCache,
  type PullCookie,
  type PullRequestType,
  type PullResponseOKV1,
  type PushRequestType,
  transact,
} from "@repo/models";

import { logger, redis } from "@repo/lib";

import { ClientService } from "../services/client.service";
import { ClientGroupService } from "../services/client-group.service";
import { ReplicacheService } from "../services/replicache.service";
import { TodoService } from "../services/todo.service";

const cvrCache = new CVRCache(redis);

class ReplicacheController {
  pull: RequestHandler = async (
    req: Request<object, object, PullRequestType["body"]>,
    res: Response,
    next: NextFunction,
  ) => {
    try {
      const userId = req.user.id;
      const { cookie, clientGroupID } = req.body;
      // 1. Get the base CVR and the previous CVR from the cache
      const { baseCVR, previousCVR } = await cvrCache.getBaseCVR(clientGroupID, cookie);

      const trxResponse = await transact(async (tx) => {
        // 2. Init services inside the transaction
        //#region  //*=========== init services ===========
        const clientGroupService = new ClientGroupService(tx);
        const clientService = new ClientService(tx);
        const todoService = new TodoService(tx);
        //#endregion  //*======== init services ===========

        // 3. Get the base client group
        const baseClientGroup = await clientGroupService.getById({
          id: clientGroupID,
          userId,
        });

        // 4. Get the all todos and clients (just id and rowVersion) from the database
        // this needs to be done for all entities that are part of the sync
        const [todosMeta, clientsMeta] = await Promise.all([
          todoService.findMeta({ userId }),
          clientService.findMeta({ clientGroupId: clientGroupID }),
        ]);

        // 5. Generate the next CVR
        const nextCVR = CVR.generateCVR({
          clientsMeta,
          todosMeta,
        });

        // 6. Get the puts and dels for todos
        // this needs to be done for all entities that are part of the sync
        const todoPuts = CVR.getPutsSince(nextCVR.todos, baseCVR.todos); // puts refers to ones that are new or updated
        const todoDels = CVR.getDelsSince(nextCVR.todos, baseCVR.todos); // dels refers to ones that are deleted

        // 7. Get the actual todos data from the database for all the puts
        const todos = await todoService.findMany({ ids: todoPuts });

        // 8. Get the puts for clients and compute the changes for each client
        const clientPuts = CVR.getPutsSince(nextCVR.clients, baseCVR.clients);
        const clientChanges: Record<string, number> = {}; // {clientid: lastMutationId}
        for (const id of clientPuts) {
          const c = nextCVR.clients.get(id);
          clientChanges[id] = c ? c.rowVersion : 0;
        }

        // 9. Upsert the client group with the new CVR version
        const previousCVRVersion = cookie?.order ?? baseClientGroup.cvrVersion;
        const nextClientGroup = await clientGroupService.upsert({
          id: baseClientGroup.id,
          userId,
          cvrVersion: Math.max(previousCVRVersion, baseClientGroup.cvrVersion) + 1,
        });

        // 10. Generate the new response cookie
        const responseCookie: PullCookie = {
          clientGroupID,
          order: nextClientGroup.cvrVersion,
        };

        // 11. Generate the patch for Replicache to sync the indexDB of the client group
        const patch = ReplicacheService.genPatch({
          previousCVR,
          TODO: {
            data: todos,
            dels: todoDels,
          },
        });

        return {
          nextCVR,
          responseCookie,
          patch,
          clientChanges,
        };
      });

      if (trxResponse === null) {
        return res.status(200).json({
          cookie,
          lastMutationIDChanges: {},
          patch: [],
        });
      }

      const { patch, clientChanges, nextCVR, responseCookie } = trxResponse;
      // 12. Set the new CVR in the cache
      await cvrCache.setCVR(responseCookie.clientGroupID, responseCookie.order, nextCVR);
      // 13. Delete the old CVR from the cache if it existed
      if (cookie) {
        await cvrCache.delCVR(clientGroupID, cookie.order);
      }

      const body: PullResponseOKV1 = {
        cookie: responseCookie,
        lastMutationIDChanges: clientChanges,
        patch,
      };

      return res.status(200).json(body);
    } catch (error) {
      if (error instanceof AppError) {
        return next(error);
      }
      logger.error(error);
      return next(
        new AppError({
          code: "INTERNAL_SERVER_ERROR",
          message:
            "Failed to pull data from the server, due to an internal error. Please try again later.",
        }),
      );
    }
  };
}

export const replicacheController = new ReplicacheController();

Let's go through each step in detail and discuss the code

  1. Get baseCVR and prevCVR from the redis cache with the help of clientGroupID and cookie that we receive from the pull request body (we have already discussed this function in detail above when were discussing about cvrCache class, but all it does is fetches the CVR from redis for this clientGroup --> which symbolises it's existing state on the browser cache)

  2. Initialise the transaction and services within it for this we are doing clientGroup, client and todo services.

  3. Get the base clientGroup using the getByID function

       async getById({ id, userId }: { id: string; userId: string }): Promise<
         Prisma.ClientGroupGetPayload<{
           select: {
             id: true;
             userId: true;
             cvrVersion: true;
           };
         }>
       > {
         const clientGroup = await this.tx.clientGroup.findUnique({
           where: {
             id,
             userId,
           },
           select: {
             id: true,
             userId: true,
             cvrVersion: true,
           },
         });
         if (!clientGroup) {
           return {
             id,
             userId,
             cvrVersion: 0,
           };
         }
         if (clientGroup.userId !== userId) {
           throw new AppError({
             code: "UNAUTHORIZED",
             message: "You are not authorized to access this client group",
           });
         }
         return clientGroup;
       }
    

    we have already discussed this method during push but to recap we just try to fetch the clientGroup by it's id and associated userID, and if not exist rather can creating we just return an object with all properties of a newly created clientGroup and defer the creation process for the end to avoid deadlocks in the transaction (lol this was for you to figure out in the last blog).

  4. Find all metas which basically means finding all entities that we need to sync with client from the database (fetching the state of our central server but only of the part that the user has access to)

    • todoService.findMeta

          async findMeta({ userId }: { userId: string }): Promise<SearchResult[]> {
            return this.tx.todo.findMany({
              where: {
                userId,
              },
              select: {
                id: true,
                rowVersion: true,
              },
            });
          }
      

      it only consists of id and rowVersion else this db call will be very expensive

    • Like wise we have to do the same drill for all entities we want to sync with replicache (but keep in mind only fetch data that the user has access to)

    • clientService.findMeta, this is a bit odd right, we are fetching the lastMutationID for client but calling it as rowVersion, that is just to redis class simple and keep typescript simple for the sake of the blog

    • Ideally you want to store lastMutationID as lastMutationID and not rowVersion just to satisfy typescript, but at the end of the day it's your call

          async findMeta({ clientGroupId }: { clientGroupId: string }): Promise<SearchResult[]> {
            const clients = await this.tx.client.findMany({
              where: {
                clientGroupId,
              },
              select: {
                id: true,
                lastMutationId: true,
              },
            });
      
            return clients.map((client) => ({
              id: client.id,
              rowVersion: client.lastMutationId,
            }));
          }
      
  5. Once found out all the meta's generate nextCVR or the new CVR that we will be eventually stored in redis using the generateCVR method

  6. Get diffs in the form of puts and dels of all entities. Again this drill has to be done for all entities that you want to sync with replicache

    1. puts refers to ones that are new or are updated

      • Lemme explain how are able to know that remember a rowVersion field we store inside each row of our tables, like todo.

      • So if you current cvr generated from your new has an entry of a todo which does not exists in old cvr (newly created todo) that is a put

      • Or say we update a todo (we also update rowVersion) so new CVR will have updated rowVersion (say 10) but old cvr has a lesser rowVersion (say 8) that will be considered as a put as well

      •         static getPutsSince(nextData: ClientViewMap, prevData: ClientViewMap): string[] {
                  const puts: string[] = [];
                  nextData.forEach((meta, id) => {
                    const prev = prevData.get(id);
                    if (prev === undefined || prev.rowVersion < meta.rowVersion) {
                      puts.push(id);
                    }
                  });
                  return puts;
                }
        
    2. dels as the name suggests refers to the deleted records

      • simply entries that are absent in newCVR but were present inside oldCVR
  7. Now we need to generate the patch (all the new/updated entries or deleted entries so browser can get in sync)

    • for that we simply fetch all data from todoService with help of all ids we get from puts [todoPuts is just an array of todoIDs]
  8. Now is a bit of a tricky part (but is boiler plate needed for replicache) so setup once and never touch it again 🙂

    • this step involves you getting the puts in your clients part of the cvr, means what are the new clients got added and what are the updated once (updated lastMutationID) inside a clientGroup

    • And finally create a record called clientChanges which consists of all clientsIDs in a clientGroup and their lastMutationIDs

        {
            "clientID": "lastMutationID" 
        }
      
  9. Upsert the client group with new CVR version because we are just updating the cvr now.

  10. Generate the new response cookie which consists of the clientGroupID and order which is nothing but the clientGroup's updated cvr version

  11. Finally generate a patch for which we use a utility function called genPatch you can find this function inside the replicache.service.ts file

    //! Make sure key have the same name as IDB_KEY
    type GenPatchArgs = {
      previousCVR?: CVR;
      TODO: {
        data: TodoType[];
        dels: string[];
      };
    };
    
    export class ReplicacheService {
      static genPatch(args: GenPatchArgs): PatchOperation[] {
        const patch: PatchOperation[] = [];
        const { previousCVR, ...models } = args;
    
        // clears the whole indexDB if previousCVR is undefined
        if (previousCVR === undefined) {
          patch.push({ op: "clear" });
        }
    
        Object.entries(models).forEach(([_key, { data, dels }]) => {
          const key = _key as IDBKeys;
    
          dels.forEach((del) =>
            patch.push({
              op: "del", // delete from indexDB if it exists
              key: IDB_KEY[key]({ id: del }),
            }),
          );
          data.forEach((datum) =>
            patch.push({
              op: "put", // put in indexDB if it doesn't exist or update if it does
              key: IDB_KEY[key]({ id: datum.id }),
              value: normalizeToReadonlyJSON(datum),
            }),
          );
        });
    
        return patch;
      }
    }
    

    Now this function is pretty important, because it generates the diff that replicache on your client will use to sync your indexDB with the current state of the database.
    Replicache expects a PatchOperations[] where it expects keys which can be either "put", "del" or "clear"
    put also expects value which replicache will put inside that key

    This is where what the IDB_KEY object is all about that we wrote in initiate replicache blog (blog-4) where we defined keys for all our entities that we need to store inside replicache.
    Because replicache puts then in these keys on our response from pull then we simply can update indexDB with those keys to trigger mutations or simply subscribe to those keys for rendering value

  12. Finally we set the newCVR inside redis

  13. And delete the oldCVR if it existed saving space in redis

Finally we are done with our pull endpoint let's move on to the next blog where we ill setup "poke".