Setup "Poke"

Setup "Poke"

Blog 7 of the 8 Blog series

Hey everyone!, finally after setting up push and pull, we are mostly done, but there is one vital piece of the puzzle pending.

Right now you can push mutations to our server and on every 60 seconds interval our client pulls and our client is in sync with db, but we don't want that 60 second delay we want it to be instant

you might not be able to notice the above fact because in this app we are using row versioning scoped to a user, and no one can actually update or change your todo-list, but imagine a linear like application where there are teams that are collaborating, in that environment you wanna see your co-workers changes instantly without reloading.

This is what poke is going to enable us to achieve.

How?

This is done by sending the client a hint that it should pull soon. This hint message is called a poke. The poke doesn’t contain any actual data. All the poke does is tell the client that it should pull again soon.

We will use ably pub sub and send an empty event to our clients with a topic formed with the current user's id.

So let's get started on setting up poke

hop on to utils/poke.ts inside api/arc in apps

import { ably } from "@repo/lib";

export async function sendPoke({ userId }: { userId: string }) {
  ably.channels.get(`replicache:${userId}`).publish({});
}

as you see very simple publish to a channel formed using user's ID

import Ably from "ably";

import { env } from "../env";

export const ably = new Ably.Rest(env.ABLY_API_KEY);

this is initialisation of ably inside the lib package

now calling sendPoke is easy as well go to replicache.controller.ts and checkout the push method

  push: RequestHandler = async (
    req: Request<object, object, PushRequestType["body"]>,
    res: Response,
    next: NextFunction,
  ) => {
    const userId = req.user.id;
    try {
      const push = req.body;
      for (const mutation of push.mutations) {
        try {
          await ReplicacheService.processMutation({
            clientGroupID: push.clientGroupID,
            errorMode: false,
            mutation,
            userId,
          });
        } catch (error) {
          await ReplicacheService.processMutation({
            clientGroupID: push.clientGroupID,
            errorMode: true,
            mutation,
            userId,
          });
        }
      }
      return res.status(200).json({
        success: true,
      });
    } catch (error) {
      if (error instanceof AppError) {
        return next(error);
      }
      logger.error(error);
      return next(
        new AppError({
          code: "INTERNAL_SERVER_ERROR",
          message:
            "Failed to push data to the server, due to an internal error. Please try again later.",
        }),
      );
    } finally {
      await sendPoke({ userId });
    }
  };

as you can see inside the finally block when the mutations are done processing we call the sendPoke function passing the current userID.

Now server side setup is done for the client we need to listen to this channel and actually pull whenever an event is pushed to this specific channel of replicache:${userID}

Now setting up an ably client on server was really easy because we just instantiated an ably class with the ably api key, which can't be done on the frontend because that will expose the api key on the web, which is not safe at all

so ably recommends to build a backend route and authenticate your frontend ably client.

so move to socket.router.ts inside api

import { Router } from "express";

import { socketController } from "../controllers/socket.controller";
import { authenticate } from "../middlewares/auth.middleware";

export const socketRouter = Router({ mergeParams: true });

/**
 * @method GET @url /socket/token @desc get's the ably token request for the user
 */
socketRouter.get("/token", authenticate, socketController.getToken);

let's check the getToken function

import { type NextFunction, type Request, type RequestHandler, type Response } from "express";

import { ably } from "@repo/lib";

class SocketController {
  public getToken: RequestHandler = async (req: Request, res: Response, next: NextFunction) => {
    try {
      const tokenRes = await ably.auth.createTokenRequest({
        clientId: req.user.id, // this is available because we have preponed this route handler with the authenticate middleware
      });

      res.status(200).json(tokenRes);
    } catch (error) {
      next(error);
    }
  };
}

export const socketController = new SocketController();

now we have the token router in place let's setup the ably client inside our web app

move to create-ably-provider.tsx file inside web/src/providers/

"use client";

import Ably from "ably";
import { AblyProvider } from "ably/react";

import { api } from "~/lib/api";

const ablyClient = new Ably.Realtime({
  authCallback: (_tokenParams, callback) => {
    api
      .getSocketAuthToken()
      .then((tokenRequest) => callback(null, tokenRequest))
      .catch((error) => callback(error, null));
  },
  /**
   * Auto-connect in the browser because we don't have cookies in the next-server
   * and we don't want ably to fire authCallback without cookies
   * @see https://github.com/ably/ably-js/issues/1742
   */
  autoConnect: typeof window !== "undefined",
  closeOnUnload: false,
});

export const AblyContextProvider = ({ children }: { children: React.ReactNode }) => {
  return <AblyProvider client={ablyClient}>{children}</AblyProvider>;
};

for those of who are wondering what the heck is api

import { type TokenRequest } from "ably";
import axios from "axios";

import {
  type PullRequest,
  type PushRequest,
  type UserCreateOutputType,
  type UserGetOutputType,
} from "@repo/models";

import { env } from "../env";

const _axios = axios.create({
  baseURL: `${env.NEXT_PUBLIC_API_URL}`,
  withCredentials: true,
  headers: {
    "Content-Type": "application/json",
  },
});

export class API {
  createUser = async (data: { email: string }) => {
    await _axios.post<UserCreateOutputType>("/users", data);
  };

  getUser = async () => {
    const response = await _axios.get<UserGetOutputType>("/users");
    return response.data;
  };

  deleteUser = async () => {
    const response = await _axios.delete("/users");
    return response.data;
  };

  getSocketAuthToken = async () => {
    const response = await _axios.get<TokenRequest>("/socket/token");
    return response.data;
  };

  replicachePull = async (data: PullRequest, instanceId: string) => {
    const response = await _axios.post(`/replicache/pull?instance=${instanceId}`, data);
    return response;
  };

  replicachePush = async (data: PushRequest, instanceId: string) => {
    const response = await _axios.post(`/replicache/push?instance=${instanceId}`, data);
    return response;
  };
}

export const api = new API();

api is a consolidation of all api calls inside frontend app which can be used on it's own or inside react-query or swr.

now add the AblyProvider inside the root provider.tsx file in web/src/app

"use client";

import { NextUIProvider } from "@nextui-org/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import * as React from "react";

import { AblyContextProvider } from "~/providers/create-ably-provider";
import { ReplicacheProvider } from "~/providers/create-replicache-provider";

export function Providers({ children }: { children: React.ReactNode }) {
  const [queryClient] = React.useState(() => new QueryClient());

  return (
    <AblyContextProvider>
      <NextUIProvider>
        <QueryClientProvider client={queryClient}>
          <ReplicacheProvider>{children}</ReplicacheProvider>
        </QueryClientProvider>
      </NextUIProvider>
    </AblyContextProvider>
  );
}

now we use ably to listen to our channel where poke will be sent so move to use-replicache.tsx inside web

import { useAbly } from "ably/react";
import { AxiosError } from "axios";
import { nanoid } from "nanoid";
import * as React from "react";
import { create } from "zustand";
import { immer } from "zustand/middleware/immer";

import { type M, type MutatorType, type PullResponseOKV1, Replicache } from "@repo/models";

import { api } from "~/lib/api";

import { env } from "~/env";
import { useUser } from "~/hook/user-user";
import { clientMutators } from "~/mutators";

type State = {
  // M is a type for all mutators in the app (client and server)
  rep: Replicache<M<MutatorType.CLIENT>> | null;
};

type Actions = {
  setRep: (rep: Replicache<M<MutatorType.CLIENT>>) => void;
};

const useReplicacheStore = create<State & Actions>()(
  immer((set) => ({
    rep: null,
    setRep: (rep) => set({ rep }),
  })),
);

export const useReplicache = () => {
  return { rep: useReplicacheStore((state) => state.rep) };
};

export const useLoadReplicache = () => {
  const { data } = useUser();
  const user = data?.user;
  const { rep, setRep } = useReplicacheStore((state) => state);
  const ably = useAbly();

  React.useEffect(() => {
    if (!user?.id) return;
    const iid = nanoid();

    const r = new Replicache({
      name: user.id,
      licenseKey: env.NEXT_PUBLIC_REPLICACHE_LICENSE_KEY,
      mutators: clientMutators(user.id),
      schemaVersion: env.NEXT_PUBLIC_SCHEMA_VERSION ?? "1",
    });

    r.pusher = async (opts) => {
      try {
        const response = await api.replicachePush(opts, iid);
        return {
          httpRequestInfo: {
            httpStatusCode: response.status,
            errorMessage: "",
          },
        };
      } catch (error) {
        if (error instanceof AxiosError)
          return {
            httpRequestInfo: {
              httpStatusCode: error.status ?? 500,
              errorMessage: error.message,
            },
          };
        return {
          httpRequestInfo: {
            httpStatusCode: 500,
            errorMessage: "Unknown error",
          },
        };
      }
    };

    r.puller = async (opts) => {
      try {
        const response = await api.replicachePull(opts, iid);
        return {
          response: response.data as PullResponseOKV1,
          httpRequestInfo: {
            errorMessage: "",
            httpStatusCode: response.status,
          },
        };
      } catch (error) {
        if (error instanceof AxiosError)
          return {
            httpRequestInfo: {
              httpStatusCode: error.status ?? 500,
              errorMessage: error.message,
            },
          };
        return {
          httpRequestInfo: {
            httpStatusCode: 500,
            errorMessage: "Unknown error",
          },
        };
      }
    };

    setRep(r);

    return () => {
      void r.close();
    };
  }, [setRep, user?.id]);

  // This is the new code that we added
  React.useEffect(() => {
    if (!rep || !user?.id) return;
    const channel = ably.channels.get(`replicache:${user.id}`);
    channel.subscribe(() => {
      void rep?.pull();
    });

    return () => {
      const channel = ably.channels.get(`replicache:${user.id}`);
      channel.unsubscribe();
    };
  }, [rep, ably.channels, user?.id]);
};

if you see the useEffect at the bottom it's very clear how we setup up respond to poke

get the ably channel where we pushed our empty poke event replicache:${userId} then simply subscribed to the channel by channel.subscribe()

and on subscribe use replicache to pull using rep.pull()

and don't forget to unsubscribe inside the cleanup block of the useEffect.

Now we are all set

As soon as a mutation happens --> server pokes connected clients to pull --> Clients pull the latest sever state.

Wallah we are mostly done with coding the app, tune into the next blog where we will do the final error handling and discuss some pointers which one should keep in mind while building such apps.