Backend Routes - "push"

Backend Routes - "push"

Blog-5 of the 8 Blog series

ยท

19 min read

Backend strategies

Replicache defines "push" and "pull" endpoints that your server must implement for sync to work. There are various ways to write push and pull endpoints. As the heading suggests, we will write the push endpoint in this blog.

But before that, let's understand the different backend strategies Replicache supports so we can proceed with writing the code.

The main difference between the strategies is how they calculate the patch/diff needed by the pull endpoint. Different approaches require different states to be stored in the backend database and different logic in the push and pull endpoints.

Also some use-cases are only supported well with some strategies. Notably:

  • Read Auth: When not all data is accessible to all users. In an application like Linear tasks of one organisation should not be synced with a users from different organisations.

  • Partial Sync: When a user only syncs some of the data they have access to. In an application like GitHub, each user has access to many GB of data, but only a small subset should be synced to the client at any given time.

Now let's discuss each of the different strategies

  1. Reset Strategy

    • Basic description: it's very basic because it sends the entire client view on every pull response, so no diff calculation during pull. This is quite in-efficient and not recommended for apps where data is frequently changing

    • When to use: For apps with very small amounts of data, or where the data changes infrequently

    • Implementation: easy ๐ŸŸข

    • Performance: bad ๐Ÿ”ด (each pull computes and returns the entire client view)

    • Read Auth: easy ๐ŸŸข (because each client get's their own data that they have access to)

    • Partial Sync: easy ๐ŸŸข

  2. Global Version Strategy

    • Basic description:

      • A single version is stored in the database and incremented on each push. Entities have a lastModifiedVersion field which is the global version the entity has last modified

      • The global version is returned as a cookie with every pull and then as a request on the next pull, which is used to calculate the diff (all the entities that have less more lastModifiedVersion that the last pull version)

    • When to use: Simple apps with low concurrency, and where data is synced to all users. (All users have access to same set of data, ex: maybe a local auction house or something)

    • Performance: bad ๐Ÿ”ด Limited to about 50 pushes/second across entire app. Which means only 50 operations your server can handle every second. (This happens because every push is trying to get access to the same set of resources to modify and acquiring locks)

    • Implementation: easy ๐ŸŸข

    • Read Auth: Hard ๐Ÿ”ด (because as discussed global version and every client accesses it and the same set of data)

    • Partial Sync: Hard ๐Ÿ”ด

  3. Per-Space Version strategy:

    • Basic description:

      • Same as global version strategy except it has a concept of space. As in there is a version to maintain per space rather than the whole server

      • Think of space as an organisation in linear

    • When to use: Apps where data can be synced per space like example can be a linear organisation.

    • Performance: ๐ŸŸก Limited to 50 pushes/second/space

    • Implementation: easy ๐ŸŸข

    • Read Auth: ๐ŸŸก Restricting access cross space is pre built as in one member from organisation 1 can not view data from organisation 2. But if you want to implement read auth inside organisation 1 (like scoping tasks to teams) that can't be implemented, basically all users within a space see everything in that space

    • Partial Sync: ๐ŸŸก You can choose which spaces to sync to each client, but within a space all data is synced.

  4. Row Version Strategy (๐Ÿ)

    • Description:

      • The client view can be computed dynamically for a specific user. it can be any arbitrary query over the database

      • Does not require global locks or concept of spaces

    • Performance: ๐ŸŸข close to traditional non transactional servers

    • Implementation: ๐Ÿ”ด Most difficult

    • Read Auth: ๐ŸŸข Fully supported. Each individual data item is authorised before sending to the client

    • Partial Sync: ๐ŸŸข Full supported, sync any arbitrary subset of data based on logic of your liking

After this basic yet tedious process of reading through this what strategy do you think you need in your app?

We will continue along with Row Version Strategy the most complex yet most scalable approach throughout the rest of the blog

Let's see our database schema and what state are we going to store inside our database

open up schema.prisma file inside models package and prisma folder of our monorepo

generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

model User {
  id           String        @id @default(uuid())
  email        String        @unique
  clientGroups ClientGroup[]
  todos        Todo[]

  @@map("users")
}

model ClientGroup {
  id           String   @id
  user         User     @relation(fields: [userId], references: [id], onDelete: Cascade)
  userId       String   @map("user_id")
  cvrVersion   Int      @map("cvr_version")
  lastModified DateTime @default(now()) @updatedAt @map("last_modified")
  clients      Client[]

  @@index([id, userId])
  @@map("client_groups")
}

model Client {
  id             String      @id
  clientGroup    ClientGroup @relation(fields: [clientGroupId], references: [id], onDelete: Cascade)
  clientGroupId  String      @map("client_group_id")
  lastMutationId Int         @default(0) @map("last_mutation_id")
  lastModified   DateTime    @default(now()) @updatedAt @map("last_modified")

  @@index([id, clientGroupId])
  @@map("clients")
}

model Todo {
  id         String  @id
  title      String
  completed  Boolean @default(false)
  userId     String  @map("user_id")
  user       User    @relation(fields: [userId], references: [id])
  rowVersion Int     @default(0) @map("row_version")

  @@map("todos")
}

If understanding what clientGroup and client is an issue we have already discussed them in detail in Blog-3 Key terminologies section

Let's go through the schema a bit

ClientGroup:

  • Refers to a browser profile and is scoped to userId

  • contains something called CVR version which we will discuss further when we learn about CVR

  • lastModified is basically updatedAt column

Clients:

  • A client refers to a browser tab under a browser profile

  • scoped to a single client group

  • When a mutation happens it's the client who sends a request to our server's push endpoint to sync it with the database. Hence we store the lastMutationID it triggered inside our database and is used inside push and pull

  • lastModified again is basically updatedAt column

Todo:

  • Our business requirement table/model

  • Main thing to notice inside todo model is a column called rowVersion. For now just keep in mind every entity/table you want to sync from the database to replicache just add this rowVersion column in it, we will discuss in detail when we actually learn about CVR and the pull endpoint

  • Make sure to increment the rowVersion by 1 every time some update happens on a row that has a rowVersion columns

  • Make sure every dataset/table that you are thinking to sync with replicache their should be a way on your server to check if a particular user has access to that entity

  • For now just keep these pointers in your head and we will revisit them while discussing CVR and the pull endpoint which will help us gain a crystal clear clarity on these concepts

Now last piece of setup I promise before actually writing the push endpoint is a bit of disclaimer about transactions

All endpoints inside replicache server setup have to be transactional with a high level of isolation (our setup is with serialisable), with high level of isolation comes deadlock errors.

On every "push" or "pull" that happens on the server replicache client keeps on retrying and sending requests to our server. But we should not rely on that alone, rather retry ourselves on our servers, every time any of our request handlers throws an error.

to see that logic hop over to prisma-client.ts file inside the models package

import { Prisma, PrismaClient } from "@prisma/client";

import { logger } from "@repo/lib";

import { AppError } from "./err";

declare global {
  // eslint-disable-next-line no-var
  var prisma: PrismaClient | undefined;
}

export const prismaClient =
  global.prisma ||
  new PrismaClient({
    log: ["query"],
  });
if (process.env.NODE_ENV !== "production") global.prisma = prismaClient;

export type TransactionalPrismaClient = Parameters<Parameters<PrismaClient["$transaction"]>[0]>[0];

export default prismaClient;
export * from "@prisma/client";

export async function transact<T>(body: (tx: TransactionalPrismaClient) => Promise<T>): Promise<T> {
  for (let i = 0; i < 10; i++) {
    try {
      const r = await prismaClient.$transaction(body, {
        isolationLevel: "Serializable",
      });
      return r;
    } catch (error) {
      logger.error(`Transaction failed, retrying...${error}`);
      if (shouldRetryTxn(error)) {
        logger.debug(`Retrying transaction...`);
        continue;
      }
    }
  }
  throw new AppError({
    code: "INTERNAL_SERVER_ERROR",
    message: "Tried 10 times, but transaction still failed",
  });
}

function shouldRetryTxn(error: unknown) {
  if (error instanceof Prisma.PrismaClientKnownRequestError) {
    return error.code === "P2002";
  }
  return false;
}

so as per this function whenever we want to instantiate a transaction we use this transact function over using normal prisma transaction, because this reties automatically in case of dead lock errors on the database. [read more about this topic https://www.postgresql.org/docs/16/transaction-iso.html]

Now let's hop back to the replicache.router.ts files inside the api app

import { Router } from "express";

import { pullRequestSchema, pushRequestSchema } from "@repo/models";

import { replicacheController } from "../controllers/replicache.controller";
import { authenticate } from "../middlewares/auth.middleware";
import validate from "../middlewares/validate.middleware";

export const replicacheRouter = Router({ mergeParams: true });

/**
 * @method POST @url /replicache/push @desc push mutations to the server
 */
replicacheRouter.post(
  "/push",
  validate(pushRequestSchema),
  authenticate,
  replicacheController.push,
);

/**
 * @method POST @url /replicache/pull @desc pull diff from the server
 */
replicacheRouter.post(
  "/pull",
  validate(pullRequestSchema),
  authenticate,
  replicacheController.pull,
);

Now let's go over this file a bit to explain you guys that work I have done

  1. Here we are creating a replicache router which is later registered to our server

  2. With the router I am mounting two routes "push" and "pull"

  3. On both routes I am attaching 2 middlewares

    1. validate --> if you checkout validate it just takes in a schema and returns a higher order function which validates the request body, headers, params, query etc against the defined schema and throws error if there is an error

    2. authenticate --> makes sure the user is authenticated and is also responsible for inserting the userId inside the request object for handy retrieval of the userID inside every route that requires the user to be authenticated

  4. Then let's go to the controllers in this blog we are just going to discuss the push controller.

Hop on the replicache.controller.ts file

import { type NextFunction, type Request, type RequestHandler, type Response } from "express";

import {
  AppError,
  type PushRequestType,
  transact,
} from "@repo/models";

import { logger } from "@repo/lib";

import { ReplicacheService } from "../services/replicache.service";
import { TodoService } from "../services/todo.service";
import { sendPoke } from "../utils/poke";

class ReplicacheController {
  push: RequestHandler = async (
    req: Request<object, object, PushRequestType["body"]>,
    res: Response,
    next: NextFunction,
  ) => {
    const userId = req.user.id;
    try {
      const push = req.body;
      for (const mutation of push.mutations) {
        try {
          await ReplicacheService.processMutation({
            clientGroupID: push.clientGroupID,
            errorMode: false,
            mutation,
            userId,
          });
        } catch (error) {
          await ReplicacheService.processMutation({
            clientGroupID: push.clientGroupID,
            errorMode: true,
            mutation,
            userId,
          });
        }
      }
      return res.status(200).json({
        success: true,
      });
    } catch (error) {
      if (error instanceof AppError) {
        return next(error);
      }
      logger.error(error);
      return next(
        new AppError({
          code: "INTERNAL_SERVER_ERROR",
          message:
            "Failed to push data to the server, due to an internal error. Please try again later.",
        }),
      );
    } finally {
      await sendPoke({ userId });
    }
  };
}

export const replicacheController = new ReplicacheController();

In this file you can see the push controller does very little amount of things and most of the business logic is even further abstracted to the service layer

  1. Define the push method inside the controller class

  2. Take out the properties that replicache sends us inside the push body (find that inside the replicache.schema.ts file inside the models package)

    1.    import { z } from "zod";
      
         const mutation = z.object({
           id: z.number(),
           clientID: z.string(),
           name: z.string(),
           args: z.any(),
         });
         export type MutationType = z.infer<typeof mutation>;
      
         export const pushRequestSchema = z.object({
           body: z.object({
             profileID: z.string(),
             clientGroupID: z.string(),
             mutations: z.array(mutation),
             schemaVersion: z.string(),
           }),
         });
         export type PushRequestType = z.infer<typeof pushRequestSchema>;
      
         const cookieSchema = z
           .object({
             order: z.number(),
             clientGroupID: z.string(),
           })
           .optional()
           .nullable();
      
         export type PullCookie = z.infer<typeof cookieSchema>;
      
         export const pullRequestSchema = z.object({
           body: z.object({
             profileID: z.string(),
             clientGroupID: z.string(),
             cookie: cookieSchema,
             schemaVersion: z.string(),
           }),
         });
         export type PullRequestType = z.infer<typeof pullRequestSchema>;
      
  3. As we know replicache sends us mutations in batch in form of an array as you can see in this schema file (generally it is one mutation per push, but when user does mutations at a high frequency or has internet issues replicache queues up the mutations and sends it in batches) any way we need to handle all those mutations

  4. So inside our try block we loop through all mutations and start a try catch block even inside each iteration

  5. and we just call ReplicacheService.processMutation on both try and catch with the only difference being in the try block we have errorMode as true and for catch we have errorMode as false

  6. To give a brief understanding of how these errorModes work inside the processMutation, we call the transact function (which starts a transaction and retriers it 10 times on deadlock errors) with errorMode true, so in any other case of server error or database failure after 10 reties we run the same function inside catch block with errorModel as true and by doing some config inside the function we block replicache to send this specific mutation again and again because we tried on server and it did not work (more on this shortly when we will write up the actual processMutation function, but even more we will discuss in blog-8 when we discuss error handling)

  7. After that we simply send a success true message to the client if all worked out, else do some error handling

  8. And in the finally block we send a poke to the concerned client (as discussed in the intro of replicache(Blog-3) send poke just sends a empty pub sub events via web-sockets to the concerned clients that something has changed on the server and they need to pull.

  9. Just remember every time you update something that is synced with replicache update the rowVersion and sendPoke to connected clients so they can pull and see the changes immediately

  10. For now knowing this much is enough we will be setting up the poke functionality on both frontend and backend on in blog-7

Let's dive deep into the processMutation function finally ๐Ÿคฏ

head over to replicache.service.ts file and checkout processMutation function

import {
  AppError,
  type MutationType,
  type TodoType,
  transact,
} from "@repo/models";

import { logger } from "@repo/lib";

import { ClientService } from "./client.service";
import { ClientGroupService } from "./client-group.service";
import { serverMutators } from "../mutators";

export class ReplicacheService {
  static async processMutation({
    clientGroupID,
    errorMode,
    mutation,
    userId,
  }: {
    userId: string;
    clientGroupID: string;
    mutation: MutationType;
    errorMode: boolean;
  }): Promise<void> {
    await transact(async (tx) => {
      logger.info(
        `Processing mutation ${mutation.name} for user ${userId} in client group ${clientGroupID}`,
      );

      // 1. Instantiate client and cliet group services inside the transaction block
      const clientGroupService = new ClientGroupService(tx);
      const clientService = new ClientService(tx);

      // 2. Fetch the base client group and client
      const [baseClientGroup, baseClient] = await Promise.all([
        clientGroupService.getClientGroupById({
          id: clientGroupID,
          userId,
        }),
        clientService.getClientById({
          id: mutation.clientID,
          clientGroupId: clientGroupID,
        }),
      ]);

      // 3. calculate the next mutation id
      const nextMutationId = baseClient.lastMutationId + 1;

      // 4. Check if the mutation id is valid
      //#region  //*=========== Mutation id checks ===========
      // 4.1. Check if the mutation id is less --> means already processed so just return
      if (mutation.id < nextMutationId) {
        logger.debug(`Skipping mutation ${mutation.id} because it has already been applied`);
        return;
      }

      // 4.2. Check if the mutation id is greater --> means future mutation so throw error
      if (mutation.id > nextMutationId) {
        logger.error(
          `Mutation ${mutation.id} is too far in the future, expected ${nextMutationId}`,
        );
        throw new AppError({
          code: "INTERNAL_SERVER_ERROR",
          message: `Mutation ${mutation.id} is too far in the future, expected ${nextMutationId}`,
        });
      }
      //#endregion  //*======== Mutation id checks ===========

      // 5. Apply the mutation if not error mode
      if (!errorMode) {
        try {
          // 5.1. Check if the mutation is valid
          const mutationName = mutation.name as keyof typeof serverMutators;
          const mutator = serverMutators[mutationName];
          if (!mutator) {
            logger.error(`Unknown mutation ${mutation.name}`);
            throw new Error(`Unknown mutation ${mutation.name}`);
          }
          // 5.2. Apply the mutation
          const args = mutation.args;
          await mutator({
            args,
            userId,
            tx,
          });
        } catch (error) {
          if (error instanceof AppError) {
            throw error;
          }
          logger.error(error);
          throw new AppError({
            code: "INTERNAL_SERVER_ERROR",
            message: `Failed to apply mutation: ${mutation.name} to the server, due to an internal error. Please try again later.`,
          });
        }
      }

      // 6. Update the client with the new mutation id
      await Promise.all([
        clientGroupService.upsert({ ...baseClientGroup }),
        clientService.upsert({
          id: baseClient.id,
          clientGroupId: baseClient.clientGroupId,
          lastMutationId: nextMutationId,
        }),
      ]);

      logger.info(`Mutation ${mutation.id} applied successfully`);
    });
  }
}

In the code block I have already marked sections of code with steps and added a brief description of what each step does now let's do a deep dive to each step

  1. Instantiating client and client group services inside the transaction block. This is done so we use the tx object of the current transaction to perform db operations inside our client and client group services so all the db operations are the part of the same transaction, which makes sure if any step fails at any point of time the whole transaction will rollback

  2. Fetch baseClientGroup and baseClient these are simple methods which won't need much explanation so am pasting the code files for both client.service.ts and client-group.service.ts below

    1. client.service.ts

       import {
         AppError,
         type Prisma,
         prismaClient,
         type SearchResult,
         type TransactionalPrismaClient,
       } from "@repo/models";
      
       /**
        * @description this class is to be used for transactional operations on the client table
        */
       export class ClientService {
         constructor(private tx: TransactionalPrismaClient = prismaClient) {}
      
         async getById({ id, clientGroupId }: { id: string; clientGroupId: string }): Promise<
           Prisma.ClientGetPayload<{
             select: {
               id: true;
               clientGroupId: true;
               lastMutationId: true;
             };
           }>
         > {
           const client = await this.tx.client.findUnique({
             where: {
               id,
               clientGroupId,
             },
             select: {
               id: true,
               clientGroupId: true,
               lastMutationId: true,
             },
           });
           if (!client) {
             return {
               id,
               clientGroupId,
               lastMutationId: 0,
             };
           }
           if (client.clientGroupId !== clientGroupId) {
             throw new AppError({
               code: "UNAUTHORIZED",
               message: "You are not authorized to access this client",
             });
           }
           return client;
         }
       }
      
       /**
        * @description this constant is an instance of the ClientService class, and should be used for non transactional operations
        * on the client table
        */
       export const clientService = new ClientService();
      
    2. client-group.service.ts

       import { AppError, type Prisma, prismaClient } from "@repo/models";
       import { type TransactionalPrismaClient } from "@repo/models";
      
       /**
        * @description this class is to be used for transactional operations on the clientGroup table
        */
       export class ClientGroupService {
         constructor(private tx: TransactionalPrismaClient = prismaClient) {}
      
         async getById({ id, userId }: { id: string; userId: string }): Promise<
           Prisma.ClientGroupGetPayload<{
             select: {
               id: true;
               userId: true;
               cvrVersion: true;
             };
           }>
         > {
           const clientGroup = await this.tx.clientGroup.findUnique({
             where: {
               id,
               userId,
             },
             select: {
               id: true,
               userId: true,
               cvrVersion: true,
             },
           });
           if (!clientGroup) {
             return {
               id,
               userId,
               cvrVersion: 0,
             };
           }
           if (clientGroup.userId !== userId) {
             throw new AppError({
               code: "UNAUTHORIZED",
               message: "You are not authorized to access this client group",
             });
           }
           return clientGroup;
         }
       }
      
       /**
        * @description this constant is an instance of the ClientGroupService class, and should be used for non transactional operations
        */
       export const clientGroupService = new ClientGroupService();
      
    3. Interesting part about both the getById methods is that they try to find clientGroup and client and if they don't find rather than creating they just return an object of how a newly created client/clientGroup would look like there is a very good reason behind that.

    4. Try on your own to play around with the order here and you will realise the issue lemme give a hint (deadlock)

  3. This is simple we just take the baseClient and get it's lastMutationID it will be 0 if not present earlier or will be some number if it existed in our db so we will simply increment it by 1 and expect the current mutation that we are executing should be exactly same as it as we are maintaining that state inside the client table inside our database

  4. Step 4 is a checker where we see

    1. if current mutation id is less than the nextMutationID calculated in step 3 we just return in production and in dev we log (check how logger works inside the lib package!!) this rarely happens in production when user has a two tabs opened up with same account (same client group) or in dev mode

    2. if current mutation id is somehow more than the nextMutationID that's a blunder which means a mutation is somehow coming from the future that we don't have track off, never happens on production if your setup is right (which in this case is ๐Ÿ˜Ž)

  5. Step 5 is simply checking if not in errorMode

    1. 5.1 check if the mutation is valid by checking agains the set of server mutators we have as a object (yet to be written by us) with the help of the type-system we wrote in the last blog (you will see how easy it will be writing mutators from the type-system)

    2. 5.2 Apply the mutation by simply calling it with args, userId and the current tx object so whatever db calls happen inside the transaction

  6. Step 6 upsert the client and client group

    1. upsert the clientGroup, in this case we are not really updating anything as you can see inside step6 of processMutation because we just passing the baseClientGroup, the purpose of calling this function here is to just create the clientGroup if not exists because remember in the getById function we returned an object with values rather than creating the clientGroup if it did not exist

       import { AppError, type Prisma, prismaClient } from "@repo/models";
       import { type TransactionalPrismaClient } from "@repo/models";
      
       /**
        * @description this class is to be used for transactional operations on the clientGroup table
        */
       export class ClientGroupService {
         constructor(private tx: TransactionalPrismaClient = prismaClient) {}
      
         async upsert({ id, userId, cvrVersion }: { id: string; userId: string; cvrVersion: number }) {
           return await this.tx.clientGroup.upsert({
             where: {
               id,
               userId,
             },
             update: {
               lastModified: new Date(),
               cvrVersion,
             },
             create: {
               id,
               userId,
               cvrVersion,
               lastModified: new Date(),
             },
             select: {
               id: true,
               cvrVersion: true,
             },
           });
         }
       }
      
       /**
        * @description this constant is an instance of the ClientGroupService class, and should be used for non transactional operations
        */
       export const clientGroupService = new ClientGroupService();
      
    2. Upsert the client. As you can see in the processMutation function, we are updating the lastMutationID to nextMutationID. If the client does not exist, we create it; otherwise, we just update it.

       import {
         AppError,
         type Prisma,
         prismaClient,
         type SearchResult,
         type TransactionalPrismaClient,
       } from "@repo/models";
      
       /**
        * @description this class is to be used for transactional operations on the client table
        */
       export class ClientService {
         constructor(private tx: TransactionalPrismaClient = prismaClient) {}
      
         async upsert({
           id,
           clientGroupId,
           lastMutationId,
         }: {
           id: string;
           clientGroupId: string;
           lastMutationId: number;
         }) {
           await this.tx.client.upsert({
             where: {
               id,
               clientGroupId,
             },
             create: {
               id,
               clientGroupId,
               lastMutationId,
             },
             update: {
               lastMutationId,
             },
             select: {
               id: true,
             },
           });
         }
       }
      
       /**
        * @description this constant is an instance of the ClientService class, and should be used for non transactional operations
        * on the client table
        */
       export const clientService = new ClientService();
      

Last piece of the puzzle for push let's write down the serverMutators itself that will actually execute the business logic and actually sync the user mutations on the backend database

  1. Go to mutators/index.ts inside api

     import { type M, type MutatorType } from "@repo/models";
    
     import { todoMutators } from "./todo.mutator";
    
     export const serverMutators: M<MutatorType.SERVER> = {
       ...todoMutators,
     };
    

    here we are creating the actual serverMutators object from the type-system we wrote in the last blog, but what about the todoMutators thoug?

  2. Now go to todo.mutator.ts inside the same folder

     import {
       AppError,
       type MutatorType,
       todoCreateSchema,
       todoDeleteSchema,
       type TodoMutators,
       todoUpdateSchema,
     } from "@repo/models";
    
     import { TodoService } from "../services/todo.service";
    
     export const todoMutators: TodoMutators<MutatorType.SERVER> = {
       async todoCreate(body) {
         const { args, tx, userId } = body;
    
         const parsed = todoCreateSchema.safeParse(args);
         if (!parsed.success) {
           throw new AppError({
             code: "BAD_REQUEST",
             message: `Invalid request body, ${parsed.error.message}`,
           });
         }
    
         const todoService = new TodoService(tx);
         await todoService.create({ args: parsed.data, userId });
       },
       async todoUpdate(body) {
         const { args, tx, userId } = body;
    
         const parsed = todoUpdateSchema.safeParse(args);
         if (!parsed.success) {
           throw new AppError({
             code: "BAD_REQUEST",
             message: `Invalid request body, ${parsed.error.message}`,
           });
         }
    
         const todoService = new TodoService(tx);
         await todoService.update({ args, userId });
       },
       async todoDelete(body) {
         const { args, tx, userId } = body;
    
         const parsed = todoDeleteSchema.safeParse(args);
         if (!parsed.success) {
           throw new AppError({
             code: "BAD_REQUEST",
             message: `Invalid request body, ${parsed.error.message}`,
           });
         }
    
         const todoService = new TodoService(tx);
         await todoService.delete({ args, userId });
       },
     };
    

    here as you can see we are defining all the mutators that are actually registered to replicache on the frontend, all of them have the same kind of template

    1. get args, tx and userId from the body of the mutation

    2. parse args with your schema written

    3. call the service with tx to make it part of the same transaction

    4. call the service function's relevant business method like create, update, delete etc

  3. You can actually hop onto todo.service.ts and checkout the methods, they are quite straight forward so am not covering them here

So finally we have setup our push endpoint and the backend mutators, lets proceed with setting up the pull endpoint in the next blog.

ย