Bedrock Knowledge baseのデータをAWS SDK JS v3で更新する方法

Bedrock Knowledge Baseを使用すると、RAGのベースとなるベクターストアを簡単に準備できます。ただし、CMSやECサイトなどでは定期的なデータ更新が必要です。AWS SDKを使用してデータの更新/同期を行う方法を調査しました。データの取り込みはIngestionJobと呼ばれ、StartIngestionJob APIを使用して実行できます。データ量によって取り込みに時間がかかる場合は、ListIngestionJobs APIを使用してIngestion Jobの履歴を取得できます。また、個別のジョブについて詳細を確認するにはGetIngestionJob APIを使用します。データソースはS3に保存されているため、シンプルにジョブを実行できます。App FlowやEventBridge / Step Functionsと組み合わせることでさまざまなことができます。

広告ここから
広告ここまで

目次

    Bedrock Knowledge Baseを使うことで、簡単にRAGのベースとなるベクターストアを用意できます。が、CMSやECサイトなどでは、定期的なデータ更新が必要です。ということで、AWS SDKでデータの更新 / Syncを行う方法を調べました。

    AWS SDK(v3)のインストール

    Knowledge Baseの操作はclient-bedrock-agentを使います。Agentsを使わず、Knowledge Baseだけの場合でもこちらを使いますので、ライブラリを探す際に少し迷うかもしれません。

    npm i @aws-sdk/client-bedrock-agent

    ジョブを開始する操作を行う

    Knowledge Baseでのデータの取り込みはIngestionJobとAWSでは呼ばれています。APIを呼び出して取り込みする場合は、StartIngestionJob APIを実行しましょう。

        const client = new BedrockAgentClient({
            credentials: {
                accessKeyId: env.aws.credentials.accessKeyId,
                secretAccessKey: env.aws.credentials.secretAccessKey
            },
            region: 'us-east-1'
        });
          const command = new StartIngestionJobCommand({
            knowledgeBaseId: "STRING_VALUE", // required
            dataSourceId: "STRING_VALUE", // required
          });
          const response = await client.send(command);

    実行結果は次のようなJSONになります。

    pdatedAt":"2023-12-25T01:56:50.428Z"}})
    {
      '$metadata': {
        httpStatusCode: 202,
        requestId: '50b1e3fb-3a0c-4091-a258-741bde7c4850',
        attempts: 1,
        totalRetryDelay: 0
      },
      ingestionJob: {
        dataSourceId: 'XXXXX',
        ingestionJobId: 'XXXXX',
        knowledgeBaseId: 'XXXXX',
        startedAt: '2023-12-25T01:56:50.428Z',
        statistics: {
          numberOfDocumentsDeleted: 0,
          numberOfDocumentsFailed: 0,
          numberOfDocumentsScanned: 0,
          numberOfModifiedDocumentsIndexed: 0,
          numberOfNewDocumentsIndexed: 0
        },
        status: 'STARTING',
        updatedAt: '2023-12-25T01:56:50.428Z'
      }
    }

    Ingestion Jobの状態を確認する

    データ量によっては、取り込みに時間がかかることがあります。その場合は、ListIngestionJobs APIでKnowledge Baseに対するIngestion Jobの履歴を取得できます。

          const command = new ListIngestionJobsCommand({
            knowledgeBaseId: "xxxx",
            dataSourceId: "xxxx",
          });
          const response = await client.send(command);

    Listなので配列で返ってきますね。

    {
      "$metadata": {
        "httpStatusCode": 200,
        "requestId": "2272f9f0-b55d-4e35-aee0-040f1009d6f4",
        "attempts": 1,
        "totalRetryDelay": 0
      },
      "ingestionJobSummaries": [
        {
          "dataSourceId": "5KXD5UGI8N",
          "ingestionJobId": "5EDVZDQI61",
          "knowledgeBaseId": "63KJZ7TR6Z",
          "startedAt": "2023-12-25T01:56:50.428Z",
          "statistics": {
            "numberOfDocumentsDeleted": 0,
            "numberOfDocumentsFailed": 0,
            "numberOfDocumentsScanned": 3,
            "numberOfModifiedDocumentsIndexed": 0,
            "numberOfNewDocumentsIndexed": 0
          },
          "status": "COMPLETE",
          "updatedAt": "2023-12-25T01:56:51.286Z"
        },
        {
          "dataSourceId": "xxxx",
          "ingestionJobId": "xxxx",
          "knowledgeBaseId": "xxxx",
          "startedAt": "2023-12-11T14:11:49.212Z",
          "statistics": {
            "numberOfDocumentsDeleted": 0,
            "numberOfDocumentsFailed": 0,
            "numberOfDocumentsScanned": 3,
            "numberOfModifiedDocumentsIndexed": 0,
            "numberOfNewDocumentsIndexed": 1
          },
          "status": "COMPLETE",
          "updatedAt": "2023-12-11T14:12:01.610Z"
        },
        {
          "dataSourceId": "xxxx",
          "ingestionJobId": "xxxx",
          "knowledgeBaseId": "xxxx",
          "startedAt": "2023-12-11T14:01:19.632Z",
          "statistics": {
            "numberOfDocumentsDeleted": 0,
            "numberOfDocumentsFailed": 0,
            "numberOfDocumentsScanned": 2,
            "numberOfModifiedDocumentsIndexed": 0,
            "numberOfNewDocumentsIndexed": 2
          },
          "status": "COMPLETE",
          "updatedAt": "2023-12-11T14:01:30.083Z"
        }
      ]
    }

    Syncの詳細を見る

    もし個別のジョブについてみたい場合は、GetIngestionJob APIを使いましょう。

    const command = new GetIngestionJobCommand({
      knowledgeBaseId: "STRING_VALUE", // required
      dataSourceId: "STRING_VALUE", // required
      ingestionJobId: "STRING_VALUE", // required
    });
    const response = await client.send(command);

    取れるデータ自体はListとあまり変わりない様子ですが、データ量が少ない分速度がはやい可能性がありそうです。

    {
      "$metadata": {
        "httpStatusCode": 200,
        "requestId": "d3192456-633a-4ea5-bd0a-b242a20fb18f",
        "attempts": 1,
        "totalRetryDelay": 0
      },
      "ingestionJob": {
        "dataSourceId": "xxxx",
        "ingestionJobId": "xxxx",
        "knowledgeBaseId": "xxxx",
        "startedAt": "2023-12-25T01:56:50.428Z",
        "statistics": {
          "numberOfDocumentsDeleted": 0,
          "numberOfDocumentsFailed": 0,
          "numberOfDocumentsScanned": 3,
          "numberOfModifiedDocumentsIndexed": 0,
          "numberOfNewDocumentsIndexed": 0
        },
        "status": "COMPLETE",
        "updatedAt": "2023-12-25T01:56:51.286Z"
      }
    }

    試してみて感じたこと

    データソースはS3に保存済みなので、そのデータを取り込むだけでよいシンプルさ故か、かなりシンプルにジョブが実行できます。App FlowやEventBridge / Step Functionsなどを組み合わせると、いろいろできそうですね。

    参考

    広告ここから
    広告ここまで
    Home
    Search
    Bookmark