Cloudflare Workers / R2 / Hono / LangChain.jsを使ってRAGを作ってみる

この記事は、特定のアドベントカレンダーに関連しており、CloudflareとHonoの利用方法について説明されています。記事では、ベクターストアの作成や定期的なデータの取り込み処理などが紹介されています。また、自分のブログを参照したり、HonoでAPIを実装する方法やGUIアプリケーションの作成についても説明されています。しかしながら、Cloudflare Vectorizeの使用やFAISSの利用も検討する必要があると述べられています。

広告ここから
広告ここまで

目次

    この記事は、以下のアドベントカレンダーへのクロスポストです。

    RAGを作成するために、ベクターストアが必要です。が、どうも調べてみるとJSONファイルのexport / importでもできなくない様子でしたので、試してみました。

    定期的にEmbeddingを作成・更新する処理をWorkersで書く

    RAGを動かすには参照するデータソースが必要です。そこでCloudflare Workersのscheduledを使って定期的な取り込み処理を作りましょう。AbordSignal.timeoutがないとエラーになる部分に注意が必要です。また、インデックスデータはバインド済みのR2バケットにJSONでputさせています。

    import { JSONLoader } from "langchain/document_loaders/fs/json";
    import { OpenAIEmbeddings } from "langchain/embeddings/openai";
    import { MemoryVectorStore } from "langchain/vectorstores/memory";
    type Bindings = {
      LANGCHAIN_VC_STORE: R2Bucket;
      OPENAI_API_KEY: string;
    }
    
    const scheduled: ExportedHandlerScheduledHandler<Bindings> = async (event, env, ctx) => {
      const embeddings = new OpenAIEmbeddings({
        openAIApiKey: env.OPENAI_API_KEY
      })
      if (!AbortSignal.timeout) {
        AbortSignal.timeout = (ms) => {
          const controller = new AbortController();
          setTimeout(() => controller.abort(new DOMException("TimeoutError")), ms);
          return controller.signal;
        };
      }
    
      const response = await fetch('https://example.com/wp-json/wp/v2/posts?per_page=100')
      const jsonResponse = await response.json()
      const indexData = (jsonResponse as any).map((data: any) => ({
        id: data.id,
        content: data.content.rendered,
      }))
      const jsonString = JSON.stringify(indexData)
      const bytes = new TextEncoder().encode(jsonString)
      const jsonBlobData = new Blob([bytes], {
        type: 'application/json:charset=utf-8'
      })
      const jsonLoader = new JSONLoader(jsonBlobData as any as globalThis.Blob)
      const docs = await jsonLoader.loadAndSplit()
    
      const vectorStoreData = await MemoryVectorStore.fromDocuments(docs, embeddings)
      await env.LANGCHAIN_VC_STORE.put('wp-kyoto.json', JSON.stringify(vectorStoreData.memoryVectors))
    }
    export default {
        scheduled
    }
    

    検索側のAPIを追加する

    続いて作成したインデックスから検索と回答文章生成を行うAPIを作ります。Workersのfetchを使っても良いのですが、軽量かつAPIを複数生やすのに便利なので、Honoを入れました。

    
    type PromiseType<T extends PromiseLike<any>> = T extends PromiseLike<infer P>
      ? P
      : never
    type MemoryVectorStoreType = PromiseType<ReturnType<typeof MemoryVectorStore.fromDocuments>>
    type MemoryVectors = MemoryVectorStoreType['memoryVectors']
    
    const app = new Hono()
    /**
     * Answer by the data
     */
    app.get('/ask', async (c) => {
      const model = new ChatOpenAI({
        temperature: 0,
        openAIApiKey: c.env.OPENAI_API_KEY,
      });
      const embeddings = new OpenAIEmbeddings({
        openAIApiKey: c.env.OPENAI_API_KEY
      })
      const result = await c.env.LANGCHAIN_VC_STORE.get('wp-kyoto.json')
      /**
       * @See https://github.com/langchain-ai/langchainjs/issues/1468#issuecomment-1666549817
       */
      const vectors = await result?.json<MemoryVectors>()
      if (!vectors) {
        return c.text('not')
      }
      const memoryVectorStore = new MemoryVectorStore(embeddings);    
      memoryVectorStore.addVectors(
        vectors.map(x => x.embedding),
        vectors.map(x => ({
          pageContent: x.content,
          metadata: { id: (x as any).id }
        }))
      );
    
      const chain = RetrievalQAChain.fromLLM(model, memoryVectorStore.asRetriever())
      const answerStream = await chain.call({
        query: "Honoで実装する方法を教えて"
      })
      console.log(answerStream)
      return c.json(answerStream)
    })
    
    ...
    
    export default {
        fetch: app.fetch,
        scheduled
    }
    

    注意点としては、「読み込んだJSONデータを使って、毎回ベクターストアをメモリ上に作成する」仕組みをとっていることでしょうか。そのため、R2からGETしたデータをMemoryVectorStoreaddVectorsで追加する処理を挟んでいます。

      const memoryVectorStore = new MemoryVectorStore(embeddings);    
      memoryVectorStore.addVectors(
        vectors.map(x => x.embedding),
        vectors.map(x => ({
          pageContent: x.content,
          metadata: { id: (x as any).id }
        }))
      );
    

    レスポンス比較

    検索APIが用意できたので、動きをみてみましょう。まずはEmbeddingしたJSONデータが空の時のレスポンスをみてみます。このコードを書いていた時期のGPT-3.5は少し前の情報しかラーニングしていなかったため、Honoについては回答できるものがありませんでした。

    % curl http://localhost:8787/ask | jq .
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100   253  100   253    0     0     22      0  0:00:11  0:00:11 --:--:--    52
    {
      "text": "申し訳ありませんが、Honoという具体的なプログラミング言語やフレームワークについては知りません。Honoに関する情報を提供していただければ、お手伝いできるかもしれません。"
    }

    続いて自分のブログをEmbeddingした後の動作を見てみます。

    % curl http://localhost:8787/ask | jq .
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100  1046  100  1046    0     0     27      0  0:00:38  0:00:38 --:--:--   236
    {
      "text": "Honoを使ってAPIを実装する方法は、以下の手順に従って行います。\n\n1. ディレクトリとファイルを用意します。\n```\n% mkdir lambda\n% touch lambda/main.ts\n```\n\n2. `lambda/main.ts`に、HonoでAPIを実装するコードを書きます。以下は、HonoとAWS Lambdaを組み合わせた例です。\n```typescript\nimport { Hono } from \"hono\";\nimport { handle } from \"hono/aws-lambda\";\n\nconst app = new Hono();\n\napp.get(\"/\", (req, res) => {\n  res.send(\"Hello, Hono!\");\n});\n\nexport const handler = handle(app);\n```\n\n3. 必要なパッケージをインストールします。\n```\n% npm install hono\n```\n\n4. AWS Lambda関数としてデプロイします。AWS CDKを使用する場合は、CDKスタックにLambda関数を追加するコードを書きます。\n\n以上の手順で、Honoを使ったAPIの実装が完了します。詳細な手順や設定については、Honoの公式ドキュメントやサンプルコードを参考にしてください。"
    }

    Cloudflare WorkersではなくAWS Lambdaを使わせようとするなど、すこし癖のある回答が生成されているため、自分のブログを参照したなということが伺えます。

    HonoでRAGのGUIアプリケーションも用意する

    デモなどをするには、やはりGUIが欲しいところです。そこで簡単なフォームを持つアプリをHonoに追加しました。

    
    
    app.get('/', async c => {
    
      return c.html(`
    <html>
    <head>
    </head>
    <body>
    <form id="input-form" autocomplete="off" method="post">
      <input
        type="text"
        name="query"
        style={{
          width: '100%'
        }}
      />
      <button type="submit">Send</button>
    </form>
    <h2>AI</h2>
    <pre
      id="ai-content"
      style={{
        'white-space': 'pre-wrap'
      }}
    ></pre>
    <script>
    let target
    let message = ''
    document.addEventListener('DOMContentLoaded', function () {
      target = document.getElementById('ai-content')
      fetchChunked(target)
      console.log('aaa')
      document.getElementById('input-form').addEventListener('submit', function (event) {
        event.preventDefault()
        const formData = new FormData(event.target)
        message = formData.get('query')
        console.log(message)
        fetchChunked(target)
      })
    })
    
    function fetchChunked(target) {
      target.innerHTML = 'loading...'
      fetch('/ask', {
        method: 'post',
        headers: {
          'content-type': 'application/json'
        },
        body: JSON.stringify({ question: message })
      }).then((response) => {
        const reader = response.body.getReader()
        let decoder = new TextDecoder()
        target.innerHTML = ''
        reader.read().then(function processText({ done, value }) {
          console.log({done, value})
          if (done) {
            return
          }
          target.innerHTML += decoder.decode(value)
          return reader.read().then(processText)
        })
      })
    }
    </script>
    </body>
    `)
    })

    ベクターストアを毎回作っている部分なども含め、レスポンスが遅いのでStreamで少しずつ表示する仕組みにしておきました。

    APIレスポンスをStreamで返す

    API側もStreamでレスポンスを返させましょう。

    
    app.post('/ask', async (c) => {
      const { question } = await c.req.json<{question: string}>()
      if (!question) {
        return c.text('')
      }
      const model = new ChatOpenAI({
        temperature: 0,
        openAIApiKey: c.env.OPENAI_API_KEY,
      });
      const embeddings = new OpenAIEmbeddings({
        openAIApiKey: c.env.OPENAI_API_KEY
      })
      const result = await c.env.LANGCHAIN_VC_STORE.get('wp-kyoto.json')
      /**
       * @See https://github.com/langchain-ai/langchainjs/issues/1468#issuecomment-1666549817
       */
      const vectors = await result?.json<MemoryVectors>()
      if (!vectors) {
        return c.text('not')
      }
      const memoryVectorStore = new MemoryVectorStore(embeddings);    
      memoryVectorStore.addVectors(
        vectors.map(x => x.embedding),
        vectors.map(x => ({
          pageContent: x.content,
          metadata: { id: (x as any).id }
        }))
      );
      const chain = RetrievalQAChain.fromLLM(model, memoryVectorStore.asRetriever())
      const answerStream = await chain.stream({
        query: question
      })
      return c.streamText(async stream => {
        stream.write("loading...")
        for await (const chunk of answerStream) {
          stream.write(chunk.text)
          await stream.sleep(10)
        }
      })
    })

    Cloudflare Workersにデプロイして動作を確認する

    実装が終わったら、wrangler deployなどでCloudflareにデプロイします。wrangler devでローカル実行している場合、特定の条件だけかもですが、Streamでレスポンスが出ないケースがありました。そのため面倒でもデモはデプロイしたものを使う方が良さそうです。

    こちらもAWS LambdaにHonoをデプロイすることを推してくるので、自分のブログに影響を受けたことが伺えますね。

    やってみた感想

    このデモを作っていた時期は、まだCloudflare Vectorizeも試していない時期でした。そのためJSONでデータを保存する方法をとっています。今CloudflareでRAGを作るならば、Vectroizeを使うのが確実ですが、これはこれで月5ドルとはいえWorkersへの課金が必要なので、稟議等の手続きが多い会社では悩ましいかもしれません。

    あとは、FAISSなど、ベクターストアのインデックスそのものをexport / importできるものもありますので、ある程度動きが把握できたタイミングでこちらに切り替えることも検討したほうがよいかなと思います。

    広告ここから
    広告ここまで
    Home
    Search
    Bookmark