Bedrock Knowledge baseのデータをAWS SDK JS v3で更新する方法
Bedrock Knowledge Baseを使用すると、RAGのベースとなるベクターストアを簡単に準備できます。ただし、CMSやECサイトなどでは定期的なデータ更新が必要です。AWS SDKを使用してデータの更新/同期を行う方法を調査しました。データの取り込みはIngestionJobと呼ばれ、StartIngestionJob APIを使用して実行できます。データ量によって取り込みに時間がかかる場合は、ListIngestionJobs APIを使用してIngestion Jobの履歴を取得できます。また、個別のジョブについて詳細を確認するにはGetIngestionJob APIを使用します。データソースはS3に保存されているため、シンプルにジョブを実行できます。App FlowやEventBridge / Step Functionsと組み合わせることでさまざまなことができます。
目次
Bedrock Knowledge Baseを使うことで、簡単にRAGのベースとなるベクターストアを用意できます。が、CMSやECサイトなどでは、定期的なデータ更新が必要です。ということで、AWS SDKでデータの更新 / Syncを行う方法を調べました。
AWS SDK(v3)のインストール
Knowledge Baseの操作はclient-bedrock-agent
を使います。Agentsを使わず、Knowledge Baseだけの場合でもこちらを使いますので、ライブラリを探す際に少し迷うかもしれません。
npm i @aws-sdk/client-bedrock-agent
ジョブを開始する操作を行う
Knowledge Baseでのデータの取り込みはIngestionJob
とAWSでは呼ばれています。APIを呼び出して取り込みする場合は、StartIngestionJob
APIを実行しましょう。
const client = new BedrockAgentClient({
credentials: {
accessKeyId: env.aws.credentials.accessKeyId,
secretAccessKey: env.aws.credentials.secretAccessKey
},
region: 'us-east-1'
});
const command = new StartIngestionJobCommand({
knowledgeBaseId: "STRING_VALUE", // required
dataSourceId: "STRING_VALUE", // required
});
const response = await client.send(command);
実行結果は次のようなJSONになります。
pdatedAt":"2023-12-25T01:56:50.428Z"}})
{
'$metadata': {
httpStatusCode: 202,
requestId: '50b1e3fb-3a0c-4091-a258-741bde7c4850',
attempts: 1,
totalRetryDelay: 0
},
ingestionJob: {
dataSourceId: 'XXXXX',
ingestionJobId: 'XXXXX',
knowledgeBaseId: 'XXXXX',
startedAt: '2023-12-25T01:56:50.428Z',
statistics: {
numberOfDocumentsDeleted: 0,
numberOfDocumentsFailed: 0,
numberOfDocumentsScanned: 0,
numberOfModifiedDocumentsIndexed: 0,
numberOfNewDocumentsIndexed: 0
},
status: 'STARTING',
updatedAt: '2023-12-25T01:56:50.428Z'
}
}
Ingestion Jobの状態を確認する
データ量によっては、取り込みに時間がかかることがあります。その場合は、ListIngestionJobs
APIでKnowledge Baseに対するIngestion Jobの履歴を取得できます。
const command = new ListIngestionJobsCommand({
knowledgeBaseId: "xxxx",
dataSourceId: "xxxx",
});
const response = await client.send(command);
Listなので配列で返ってきますね。
{
"$metadata": {
"httpStatusCode": 200,
"requestId": "2272f9f0-b55d-4e35-aee0-040f1009d6f4",
"attempts": 1,
"totalRetryDelay": 0
},
"ingestionJobSummaries": [
{
"dataSourceId": "5KXD5UGI8N",
"ingestionJobId": "5EDVZDQI61",
"knowledgeBaseId": "63KJZ7TR6Z",
"startedAt": "2023-12-25T01:56:50.428Z",
"statistics": {
"numberOfDocumentsDeleted": 0,
"numberOfDocumentsFailed": 0,
"numberOfDocumentsScanned": 3,
"numberOfModifiedDocumentsIndexed": 0,
"numberOfNewDocumentsIndexed": 0
},
"status": "COMPLETE",
"updatedAt": "2023-12-25T01:56:51.286Z"
},
{
"dataSourceId": "xxxx",
"ingestionJobId": "xxxx",
"knowledgeBaseId": "xxxx",
"startedAt": "2023-12-11T14:11:49.212Z",
"statistics": {
"numberOfDocumentsDeleted": 0,
"numberOfDocumentsFailed": 0,
"numberOfDocumentsScanned": 3,
"numberOfModifiedDocumentsIndexed": 0,
"numberOfNewDocumentsIndexed": 1
},
"status": "COMPLETE",
"updatedAt": "2023-12-11T14:12:01.610Z"
},
{
"dataSourceId": "xxxx",
"ingestionJobId": "xxxx",
"knowledgeBaseId": "xxxx",
"startedAt": "2023-12-11T14:01:19.632Z",
"statistics": {
"numberOfDocumentsDeleted": 0,
"numberOfDocumentsFailed": 0,
"numberOfDocumentsScanned": 2,
"numberOfModifiedDocumentsIndexed": 0,
"numberOfNewDocumentsIndexed": 2
},
"status": "COMPLETE",
"updatedAt": "2023-12-11T14:01:30.083Z"
}
]
}
Syncの詳細を見る
もし個別のジョブについてみたい場合は、GetIngestionJob
APIを使いましょう。
const command = new GetIngestionJobCommand({
knowledgeBaseId: "STRING_VALUE", // required
dataSourceId: "STRING_VALUE", // required
ingestionJobId: "STRING_VALUE", // required
});
const response = await client.send(command);
取れるデータ自体はListとあまり変わりない様子ですが、データ量が少ない分速度がはやい可能性がありそうです。
{
"$metadata": {
"httpStatusCode": 200,
"requestId": "d3192456-633a-4ea5-bd0a-b242a20fb18f",
"attempts": 1,
"totalRetryDelay": 0
},
"ingestionJob": {
"dataSourceId": "xxxx",
"ingestionJobId": "xxxx",
"knowledgeBaseId": "xxxx",
"startedAt": "2023-12-25T01:56:50.428Z",
"statistics": {
"numberOfDocumentsDeleted": 0,
"numberOfDocumentsFailed": 0,
"numberOfDocumentsScanned": 3,
"numberOfModifiedDocumentsIndexed": 0,
"numberOfNewDocumentsIndexed": 0
},
"status": "COMPLETE",
"updatedAt": "2023-12-25T01:56:51.286Z"
}
}
試してみて感じたこと
データソースはS3に保存済みなので、そのデータを取り込むだけでよいシンプルさ故か、かなりシンプルにジョブが実行できます。App FlowやEventBridge / Step Functionsなどを組み合わせると、いろいろできそうですね。