メインコンテンツにスキップ
バージョン: v6 - 安定版

AWS Lambda での Sequelize の使用

AWS Lambda は、お客様が基盤となるサーバーを気にすることなくコードを実行できるサーバーレスコンピューティングサービスです。特定の概念が正しく理解されておらず、適切な構成が使用されていない場合、AWS Lambda で sequelize を使用するのは難しい場合があります。このガイドでは、これらの概念のいくつかを明確にすることで、ライブラリのユーザーが AWS Lambda 用に sequelize を適切に構成し、問題をトラブルシューティングできるようにすることを目的としています。

TL;DR

AWS Lambda 用に sequelize コネクションプーリング を適切に構成する方法だけを知りたい場合は、sequelize コネクションプーリングは AWS Lambda の Node.js ランタイムとうまく連携せず、解決するよりも多くの問題を引き起こすことを知っておく必要があります. したがって、最も適切な構成は、**同じ呼び出し内でプーリングを使用** し、**呼び出し間でプーリングしない** ことです (つまり、最後にすべてのコネクションを閉じる)。

const { Sequelize } = require("sequelize");

let sequelize = null;

async function loadSequelize() {
const sequelize = new Sequelize(/* (...) */, {
// (...)
pool: {
/*
* Lambda functions process one request at a time but your code may issue multiple queries
* concurrently. Be wary that `sequelize` has methods that issue 2 queries concurrently
* (e.g. `Model.findAndCountAll()`). Using a value higher than 1 allows concurrent queries to
* be executed in parallel rather than serialized. Careful with executing too many queries in
* parallel per Lambda function execution since that can bring down your database with an
* excessive number of connections.
*
* Ideally you want to choose a `max` number where this holds true:
* max * EXPECTED_MAX_CONCURRENT_LAMBDA_INVOCATIONS < MAX_ALLOWED_DATABASE_CONNECTIONS * 0.8
*/
max: 2,
/*
* Set this value to 0 so connection pool eviction logic eventually cleans up all connections
* in the event of a Lambda function timeout.
*/
min: 0,
/*
* Set this value to 0 so connections are eligible for cleanup immediately after they're
* returned to the pool.
*/
idle: 0,
// Choose a small enough value that fails fast if a connection takes too long to be established.
acquire: 3000,
/*
* Ensures the connection pool attempts to be cleaned up automatically on the next Lambda
* function invocation, if the previous invocation timed out.
*/
evict: CURRENT_LAMBDA_FUNCTION_TIMEOUT
}
});

// or `sequelize.sync()`
await sequelize.authenticate();

return sequelize;
}

module.exports.handler = async function (event, callback) {
// re-use the sequelize instance across invocations to improve performance
if (!sequelize) {
sequelize = await loadSequelize();
} else {
// restart connection pool to ensure connections are not re-used across invocations
sequelize.connectionManager.initPools();

// restore `getConnection()` if it has been overwritten by `close()`
if (sequelize.connectionManager.hasOwnProperty("getConnection")) {
delete sequelize.connectionManager.getConnection;
}
}

try {
return await doSomethingWithSequelize(sequelize);
} finally {
// close any opened connections during the invocation
// this will wait for any in-progress queries to finish before closing the connections
await sequelize.connectionManager.close();
}
};

AWS RDS Proxy の使用

AWS RDS を使用しており、Aurora または サポートされているデータベースエンジン を使用している場合は、AWS RDS Proxy を使用してデータベースに接続してください。これにより、各呼び出しでコネクションを開いたり閉じたりすることが、基盤となるデータベースサーバーにとって負荷の高い操作にならないようにします。


AWS Lambda でこのように sequelize を使用する必要がある理由を理解したい場合は、このドキュメントの残りの部分を読み続けてください。

Node.js イベントループ

Node.js イベントループ

JavaScript がシングルスレッドであるという事実にもかかわらず、Node.js がノンブロッキング I/O 操作を実行できるようにするものです —

イベントループの実装は C++ ですが、Node.js が index.js という名前のスクリプトを実行する方法を示す、簡略化された JavaScript 疑似実装を次に示します。

// see: https://node.dokyumento.jp/en/docs/guides/event-loop-timers-and-nexttick/
// see: https://www.youtube.com/watch?v=P9csgxBgaZ8
// see: https://www.youtube.com/watch?v=PNa9OMajw9w
const process = require('process');

/*
* counter of pending events
*
* reference counter is increased for every:
*
* 1. scheduled timer: `setTimeout()`, `setInterval()`, etc.
* 2. scheduled immediate: `setImmediate()`.
* 3. syscall of non-blocking IO: `require('net').Server.listen()`, etc.
* 4. scheduled task to the thread pool: `require('fs').WriteStream.write()`, etc.
*
* reference counter is decreased for every:
*
* 1. elapsed timer
* 2. executed immediate
* 3. completed non-blocking IO
* 4. completed thread pool task
*
* references can be explicitly decreased by invoking `.unref()` on some
* objects like: `require('net').Socket.unref()`
*/
let refs = 0;

/*
* a heap of timers, sorted by next ocurrence
*
* whenever `setTimeout()` or `setInterval()` is invoked, a timer gets added here
*/
const timersHeap = /* (...) */;

/*
* a FIFO queue of immediates
*
* whenever `setImmediate()` is invoked, it gets added here
*/
const immediates = /* (...) */;

/*
* a FIFO queue of next tick callbacks
*
* whenever `require('process').nextTick()` is invoked, the callback gets added here
*/
const nextTickCallbacks = [];

/*
* a heap of Promise-related callbacks, sorted by promise constructors callbacks first,
* and then resolved/rejected callbacks
*
* whenever a new Promise instance is created via `new Promise` or a promise resolves/rejects
* the appropriate callback (if any) gets added here
*/
const promiseCallbacksHeap = /* ... */;

function execTicksAndPromises() {
while (nextTickCallbacks.length || promiseCallbacksHeap.size()) {
// execute all callbacks scheduled with `process.nextTick()`
while (nextTickCallbacks.length) {
const callback = nextTickCallbacks.shift();
callback();
}

// execute all promise-related callbacks
while (promiseCallbacksHeap.size()) {
const callback = promiseCallbacksHeap.pop();
callback();
}
}
}

try {
// execute index.js
require('./index');
execTicksAndPromises();

do {
// timers phase: executes all elapsed timers
getElapsedTimerCallbacks(timersHeap).forEach(callback => {
callback();
execTicksAndPromises();
});

// pending callbacks phase: executes some system operations (like `TCP errors`) that are not
// executed in the poll phase
getPendingCallbacks().forEach(callback => {
callback();
execTicksAndPromises();
})

// poll phase: gets completed non-blocking I/O events or thread pool tasks and invokes the
// corresponding callbacks; if there are none and there's no pending immediates,
// it blocks waiting for events/completed tasks for a maximum of `maxWait`
const maxWait = computeWhenNextTimerElapses(timersHeap);
pollForEventsFromKernelOrThreadPool(maxWait, immediates).forEach(callback => {
callback();
execTicksAndPromises();
});

// check phase: execute available immediates; if an immediate callback invokes `setImmediate()`
// it will be invoked on the next event loop iteration
getImmediateCallbacks(immediates).forEach(callback => {
callback();
execTicksAndPromises();
});

// close callbacks phase: execute special `.on('close')` callbacks
getCloseCallbacks().forEach(callback => {
callback();
execTicksAndPromises();
});

if (refs === 0) {
// listeners of this event may execute code that increments `refs`
process.emit('beforeExit');
}
} while (refs > 0);
} catch (err) {
if (!process.listenerCount('uncaughtException')) {
// default behavior: print stack and exit with status code 1
console.error(err.stack);
process.exit(1);
} else {
// there are listeners: emit the event and exit using `process.exitCode || 0`
process.emit('uncaughtException');
process.exit();
}
}

Node.js における AWS Lambda 関数ハンドラーのタイプ

AWS Lambda ハンドラーには、Node.js で 2 つの種類があります。

非同期ハンドラー (つまり、callback)

module.exports.handler = function (event, context, callback) {
try {
doSomething();
callback(null, 'Hello World!'); // Lambda returns "Hello World!"
} catch (err) {
// try/catch is not required, uncaught exceptions invoke `callback(err)` implicitly
callback(err); // Lambda fails with `err`
}
};

同期ハンドラー (つまり、async/await または Promise を使用)

// async/await
module.exports.handler = async function (event, context) {
try {
await doSomethingAsync();
return 'Hello World!'; // equivalent of: callback(null, "Hello World!");
} catch (err) {
// try/cath is not required, async functions always return a Promise
throw err; // equivalent of: callback(err);
}
};

// Promise
module.exports.handler = function (event, context) {
/*
* must return a `Promise` to be considered an async handler
*
* an uncaught exception that prevents a `Promise` to be returned
* by the handler will "downgrade" the handler to non-async
*/
return Promise.resolve()
.then(() => doSomethingAsync())
.then(() => 'Hello World!');
};

一見すると、非同期ハンドラーと同期ハンドラーは単なるコードスタイルの選択のように見えますが、2 つの間に根本的な違いがあります。

  • 同期ハンドラーでは、イベントループが空かどうか regardless of whether the event loop is empty or not.に関わらず、ハンドラーによって返された Promise が解決または拒否されたときに Lambda 関数の実行が終了します。
  • 非同期ハンドラーでは、次のいずれかの条件が発生したときに Lambda 関数の実行が終了します。

この根本的な違いは、sequelize がどのように影響を受けるかを合理化するために理解することが非常に重要です。違いを説明するために、いくつかの例を次に示します。

// no callback invoked
module.exports.handler = function () {
// Lambda finishes AFTER `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
};

// callback invoked
module.exports.handler = function (event, context, callback) {
// Lambda finishes AFTER `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
callback(null, 'Hello World!');
};

// callback invoked, context.callbackWaitsForEmptyEventLoop = false
module.exports.handler = function (event, context, callback) {
// Lambda finishes BEFORE `doSomething()` is invoked
context.callbackWaitsForEmptyEventLoop = false;
setTimeout(() => doSomething(), 2000);
setTimeout(() => callback(null, 'Hello World!'), 1000);
};

// async/await
module.exports.handler = async function () {
// Lambda finishes BEFORE `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
return 'Hello World!';
};

// Promise
module.exports.handler = function () {
// Lambda finishes BEFORE `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
return Promise.resolve('Hello World!');
};

AWS Lambda 実行環境 (つまり、コンテナー)

AWS Lambda 関数ハンドラーは、組み込みまたはカスタムの ランタイム によって呼び出されます。ランタイムは、再利用される場合とされない場合がある 実行環境 (つまり、コンテナー) で実行されます。コンテナーは 一度に 1 つの要求 しか処理できません。Lambda 関数の同時呼び出しとは、同時要求ごとにコンテナーインスタンスが作成されることを意味します。

実際には、これは Lambda 関数はステートレスに設計する必要があることを意味しますが、開発者はキャッシュの目的で状態を使用できます。

let sequelize = null;

module.exports.handler = async function () {
/*
* sequelize will already be loaded if the container is re-used
*
* containers are never re-used when a Lambda function's code change
*
* while the time elapsed between Lambda invocations is used as a factor to determine whether
* a container is re-used, no assumptions should be made of when a container is actually re-used
*
* AWS does not publicly document the rules of container re-use "by design" since containers
* can be recycled in response to internal AWS Lambda events (e.g. a Lambda function container
* may be recycled even if the function is constanly invoked)
*/
if (!sequelize) {
sequelize = await loadSequelize();
}

return await doSomethingWithSequelize(sequelize);
};

Lambda 関数がイベントループが空になるのを待たず、コンテナーが再利用される場合、イベントループは次の呼び出しが発生するまで「一時停止」されます。例えば

let counter = 0;

module.exports.handler = function (event, context, callback) {
/*
* The first invocation (i.e. container initialized) will:
* - log:
* - Fast timeout invoked. Request id: 00000000-0000-0000-0000-000000000000 | Elapsed ms: 5XX
* - return: 1
*
* Wait 3 seconds and invoke the Lambda again. The invocation (i.e. container re-used) will:
* - log:
* - Slow timeout invoked. Request id: 00000000-0000-0000-0000-000000000000 | Elapsed ms: 3XXX
* - Fast timeout invoked. Request id: 11111111-1111-1111-1111-111111111111 | Elapsed ms: 5XX
* - return: 3
*/
const now = Date.now();

context.callbackWaitsForEmptyEventLoop = false;

setTimeout(() => {
console.log(
'Slow timeout invoked. Request id:',
context.awsRequestId,
'| Elapsed ms:',
Date.now() - now,
);
counter++;
}, 1000);

setTimeout(() => {
console.log(
'Fast timeout invoked. Request id:',
context.awsRequestId,
'| Elapsed ms:',
Date.now() - now,
);
counter++;
callback(null, counter);
}, 500);
};

AWS Lambda における Sequelize コネクションプーリング

sequelize は、データベースコネクションの使用を最適化するためにコネクションプーリングを使用します。 sequelize によって使用されるコネクションプールは、setTimeout() コールバック (Node.js イベントループによって処理されます) を使用して実装されます。

AWS Lambda コンテナーは一度に 1 つの要求を処理するという事実を考えると、sequelize を次のように構成したくなります。

const { Sequelize } = require('sequelize');

const sequelize = new Sequelize(/* (...) */, {
// (...)
pool: { min: 1, max: 1 }
});

この構成により、Lambda コンテナーが過剰な数の接続でデータベースサーバーを圧倒することを防ぎます (各コンテナーは最大 1 つの接続を取得するため)。また、アイドル状態のときにコンテナーのコネクションがガベージコレクションされないようにするため、Lambda コンテナーが再利用されるときにコネクションを再確立する必要がありません。残念ながら、この構成にはいくつかの問題があります。

  1. イベントループが空になるのを待つ Lambda は常にタイムアウトします。 sequelize コネクションプールは、options.pool.evict ミリ秒ごとに setTimeout をスケジュールし、**すべてのアイドル状態のコネクションが閉じられる** までこれを続けます。ただし、min1 に設定されているため、プールには常に少なくとも 1 つのアイドル状態のコネクションが存在し、結果としてイベントループが無限になります。
  2. Model.findAndCountAll() のような一部の操作は、複数のクエリを非同期に実行します (例: Model.count()Model.findAll())。最大 1 つのコネクションを使用すると、クエリが (2 つのコネクションを使用して並列に実行されるのではなく) シリアルに実行されるようになります。これは、管理しやすい数のデータベース接続を維持するための許容できるパフォーマンスの妥協点かもしれませんが、クエリの実行にデフォルトまたは構成された options.pool.acquire タイムアウトよりも時間がかかる場合、長時間実行されるクエリは ConnectionAcquireTimeoutError になる可能性があります。これは、シリアル化されたクエリが、他のクエリによって使用されているコネクションが解放されるまでプールで待機したままになるためです。
  3. AWS Lambda関数がタイムアウトした場合(つまり、設定されたAWS Lambdaタイムアウトを超えた場合)、Node.jsイベントループはその状態に関係なく「一時停止」されます。これにより、接続エラーが発生する可能性のある競合状態が発生する可能性があります。たとえば、非常に負荷の高いクエリが原因でLambda関数がタイムアウトし、負荷の高いクエリが完了して接続がプールに解放される前にイベントループが「一時停止」され、コンテナが再利用され、`options.pool.acquire`ミリ秒後に接続が返されない場合、後続のLambda呼び出しが`ConnectionAcquireTimeoutError`で失敗するといった状況が発生する可能性があります。

`{ min: 1, max: 2 }`を使用することで、問題**#2**の軽減を試みることができます。ただし、これは依然として問題**#1**と**#3**の影響を受け、さらに追加の問題が発生します。

  1. 接続プールのエビクションコールバックが実行される前にイベントループが「一時停止」したり、Lambda呼び出し間で`options.pool.evict`時間以上経過したりすると、競合状態が発生する可能性があります。これにより、タイムアウトエラー、ハンドシェイクエラー、およびその他の接続関連エラーが発生する可能性があります。
  2. `Model.findAndCountAll()`のような操作を使用し、基になる`Model.count()`または`Model.findAll()`クエリのいずれかが失敗した場合、Lambda関数の実行が完了してイベントループが「一時停止」される前に、他のクエリの実行が完了していること(および接続がプールに戻されていること)を保証できません。これにより、接続が古い状態のままになり、TCP接続が早期に閉じられたり、その他の接続関連エラーが発生する可能性があります。

`{ min: 2, max: 2 }`を使用すると、追加の問題**#1**が軽減されます。ただし、この構成では、他のすべての問題(元の**#1**、**#3**、および追加の**#2**)が依然として発生します。

競合状態の例の詳細

この例を理解するには、Lambdaと`sequelize`の特定の部分がどのように実装されているかについて、もう少しコンテキストが必要です。

`nodejs.12x`用の組み込みAWS Lambdaランタイムは、Node.jsで実装されています。Node.js Lambda関数内で`/var/runtime/`の内容を読み取ることで、ランタイムのソースコード全体にアクセスできます。関連するコードのサブセットは次のとおりです

runtime/Runtime.js

class Runtime {
// (...)

// each iteration is executed in the event loop `check` phase
scheduleIteration() {
setImmediate(() => this.handleOnce().then(/* (...) */));
}

async handleOnce() {
// get next invocation. see: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-next
let { bodyJson, headers } = await this.client.nextInvocation();

// prepare `context` handler parameter
let invokeContext = new InvokeContext(headers);
invokeContext.updateLoggingContext();

// prepare `callback` handler parameter
let [callback, callbackContext] = CallbackContext.build(
this.client,
invokeContext.invokeId,
this.scheduleIteration.bind(this),
);

try {
// this listener is subscribed to process.on('beforeExit')
// so that when when `context.callbackWaitsForEmptyEventLoop === true`
// the Lambda execution finishes after the event loop is empty
this._setDefaultExitListener(invokeContext.invokeId);

// execute handler
const result = this.handler(
JSON.parse(bodyJson),
invokeContext.attachEnvironmentData(callbackContext),
callback,
);

// finish the execution if the handler is async
if (_isPromise(result)) {
result.then(callbackContext.succeed, callbackContext.fail).catch(callbackContext.fail);
}
} catch (err) {
callback(err);
}
}
}

ランタイムは、初期化コードの最後に反復をスケジュールします

runtime/index.js

// (...)

new Runtime(client, handler, errorCallbacks).scheduleIteration();

`sequelize`を使用してLambdaハンドラによって呼び出されるすべてのSQLクエリは、最終的にSequelize.prototype.query()を使用して実行されます。このメソッドは、プールから接続を取得し、クエリを実行し、クエリが完了したら接続をプールに解放する役割を担います。次のスニペットは、トランザクションのないクエリのメソッドロジックを簡略化したものです

sequelize.js

class Sequelize {
// (...)

query(sql, options) {
// (...)

const connection = await this.connectionManager.getConnection(options);
const query = new this.dialect.Query(connection, this, options);

try {
return await query.run(sql, bindParameters);
} finally {
await this.connectionManager.releaseConnection(connection);
}
}
}

フィールド`this.connectionManager`は、ダイアレクト固有の`ConnectionManager`クラスのインスタンスです。すべてのダイアレクト固有のマネージャーは、抽象`ConnectionManager`クラスから継承します。このクラスは、接続プールを初期化し、新しい接続を作成する必要があるたびにダイアレクト固有のクラスの`connect()`メソッドを呼び出すように構成します。次のスニペットは、`mysql`ダイアレクトの`connect()`メソッドを簡略化したものです

mysql/connection-manager.js

class ConnectionManager {
// (...)

async connect(config) {
// (...)
return await new Promise((resolve, reject) => {
// uses mysql2's `new Connection()`
const connection = this.lib.createConnection(connectionConfig);

const errorHandler = e => {
connection.removeListener('connect', connectHandler);
connection.removeListener('error', connectHandler);
reject(e);
};

const connectHandler = () => {
connection.removeListener('error', errorHandler);
resolve(connection);
};

connection.on('error', errorHandler);
connection.once('connect', connectHandler);
});
}
}

フィールド`this.lib`は`mysql2`を参照し、関数`createConnection()`は`Connection`クラスのインスタンスを作成することで接続を作成します。このクラスの関連するサブセットは次のとおりです

mysql2/connection.js

class Connection extends EventEmitter {
constructor(opts) {
// (...)

// create Socket
this.stream = /* (...) */;

// when data is received, clear timeout
this.stream.on('data', data => {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.packetParser.execute(data);
});

// (...)

// when handshake is completed, emit the 'connect' event
handshakeCommand.on('end', () => {
this.emit('connect', handshakeCommand.handshake);
});

// set a timeout to trigger if no data is received on the socket
if (this.config.connectTimeout) {
const timeoutHandler = this._handleTimeoutError.bind(this);
this.connectTimeout = Timers.setTimeout(
timeoutHandler,
this.config.connectTimeout
);
}
}

// (...)

_handleTimeoutError() {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.stream.destroy && this.stream.destroy();
const err = new Error('connect ETIMEDOUT');
err.errorno = 'ETIMEDOUT';
err.code = 'ETIMEDOUT';
err.syscall = 'connect';

// this will emit the 'error' event
this._handleNetworkError(err);
}
}

上記のコードに基づいて、次のイベントシーケンスは、`{ min: 1, max: 1 }`での接続プーリングの競合状態が`ETIMEDOUT`エラーを引き起こす可能性があることを示しています

  1. Lambda呼び出しが受信される(新しいコンテナ)
    1. イベントループが`check`フェーズに入り、`runtime/Runtime.js`の`handleOnce()`メソッドが呼び出されます。
      1. `handleOnce()`メソッドは`await this.client.nextInvocation()`を呼び出して待機します。
    2. 保留中のタイマーがないため、イベントループは`timers`フェーズをスキップします。
    3. イベントループが`poll`フェーズに入り、`handleOnce()`メソッドが続行されます。
    4. Lambdaハンドラが呼び出されます。
    5. Lambdaハンドラは`Model.count()`を呼び出し、`sequelize.js`の`query()`を呼び出し、`connectionManager.getConnection()`を呼び出します。
    6. 接続プールは`Model.count()`の`setTimeout(..., config.pool.acquire)`を初期化し、新しい接続を作成するために`mysql/connection-manager.js`の`connect()`を呼び出します。
    7. `mysql2/connection.js`はTCPソケットを作成し、`ETIMEDOUT`で接続を失敗させるための`setTimeout()`を初期化します。
    8. ハンドラによって返されたPromiseが拒否される(ここでは詳細な理由は説明しません)ため、Lambda関数の実行が完了し、Node.jsイベントループが「一時停止」されます。
  2. 呼び出し間に十分な時間が経過し、以下のようになります。
    1. `config.pool.acquire`タイマーが経過します。
    2. `mysql2`接続タイマーはまだ経過していませんが、ほぼ経過しています(つまり、競合状態)。
  3. 2番目のLambda呼び出しが受信される(コンテナが再利用される)
    1. イベントループが「再開」されます。
    2. イベントループが`check`フェーズに入り、`runtime/Runtime.js`の`handleOnce()`メソッドが呼び出されます。
    3. イベントループが`timers`フェーズに入り、`config.pool.acquire`タイマーが経過すると、前の呼び出しの`Model.count()` Promiseが`ConnectionAcquireTimeoutError`で拒否されます。
    4. イベントループが`poll`フェーズに入り、`handleOnce()`メソッドが続行されます。
    5. Lambdaハンドラが呼び出されます。
    6. Lambdaハンドラは`Model.count()`を呼び出し、`sequelize.js`の`query()`を呼び出し、`connectionManager.getConnection()`を呼び出します。
    7. 接続プールは`Model.count()`の`setTimeout(..., config.pool.acquire)`を初期化し、`{ max : 1 }`であるため、保留中の`connect()` Promiseが完了するのを待ちます。
    8. 保留中のイミディエイトがないため、イベントループは`check`フェーズをスキップします。
    9. **競合状態:**イベントループが`timers`フェーズに入り、`mysql2`接続タイムアウトが経過すると、`connection.emit('error')`を使用して`ETIMEDOUT`エラーが発行されます。
    10. 発行されたイベントは、`mysql/connection-manager.js`の`connect()`でPromiseを拒否し、拒否されたPromiseを`Model.count()`クエリのPromiseに転送します。
    11. ラムダ関数は`ETIMEDOUT`エラーで失敗します。