こんにちは、うるるの眞下です
QStashというサービスを使用し始めて、
- 使い勝手が良い
- いろいろなユースケースで使用できる
- qstash以外のUpstash製品もとても良い
- コスパ最強
ということでまずは、QStashについて紹介させて下さい。
QStashとは?
QStashは、Upstashが提供するサーバーレスメッセージングおよびスケジューリングソリューションです
ClientとAPIとの仲介役として動作し、自動リトライなどを提供するサービスです
直接エンドポイントを呼び出すのではなく、QStashを経由することで信頼性の高いシステムを構築できます
この記事の概要
- QStashの主要機能をTypeScript + NestJSで実装する方法を網羅的に解説します
- 遅延処理、キュー、FlowControlなど実務で必須の機能を実装例付きで紹介します
環境構築
今回は筆者が使用しているNestJSを実装例に挙げています
NestJSの環境構築については、以下のリンクを参考に進めてください:
https://docs.nestjs.com/first-steps
@upstash/qstash TypeScript SDKのインストール
npm install @upstash/qstash
Upstashアカウント作成とAPIキーの取得
- Upstash Consoleでアカウント作成
- QStashプロジェクト作成後、環境変数をコピーします
- Consoleから取得した環境変数をセットします
ローカル環境での開発
QStashはWebhookを送信するため、ローカル開発時には公開可能なエンドポイントが必要です。以下の2つの方法で対応できます:
方法1: QStash CLI(推奨)
QStashが提供する開発用CLIを使用します:
npx @upstash/qstash-cli dev
これにより、ローカル環境でQStashサーバーをシミュレートできます。本番環境との差異を最小限に抑えながら開発が可能です
方法2: ngrokを使用したトンネリング
# ngrokのインストール
npm install -g ngrok
# ローカルサーバーを公開(例:3000番ポート)
ngrok http 3000
ngrokで生成されたHTTPS URLを、QStashのWebhook URLとして使用します。
基本的なHTTPサーバーの作成
NestJSでQStashを活用するシンプルなAPIサーバーを構築しましょう
以下のコントローラーがあれば、QStashの全機能を試すことができます
import { Controller, Post, Body, Headers, Get, Query } from '@nestjs/common';
import { Client } from '@upstash/qstash';
const qstash = new Client({
token: process.env.QSTASH_TOKEN!,
});
@Controller('qstash')
export class QstashController {
// QStashからのメッセージを受信するエンドポイント
@Post('webhook')
async receiveMessage(
@Body() body: any,
@Headers() headers: Recordstring, string>
) {
console.log('Received message:', body);
console.log('Headers:', headers);
// 処理完了を示す2xxステータスを返す
return { status: 'processed', timestamp: new Date() };
}
// エラーハンドリング用エンドポイント
@Post('webhook-error')
async receiveErrorMessage(@Body() body: any) {
console.log('Processing with potential error:', body);
// 50%の確率でエラーを発生させてリトライをテスト
if (Math.random() > 0.5) {
throw new Error('Random processing error');
}
return { status: 'success' };
}
}
QStash SDKの導入と基本操作
publishとpublishJSONの違い
QStashには2つの主要なメソッドがあります:
- publish: 生のHTTPリクエストをそのまま送信
- publishJSON: JSONデータを自動的にシリアライズして送信
publishJSONの方がTypeScript環境では使いやすいです!
基本的なメッセージパブリッシュの実装
publish メソッドの場合
// publish:手動でJSONを文字列化
@Get('send-basic')
async sendBasicMessage() {
const response = await qstash.publish({
url: 'https://your-app.com/qstash/webhook',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: 'Hello QStash!',
timestamp: new Date()
}),
});
return { messageId: response.messageId };
}
publishJSON メソッドの場合(推奨)
// publishJSON:自動でJSONを処理
@Get('send-basic-json')
async sendBasicJSONMessage() {
const response = await qstash.publishJSON({
url: 'https://your-app.com/qstash/webhook',
body: {
message: 'Hello QStash!',
timestamp: new Date(),
metadata: {
source: 'api',
version: '1.0'
}
},
// オプションパラメータ
delay: '30s', // 遅延配信
retries: 3, // リトライ回数
callback: 'https://your-app.com/callback', // コールバックURL
});
return { messageId: response.messageId };
}
publishJSONの主要プロパティ
- url: 送信先のWebhook URL(必須)
- body: 送信するJSONオブジェクト(必須)
- headers: カスタムヘッダー
- delay: 配信遅延時間(例:’30s’, ‘5m’, ‘2h’)
- retries: 最大リトライ回数
- callback: 配信完了時のコールバックURL
- failureCallback: 配信失敗時のコールバックURL
- notBefore: 配信開始時刻(Unix timestamp)
この基本実装だけで、自動リトライ付きの確実なメッセージ配信が実現できます。
機能の紹介
遅延メッセージ(Delay)機能
ユースケース
- 決済処理後の確認メールを30分後に送信
- 無料トライアル期間終了の3日前に通知
- 注文キャンセル処理を24時間後に自動実行
実装例
// 相対的な遅延(30分後)
@Get('send-delayed')
async sendDelayedMessage() {
const response = await qstash.publish({
url: 'https://your-app.com/qstash/webhook',
body: JSON.stringify({
action: 'send_reminder',
userId: 12345
}),
delay: '30m', // 30分後に配信
});
return { messageId: response.messageId };
}
// 絶対時刻指定(特定の日時)
@Get('send-scheduled')
async sendScheduledMessage() {
const futureTimestamp = Math.floor(Date.now() / 1000) + (24 * 60 * 60); // 24時間後
const response = await qstash.publish({
url: 'https://your-app.com/qstash/webhook',
body: JSON.stringify({
action: 'process_subscription_renewal',
subscriptionId: 'sub_123'
}),
notBefore: futureTimestamp,
});
return { messageId: response.messageId };
}
リトライ機能
ユースケース
- 外部API呼び出しの一時的な障害を自動復旧
- データベース更新の競合状態を自動リトライ
- メール送信サービスのレート制限を自動回避
QStashのリトライ機能は、指数バックオフアルゴリズムを採用しています。
参考:https://upstash.com/docs/qstash/features/retry
実装例
// カスタムリトライ回数の設定
@Get('send-with-retry')
async sendWithCustomRetry() {
const response = await qstash.publish({
url: 'https://your-app.com/qstash/webhook-error',
body: JSON.stringify({
action: 'critical_process',
data: { important: true }
}),
retries: 3, // 最大3回リトライ
});
return { messageId: response.messageId };
}
// リトライ情報を含むWebhook受信
@Post('webhook-retry-aware')
async receiveWithRetryInfo(
@Body() body: any,
@Headers('upstash-retried') retriedCount: string
) {
const retryCount = parseInt(retriedCount || '0');
console.log(`Processing attempt: ${retryCount + 1}`);
if (retryCount > 0) {
console.log('This is a retry attempt');
// リトライ時の特別な処理
}
return { processed: true, retryCount };
}
キュー機能(enqueue機能)
ユースケース
- 画像処理タスクの順序保証実行
- 決済処理の競合状態防止
- バッチ処理の確実な順序実行
- データ整合性が重要なワークフロー
FIFO(先入先出)が保証されたキューシステムは、データ整合性を重視するシステムやバッチ処理の順序実行に活用できます
キュー操作の実装パターン
QStashでは、キューの作成と操作は以下のように行います:
キュー作成方法
// キューの作成と設定
@Get('setup-queue')
async setupQueue() {
// キューインスタンスを取得してからupsertを呼び出す
const queue = qstash.queue({ queueName: 'image-processing-queue' });
await queue.upsert({ parallelism: 1 }); // 順序保証のため並列度1
return { message: 'Queue created successfully' };
}
// キューへのメッセージ送信
@Get('enqueue-task-v1')
async enqueueTaskV1(@Query('taskId') taskId: string) {
const response = await qstash.publish({
queue: 'image-processing-queue',
url: 'https://your-app.com/qstash/webhook',
body: JSON.stringify({
action: 'process_image',
taskId: taskId,
priority: 'high'
}),
});
return { messageId: response.messageId, queue: 'image-processing-queue' };
}
queue.enqueueJSON()
// より直感的なキュー操作
@Get('enqueue-task-v2')
async enqueueTaskV2(@Query('taskId') taskId: string) {
// キューインスタンスを取得
const queue = qstash.queue({ queueName: "image-processing-queue" });
// JSONメッセージをエンキュー
const response = await queue.enqueueJSON({
url: "https://your-app.com/qstash/webhook",
body: {
action: 'process_image',
taskId: taskId,
metadata: {
enqueuedAt: new Date(),
source: 'api-v2'
}
},
// オプション設定
delay: '10s', // 10秒後に処理開始
retries: 5, // 最大5回リトライ
});
return {
messageId: response.messageId,
queue: 'image-processing-queue',
method: 'enqueueJSON'
};
}
キューの詳細設定パターン
高パフォーマンス用途(並列処理あり)
@Get('setup-high-performance-queue')
async setupHighPerformanceQueue() {
// キューインスタンスを取得してからupsertを呼び出す
const queue = qstash.queue({ queueName: 'bulk-email-queue' });
await queue.upsert({ parallelism: 10 }); // 同時に10個まで処理
return {
message: 'High performance queue created',
parallelism: 10
};
}
// 高スループット用キューの使用例
@Get('bulk-enqueue')
async bulkEnqueue() {
const queue = qstash.queue({ queueName: "bulk-email-queue" });
const tasks = Array.from({ length: 100 }, (_, i) => ({
action: 'send_email',
emailId: `email_${i}`,
recipient: `user${i}@example.com`
}));
const responses = await Promise.all(
tasks.map(task =>
queue.enqueueJSON({
url: "https://your-app.com/qstash/webhook",
body: task
})
)
);
return {
enqueuedCount: responses.length,
messageIds: responses.map(r => r.messageId)
};
}
優先度別キュー管理
@Get('setup-priority-queues')
async setupPriorityQueues() {
// 優先度別に複数のキューを作成
const queueConfigs = [
{ name: 'critical-tasks', parallelism: 5 },
{ name: 'normal-tasks', parallelism: 3 },
{ name: 'low-priority-tasks', parallelism: 1 }
];
for (const config of queueConfigs) {
// 各キューインスタンスを取得してからupsertを呼び出す
const queue = qstash.queue({ queueName: config.name });
await queue.upsert({ parallelism: config.parallelism });
}
return {
message: 'Priority queues created',
queues: queueConfigs
};
}
@Get('enqueue-by-priority')
async enqueueByPriority(
@Query('priority') priority: 'critical' | 'normal' | 'low',
@Query('taskData') taskData: string
) {
const queueName = `${priority}-tasks`;
const queue = qstash.queue({ queueName });
const response = await queue.enqueueJSON({
url: "https://your-app.com/qstash/webhook",
body: {
priority: priority,
data: taskData,
processedAt: new Date()
}
});
return {
messageId: response.messageId,
queue: queueName,
priority: priority
};
}
キューの監視と管理
// キューの状態確認
@Get('queue-status')
async getQueueStatus(@Query('queueName') queueName: string) {
// 実際の実装では、キューの状態を取得するAPIを使用
return {
queueName: queueName,
message: 'Queue status monitoring endpoint',
// 実際にはpending、processing、completedなどの統計情報を返す
};
}
// 順序処理されるWebhook
@Post('webhook-queue')
async processQueuedTask(@Body() body: any) {
const { action, taskId, priority } = body;
console.log(`Processing task ${taskId} with priority ${priority || 'normal'} in order`);
// 重い処理をシミュレート(実際の処理時間に応じて調整)
await new Promise(resolve => setTimeout(resolve, 2000));
return {
status: 'completed',
taskId,
priority: priority || 'normal',
processedAt: new Date()
};
}
FlowControl機能
ユースケース
- 外部APIのレート制限に合わせた送信制御
- データベース負荷を一定以下に保つ
- システムリソースの効率的な活用
- サードパーティサービスとの適切な統合
この機能を使うことで、システム負荷制御が劇的に簡単になります。
FlowControlの3つの重要なプロパティ
1. Flow-Control-Key(制御キー)
FlowControlではキー単位で制御が適用されます。URLごとではなく、キーごとに制限がかかる点が重要です!
- 同じキーを持つメッセージ群に対して制限が適用される
- 異なるキーは独立して処理される
- キーの数に制限はない
使用例:
- ユーザー別制御:
user-123
- サービス別制御:
payment-service
- 地域別制御:
region-asia
2. RatePerSecond(秒間処理制限)
1秒間に送信できるメッセージ数を制限します。
どんな時に使用するのか:
- 外部APIが秒間100リクエストまでの制限がある場合
- データベースのコネクション数を制御したい場合
- サードパーティサービスのレート制限に合わせたい場合
// 秒間10リクエストに制限
@Get('send-rate-limited')
async sendRateLimitedMessage() {
const response = await qstash.publishJSON({
url: 'https://your-app.com/qstash/webhook',
body: {
action: 'process_payment',
amount: 1000
},
flowControl: {
key: 'payment-processor',
rate: 10
},
});
return { messageId: response.messageId };
}
3. Parallelism(並列実行制限)
同時実行可能なメッセージ数を制限します。
どんな時に使用するのか:
- データベースのコネクション数を制御したい場合
- メモリ使用量を制限したい場合
- 重い処理の同時実行数を制御したい場合
// 同時実行5個まで
@Get('send-parallelism-limited')
async sendParallelismLimitedMessage() {
const response = await qstash.publishJSON({
url: 'https://your-app.com/qstash/webhook',
body: {
action: 'process_large_file',
fileId: 'file_123'
},
headers: {
'Upstash-Flow-Control-Key': 'file-processor',
'Upstash-Flow-Control-Parallelism': '5', // 同時実行5個まで
},
});
return { messageId: response.messageId };
}
FlowControlの組み合わせパターン
パターン1: RatePerSecond + Parallelism
最も実用的なパターンです。秒間制限と並列制限を組み合わせることで、きめ細かい制御が可能になります。
@Get('send-combined-flow-control')
async sendCombinedFlowControl(@Query('userId') userId: string) {
const response = await qstash.publishJSON({
url: 'https://your-app.com/qstash/webhook',
body: {
action: 'process_user_data',
userId: userId,
priority: 'high'
},
flowControl: {
key: `user-processing-${userId}`,
rate: 20, // 秒間20リクエスト
parallelism: 10 // 同時実行10個まで
}
});
return {
messageId: response.messageId,
flowControlApplied: true,
limits: {
ratePerSecond: 20,
parallelism: 10
}
};
}
実際の動作例:
- 秒間20リクエスト、並列10個の制限
- 各リクエストが1分かかる場合:
- 最初の1秒で10個が開始される
- 次の1秒で10個が開始される
- 以降は処理完了を待ってから新しいリクエストが開始される
パターン2: 異なるキーでの独立制御
@Get('send-multi-service-control')
async sendMultiServiceControl(@Query('service') service: string) {
const flowControlConfigs = {
'email': { rate: '100', parallelism: '20' }, // メール送信
'sms': { rate: '50', parallelism: '10' }, // SMS送信
'push': { rate: '200', parallelism: '30' }, // Push通知
};
const config = flowControlConfigs[service] || flowControlConfigs['email'];
const response = await qstash.publishJSON({
url: 'https://your-app.com/qstash/webhook',
body: {
action: `send_${service}`,
timestamp: new Date()
},
flowControl: {
key: `${service}`,
rate: config.rate,
parallelism: config.parallelism
}
});
return {
messageId: response.messageId,
service: service,
limits: config
};
}
FlowControl状況の監視
// FlowControl状況を確認するエンドポイント
@Get('flow-control-status')
async getFlowControlStatus(@Query('key') key: string) {
// 実際のAPIを使用してFlowControl状況を取得
// (この機能は公式ドキュメントで言及されている)
return {
key: key,
message: 'FlowControl monitoring endpoint',
// 実際の実装では qstash.flowControl.get(key) のようなAPIを使用
};
}
セキュリティ(署名検証)
- Webhook の正当性確認
- 不正なリクエストの排除
- セキュリティ要件の満たし
本番環境では悪意のあるリクエストなども弾かないといけなく、QStashの正当なリクエストをQStash側で署名検証のメソッドを使用するだけで良いので、とても便利です
実装例
import { Receiver } from '@upstash/qstash';
const receiver = new Receiver({
currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
});
@Post('webhook-secure')
async receiveSecureMessage(
@Body() body: any,
@Headers('upstash-signature') signature: string
) {
try {
// 署名を検証
await receiver.verify({
signature,
body: JSON.stringify(body),
url: 'https://your-app.com/qstash/webhook-secure',
});
console.log('Verified message:', body);
return { status: 'verified and processed' };
} catch (error) {
console.error('Signature verification failed:', error);
throw new Error('Invalid signature');
}
}
URL Groups(ファンアウトパターン)
- ECサイトなどで商品が購入された時に、メールやSlackへの通知、在庫管理システムへ同一のbodyの内容を一斉配信するときなどに使用できるかと思います
- あらかじめ、QStash側にURL Groupを登録しておく必要があります
@Get('send-fanout')
async sendFanoutMessage() {
// URL Groupに送信すると、登録された全エンドポイントに配信
const response = await qstash.publish({
url: 'product-update-group', // URL Group名
body: JSON.stringify({
event: 'product_updated',
productId: 'prod_123'
}),
});
return { messageId: response.messageId };
}
バッチ送信機能
- 複数のメッセージを一括送信できます
- Promise.allなどで、処理できるような同時並行的に処理できるものはバッチ送信機能を活用して処理するのがパフォーマンス的にもいいかと思います
@Get('send-batch')
async sendBatchMessages() {
const messages = [
{
destination: 'https://your-app.com/qstash/webhook',
body: JSON.stringify({ task: 'task1' })
},
{
destination: 'https://your-app.com/qstash/webhook',
body: JSON.stringify({ task: 'task2' })
}
];
const responses = await qstash.batch(messages);
return { batchId: responses.map(r => r.messageId) };
}
料金体系
- QStashの料金は**$1 per 100,000 requests**とコスパ最強
- 使わなければ0円
- Freeプランもあり、月15,000メッセージまで無料です
参考記事
Upstash QStash 公式ドキュメント: https://upstash.com/docs/qstash/overall/getstarted
Views: 0