虎の穴ラボ技術ブログ

虎の穴ラボ株式会社所属のエンジニアが書く技術ブログです

MENU

Deno で掲示板サイトを作ろう! with upstash & supabase その 5 ( QStash を使ったスケジュール実行)

皆さん、こんにちは。 冷蔵庫を買い換えました。以前より容量を倍にしたら部屋での存在感がすごいです。おっくんです。

今回は、「Deno で掲示板サイトを作ろう! with upstash & supabase」企画の 5回目として、upstash QStash を使用したスケジュール実行処理を組み立てます。

前回記事はこちら

toranoana-lab.hatenablog.com

upstash QStash の導入

前回記事では、upstash redis を導入しました。 こちらは upstash.inc が展開している「サーバーレスで」「お安く」「上限有り(MAX160$)の従量課金」で使える Redis でした。

upstash では、Redis 以外にサーバーレスおよびエッジランタイム向けのメッセージング/スケジューリングソリューションのQStash も展開しています。

この QStash のスケジューリング機能を使い、匿名掲示板の古い書き込みを削除するバッチ処理を組み立てます。

実装方針

30 分に 1 回 upstash QStash から、deno deploy に立てた API を呼び出します。 deno deploy に用意した API は、supabase Edge Functions に立てた APIを呼び出し、条件に基づいて RDB に保存している書き込みを削除します。

削除ルール

RDB のデータ削除するにあたり、ルールを決める必要があります。
このルールは、そのまま supabase Edge Functions で実装する処理になります。

その前に現在の掲示板のデータの構造をおさらいしておきます。

  • 個別の掲示板「topic」と書き込み「post」の 2 つがある。
  • post には、親になる topic が必ず 1 つある。

この構造に対して、次のルールで削除をすることにします。

  1. post は、データ登録後 1 時間が経過したら削除
  2. topic は、データ登録後 1 時間の経過 且つ 紐づいた post が無い場合に削除

時間の経過については、topic、 post ともに、データ登録時の時刻が記録されているcreated_at カラムを持っているのでこちらのカラムを基準に判断します。

実装

第 1 段階 QStash -> deno deploy の疎通

手始めに、QStash から届いた通信を deno deploy で受け取ることを試みてみます。

anonymous-board/anonymous-board/routes/api/posts_remove.ts を作成します。

[posts_remove.ts(新規)]

import { HandlerContext } from "$fresh/server.ts";

export const handler = (_req: Request, _ctx: HandlerContext): Response => {
  console.log(req);

  return new Response("receive");
};

これが追加できたら一旦デプロイしましょう。

これまでも解説していますが Fresh は、ディレクトリベースのルーティングが効いているので /api/posts_remove と、作成したハンドラが紐づいています。

アクセスすると、receive の文字列が返ってくることが確認できるはずです。

またリクエストの内容をコンソール出力しているので、deno deploy の Logs 画面 でも応答している様子がわかるはずです。

次は、upstash の設定です。

upstash にサインインし、 QStash の管理画面を開きます。

QStash の設定を行うためには、URL や Body など各種設定を行いますが、今回扱うのは、URL、Type、 Every(Type を Scheduled に変えると表示される)です。

設定できたら、「Scheduled」で保存します。

先ほどの様に、deno deploy の Logs 画面を見ると通信が入ってきているのがわかるはずです。

見ると "user-agent": "Upstash-QStash" となっているのが確認できます。

これで QStash -> deno deploy の通信の疎通が取れました。 設定した QStash の設定は外しておきましょう。すぐに無料利用の上限に到達してしまうかもしれません。

第 2 段階 deno deploy -> supabase Edge Functions の受信のルール

疎通はできましたが、アプリケーションとして通信が QStash から届いたものであるかを判定することができていません。 upstash では、通信が QStash から送られたものであるのか、検証できるモジュールを公開しています。

github.com

こちらを使用して、検証を行います。また、通信は POST メソッドで送られてきていることも確認できるので、これも検証することにします。

upstash は deno deploy で先のモジュールを導入した処理をサンプルとして公開しています。

こちらを参考にします。

docs.upstash.com

[posts_remove.ts(更新)]

import { HandlerContext } from "$fresh/server.ts";
import { envConfig } from "../../util/config.ts";
import { Receiver } from "@upstash/qstash/mod.ts";

const receiver = new Receiver({
  currentSigningKey: envConfig.UPSTASH_QSTASH_CURRENT_SIGNING_KEY,
  nextSigningKey: envConfig.UPSTASH_QSTASH_NEXT_SIGNING_KEY,
});

export const handler = async (
  req: Request,
  _ctx: HandlerContext
): Promise<Response> => {
  if (req.headers.get("method") !== "POST") {
    console.error("not method POST");
    return new Response("", { status: 404 });
  }

  const isValid = await receiver
    .verify({
      signature: req.headers.get("Upstash-Signature"),
      body: await req.text(),
    })
    .catch((err: Error) => {
      console.error(err);
      return new Response("", { status: 404 });
    });

  if (!isValid) {
    return new Response("Invalid signature", { status: 401 });
  }

  console.log(req);
  // supabase Edge Functions を呼び出す

  return new Response("", { status: 200 });
};

QStash のモジュールは、currentSigningKeynextSigningKey の 2 つのキーを要求します。 これらは環境変数を介して引き渡すので、

[util/config.ts(更新)]

import "dotenv/load.ts";

const envConfig = {
  SUPABASE_EDGE_FUNCTION_END_POINT: Deno.env.get(
    "SUPABASE_EDGE_FUNCTION_END_POINT",
  )!,
  SUPABASE_ANON_KEY: Deno.env.get("SUPABASE_ANON_KEY")!,
  SESSION_SECONDS: Deno.env.get("SESSION_SECONDS")!,
  SECRET: Deno.env.get("SECRET")!,
  SALT: Deno.env.get("SALT")!,
  DENO_ENV: Deno.env.get("DENO_ENV")!,
  UPSTASH_REDIS_REST_URL: Deno.env.get("UPSTASH_REDIS_REST_URL")!,
  UPSTASH_REDIS_REST_TOKEN: Deno.env.get("UPSTASH_REDIS_REST_TOKEN")!,
  // 以下2つの環境変数を追記
  UPSTASH_QSTASH_CURRENT_SIGNING_KEY: Deno.env.get("UPSTASH_QSTASH_CURRENT_SIGNING_KEY"),
  UPSTASH_QSTASH_NEXT_SIGNING_KEY: Deno.env.get("UPSTASH_QSTASH_NEXT_SIGNING_KEY"),
};

export { envConfig };

これらの環境変数は、QStash のコンソールで確認できるので控えておき、deno deploy で設定します。(詳しい設定方法は、前回連載記事に掲載していますのでそちらを参照ください。)

新しくモジュールを追加しているので、import_map.json も更新しています。

[import_map.json(更新)]

{
  "imports": {
    "$fresh/": "https://deno.land/x/fresh@1.1.1/",
    "preact": "https://esm.sh/preact@10.11.0",
    "preact/": "https://esm.sh/preact@10.11.0/",
    "preact-render-to-string": "https://esm.sh/*preact-render-to-string@5.2.4",
    "@preact/signals": "https://esm.sh/*@preact/signals@1.0.3",
    "@preact/signals-core": "https://esm.sh/*@preact/signals-core@1.0.1",
    "twind": "https://esm.sh/twind@0.16.17",
    "twind/": "https://esm.sh/twind@0.16.17/",
    "dotenv/": "https://deno.land/std@0.157.0/dotenv/",
    "redis/": "https://deno.land/x/redis@v0.25.0/",
    "fresh_session/": "https://deno.land/x/fresh_session@0.2.0/",
    "deno_csrf/": "https://deno.land/x/deno_csrf@0.0.5/",
    "std_cookie": "https://deno.land/std@0.159.0/http/cookie.ts",
    "zod": "https://deno.land/x/zod@v3.20.0/mod.ts",
    "ammonia": "https://deno.land/x/ammonia@0.3.1/mod.ts",
    "upstash_redis/": "https://deno.land/x/upstash_redis/",
    "@upstash/qstash/": "https://deno.land/x/upstash_qstash@v0.3.6/" // <= 追記
  }
}

ここまでできたらデプロイします。

先の確認の様にブラウザでアクセスすると、404 エラーが返却されるのがわかるはずです。 その時には、deno deploy の Logs 画面には、not method POST と表示されています。 再度 QStash でスケジュール設定すると、応答していることも確認できます。

第 3 段階 deno deploy -> supabase Edge Functions の呼び出し処理

QStash から届いた通知であることを検証できるようになったので、最後に古い投稿の削除処理本体を作ります。 これまで作ってきた処理に、追加で API を足します。

[supabase/functions/board_api/index.ts(変更 )]

import { createClient } from "https://esm.sh/@supabase/supabase-js@^1.33.2";
import { serve } from "https://deno.land/std@0.131.0/http/server.ts";
import { Router } from "https://deno.land/x/acorn/mod.ts";
import { datetime } from "https://deno.land/x/ptera/mod.ts";

const router = new Router();

// 他API 省略

router.post("/database-access/remove_posts", async () => {
  const now = new Date();
  const dateTime = datetime(now.setHours(now.getHours() - 1));
  const limit = dateTime.toUTC().format("YYYY-MM-dd HH:mm:ss"); // 時間が経過した post を削除

  const postDeleteResult = await supabaseClient
    .from("posts")
    .delete()
    .lt("created_at", limit);

  if (postDeleteResult.statusText !== "OK") return { success: false };

  // 作成から時間が経過し、関連した post が無い topics を削除
  const topicsDeleteTargetResult = await supabaseClient
    .from("topics")
    .select("id, posts(id)", { count: "exact" })
    .lt("created_at", limit);

  if (
    topicsDeleteTargetResult.statusText !== "OK" ||
    !topicsDeleteTargetResult ||
    !topicsDeleteTargetResult.data
  )
    return { success: false };

  const topicsDeleteTargets = topicsDeleteTargetResult.data
    .filter((p: { id: number, posts: any[] }) => p.posts.length === 0)
    .map((p: { id: number, posts: any[] }) => p.id);

  if (topicsDeleteTargets.length === 0) return { success: true };

  const topicsDeleteResult = await supabaseClient
    .from("topics")
    .delete()
    .filter("id", "in", `(${topicsDeleteTargets.join(",")})`);

  if (topicsDeleteResult.statusText !== "OK") return { success: false };

  return { success: true };
});

await serve((req) => {
  console.info(req);
  return router.handle(req);
});

時刻の処理には、Ptera を使用しています。

github.com

toranoana.deno #0 で開発者自ら LT で紹介頂いたモジュールです。

toranoana-lab.hatenablog.com

supabase Edge Functions に新たに作成した処理を呼び出すために、先に QStash で呼び出すように設定した deno deploy に立っている API から呼び出すようにします。

[posts_remove.ts(更新)]

import { HandlerContext } from "$fresh/server.ts";
import { envConfig } from "../../util/config.ts";
import { Receiver } from "@upstash/qstash/mod.ts";

const receiver = new Receiver({
  currentSigningKey: envConfig.UPSTASH_QSTASH_CURRENT_SIGNING_KEY,
  nextSigningKey: envConfig.UPSTASH_QSTASH_NEXT_SIGNING_KEY,
});

export const handler = async (
  req: Request,
  _ctx: HandlerContext
): Promise<Response> => {
  if (req.method !== "POST") {
    console.error("not method POST");
    return new Response("", { status: 404 });
  }

  const isValid = await receiver
    .verify({
      signature: req.headers.get("Upstash-Signature"),
      body: await req.text(),
    })
    .catch((err: Error) => {
      console.error(err);
      return new Response("", { status: 404 });
    });

  if (!isValid) {
    return new Response("Invalid signature", { status: 401 });
  }

  // supabase Edge Functions を呼び出す
  const result = await fetch(
    `${envConfig.SUPABASE_EDGE_FUNCTION_END_POINT}/remove_posts`,
    {
      method: "POST",
      headers: {
        Authorization: `Bearer ${envConfig.SUPABASE_ANON_KEY}`,
        "Content-Type": "application/json",
      },
    }
  );

  console.log(result);

  return new Response("", { status: 200 });
};

実装できたら、デプロイし改めて QStash の設定を行います。 ここでは確認のため、再度 1 分毎の呼び出しとしておきます。

supabase Edge Functions の Logs を 確認すると、リクエストが到達していることがわかります。

適当に投稿して、時間を待つか supabase の table editor で直接 created_at を1時間経過扱いになるように書き換えると、topic と post が削除されるのが確認できるはずです。

QStash からの通知をきっかけにして、deno deploy を中継し、supabase Edge Functions を呼び出し処理ができました。

QStash は、30 分に 1 回程度の通知間隔にして、改めて設定をしておきましょう。


QStash のスケジューリング機能を使い、匿名掲示板の古い書き込みを削除するバッチ処理を組み立てました。 「スケジュール実行に QStash を使う」というのが今回の主題になりますが、エッジサービスとしての競合 CloudFlare Workers だと「Cron Triggers」という機能があります。 この機能で、CloudFlare Workers として、スケジュール実行をサポートしています。 deno deploy にもこの機能が欲しいと切に願う次第です。

では、仮に、deno deploy にスケジュール機能が入れば QStash と縁をきれるか?というとそうでもありません。 今回取り扱っていない機能として、冒頭の紹介にも記載した メッセージング の機能があります。 pub/sub 的な topic や、確実に送信先に届けるためのリトライ機能、少し変わったところではコールバックなどを備えています。 本来は、「スケジュールされたメッセージ送信」であるところの「スケジュール」の部分だけ切り出したのが今回の記事です。

deno deploy だけの構成に限らず、サーバレスなインフラの一翼として担える機能を数多く備えていると思うので、これからも活用していきたいと思っています。

今回進めた内容は、以下のリポジトリに上げていますので全体感を確認したい場合はこちらをご参考ください。

github.com

P.S.

採用

虎の穴では一緒に働く仲間を募集中です!
この記事を読んで、興味を持っていただけた方はぜひ弊社の採用情報をご覧下さい。
yumenosora.co.jp