diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index cc514f2..e5100db 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -17,4 +17,4 @@ jobs: - name: Run CodeQL run: | docker run --rm -v $PWD:/app composer sh -c \ - "composer install --profile --ignore-platform-reqs && composer analyse" \ No newline at end of file + "composer install --profile --ignore-platform-reqs && composer check" \ No newline at end of file diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8deed9a..8d78fb3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -5,9 +5,6 @@ jobs: lint: name: Tests runs-on: ubuntu-latest - strategy: - matrix: - php-versions: ['8.3'] # add PHP versions as required steps: - name: Checkout repository @@ -24,7 +21,6 @@ jobs: - name: Build run: | - export PHP_VERSION=${{ matrix.php-versions }} docker compose build docker compose up -d sleep 10 diff --git a/composer.json b/composer.json index 3af3ffd..9e1c76e 100755 --- a/composer.json +++ b/composer.json @@ -23,21 +23,21 @@ }, "scripts":{ "test": "phpunit", - "analyse": "vendor/bin/phpstan analyse", + "check": "vendor/bin/phpstan analyse --memory-limit=1G --level=max src", "format": "vendor/bin/pint", "lint": "vendor/bin/pint --test" }, "require": { "php": ">=8.0", - "ext-mongodb": "2.1.1", - "mongodb/mongodb": "2.1.0", - "ramsey/uuid": "^4.9.0" + "ext-mongodb": "2.1.*", + "mongodb/mongodb": "2.1.*", + "ramsey/uuid": "4.9.*" }, "require-dev": { - "fakerphp/faker": "^1.14", - "phpunit/phpunit": "^9.4", - "swoole/ide-helper": "4.8.0", - "laravel/pint": "1.2.*", - "phpstan/phpstan": "2.1.*" + "fakerphp/faker": "1.*", + "phpunit/phpunit": "9.*", + "swoole/ide-helper": "5.1.*", + "laravel/pint": "*", + "phpstan/phpstan": "*" } } diff --git a/composer.lock b/composer.lock index 21f6226..09b34f7 100644 --- a/composer.lock +++ b/composer.lock @@ -4,29 +4,29 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "0edc06298a5d7d385ed1f0e19925baec", + "content-hash": "1d493c50f129de83162ce431a615288c", "packages": [ { "name": "brick/math", - "version": "0.13.1", + "version": "0.14.0", "source": { "type": "git", "url": "https://github.com/brick/math.git", - "reference": "fc7ed316430118cc7836bf45faff18d5dfc8de04" + "reference": "113a8ee2656b882d4c3164fa31aa6e12cbb7aaa2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/brick/math/zipball/fc7ed316430118cc7836bf45faff18d5dfc8de04", - "reference": "fc7ed316430118cc7836bf45faff18d5dfc8de04", + "url": "https://api.github.com/repos/brick/math/zipball/113a8ee2656b882d4c3164fa31aa6e12cbb7aaa2", + "reference": "113a8ee2656b882d4c3164fa31aa6e12cbb7aaa2", "shasum": "" }, "require": { - "php": "^8.1" + "php": "^8.2" }, "require-dev": { "php-coveralls/php-coveralls": "^2.2", - "phpunit/phpunit": "^10.1", - "vimeo/psalm": "6.8.8" + "phpstan/phpstan": "2.1.22", + "phpunit/phpunit": "^11.5" }, "type": "library", "autoload": { @@ -56,7 +56,7 @@ ], "support": { "issues": "https://github.com/brick/math/issues", - "source": "https://github.com/brick/math/tree/0.13.1" + "source": "https://github.com/brick/math/tree/0.14.0" }, "funding": [ { @@ -64,20 +64,20 @@ "type": "github" } ], - "time": "2025-03-29T13:50:30+00:00" + "time": "2025-08-29T12:40:03+00:00" }, { "name": "mongodb/mongodb", - "version": "2.1.0", + "version": "2.1.1", "source": { "type": "git", "url": "https://github.com/mongodb/mongo-php-library.git", - "reference": "3bbe7ba9578724c7e1f47fcd17c881c0995baaad" + "reference": "f399d24905dd42f97dfe0af9706129743ef247ac" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/mongodb/mongo-php-library/zipball/3bbe7ba9578724c7e1f47fcd17c881c0995baaad", - "reference": "3bbe7ba9578724c7e1f47fcd17c881c0995baaad", + "url": "https://api.github.com/repos/mongodb/mongo-php-library/zipball/f399d24905dd42f97dfe0af9706129743ef247ac", + "reference": "f399d24905dd42f97dfe0af9706129743ef247ac", "shasum": "" }, "require": { @@ -139,9 +139,9 @@ ], "support": { "issues": "https://github.com/mongodb/mongo-php-library/issues", - "source": "https://github.com/mongodb/mongo-php-library/tree/2.1.0" + "source": "https://github.com/mongodb/mongo-php-library/tree/2.1.1" }, - "time": "2025-05-23T10:48:05+00:00" + "time": "2025-08-13T20:50:05+00:00" }, { "name": "psr/log", @@ -271,20 +271,20 @@ }, { "name": "ramsey/uuid", - "version": "4.9.0", + "version": "4.9.1", "source": { "type": "git", "url": "https://github.com/ramsey/uuid.git", - "reference": "4e0e23cc785f0724a0e838279a9eb03f28b092a0" + "reference": "81f941f6f729b1e3ceea61d9d014f8b6c6800440" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/ramsey/uuid/zipball/4e0e23cc785f0724a0e838279a9eb03f28b092a0", - "reference": "4e0e23cc785f0724a0e838279a9eb03f28b092a0", + "url": "https://api.github.com/repos/ramsey/uuid/zipball/81f941f6f729b1e3ceea61d9d014f8b6c6800440", + "reference": "81f941f6f729b1e3ceea61d9d014f8b6c6800440", "shasum": "" }, "require": { - "brick/math": "^0.8.8 || ^0.9 || ^0.10 || ^0.11 || ^0.12 || ^0.13", + "brick/math": "^0.8.8 || ^0.9 || ^0.10 || ^0.11 || ^0.12 || ^0.13 || ^0.14", "php": "^8.0", "ramsey/collection": "^1.2 || ^2.0" }, @@ -343,22 +343,22 @@ ], "support": { "issues": "https://github.com/ramsey/uuid/issues", - "source": "https://github.com/ramsey/uuid/tree/4.9.0" + "source": "https://github.com/ramsey/uuid/tree/4.9.1" }, - "time": "2025-06-25T14:20:11+00:00" + "time": "2025-09-04T20:59:21+00:00" }, { "name": "symfony/polyfill-php85", - "version": "v1.32.0", + "version": "v1.33.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-php85.git", - "reference": "6fedf31ce4e3648f4ff5ca58bfd53127d38f05fd" + "reference": "d4e5fcd4ab3d998ab16c0db48e6cbb9a01993f91" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-php85/zipball/6fedf31ce4e3648f4ff5ca58bfd53127d38f05fd", - "reference": "6fedf31ce4e3648f4ff5ca58bfd53127d38f05fd", + "url": "https://api.github.com/repos/symfony/polyfill-php85/zipball/d4e5fcd4ab3d998ab16c0db48e6cbb9a01993f91", + "reference": "d4e5fcd4ab3d998ab16c0db48e6cbb9a01993f91", "shasum": "" }, "require": { @@ -405,7 +405,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-php85/tree/v1.32.0" + "source": "https://github.com/symfony/polyfill-php85/tree/v1.33.0" }, "funding": [ { @@ -416,12 +416,16 @@ "url": "https://github.com/fabpot", "type": "github" }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, { "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "type": "tidelift" } ], - "time": "2025-05-02T08:40:52+00:00" + "time": "2025-06-23T16:12:55+00:00" } ], "packages-dev": [ @@ -560,16 +564,16 @@ }, { "name": "laravel/pint", - "version": "v1.2.1", + "version": "v1.25.1", "source": { "type": "git", "url": "https://github.com/laravel/pint.git", - "reference": "e60e2112ee779ce60f253695b273d1646a17d6f1" + "reference": "5016e263f95d97670d71b9a987bd8996ade6d8d9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/laravel/pint/zipball/e60e2112ee779ce60f253695b273d1646a17d6f1", - "reference": "e60e2112ee779ce60f253695b273d1646a17d6f1", + "url": "https://api.github.com/repos/laravel/pint/zipball/5016e263f95d97670d71b9a987bd8996ade6d8d9", + "reference": "5016e263f95d97670d71b9a987bd8996ade6d8d9", "shasum": "" }, "require": { @@ -577,16 +581,16 @@ "ext-mbstring": "*", "ext-tokenizer": "*", "ext-xml": "*", - "php": "^8.0" + "php": "^8.2.0" }, "require-dev": { - "friendsofphp/php-cs-fixer": "^3.11.0", - "illuminate/view": "^9.32.0", - "laravel-zero/framework": "^9.2.0", - "mockery/mockery": "^1.5.1", - "nunomaduro/larastan": "^2.2.0", - "nunomaduro/termwind": "^1.14.0", - "pestphp/pest": "^1.22.1" + "friendsofphp/php-cs-fixer": "^3.87.2", + "illuminate/view": "^11.46.0", + "larastan/larastan": "^3.7.1", + "laravel-zero/framework": "^11.45.0", + "mockery/mockery": "^1.6.12", + "nunomaduro/termwind": "^2.3.1", + "pestphp/pest": "^2.36.0" }, "bin": [ "builds/pint" @@ -622,7 +626,7 @@ "issues": "https://github.com/laravel/pint/issues", "source": "https://github.com/laravel/pint" }, - "time": "2022-11-29T16:25:20+00:00" + "time": "2025-09-19T02:57:12+00:00" }, { "name": "myclabs/deep-copy", @@ -862,16 +866,16 @@ }, { "name": "phpstan/phpstan", - "version": "2.1.22", + "version": "2.1.29", "source": { "type": "git", - "url": "https://github.com/phpstan/phpstan.git", - "reference": "41600c8379eb5aee63e9413fe9e97273e25d57e4" + "url": "https://github.com/phpstan/phpstan-phar-composer-source.git", + "reference": "git" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/41600c8379eb5aee63e9413fe9e97273e25d57e4", - "reference": "41600c8379eb5aee63e9413fe9e97273e25d57e4", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/d618573eed4a1b6b75e37b2e0b65ac65c885d88e", + "reference": "d618573eed4a1b6b75e37b2e0b65ac65c885d88e", "shasum": "" }, "require": { @@ -916,7 +920,7 @@ "type": "github" } ], - "time": "2025-08-04T19:17:37+00:00" + "time": "2025-09-25T06:58:18+00:00" }, { "name": "phpunit/php-code-coverage", @@ -1239,16 +1243,16 @@ }, { "name": "phpunit/phpunit", - "version": "9.6.24", + "version": "9.6.29", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "ea49afa29aeea25ea7bf9de9fdd7cab163cc0701" + "reference": "9ecfec57835a5581bc888ea7e13b51eb55ab9dd3" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/ea49afa29aeea25ea7bf9de9fdd7cab163cc0701", - "reference": "ea49afa29aeea25ea7bf9de9fdd7cab163cc0701", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/9ecfec57835a5581bc888ea7e13b51eb55ab9dd3", + "reference": "9ecfec57835a5581bc888ea7e13b51eb55ab9dd3", "shasum": "" }, "require": { @@ -1273,7 +1277,7 @@ "sebastian/comparator": "^4.0.9", "sebastian/diff": "^4.0.6", "sebastian/environment": "^5.1.5", - "sebastian/exporter": "^4.0.6", + "sebastian/exporter": "^4.0.8", "sebastian/global-state": "^5.0.8", "sebastian/object-enumerator": "^4.0.4", "sebastian/resource-operations": "^3.0.4", @@ -1322,7 +1326,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.24" + "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.29" }, "funding": [ { @@ -1346,7 +1350,7 @@ "type": "tidelift" } ], - "time": "2025-08-10T08:32:42+00:00" + "time": "2025-09-24T06:29:11+00:00" }, { "name": "psr/container", @@ -1842,16 +1846,16 @@ }, { "name": "sebastian/exporter", - "version": "4.0.6", + "version": "4.0.8", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/exporter.git", - "reference": "78c00df8f170e02473b682df15bfcdacc3d32d72" + "reference": "14c6ba52f95a36c3d27c835d65efc7123c446e8c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/exporter/zipball/78c00df8f170e02473b682df15bfcdacc3d32d72", - "reference": "78c00df8f170e02473b682df15bfcdacc3d32d72", + "url": "https://api.github.com/repos/sebastianbergmann/exporter/zipball/14c6ba52f95a36c3d27c835d65efc7123c446e8c", + "reference": "14c6ba52f95a36c3d27c835d65efc7123c446e8c", "shasum": "" }, "require": { @@ -1907,15 +1911,27 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/exporter/issues", - "source": "https://github.com/sebastianbergmann/exporter/tree/4.0.6" + "source": "https://github.com/sebastianbergmann/exporter/tree/4.0.8" }, "funding": [ { "url": "https://github.com/sebastianbergmann", "type": "github" + }, + { + "url": "https://liberapay.com/sebastianbergmann", + "type": "liberapay" + }, + { + "url": "https://thanks.dev/u/gh/sebastianbergmann", + "type": "thanks_dev" + }, + { + "url": "https://tidelift.com/funding/github/packagist/sebastian/exporter", + "type": "tidelift" } ], - "time": "2024-03-02T06:33:00+00:00" + "time": "2025-09-24T06:03:27+00:00" }, { "name": "sebastian/global-state", @@ -2402,16 +2418,16 @@ }, { "name": "swoole/ide-helper", - "version": "4.8.0", + "version": "5.1.8", "source": { "type": "git", "url": "https://github.com/swoole/ide-helper.git", - "reference": "837a2b20242e3cebf0ba1168e876f0f1ca9a14e3" + "reference": "2806169ec7385e3b5eb484efef1f4764af46d995" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/swoole/ide-helper/zipball/837a2b20242e3cebf0ba1168e876f0f1ca9a14e3", - "reference": "837a2b20242e3cebf0ba1168e876f0f1ca9a14e3", + "url": "https://api.github.com/repos/swoole/ide-helper/zipball/2806169ec7385e3b5eb484efef1f4764af46d995", + "reference": "2806169ec7385e3b5eb484efef1f4764af46d995", "shasum": "" }, "type": "library", @@ -2428,19 +2444,9 @@ "description": "IDE help files for Swoole.", "support": { "issues": "https://github.com/swoole/ide-helper/issues", - "source": "https://github.com/swoole/ide-helper/tree/4.8.0" + "source": "https://github.com/swoole/ide-helper/tree/5.1.8" }, - "funding": [ - { - "url": "https://gitee.com/swoole/swoole?donate=true", - "type": "custom" - }, - { - "url": "https://github.com/swoole", - "type": "github" - } - ], - "time": "2021-10-14T19:39:28+00:00" + "time": "2025-08-04T05:40:28+00:00" }, { "name": "symfony/deprecation-contracts", @@ -2562,13 +2568,13 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": [], + "stability-flags": {}, "prefer-stable": false, "prefer-lowest": false, "platform": { "php": ">=8.0", - "ext-mongodb": "2.1.1" + "ext-mongodb": "2.1.*" }, - "platform-dev": [], - "plugin-api-version": "2.2.0" + "platform-dev": {}, + "plugin-api-version": "2.6.0" } diff --git a/docker-compose.yml b/docker-compose.yml index c654d8a..c02a879 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,12 +1,10 @@ -version: '3.1' - services: tests: container_name: utopia-mongo-test-1 build: context: . - dockerfile: Dockerfile-php-${PHP_VERSION:-8.3} + dockerfile: php${PHP_VERSION:-83}.Dockerfile networks: - database volumes: @@ -15,7 +13,7 @@ services: - "8882:8882" mongo: - image: mongo:5 + image: mongo:8 container_name: utopia-mongo-1 networks: - database diff --git a/Dockerfile-php-8.3 b/php83.Dockerfile similarity index 96% rename from Dockerfile-php-8.3 rename to php83.Dockerfile index d65b1c8..68f04e3 100644 --- a/Dockerfile-php-8.3 +++ b/php83.Dockerfile @@ -1,4 +1,4 @@ -FROM composer:2.0 as composer +FROM composer:2.0 AS composer ARG TESTING=false ENV TESTING=$TESTING @@ -33,7 +33,7 @@ RUN \ && ./configure \ && make && make install -FROM compile as final +FROM compile AS final LABEL maintainer="team@appwrite.io" diff --git a/src/Auth.php b/src/Auth.php index eb24efa..fecc25b 100644 --- a/src/Auth.php +++ b/src/Auth.php @@ -34,7 +34,7 @@ public function start(): array return [ [ "saslStart" => 1, - "mechanism" => "SCRAM-SHA-1", + "mechanism" => "SCRAM-SHA-256", "payload" => $payload, "autoAuthorize" => 1, "options" => ["skipEmptyExchange" => true], @@ -84,9 +84,9 @@ public function createResponse(mixed $challenge = null): string|bool if (empty($challenge)) { return $this->generateInitialResponse($authcid, $authzid); - } else { - return $this->generateResponse($challenge, $this->secret); } + + return $this->generateResponse($challenge, $this->secret); } /** @@ -110,10 +110,10 @@ private function formatName(string $username): string */ private function generateInitialResponse(string $authcid, string $authzid): string { - $gs2CbindFlag = 'n,'; + $gs2CbindFlag = 'n,'; $this->gs2Header = $gs2CbindFlag . (!empty($authzid) ? 'a=' . $authzid : '') . ','; - // I must generate a client nonce and "save" it for later comparison on second response. + // Generate a client nonce and "save" it for later comparison on second response. $this->cnonce = $this->generateCnonce(); $this->firstMessageBare = 'n=' . $authcid . ',r=' . $this->cnonce; @@ -124,8 +124,8 @@ private function generateInitialResponse(string $authcid, string $authzid): stri /** * Parses and verifies a non-empty SCRAM challenge. * - * @param string $challenge The SCRAM challenge - * @param string $password The password challenge + * @param string $challenge The SCRAM challenge + * @param string $password The password challenge * @return string|false The response to send; false in case of wrong challenge or if an initial response has not * been generated first. */ @@ -141,7 +141,7 @@ private function generateResponse(string $challenge, string $password): string|b return false; } $nonce = $matches[1]; - $salt = base64_decode($matches[2]); + $salt = base64_decode($matches[2]); if (!$salt) { return false; @@ -153,17 +153,17 @@ private function generateResponse(string $challenge, string $password): string|b return false; } - $channelBinding = 'c=' . base64_encode($this->gs2Header); - $finalMessage = $channelBinding . ',r=' . $nonce; - $saltedPassword = $this->hi($password, $salt, $i); + $channelBinding = 'c=' . base64_encode($this->gs2Header); + $finalMessage = $channelBinding . ',r=' . $nonce; + $saltedPassword = $this->hi($password, $salt, $i); $this->saltedPassword = $saltedPassword; - $clientKey = $this->hmac($saltedPassword, "Client Key"); - $storedKey = $this->hash($clientKey); - $authMessage = $this->firstMessageBare . ',' . $challenge . ',' . $finalMessage; - $this->authMessage = $authMessage; - $clientSignature = $this->hmac($storedKey, $authMessage); - $clientProof = $clientKey ^ $clientSignature; - $proof = ',p=' . base64_encode($clientProof); + $clientKey = $this->hmac($saltedPassword, "Client Key"); + $storedKey = $this->hash($clientKey); + $authMessage = $this->firstMessageBare . ',' . $challenge . ',' . $finalMessage; + $this->authMessage = $authMessage; + $clientSignature = $this->hmac($storedKey, $authMessage); + $clientProof = $clientKey ^ $clientSignature; + $proof = ',p=' . base64_encode($clientProof); return $finalMessage . $proof; } @@ -171,18 +171,18 @@ private function generateResponse(string $challenge, string $password): string|b /** * Hi() call, which is essentially PBKDF2 (RFC-2898) with HMAC-H() as the pseudorandom function. * - * @param string $str The string to hash. + * @param string $str The string to hash. * @param string $salt The salt value. * @param int $i The iteration count. * @return string The hashed string. */ private function hi(string $str, string $salt, int $i): string { - $int1 = "\0\0\0\1"; - $ui = $this->hmac($str, $salt . $int1); + $int1 = "\0\0\0\1"; + $ui = $this->hmac($str, $salt . $int1); $result = $ui; for ($k = 1; $k < $i; $k++) { - $ui = $this->hmac($str, $ui); + $ui = $this->hmac($str, $ui); $result = $result ^ $ui; } return $result; @@ -207,10 +207,10 @@ public function verify(string $data): bool return false; } - $verifier = $matches[1]; + $verifier = $matches[1]; $proposedServerSignature = base64_decode($verifier); - $serverKey = $this->hmac($this->saltedPassword, "Server Key"); - $serverSignature = $this->hmac($serverKey, $this->authMessage); + $serverKey = $this->hmac($this->saltedPassword, "Server Key"); + $serverSignature = $this->hmac($serverKey, $this->authMessage); return $proposedServerSignature === $serverSignature; } @@ -245,7 +245,7 @@ public function getAuthMessage(): string */ private function hash($data): string { - return hash('sha1', $data, true); + return hash('sha256', $data, true); } /** @@ -255,7 +255,7 @@ private function hash($data): string */ private function hmac($key, $str): string { - return hash_hmac('sha1', $str, $key, true); + return hash_hmac('sha256', $str, $key, true); } /** @@ -279,10 +279,17 @@ private function generateCnonce(): string } /** + * Encode credentials for MongoDB SCRAM-SHA-256 + * For SCRAM-SHA-256, we store the raw password and let the SCRAM mechanism handle it + * + * @param string $username + * @param string $password * @return string */ public static function encodeCredentials($username, $password): string { - return \md5($username . ':mongo:' . $password); + // For SCRAM-SHA-256, return the raw password + // The SCRAM mechanism will handle the proper hashing + return $password; } } diff --git a/src/Client.php b/src/Client.php index e374806..57f99a9 100644 --- a/src/Client.php +++ b/src/Client.php @@ -3,11 +3,13 @@ namespace Utopia\Mongo; use MongoDB\BSON\Document; -use Swoole\Client as SwooleClient; -use Swoole\Coroutine\Client as CoroutineClient; +use MongoDB\BSON\Int64; +use MongoDB\Driver\Exception\InvalidArgumentException; use Ramsey\Uuid\Uuid; use stdClass; +use Swoole\Client as SwooleClient; use Swoole\Coroutine; +use Swoole\Coroutine\Client as CoroutineClient; class Client { @@ -21,6 +23,26 @@ class Client */ private SwooleClient|CoroutineClient $client; + /** + * Active sessions with transaction state tracking. + */ + private array $sessions = []; + + /** + * Current cluster time for causal consistency. + */ + private ?object $clusterTime = null; + + /** + * Current operation time for causal consistency. + */ + private ?object $operationTime = null; + + /** + * Connection status flag. + */ + private bool $isConnected = false; + /** * Defines commands Mongo uses over wire protocol. */ @@ -42,6 +64,38 @@ class Client public const COMMAND_COMMIT_TRANSACTION = "commitTransaction"; public const COMMAND_ABORT_TRANSACTION = "abortTransaction"; public const COMMAND_END_SESSIONS = "endSessions"; + public const COMMAND_LIST_INDEXES = "listIndexes"; + public const COMMAND_COLLMOD = "collMod"; + + // Connection and performance settings + private int $defaultMaxTimeMS = 30000; // 30 seconds default + + // Transaction error codes for retry logic + public const TRANSIENT_TRANSACTION_ERROR = 'TransientTransactionError'; + public const UNKNOWN_TRANSACTION_COMMIT_RESULT = 'UnknownTransactionCommitResult'; + public const TRANSACTION_TIMEOUT_ERROR = 50; + public const TRANSACTION_ABORTED_ERROR = 251; + + // Transaction states + public const TRANSACTION_NONE = 'none'; + public const TRANSACTION_STARTING = 'starting'; + public const TRANSACTION_IN_PROGRESS = 'in_progress'; + public const TRANSACTION_COMMITTED = 'committed'; + public const TRANSACTION_ABORTED = 'aborted'; + + // Read concerns + public const READ_CONCERN_LOCAL = 'local'; + public const READ_CONCERN_AVAILABLE = 'available'; + public const READ_CONCERN_MAJORITY = 'majority'; + public const READ_CONCERN_LINEARIZABLE = 'linearizable'; + public const READ_CONCERN_SNAPSHOT = 'snapshot'; + + // Read preferences + public const READ_PREFERENCE_PRIMARY = 'primary'; + public const READ_PREFERENCE_SECONDARY = 'secondary'; + public const READ_PREFERENCE_PRIMARY_PREFERRED = 'primaryPreferred'; + public const READ_PREFERENCE_SECONDARY_PREFERRED = 'secondaryPreferred'; + public const READ_PREFERENCE_NEAREST = 'nearest'; /** @@ -72,6 +126,7 @@ class Client * @param string $user * @param string $password * @param Boolean $useCoroutine + * @throws \Exception */ public function __construct( string $database, @@ -81,6 +136,22 @@ public function __construct( string $password, bool $useCoroutine = false ) { + if (empty($database)) { + throw new \InvalidArgumentException('Database name cannot be empty'); + } + if (empty($host)) { + throw new \InvalidArgumentException('Host cannot be empty'); + } + if ($port <= 0 || $port > 65535) { + throw new \InvalidArgumentException('Port must be between 1 and 65535'); + } + if (empty($user)) { + throw new \InvalidArgumentException('Username cannot be empty'); + } + if (empty($password)) { + throw new \InvalidArgumentException('Password cannot be empty'); + } + $this->id = uniqid('utopia.mongo.client'); $this->database = $database; $this->host = $host; @@ -89,11 +160,11 @@ public function __construct( // Only use coroutines if explicitly requested and we're in a coroutine context if ($useCoroutine) { try { - $cid = \Swoole\Coroutine::getCid(); - if ($cid === false || $cid < 0) { + $cid = Coroutine::getCid(); + if ($cid <= 0) { $useCoroutine = false; } - } catch (\Throwable $e) { + } catch (\Throwable) { $useCoroutine = false; } } @@ -102,6 +173,15 @@ public function __construct( ? new CoroutineClient(SWOOLE_SOCK_TCP | SWOOLE_KEEP) : new SwooleClient(SWOOLE_SOCK_TCP | SWOOLE_KEEP); + // Set socket options to prevent hanging + $this->client->set([ + 'open_tcp_keepalive' => true, + 'tcp_keepidle' => 4, // Start keepalive after 4s idle + 'tcp_keepinterval' => 3, // Keepalive interval 3s + 'tcp_keepcount' => 2, // Close after 2 failed keepalives + 'timeout' => 30 // 30 second connection timeout + ]); + $this->auth = new Auth([ 'authcid' => $user, 'secret' => Auth::encodeCredentials($user, $password) @@ -109,8 +189,7 @@ public function __construct( } /** - * Connect to Mongo using TCP/IP - * and Wire Protocol. + * Connect to MongoDB using TCP/IP and Wire Protocol. * @throws Exception */ public function connect(): self @@ -118,75 +197,167 @@ public function connect(): self if ($this->client->isConnected()) { return $this; } + + // Validate connection parameters before attempting connection + $validateConnectionParams = function () { + if (empty($this->host)) { + throw new Exception('MongoDB host cannot be empty'); + } + if ($this->port <= 0 || $this->port > 65535) { + throw new Exception('MongoDB port must be between 1 and 65535'); + } + }; + + $validateConnectionParams(); + if (!$this->client->connect($this->host, $this->port)) { throw new Exception("Failed to connect to MongoDB at {$this->host}:{$this->port}"); } + $this->isConnected = true; + [$payload, $db] = $this->auth->start(); $res = $this->query($payload, $db); [$payload, $db] = $this->auth->continue($res); - $res = $this->query($payload, $db); + $this->query($payload, $db); return $this; } /** - * Create a UUID. + * Create a UUID using UUID7 standard for MongoDB _id field. + * * @return string */ public function createUuid(): string { - return Uuid::uuid7()->toString(); + return Uuid::uuid7()->toString(); } /** - * Send a raw string query to connection. - * @param string $qry - * @return mixed - */ - public function raw_query(string $qry): mixed - { - return $this->send($qry); - } - - /** - * Send a BSON packed query to connection. + * Send a BSON packed query to connection with comprehensive session, causal consistency, and transaction support. * - * @param array $command - * @param string|null $db - * @return stdClass|array|int + * @param array $command Command to execute + * @param string|null $db Database name + * @return stdClass|array|int Query result * @throws Exception */ public function query(array $command, ?string $db = null): stdClass|array|int { + // Validate connection state before each operation + $this->validateConnection(); + + $sessionId = null; + + // Extract and process session from options if provided + if (isset($command['session'])) { + $sessionData = $command['session']; + unset($command['session']); + + // Handle different session formats + if (is_array($sessionData) && isset($sessionData['id'])) { + $command['lsid'] = $sessionData['id']; + $rawId = $sessionData['id']->id ?? null; + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; + } else { + $command['lsid'] = $sessionData; + $rawId = $sessionData->id ?? null; + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; + } + + // Add transaction parameters if session is in transaction + if ($sessionId && isset($this->sessions[$sessionId]) && + $this->sessions[$sessionId]['state'] === self::TRANSACTION_IN_PROGRESS) { + $command['txnNumber'] = new Int64($this->sessions[$sessionId]['txnNumber']); + $command['autocommit'] = false; + + // Check if this is the first operation + $isFirstOperation = !isset($this->sessions[$sessionId]['firstOperationDone']); + + // Add the first operation flag for the first operation in the transaction + if ($isFirstOperation) { + $command['startTransaction'] = true; + $this->sessions[$sessionId]['firstOperationDone'] = true; + + // Add transaction options from startTransaction + if (isset($this->sessions[$sessionId]['transactionOptions'])) { + $txnOpts = $this->sessions[$sessionId]['transactionOptions']; + if (isset($txnOpts['readConcern']) && !isset($command['readConcern'])) { + $command['readConcern'] = $txnOpts['readConcern']; + } + if (isset($txnOpts['writeConcern']) && !isset($command['writeConcern'])) { + $command['writeConcern'] = $txnOpts['writeConcern']; + } + } + } + + // IMPORTANT: Do NOT add causal consistency readConcern for ANY operation in a transaction + // MongoDB transactions provide their own consistency guarantees, and readConcern can only + // be specified on the first operation (which is handled above via transactionOptions) + // Attempting to add readConcern to subsequent operations will cause E72 InvalidOptions error + + // Remove any readConcern that might have been added before this point for non-first operations + if (!$isFirstOperation && isset($command['readConcern'])) { + unset($command['readConcern']); + } + } else { + // Not in a transaction - can add causal consistency readConcern freely + if ($this->operationTime !== null && !isset($command['readConcern']['afterClusterTime'])) { + $command['readConcern'] = $command['readConcern'] ?? []; + $command['readConcern']['afterClusterTime'] = $this->operationTime; + } + } + } else { + // No session - can add causal consistency readConcern freely (unless explicitly skipped) + if (!isset($command['skipReadConcern']) && + $this->operationTime !== null && + !isset($command['readConcern']['afterClusterTime'])) { + $command['readConcern'] = $command['readConcern'] ?? []; + $command['readConcern']['afterClusterTime'] = $this->operationTime; + } + } + + // Remove internal flag before sending to MongoDB + unset($command['skipReadConcern']); + + // CRITICAL: Remove readConcern from any non-first operation in a transaction + // MongoDB will reject commands with readConcern that have txnNumber but not startTransaction + if (isset($command['txnNumber']) && !isset($command['startTransaction']) && isset($command['readConcern'])) { + unset($command['readConcern']); + } + + // Add cluster time for causal consistency + if ($this->clusterTime !== null) { + $command['$clusterTime'] = $this->clusterTime; + } + $params = array_merge($command, [ '$db' => $db ?? $this->database, ]); $sections = Document::fromPHP($params); $message = pack('V*', 21 + strlen($sections), $this->id, 0, 2013, 0) . "\0" . $sections; - return $this->send($message); - } + $result = $this->send($message); - /** - * Send a synchronous command to connection. - */ - public function blocking(string $cmd): stdClass|array|int - { - $this->client->send($cmd . PHP_EOL); - - $result = ''; + // Update causal consistency timestamps from response + $this->updateCausalConsistency($result); - while ($result = $this->client->recv()) { - sleep(1); + // Update session last use time if session was provided + if ($sessionId && isset($this->sessions[$sessionId])) { + $this->sessions[$sessionId]['lastUse'] = time(); } return $result; } + /** * Send a message to connection. * @@ -206,73 +377,69 @@ public function send(mixed $data): stdClass|array|int */ private function receive(): stdClass|array|int { + $chunks = []; $receivedLength = 0; $responseLength = null; - $res = ''; + $attempts = 0; + $maxAttempts = 10000; + $sleepTime = 100; do { - if (($chunk = $this->client->recv()) === false) { - sleep(1); // Prevent excessive CPU Load, test lower. + $chunk = $this->client->recv(); + + if ($chunk === false || $chunk === '') { + $attempts++; + if ($attempts >= $maxAttempts) { + throw new Exception('Receive timeout: no data received within reasonable time'); + } + + // Adaptive backoff: shorter delays for coroutines, longer for sync + if ($this->client instanceof CoroutineClient) { + Coroutine::sleep(0.001); // 1ms for coroutines + } else { + \usleep($sleepTime); // Microsecond precision for sync client + $sleepTime = \min($sleepTime * 1.2, 10000); // Cap at 10ms for faster checking + } continue; } - $receivedLength += strlen($chunk); - $res .= $chunk; + // Reset attempts counter when we receive data + $attempts = 0; + $sleepTime = 100; // Reset to 0.1ms - if ((!isset($responseLength)) && (strlen($res) >= 4)) { - $responseLength = unpack('Vl', substr($res, 0, 4))['l']; - } - } while ( - (!isset($responseLength)) || ($receivedLength < $responseLength) - ); + $chunkLen = \strlen($chunk); + $receivedLength += $chunkLen; + $chunks[] = $chunk; - /* - * The first 21 bytes of the MongoDB wire protocol response consist of: - * - 16 bytes: Standard message header, which includes: - * - messageLength (4 bytes): Total size of the message, including the header. - * - requestID (4 bytes): Identifier for this message. - * - responseTo (4 bytes): The requestID that this message is responding to. - * - opCode (4 bytes): The operation code for the message type (e.g., OP_MSG). - * - 4 bytes: flagBits, which provide additional information about the message. - * - 1 byte: payloadType, indicating the type of the following payload (usually 0 for a BSON document). - * - * These 21 bytes are protocol metadata and precede the actual BSON-encoded document in the response. - */ + // Parse message length from first 4 bytes + if ($responseLength === null && $receivedLength >= 4) { + $firstData = $chunks[0]; - $bsonString = substr($res, 21, $responseLength - 21); - $result = Document::fromBSON($bsonString)->toPHP(); - if (is_array($result)) { - $result = (object) $result; - } - if (property_exists($result, "writeErrors")) { - // Throws Utopia\Mongo\Exception - throw new Exception( - $result->writeErrors[0]->errmsg, - $result->writeErrors[0]->code - ); - } + if (\strlen($firstData) < 4) { + $firstData = \implode('', $chunks); + } - if (property_exists($result, 'errmsg')) { - // Throws Utopia\Mongo\Exception - throw new Exception( - 'E'.$result->code.' '.$result->codeName.': '.$result->errmsg, - $result->code - ); - } + $responseLength = \unpack('Vl', substr($firstData, 0, 4))['l']; - if (property_exists($result, "n") && $result->ok === 1.0) { - return $result->n; - } + // Validate response length before allocating memory to prevent memory exhaustion + if ($responseLength > 16777216) { // 16MB limit + throw new Exception('Response too large: ' . $responseLength . ' bytes'); + } - if (property_exists($result, "nonce") && $result->ok === 1.0) { - return $result; - } + // Validate for negative or tiny values + if ($responseLength < 21) { // Minimum MongoDB message size + throw new Exception('Invalid response length: ' . $responseLength . ' bytes'); + } + } - if ($result->ok === 1.0) { - return $result; - } + if ($responseLength !== null && $receivedLength >= $responseLength) { + break; + } + } while (true); - return $result->cursor->firstBatch; + $res = \implode('', $chunks); + + return $this->parseResponse($res, $responseLength); } /** @@ -340,7 +507,7 @@ public function selectCollection($name): self */ public function createCollection(string $name, array $options = []): bool { - $list = $this->listCollectionNames(["name" => $name]); + $list = $this->listCollectionNames(["name" => $name], $options); if (\count($list->cursor->firstBatch) > 0) { throw new Exception('Collection Exists'); @@ -410,7 +577,7 @@ public function listCollectionNames(array $filter = [], array $options = []): st public function createIndexes(string $collection, array $indexes, array $options = []): bool { foreach ($indexes as $key => $index) { - if (\array_key_exists('unique', $index) && $index['unique'] == true) { + if ($index['unique'] ?? false) { /** * TODO: Unique Indexes are now sparse indexes, which results into incomplete indexes. * However, if partialFilterExpression is present, we can't use sparse. @@ -458,14 +625,20 @@ public function dropIndexes(string $collection, array $indexes, array $options = } /** - * Insert a document/s. + * Insert document with full transaction and session support. * https://docs.mongodb.com/manual/reference/command/insert/#mongodb-dbcommand-dbcmd.insert * - * @param string $collection - * @param array $document - * @param array $options + * @param string $collection Collection name + * @param array $document Document to insert + * @param array $options Options array supporting: + * - session: Session object for transactions + * - writeConcern: Write concern specification + * - readConcern: Read concern specification + * - readPreference: Read preference + * - maxTimeMS: Operation timeout in milliseconds + * - ordered: Whether to stop on first error (default: true) * - * @return array + * @return array Inserted document with _id * @throws Exception */ public function insert(string $collection, array $document, array $options = []): array @@ -480,38 +653,115 @@ public function insert(string $collection, array $document, array $options = []) $docObj->_id = $this->createUuid(); } - $this->query(array_merge([ + // Build command with session and concerns + $command = [ self::COMMAND_INSERT => $collection, 'documents' => [$docObj], - ], $options)); + ]; + + // Add session if provided + if (isset($options['session'])) { + $command['session'] = $options['session']; + } + + // Add write concern if provided with validation + if (isset($options['writeConcern'])) { + $command['writeConcern'] = $this->createWriteConcern($options['writeConcern']); + } + + // Add read concern if provided with validation (skip for non-first transaction operations) + if (isset($options['readConcern']) && !$this->shouldSkipReadConcern($options)) { + $command['readConcern'] = $this->createReadConcern($options['readConcern']); + } + + // Add other options (excluding those we've already handled) + $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern'])); + $command = array_merge($command, $otherOptions); + + $this->query($command); return $this->toArray($docObj); } + /** + * Insert multiple documents with improved batching for MongoDB 8+ performance. + * Automatically handles large datasets by batching operations with full transaction support. + * + * @param string $collection Collection name + * @param array $documents Array of documents to insert + * @param array $options Options array supporting: + * - session: Session object for transactions + * - writeConcern: Write concern specification + * - readConcern: Read concern specification + * - readPreference: Read preference + * - maxTimeMS: Operation timeout in milliseconds + * - ordered: Whether to stop on first error (default: true) + * - batchSize: Number of documents per batch (default: 1000) + * @return array Array of inserted documents with generated _ids + * @throws Exception + */ public function insertMany(string $collection, array $documents, array $options = []): array { - $docObjs = []; + if (empty($documents)) { + return []; + } - foreach ($documents as $document) { - $docObj = new stdClass(); + $batchSize = $options['batchSize'] ?? 1000; + $ordered = $options['ordered'] ?? true; + $insertedDocs = []; - foreach ($document as $key => $value) { - $docObj->{$key} = $value; + // Process documents in batches for better performance + $batches = array_chunk($documents, $batchSize); + + foreach ($batches as $batch) { + $docObjs = []; + + foreach ($batch as $document) { + $docObj = new stdClass(); + + foreach ($document as $key => $value) { + $docObj->{$key} = $value; + } + + if (!isset($docObj->_id) || $docObj->_id === '' || $docObj->_id === null) { + $docObj->_id = $this->createUuid(); + } + + $docObjs[] = $docObj; + } + + // Build command with session and concerns + $command = [ + self::COMMAND_INSERT => $collection, + 'documents' => $docObjs, + 'ordered' => $ordered, + ]; + + // Add session if provided + if (isset($options['session'])) { + $command['session'] = $options['session']; } - if (!isset($docObj->_id) || $docObj->_id === '' || $docObj->_id === null) { - $docObj->_id = $this->createUuid(); + // Add write concern if provided with validation + if (isset($options['writeConcern'])) { + $command['writeConcern'] = $this->createWriteConcern($options['writeConcern']); } - $docObjs[] = $docObj; - } + // Add read concern if provided with validation (skip for non-first transaction operations) + if (isset($options['readConcern']) && !$this->shouldSkipReadConcern($options)) { + $command['readConcern'] = $this->createReadConcern($options['readConcern']); + } - $this->query(array_merge([ - self::COMMAND_INSERT => $collection, - 'documents' => $docObjs, - ], $options)); + // Add other options (excluding those we've already handled) + $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize'])); + $command = array_merge($command, $otherOptions); + + $this->query($command); - return $this->toArray($docObjs); + $insertedDocs = array_merge($insertedDocs, $this->toArray($docObjs)); + } + + return $insertedDocs; } /** @@ -534,33 +784,60 @@ public function lastDocument(string $collection): array } /** - * Update a document/s. + * Update document(s) with full transaction and session support. * https://docs.mongodb.com/manual/reference/command/update/#syntax * - * @param string $collection - * @param array $where - * @param array $updates - * @param array $options - * @param bool $multi + * @param string $collection Collection name + * @param array $where Filter criteria + * @param array $updates Update operations + * @param array $options Options array supporting: + * - session: Session object for transactions + * - writeConcern: Write concern specification + * - readConcern: Read concern specification + * - readPreference: Read preference + * - maxTimeMS: Operation timeout in milliseconds + * - upsert: Whether to insert if no match found + * - arrayFilters: Array filters for updates + * @param bool $multi Whether to update multiple documents * * @return Client * @throws Exception */ public function update(string $collection, array $where = [], array $updates = [], array $options = [], bool $multi = false): self { - $this->query( - array_merge([ - self::COMMAND_UPDATE => $collection, - 'updates' => [ - [ - 'q' => $this->toObject($where), - 'u' => $this->toObject($updates), - 'multi' => $multi, - 'upsert' => false - ] + // Build command with session and concerns + $command = [ + self::COMMAND_UPDATE => $collection, + 'updates' => [ + [ + 'q' => $this->toObject($where), + 'u' => $this->toObject($updates), + 'multi' => $multi, + 'upsert' => $options['upsert'] ?? false ] - ], $options) - ); + ] + ]; + + // Add session if provided + if (isset($options['session'])) { + $command['session'] = $options['session']; + } + + // Add write concern if provided with validation + if (isset($options['writeConcern'])) { + $command['writeConcern'] = $this->createWriteConcern($options['writeConcern']); + } + + // Add read concern if provided with validation (skip for non-first transaction operations) + if (isset($options['readConcern']) && !$this->shouldSkipReadConcern($options)) { + $command['readConcern'] = $this->createReadConcern($options['readConcern']); + } + + // Add other options (excluding those we've already handled) + $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'upsert'])); + $command = array_merge($command, $otherOptions); + + $this->query($command); return $this; } @@ -604,28 +881,105 @@ public function upsert(string $collection, array $operations, array $options = [ } - /** - * Find a document/s. + * Find document(s) with full transaction and session support. * https://docs.mongodb.com/manual/reference/command/find/#mongodb-dbcommand-dbcmd.find * - * @param string $collection - * @param array $filters - * @param array $options - * - * @return stdClass + * @param string $collection Collection name + * @param array $filters Query filters + * @param array $options Options array supporting: + * - session: Session object for transactions + * - readConcern: Read concern specification + * - readPreference: Read preference + * - maxTimeMS: Operation timeout in milliseconds + * - limit: Maximum number of documents to return + * - skip: Number of documents to skip + * - sort: Sort specification + * - projection: Field projection specification + * - hint: Index hint + * - allowPartialResults: Allow partial results from sharded clusters + * @return stdClass Query result * @throws Exception */ public function find(string $collection, array $filters = [], array $options = []): stdClass { $filters = $this->cleanFilters($filters); - return $this->query( - array_merge([ - self::COMMAND_FIND => $collection, - 'filter' => $this->toObject($filters), - ], $options) - ); + // Build command with session and concerns + $command = [ + self::COMMAND_FIND => $collection, + 'filter' => $this->toObject($filters), + ]; + + // Add session if provided + if (isset($options['session'])) { + $command['session'] = $options['session']; + } + + // Add read concern if provided with validation (skip for non-first transaction operations) + if (isset($options['readConcern']) && !$this->shouldSkipReadConcern($options)) { + $command['readConcern'] = $this->createReadConcern($options['readConcern']); + } + + // Add read preference if provided + if (isset($options['readPreference'])) { + $command['$readPreference'] = $options['readPreference']; + } + + // Add other options (excluding those we've already handled) + $otherOptions = array_diff_key($options, array_flip(['session', 'readConcern', 'readPreference'])); + $command = array_merge($command, $otherOptions); + + return $this->query($command); + } + + /** + * Aggregate a collection pipeline with full transaction and session support. + * + * @param string $collection Collection name + * @param array $pipeline Aggregation pipeline + * @param array $options Options array supporting: + * - session: Session object for transactions + * - readConcern: Read concern specification + * - readPreference: Read preference + * - maxTimeMS: Operation timeout in milliseconds + * - allowDiskUse: Allow using disk for large result sets + * - batchSize: Batch size for cursor + * - hint: Index hint + * - explain: Return query execution plan + * + * @return stdClass Aggregation result + * @throws Exception + */ + public function aggregate(string $collection, array $pipeline, array $options = []): stdClass + { + // Build command with session and concerns + $command = [ + self::COMMAND_AGGREGATE => $collection, + 'pipeline' => $pipeline, + 'cursor' => $this->toObject([]), + ]; + + // Add session if provided + if (isset($options['session'])) { + $command['session'] = $options['session']; + } + + // Add read concern if provided with validation (skip for non-first transaction operations) + if (isset($options['readConcern']) && !$this->shouldSkipReadConcern($options)) { + $command['readConcern'] = $this->createReadConcern($options['readConcern']); + } + + // Add read preference if provided + if (isset($options['readPreference'])) { + $command['$readPreference'] = $options['readPreference']; + } + + // Add other options (excluding those we've already handled) + $otherOptions = array_diff_key($options, array_flip(['session', 'readConcern', 'readPreference'])); + $command = array_merge($command, $otherOptions); + + return $this->query($command); } /** @@ -673,61 +1027,106 @@ public function getMore(int $cursorId, string $collection, int $batchSize = 25): } /** - * Delete a document/s. + * Delete document(s) with full transaction and session support. * https://docs.mongodb.com/manual/reference/command/delete/#mongodb-dbcommand-dbcmd.delete * - * @param string $collection - * @param array $filters - * @param int $limit - * @param array $deleteOptions - * @param array $options + * @param string $collection Collection name + * @param array $filters Delete filters + * @param int $limit Maximum number of documents to delete + * @param array $deleteOptions Delete operation options + * @param array $options Options array supporting: + * - session: Session object for transactions + * - writeConcern: Write concern specification + * - readConcern: Read concern specification + * - readPreference: Read preference + * - maxTimeMS: Operation timeout in milliseconds + * - hint: Index hint * - * @return int + * @return int Number of deleted documents * @throws Exception */ public function delete(string $collection, array $filters = [], int $limit = 1, array $deleteOptions = [], array $options = []): int { - return $this->query( - array_merge( - [ - self::COMMAND_DELETE => $collection, - 'deletes' => [ - $this->toObject( - array_merge( - [ - 'q' => $this->toObject($filters), - 'limit' => $limit, - ], - $deleteOptions - ) - ), - ] - ], - $options - ) - ); + // Build command with session and concerns + $command = [ + self::COMMAND_DELETE => $collection, + 'deletes' => [ + $this->toObject( + array_merge( + [ + 'q' => $this->toObject($filters), + 'limit' => $limit, + ], + $deleteOptions + ) + ), + ] + ]; + + // Add session if provided + if (isset($options['session'])) { + $command['session'] = $options['session']; + } + + // Add write concern if provided with validation + if (isset($options['writeConcern'])) { + $command['writeConcern'] = $this->createWriteConcern($options['writeConcern']); + } + + // Add read concern if provided with validation (skip for non-first transaction operations) + if (isset($options['readConcern']) && !$this->shouldSkipReadConcern($options)) { + $command['readConcern'] = $this->createReadConcern($options['readConcern']); + } + + // Add other options (excluding those we've already handled) + $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern'])); + $command = array_merge($command, $otherOptions); + + return $this->query($command); } /** - * Count documents. + * Count documents with full transaction and session support. * - * @param string $collection - * @param array $filters - * @param array $options + * @param string $collection Collection name + * @param array $filters Query filters + * @param array $options Options array supporting: + * - session: Session object for transactions + * - readConcern: Read concern specification + * - readPreference: Read preference + * - maxTimeMS: Operation timeout in milliseconds + * - limit: Maximum number of documents to count + * - skip: Number of documents to skip + * - hint: Index hint * - * @return int + * @return int Number of matching documents * @throws Exception */ public function count(string $collection, array $filters, array $options): int { $filters = $this->cleanFilters($filters); - // Use MongoDB's native count command with the working format instad of running find and count the results + // Use MongoDB's native count command with the working format instead of running find and count the results $command = [ self::COMMAND_COUNT => $collection, 'query' => $this->toObject($filters), ]; + // Add session if provided + if (isset($options['session'])) { + $command['session'] = $options['session']; + } + + // Add read concern if provided with validation (skip for non-first transaction operations) + if (isset($options['readConcern']) && !$this->shouldSkipReadConcern($options)) { + $command['readConcern'] = $this->createReadConcern($options['readConcern']); + } + + // Add read preference if provided + if (isset($options['readPreference'])) { + $command['$readPreference'] = $options['readPreference']; + } + // Add limit if specified if (isset($options['limit'])) { $command['limit'] = (int)$options['limit']; @@ -743,6 +1142,10 @@ public function count(string $collection, array $filters, array $options): int $command['maxTimeMS'] = (int)$options['maxTimeMS']; } + // Add other options (excluding those we've already handled) + $otherOptions = array_diff_key($options, array_flip(['session', 'readConcern', 'readPreference', 'limit', 'skip', 'maxTimeMS'])); + $command = array_merge($command, $otherOptions); + try { $result = $this->query($command); return (int)$result; @@ -751,107 +1154,277 @@ public function count(string $collection, array $filters, array $options): int } } - /** - * Aggregate a collection pipeline. - * - * @param string $collection - * @param array $pipeline - * @param array $options - * - * @return stdClass - * @throws Exception - */ - public function aggregate(string $collection, array $pipeline, array $options = []): stdClass - { - return $this->query(array_merge([ - self::COMMAND_AGGREGATE => $collection, - 'pipeline' => $pipeline, - 'cursor' => $this->toObject([]), - ], $options)); - } /** - * Start a new logical session. Returns the session id object.. + * Start a new logical session with comprehensive state tracking for transactions. * - * @return object + * @param array $options Session options supporting: + * - causalConsistency: Enable causal consistency (default: true) + * - defaultTransactionOptions: Default transaction options + * @return array Session object with comprehensive state tracking * @throws Exception */ - public function startSession(): object + public function startSession(array $options = []): array { - $result = $this->query([ - self::COMMAND_START_SESSION => 1 - ], 'admin'); - - return $result->id->id; - } + $sessionOptions = [ + 'causalConsistency' => $options['causalConsistency'] ?? true + ]; - /** - * Commit a transaction. - * - * @param array $lsid - * @param int $txnNumber - * @param bool $autocommit - * @return mixed - * @throws Exception - */ - public function commitTransaction(array $lsid, int $txnNumber, bool $autocommit = false) - { - $txnNumber = new \MongoDB\BSON\Int64($txnNumber); + if (isset($options['defaultTransactionOptions'])) { + $sessionOptions['defaultTransactionOptions'] = $options['defaultTransactionOptions']; + } $result = $this->query([ - self::COMMAND_COMMIT_TRANSACTION => 1, - 'lsid' => $lsid, - 'txnNumber' => $txnNumber, - 'autocommit' => $autocommit + self::COMMAND_START_SESSION => 1, + 'options' => $sessionOptions ], 'admin'); - // End the session after successful commit - $this->endSessions([$lsid]); + // Convert BSON\Binary to string for use as array key + $sessionId = $result->id->id instanceof \MongoDB\BSON\Binary + ? bin2hex($result->id->id->getData()) + : (string)$result->id->id; + + // Initialize session state tracking + $this->sessions[$sessionId] = [ + 'id' => $result->id, + 'state' => self::TRANSACTION_NONE, + 'txnNumber' => 0, + 'lastUse' => time(), + 'operationTime' => null, + 'clusterTime' => null, + 'options' => $sessionOptions, + 'retryableWriteNumber' => 0 + ]; - return $result; + return ['id' => $result->id, 'sessionId' => $sessionId]; } /** - * Abort (rollback) a transaction. + * Start a new transaction on a session with comprehensive state management. * - * @param array $lsid - * @param int $txnNumber - * @param bool $autocommit - * @return mixed + * @param array $session Session from startSession() + * @param array $options Transaction options supporting: + * - readConcern: Read concern specification + * - writeConcern: Write concern specification + * - readPreference: Read preference + * - maxCommitTimeMS: Maximum time to allow for commit + * @return bool Success status * @throws Exception */ - public function abortTransaction(array $lsid, int $txnNumber, bool $autocommit = false) + public function startTransaction(array $session, array $options = []): bool { - $txnNumber = new \MongoDB\BSON\Int64($txnNumber); + $rawId = $session['sessionId'] ?? ($session['id']->id ?? null); + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; - $result = $this->query([ - self::COMMAND_ABORT_TRANSACTION => 1, - 'lsid' => $lsid, - 'txnNumber' => $txnNumber, - 'autocommit' => $autocommit - ], 'admin'); + if (!$sessionId || !isset($this->sessions[$sessionId])) { + throw new Exception('Invalid session provided to startTransaction'); + } - // End the session after successful rollback - $this->endSessions([$lsid]); + $sessionState = &$this->sessions[$sessionId]; - return $result; - } + // Check current transaction state + if ($sessionState['state'] === self::TRANSACTION_IN_PROGRESS) { + throw new Exception('Session already has a transaction in progress'); + } + + // Increment transaction number for new transaction + $sessionState['txnNumber']++; + + // In MongoDB, transactions are started implicitly with the first operation + // We just need to update the session state and store the options + $sessionState['state'] = self::TRANSACTION_IN_PROGRESS; + $sessionState['lastUse'] = time(); + + // Reset the firstOperationDone flag for the new transaction + unset($sessionState['firstOperationDone']); + + // Store transaction options for use with actual operations + $sessionState['transactionOptions'] = []; + + // Store read/write concerns if provided + if (isset($options['readConcern'])) { + $sessionState['transactionOptions']['readConcern'] = $options['readConcern']; + } + if (isset($options['writeConcern'])) { + $sessionState['transactionOptions']['writeConcern'] = $options['writeConcern']; + } + if (isset($options['readPreference'])) { + $sessionState['transactionOptions']['readPreference'] = $options['readPreference']; + } + if (isset($options['maxCommitTimeMS'])) { + $sessionState['transactionOptions']['maxCommitTimeMS'] = $options['maxCommitTimeMS']; + } + + return true; + } /** - * End sessions. + * Commit a transaction with comprehensive state management and retry support. * - * @param array $lsids - * @param array $options - * @return mixed + * @param array $session Session from startSession() + * @param array $options Commit options supporting: + * - writeConcern: Write concern specification + * - maxTimeMS: Maximum time for commit operation + * @return mixed Commit result * @throws Exception */ - public function endSessions(array $lsids, array $options = []) + public function commitTransaction(array $session, array $options = []) { - // Extract session IDs from the format ['id' => sessionId] and format as objects - $sessionIds = array_map(function ($lsid) { - $sessionId = $lsid['id'] ?? $lsid; - return ['id' => $sessionId]; - }, $lsids); + $rawId = $session['sessionId'] ?? ($session['id']->id ?? null); + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; + + if (!$sessionId || !isset($this->sessions[$sessionId])) { + throw new Exception('Invalid session provided to commitTransaction'); + } + + $sessionState = &$this->sessions[$sessionId]; + + // Check current transaction state + if ($sessionState['state'] !== self::TRANSACTION_IN_PROGRESS) { + throw new Exception('No active transaction to commit'); + } + + $command = [ + self::COMMAND_COMMIT_TRANSACTION => 1, + 'lsid' => $sessionState['id'], + 'txnNumber' => new Int64($sessionState['txnNumber']), + 'autocommit' => false, + 'skipReadConcern' => true // Internal flag to prevent adding readConcern + ]; + + // Add write concern if provided + if (isset($options['writeConcern'])) { + $command['writeConcern'] = $options['writeConcern']; + } + + // Add maxTimeMS if provided + if (isset($options['maxTimeMS'])) { + $command['maxTimeMS'] = $options['maxTimeMS']; + } + + try { + $result = $this->query($command, 'admin'); + + if ($result->ok === 1.0) { + $sessionState['state'] = self::TRANSACTION_COMMITTED; + $sessionState['lastUse'] = time(); + unset($sessionState['firstOperationDone']); // Reset for next transaction + } else { + $sessionState['state'] = self::TRANSACTION_ABORTED; + } + + return $result; + } catch (Exception $e) { + // Handle specific commit errors + if ($this->isTransientTransactionError($e) || $this->isUnknownTransactionCommitResult($e)) { + // Keep transaction state for retry + throw $e; + } + + $sessionState['state'] = self::TRANSACTION_ABORTED; + throw $e; + } + } + + /** + * Abort (rollback) a transaction with comprehensive state management. + * + * @param array $session Session from startSession() + * @param array $options Abort options supporting: + * - maxTimeMS: Maximum time for abort operation + * @return mixed Abort result + * @throws Exception + */ + public function abortTransaction(array $session, array $options = []) + { + $rawId = $session['sessionId'] ?? ($session['id']->id ?? null); + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; + + if (!$sessionId || !isset($this->sessions[$sessionId])) { + throw new Exception('Invalid session provided to abortTransaction'); + } + + $sessionState = &$this->sessions[$sessionId]; + + // Check current transaction state + if ($sessionState['state'] !== self::TRANSACTION_IN_PROGRESS && + $sessionState['state'] !== self::TRANSACTION_STARTING) { + throw new Exception('No active transaction to abort'); + } + + $command = [ + self::COMMAND_ABORT_TRANSACTION => 1, + 'lsid' => $sessionState['id'], + 'txnNumber' => new Int64($sessionState['txnNumber']), + 'autocommit' => false, + 'skipReadConcern' => true // Internal flag to prevent adding readConcern + ]; + + // Add maxTimeMS if provided + if (isset($options['maxTimeMS'])) { + $command['maxTimeMS'] = $options['maxTimeMS']; + } + + try { + $result = $this->query($command, 'admin'); + $sessionState['state'] = self::TRANSACTION_ABORTED; + $sessionState['lastUse'] = time(); + unset($sessionState['firstOperationDone']); // Reset for next transaction + return $result; + } catch (Exception $e) { + // Even if abort fails, mark transaction as aborted + $sessionState['state'] = self::TRANSACTION_ABORTED; + unset($sessionState['firstOperationDone']); // Reset for next transaction + throw $e; + } + } + + /** + * End sessions with proper state cleanup and validation. + * + * @param array $sessions Array of session objects from startSession() + * @param array $options End session options + * @return mixed Result of end sessions command + * @throws Exception + */ + public function endSessions(array $sessions, array $options = []) + { + $sessionIds = []; + + foreach ($sessions as $session) { + $rawId = $session['sessionId'] ?? ($session['id']->id ?? null); + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; + + if ($sessionId && isset($this->sessions[$sessionId])) { + $sessionState = $this->sessions[$sessionId]; + + // Warn about active transactions + if ($sessionState['state'] === self::TRANSACTION_IN_PROGRESS) { + \error_log("Warning: Ending session with active transaction. Transaction will be aborted."); + } + + $sessionIds[] = $sessionState['id']; + + // Clean up local session state + unset($this->sessions[$sessionId]); + } else { + // Handle legacy format + $sessionId = $session['id'] ?? $session; + $sessionIds[] = $sessionId; + } + } + + if (empty($sessionIds)) { + return new \stdClass(); // Return empty result if no valid sessions + } return $this->query( array_merge( @@ -895,7 +1468,7 @@ public function toArray(mixed $obj): ?array } if (is_object($obj) || is_array($obj)) { - $ret = (array) $obj; + $ret = (array)$obj; foreach ($ret as $item) { $item = $this->toArray($item); } @@ -949,4 +1522,545 @@ public function isReplicaSet(): bool $this->replicaSet = property_exists($result, 'setName'); return $this->replicaSet; } + + /** + * Close connection and clean up resources including active sessions. + */ + public function close(): void + { + // End any active sessions before closing connection + if (!empty($this->sessions)) { + try { + $activeSessions = []; + foreach ($this->sessions as $sessionId => $sessionData) { + $activeSessions[] = ['id' => $sessionData['id'], 'sessionId' => $sessionId]; + } + + if (!empty($activeSessions)) { + $this->endSessions($activeSessions); + } + } catch (Exception $e) { + // Silently ignore if connection is already lost during cleanup + if (!str_contains($e->getMessage(), 'Connection to MongoDB has been lost')) { + \error_log("Error ending sessions during close: " . $e->getMessage()); + } + } + } + + if (isset($this->client) && $this->client->isConnected()) { + $this->client->close(); + } + + $this->isConnected = false; + $this->sessions = []; + $this->clusterTime = null; + $this->operationTime = null; + } + + /** + * Parse MongoDB wire protocol response and handle BSON decoding. + * + * @param string $response Raw response data + * @param int $responseLength Expected response length + * @return stdClass|array|int Parsed response + * @throws Exception + */ + private function parseResponse(string $response, int $responseLength): stdClass|array|int + { + /* + * The first 21 bytes of the MongoDB wire protocol response consist of: + * - 16 bytes: Standard message header, which includes: + * - messageLength (4 bytes): Total size of the message, including the header. + * - requestID (4 bytes): Identifier for this message. + * - responseTo (4 bytes): The requestID that this message is responding to. + * - opCode (4 bytes): The operation code for the message type (e.g., OP_MSG). + * - 4 bytes: flagBits, which provide additional information about the message. + * - 1 byte: payloadType, indicating the type of the following payload (usually 0 for a BSON document). + * + * These 21 bytes are protocol metadata and precede the actual BSON-encoded document in the response. + */ + + if (\strlen($response) < 21) { + throw new Exception('Invalid response: too short'); + } + + // Extract message header + $header = \substr($response, 0, 16); + $headerData = \unpack('VmessageLength/VrequestID/VresponseTo/VopCode', $header); + + // Validate message length + if ($headerData['messageLength'] !== $responseLength) { + throw new Exception('Response length mismatch'); + } + + // Extract flag bits and payload type + $flagBits = \unpack('V', \substr($response, 16, 4))[1]; + $payloadType = \ord(\substr($response, 20, 1)); + + // Extract BSON document (skip header + flagBits + payloadType) + $bsonString = \substr($response, 21, $responseLength - 21); + + if (empty($bsonString)) { + return new \stdClass(); + } + + try { + // Parse BSON document + $result = Document::fromBSON($bsonString)->toPHP(); + + // Convert array to stdClass if needed + if (\is_array($result)) { + $result = (object)$result; + } + + // Check for write errors (duplicate key, etc.) + if (\property_exists($result, 'writeErrors') && !empty($result->writeErrors)) { + throw new Exception( + $result->writeErrors[0]->errmsg, + $result->writeErrors[0]->code + ); + } + + // Check for general MongoDB errors + if (\property_exists($result, 'errmsg')) { + throw new Exception( + 'E' . $result->code . ' ' . $result->codeName . ': ' . $result->errmsg, + $result->code + ); + } + + // Check for operation success + if (\property_exists($result, 'n') && $result->ok === 1.0) { + return $result->n; + } + + if (\property_exists($result, 'nonce') && $result->ok === 1.0) { + return $result; + } + + if ($result->ok === 1.0) { + return $result; + } + + return $result->cursor->firstBatch; + } catch (InvalidArgumentException $e) { + throw new Exception('Failed to parse BSON response: ' . $e->getMessage()); + } catch (\Exception $e) { + if ($e instanceof Exception) { + throw $e; + } + throw new Exception('Error parsing response: ' . $e->getMessage()); + } + } + + /** + * Check if an exception represents a transient transaction error. + * + * @param Exception $exception Exception to check + * @return bool True if transient transaction error + */ + public function isTransientTransactionError(Exception $exception): bool + { + $message = $exception->getMessage(); + $code = $exception->getCode(); + + // MongoDB transient error codes + $transientCodes = [ + self::TRANSACTION_TIMEOUT_ERROR, + self::TRANSACTION_ABORTED_ERROR, + 6, // HostUnreachable + 7, // HostNotFound + 89, // NetworkTimeout + 91, // ShutdownInProgress + 189, // PrimarySteppedDown + 262, // ExceededTimeLimit + 9001, // SocketException + 10107, // NotMaster + 11600, // InterruptedAtShutdown + 11602, // InterruptedDueToReplStateChange + 13435, // NotMasterNoSlaveOk + 13436, // NotMasterOrSecondary + ]; + + return \in_array($code, $transientCodes) || + \str_contains($message, self::TRANSIENT_TRANSACTION_ERROR) || + \str_contains($message, 'connection') || + \str_contains($message, 'timeout') || + \str_contains($message, 'network'); + } + + /** + * Check if an exception represents an unknown transaction commit result. + * + * @param Exception $exception Exception to check + * @return bool True if unknown commit result + */ + public function isUnknownTransactionCommitResult(Exception $exception): bool + { + $message = $exception->getMessage(); + $code = $exception->getCode(); + + $unknownCommitCodes = [ + self::TRANSACTION_TIMEOUT_ERROR, + 91, // ShutdownInProgress + 189, // PrimarySteppedDown + 262, // ExceededTimeLimit + 9001, // SocketException + 10107, // NotMaster + 11600, // InterruptedAtShutdown + 11602, // InterruptedDueToReplStateChange + 13435, // NotMasterNoSlaveOk + 13436, // NotMasterOrSecondary + ]; + + return \in_array($code, $unknownCommitCodes) || + \str_contains($message, self::UNKNOWN_TRANSACTION_COMMIT_RESULT); + } + + /** + * Execute a callback within a transaction with automatic retry logic. + * + * @param array $session Session from startSession() + * @param callable $callback Transaction callback that receives the session + * @param array $options Transaction options supporting: + * - readConcern: Read concern specification + * - writeConcern: Write concern specification + * - readPreference: Read preference + * - maxCommitTimeMS: Maximum time to allow for commit + * - maxRetries: Maximum number of retries (default: 3) + * - retryDelayMs: Delay between retries in milliseconds (default: 100) + * @return mixed Result from callback + * @throws Exception + */ + public function withTransaction(array $session, callable $callback, array $options = []) + { + $maxRetries = $options['maxRetries'] ?? 3; + $retryDelayMs = $options['retryDelayMs'] ?? 100; + $attempt = 0; + + while ($attempt <= $maxRetries) { + try { + // Start transaction + $this->startTransaction($session, $options); + + try { + // Execute user callback + $result = $callback($session); + + // Attempt to commit + $commitAttempt = 0; + $maxCommitRetries = 3; + + while ($commitAttempt <= $maxCommitRetries) { + try { + $this->commitTransaction($session, $options); + return $result; + } catch (Exception $e) { + if ($this->isUnknownTransactionCommitResult($e) && $commitAttempt < $maxCommitRetries) { + $commitAttempt++; + if ($retryDelayMs > 0) { + usleep($retryDelayMs * 1000); + } + continue; + } + throw $e; + } + } + } catch (Exception $e) { + // Abort transaction on any error in callback or commit + try { + $this->abortTransaction($session); + } catch (Exception $abortError) { + // Log abort error but don't mask original error + \error_log("Error aborting transaction: " . $abortError->getMessage()); + } + throw $e; + } + } catch (Exception $e) { + if ($this->isTransientTransactionError($e) && $attempt < $maxRetries) { + $attempt++; + if ($retryDelayMs > 0) { + \usleep($retryDelayMs * 1000); + } + continue; + } + throw $e; + } + } + + throw new Exception('Transaction failed after maximum retries'); + } + + /** + * Update causal consistency timestamps from operation results. + * + * @param mixed $result Operation result + */ + private function updateCausalConsistency($result): void + { + if (is_object($result)) { + if (property_exists($result, 'operationTime')) { + $this->operationTime = $result->operationTime; + } + if (property_exists($result, '$clusterTime')) { + $this->clusterTime = $result->{'$clusterTime'}; + } + } + } + + /** + * Get the current operation time for causal consistency. + * + * @return object|null Current operation time + */ + public function getOperationTime(): ?object + { + return $this->operationTime; + } + + /** + * Get the current cluster time for causal consistency. + * + * @return object|null Current cluster time + */ + public function getClusterTime(): ?object + { + return $this->clusterTime; + } + + /** + * Get session state information for debugging. + * + * @param array $session Session to inspect + * @return array Session state information + */ + public function getSessionState(array $session): array + { + $rawId = $session['sessionId'] ?? ($session['id']->id ?? null); + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; + + if (!$sessionId || !isset($this->sessions[$sessionId])) { + return ['error' => 'Session not found']; + } + + return [ + 'sessionId' => $sessionId, + 'state' => $this->sessions[$sessionId]['state'], + 'txnNumber' => $this->sessions[$sessionId]['txnNumber'], + 'lastUse' => $this->sessions[$sessionId]['lastUse'], + 'retryableWriteNumber' => $this->sessions[$sessionId]['retryableWriteNumber'] + ]; + } + + /** + * Validate connection before operation with comprehensive checks. + * + * @throws Exception If connection is invalid + */ + private function validateConnection(): void + { + if (!$this->isConnected) { + throw new Exception('Client is not connected to MongoDB'); + } + + if (!isset($this->client)) { + $this->isConnected = false; + throw new Exception('MongoDB client is not initialized'); + } + + if (!$this->client->isConnected()) { + $this->isConnected = false; + throw new Exception('Connection to MongoDB has been lost'); + } + } + + /** + * Create a write concern object with validation. + * + * @param array|string|int $writeConcern Write concern specification + * @return array Validated write concern + * @throws Exception If write concern is invalid + */ + public function createWriteConcern($writeConcern): array + { + if (is_string($writeConcern)) { + return ['w' => $writeConcern]; + } + + if (is_int($writeConcern)) { + if ($writeConcern < 0) { + throw new Exception('Write concern w value must be >= 0'); + } + return ['w' => $writeConcern]; + } + + if (is_array($writeConcern)) { + $concern = []; + + if (isset($writeConcern['w'])) { + if (is_int($writeConcern['w']) && $writeConcern['w'] < 0) { + throw new Exception('Write concern w value must be >= 0'); + } + $concern['w'] = $writeConcern['w']; + } + + if (isset($writeConcern['j'])) { + $concern['j'] = (bool)$writeConcern['j']; + } + + if (isset($writeConcern['wtimeout'])) { + if (!is_int($writeConcern['wtimeout']) || $writeConcern['wtimeout'] < 0) { + throw new Exception('Write concern wtimeout must be a non-negative integer'); + } + $concern['wtimeout'] = $writeConcern['wtimeout']; + } + + return $concern; + } + + throw new Exception('Invalid write concern format'); + } + + /** + * Check if readConcern should be skipped for a transaction operation + * + * @param array $options The options array containing session + * @return bool True if readConcern should be skipped + */ + private function shouldSkipReadConcern(array $options): bool + { + if (!isset($options['session'])) { + return false; + } + + $sessionData = $options['session']; + + // Use the same extraction logic as in query() method + $sessionId = null; + if (is_array($sessionData) && isset($sessionData['id'])) { + $rawId = $sessionData['id']->id ?? null; + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; + } else { + $rawId = $sessionData->id ?? null; + $sessionId = $rawId instanceof \MongoDB\BSON\Binary + ? bin2hex($rawId->getData()) + : $rawId; + } + + // If in transaction and not first operation, skip readConcern + if ($sessionId && isset($this->sessions[$sessionId]) && + $this->sessions[$sessionId]['state'] === self::TRANSACTION_IN_PROGRESS && + isset($this->sessions[$sessionId]['firstOperationDone'])) { + return true; + } + + return false; + } + + /** + * Create a read concern object with validation. + * + * @param array|string $readConcern Read concern specification + * @return array Validated read concern + * @throws Exception If read concern is invalid + */ + public function createReadConcern($readConcern): array + { + if (is_string($readConcern)) { + $validLevels = [ + self::READ_CONCERN_LOCAL, + self::READ_CONCERN_AVAILABLE, + self::READ_CONCERN_MAJORITY, + self::READ_CONCERN_LINEARIZABLE, + self::READ_CONCERN_SNAPSHOT + ]; + + if (!in_array($readConcern, $validLevels)) { + throw new Exception('Invalid read concern level: ' . $readConcern); + } + + return ['level' => $readConcern]; + } + + if (is_array($readConcern)) { + $concern = []; + + if (isset($readConcern['level'])) { + $validLevels = [ + self::READ_CONCERN_LOCAL, + self::READ_CONCERN_AVAILABLE, + self::READ_CONCERN_MAJORITY, + self::READ_CONCERN_LINEARIZABLE, + self::READ_CONCERN_SNAPSHOT + ]; + + if (!in_array($readConcern['level'], $validLevels)) { + throw new Exception('Invalid read concern level: ' . $readConcern['level']); + } + + $concern['level'] = $readConcern['level']; + } + + if (isset($readConcern['afterClusterTime'])) { + $concern['afterClusterTime'] = $readConcern['afterClusterTime']; + } + + return $concern; + } + + throw new Exception('Invalid read concern format'); + } + + /** + * Get connection status information. + * + * @return array Connection status details + */ + public function getConnectionInfo(): array + { + return [ + 'connected' => $this->isConnected, + 'host' => $this->host, + 'port' => $this->port, + 'database' => $this->database, + 'activeSessions' => count($this->sessions), + 'clusterTimeSet' => $this->clusterTime !== null, + 'operationTimeSet' => $this->operationTime !== null + ]; + } + + /** + * Clean up stale sessions (older than 30 minutes). + */ + public function cleanupStaleSessions(): void + { + $cutoff = time() - 1800; // 30 minutes + $staleSessions = []; + + foreach ($this->sessions as $sessionId => $sessionData) { + if ($sessionData['lastUse'] < $cutoff) { + $staleSessions[] = ['id' => $sessionData['id'], 'sessionId' => $sessionId]; + } + } + + if (!empty($staleSessions)) { + try { + $this->endSessions($staleSessions); + } catch (Exception $e) { + \error_log("Error cleaning up stale sessions: " . $e->getMessage()); + } + } + } + + /** + * Destructor to ensure proper cleanup. + */ + public function __destruct() + { + $this->close(); + } } diff --git a/src/Exception.php b/src/Exception.php index 7d0cf47..3fe28c2 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -2,6 +2,258 @@ namespace Utopia\Mongo; +/** + * Base MongoDB exception class with enhanced error categorization. + * + * Provides better error handling and categorization for MongoDB 8+ operations. + */ class Exception extends \Exception { + protected array $errorLabels = []; + protected ?array $writeErrors = null; + protected ?array $writeConcernErrors = null; + protected ?string $operationType = null; + + /** + * @param string $message Error message + * @param int $code Error code + * @param \Throwable|null $previous Previous exception + * @param array $errorLabels MongoDB error labels + * @param array|null $writeErrors Write errors if applicable + * @param array|null $writeConcernErrors Write concern errors if applicable + * @param string|null $operationType Operation type that caused the error + */ + public function __construct( + string $message = '', + int $code = 0, + ?\Throwable $previous = null, + array $errorLabels = [], + ?array $writeErrors = null, + ?array $writeConcernErrors = null, + ?string $operationType = null + ) { + parent::__construct($message, $code, $previous); + $this->errorLabels = $errorLabels; + $this->writeErrors = $writeErrors; + $this->writeConcernErrors = $writeConcernErrors; + $this->operationType = $operationType; + } + + /** + * Check if this is a transient error that can be retried. + * + * @return bool + */ + public function isTransientError(): bool + { + return in_array('TransientTransactionError', $this->errorLabels) || + in_array('UnknownTransactionCommitResult', $this->errorLabels) || + $this->isNetworkError() || + $this->isRetryableWrite(); + } + + /** + * Check if this is a network-related error. + * + * @return bool + */ + public function isNetworkError(): bool + { + $networkErrorCodes = [ + 11600, 11601, 11602, // Socket errors + 89, // NetworkTimeout + 9001, // SocketException + 6, // HostUnreachable + 7, // HostNotFound + ]; + + return in_array($this->code, $networkErrorCodes); + } + + /** + * Check if this is a retryable write error. + * + * @return bool + */ + public function isRetryableWrite(): bool + { + return in_array('RetryableWriteError', $this->errorLabels); + } + + /** + * Check if this is a duplicate key error. + * + * @return bool + */ + public function isDuplicateKeyError(): bool + { + return $this->code === 11000 || $this->code === 11001; + } + + /** + * Check if this is a write concern error. + * + * @return bool + */ + public function isWriteConcernError(): bool + { + return !empty($this->writeConcernErrors); + } + + /** + * Check if this is a timeout error. + * + * @return bool + */ + public function isTimeoutError(): bool + { + $timeoutCodes = [ + 50, // MaxTimeMSExpired + 89, // NetworkTimeout + 11601, // SocketTimeout + ]; + + return in_array($this->code, $timeoutCodes); + } + + /** + * Get error labels. + * + * @return array + */ + public function getErrorLabels(): array + { + return $this->errorLabels; + } + + /** + * Get write errors. + * + * @return array|null + */ + public function getWriteErrors(): ?array + { + return $this->writeErrors; + } + + /** + * Get write concern errors. + * + * @return array|null + */ + public function getWriteConcernErrors(): ?array + { + return $this->writeConcernErrors; + } + + /** + * Get the operation type that caused this error. + * + * @return string|null + */ + public function getOperationType(): ?string + { + return $this->operationType; + } + + /** + * Get a human-readable error category. + * + * @return string + */ + public function getErrorCategory(): string + { + if ($this->isNetworkError()) { + return 'Network Error'; + } + + if ($this->isTimeoutError()) { + return 'Timeout Error'; + } + + if ($this->isDuplicateKeyError()) { + return 'Duplicate Key Error'; + } + + if ($this->isWriteConcernError()) { + return 'Write Concern Error'; + } + + if ($this->isTransientError()) { + return 'Transient Error'; + } + + return 'MongoDB Error'; + } + + /** + * Create exception from MongoDB error response. + * + * @param \stdClass $errorResponse MongoDB error response + * @param string|null $operationType Operation type + * @return static + */ + public static function fromResponse(\stdClass $errorResponse, ?string $operationType = null): static + { + $message = $errorResponse->errmsg ?? 'Unknown MongoDB error'; + $code = $errorResponse->code ?? 0; + $errorLabels = $errorResponse->errorLabels ?? []; + $writeErrors = $errorResponse->writeErrors ?? null; + $writeConcernErrors = $errorResponse->writeConcernErrors ?? null; + + return new static( + $message, + $code, + null, + $errorLabels, + $writeErrors, + $writeConcernErrors, + $operationType + ); + } +} + +/** + * Connection-related exception. + */ +class ConnectionException extends Exception +{ +} + +/** + * Authentication-related exception. + */ +class AuthenticationException extends Exception +{ +} + +/** + * Transaction-related exception. + */ +class TransactionException extends Exception +{ +} + +/** + * Bulk write operation exception. + */ +class BulkWriteException extends Exception +{ + private array $result; + + public function __construct(string $message, array $result, int $code = 0, ?\Throwable $previous = null) + { + parent::__construct($message, $code, $previous); + $this->result = $result; + } + + /** + * Get the bulk write result. + * + * @return array + */ + public function getResult(): array + { + return $this->result; + } } diff --git a/tests/MongoTest.php b/tests/MongoTest.php index 80da815..11c0064 100644 --- a/tests/MongoTest.php +++ b/tests/MongoTest.php @@ -13,6 +13,7 @@ class MongoTest extends TestCase /** * @throws Exception + * @throws \Exception */ public static function getDatabase(): Client { @@ -20,7 +21,15 @@ public static function getDatabase(): Client return self::$db; } - $client = new Client('testing', 'mongo', 27017, 'root', 'example', false); + $client = new Client( + database: 'testing', + host: 'mongo', + port: 27017, + user: 'root', + password: 'example', + useCoroutine: false + ); + $client->connect(); self::$db = $client; @@ -29,7 +38,7 @@ public static function getDatabase(): Client public function testDeleteDatabase() { - self::assertTrue($this->getDatabase()->dropDatabase([])); + self::assertTrue($this->getDatabase()->dropDatabase()); } @@ -221,7 +230,7 @@ public function testExceedTimeException() $this->getDatabase()->find( 'movies', ['$where' => 'sleep(1000) || true'], - ['maxTimeMS'=> 1] + ['maxTimeMS' => 1] )->cursor->firstBatch ?? []; } diff --git a/tests/TransactionTest.php b/tests/TransactionTest.php new file mode 100644 index 0000000..1ae7643 --- /dev/null +++ b/tests/TransactionTest.php @@ -0,0 +1,424 @@ +connect(); + } catch (Exception $e) { + self::markTestSkipped('MongoDB connection failed: ' . $e->getMessage()); + } + } + + public static function tearDownAfterClass(): void + { + self::$client?->close(); + } + + private function getClient(): Client + { + if (!self::$client) { + self::markTestSkipped('MongoDB client not available'); + } + return self::$client; + } + + /** + * Check if MongoDB instance supports transactions (requires replica set) + */ + private function isReplicatSet(): bool + { + try { + $client = $this->getClient(); + + // Try to get server status to check if it's a replica set + $result = $client->query(['isMaster' => 1], 'admin'); + + // Check various fields that indicate replica set or sharding + $isReplicaSet = isset($result->setName) || + isset($result->hosts) || + isset($result->ismaster) && isset($result->secondary) || + isset($result->isWritablePrimary) || + isset($result->msg) && $result->msg === 'isdbgrid'; + + if (!$isReplicaSet) { + return false; + } + } catch (\Exception $e) { + try { + $client = $this->getClient(); + $testSession = $client->startSession(); + $client->startTransaction($testSession); + + // Try a simple insert with transaction params + $client->insert( + '_test_tx_check', + ['test' => true], + ['session' => $testSession] + ); + + // If we get here, transactions work - abort and clean up + $client->abortTransaction($testSession); + $client->endSessions([$testSession]); + $client->delete('_test_tx_check', ['test' => true]); + } catch (Exception $txError) { + if (strpos($txError->getMessage(), 'Transaction numbers are only allowed') !== false || + strpos($txError->getMessage(), 'IllegalOperation') !== false) { + return false; + } + } + } + + return true; + } + + /** + * Test basic session creation and cleanup + */ + public function testSessionManagement() + { + $client = $this->getClient(); + + // Create session with options + $session = $client->startSession([ + 'causalConsistency' => true, + 'defaultTransactionOptions' => [ + 'readConcern' => ['level' => 'majority'], + 'writeConcern' => ['w' => 1, 'j' => true] + ] + ]); + + $this->assertArrayHasKey('id', $session); + $this->assertArrayHasKey('sessionId', $session); + + // Get session state + $state = $client->getSessionState($session); + $this->assertEquals('none', $state['state']); + $this->assertEquals(0, $state['txnNumber']); + + // End session + $client->endSessions([$session]); + + // Verify session is cleaned up + $stateAfterEnd = $client->getSessionState($session); + $this->assertArrayHasKey('error', $stateAfterEnd); + } + + /** + * Test transaction with proper state management + */ + public function testTransactionStateManagement() + { + if (!$this->isReplicatSet()) { + $this->expectNotToPerformAssertions(); + return; + } + + $client = $this->getClient(); + $session = $client->startSession(); + + try { + // Start transaction + $result = $client->startTransaction($session, [ + 'readConcern' => ['level' => 'majority'], + 'writeConcern' => ['w' => 1] + ]); + + $this->assertTrue($result); + + // Verify transaction state + $state = $client->getSessionState($session); + $this->assertEquals('in_progress', $state['state']); + $this->assertEquals(1, $state['txnNumber']); + + // Perform operations within transaction + $client->insert( + 'test_collection', + ['name' => 'test_doc', 'value' => 42], + ['session' => $session] + ); + + // Commit transaction + $commitResult = $client->commitTransaction($session); + $this->assertNotNull($commitResult); + + // Verify final state + $finalState = $client->getSessionState($session); + $this->assertEquals('committed', $finalState['state']); + + } finally { + $client->endSessions([$session]); + } + } + + /** + * Test transaction abort functionality + */ + public function testTransactionAbort() + { + if (!$this->isReplicatSet()) { + $this->expectNotToPerformAssertions(); + return; + } + + $client = $this->getClient(); + $session = $client->startSession(); + + try { + $client->startTransaction($session); + + // Insert a document + $client->insert( + 'test_collection', + ['name' => 'abort_test', 'value' => 999], + ['session' => $session] + ); + + // Abort transaction + $abortResult = $client->abortTransaction($session); + $this->assertNotNull($abortResult); + + // Verify transaction was aborted + $state = $client->getSessionState($session); + $this->assertEquals('aborted', $state['state']); + + // Verify document was not inserted (transaction rolled back) + $found = $client->find('test_collection', ['name' => 'abort_test']); + $this->assertEmpty($found->cursor->firstBatch); + + } finally { + $client->endSessions([$session]); + } + } + + /** + * Test withTransaction helper with retry logic + */ + public function testWithTransactionHelper() + { + if (!$this->isReplicatSet()) { + $this->expectNotToPerformAssertions(); + return; + } + + $client = $this->getClient(); + $session = $client->startSession(); + + try { + $result = $client->withTransaction($session, function ($session) use ($client) { + // Insert multiple documents in transaction + $client->insert( + 'test_collection', + ['name' => 'with_transaction_1', 'counter' => 1], + ['session' => $session] + ); + + $client->insert( + 'test_collection', + ['name' => 'with_transaction_2', 'counter' => 2], + ['session' => $session] + ); + + return 'transaction_completed'; + }, [ + 'readConcern' => ['level' => 'majority'], + 'writeConcern' => ['w' => 1], + 'maxRetries' => 3 + ]); + + $this->assertEquals('transaction_completed', $result); + + // Verify both documents were inserted + $found1 = $client->find('test_collection', ['name' => 'with_transaction_1']); + $found2 = $client->find('test_collection', ['name' => 'with_transaction_2']); + + $this->assertNotEmpty($found1->cursor->firstBatch); + $this->assertNotEmpty($found2->cursor->firstBatch); + + } finally { + $client->endSessions([$session]); + } + } + + /** + * Test read and write concerns validation + */ + public function testReadWriteConcerns() + { + $client = $this->getClient(); + + // Test valid write concern + $writeConcern = $client->createWriteConcern(['w' => 'majority', 'j' => true, 'wtimeout' => 5000]); + $this->assertEquals('majority', $writeConcern['w']); + $this->assertTrue($writeConcern['j']); + $this->assertEquals(5000, $writeConcern['wtimeout']); + + // Test valid read concern + $readConcern = $client->createReadConcern(['level' => 'majority']); + $this->assertEquals('majority', $readConcern['level']); + + // Test invalid read concern should throw exception + $this->expectException(Exception::class); + $client->createReadConcern(['level' => 'invalid']); + } + + /** + * Test CRUD operations with session and concerns + */ + public function testCRUDWithSessionAndConcerns() + { + if (!$this->isReplicatSet()) { + $this->expectNotToPerformAssertions(); + return; + } + + $client = $this->getClient(); + $session = $client->startSession(); + + try { + $client->startTransaction($session); + + // Insert with session and write concern + $insertedDoc = $client->insert( + 'test_collection', + ['name' => 'crud_test', 'status' => 'active'], + [ + 'session' => $session, + 'writeConcern' => ['w' => 1, 'j' => true] + ] + ); + + $this->assertArrayHasKey('_id', $insertedDoc); + + // Find with session and read concern + $found = $client->find( + 'test_collection', + ['name' => 'crud_test'], + [ + 'session' => $session, + 'readConcern' => ['level' => 'local'] + ] + ); + + $this->assertNotEmpty($found->cursor->firstBatch); + + // Update with session + $client->update( + 'test_collection', + ['name' => 'crud_test'], + ['$set' => ['status' => 'updated']], + [ + 'session' => $session, + 'writeConcern' => ['w' => 1] + ] + ); + + // Count with session + $count = $client->count( + 'test_collection', + ['name' => 'crud_test'], + [ + 'session' => $session, + 'readConcern' => ['level' => 'local'] + ] + ); + + $this->assertEquals(1, $count); + + $client->commitTransaction($session); + + } finally { + $client->endSessions([$session]); + } + } + + /** + * Test connection validation and health checks + */ + public function testConnectionValidation() + { + $client = $this->getClient(); + + // Get connection info + $info = $client->getConnectionInfo(); + + $this->assertTrue($info['connected']); + $this->assertEquals('mongo', $info['host']); + $this->assertEquals(27017, $info['port']); + $this->assertEquals('testing', $info['database']); + $this->assertIsInt($info['activeSessions']); + } + + /** + * Test causal consistency tracking + */ + public function testCausalConsistency() + { + $client = $this->getClient(); + + // Perform an operation to get operation time + $client->find('test_collection', []); + + // Check if operation time is tracked + $operationTime = $client->getOperationTime(); + $clusterTime = $client->getClusterTime(); + + // These may be null if not in a replica set, which is fine for testing + $this->assertTrue($operationTime === null || is_object($operationTime)); + $this->assertTrue($clusterTime === null || is_object($clusterTime)); + } + + /** + * Test session cleanup + */ + public function testSessionCleanup() + { + $client = $this->getClient(); + + // Create multiple sessions + $sessions = []; + for ($i = 0; $i < 3; $i++) { + $sessions[] = $client->startSession(); + } + + // Verify sessions are active + $info = $client->getConnectionInfo(); + $this->assertGreaterThanOrEqual(3, $info['activeSessions']); + + // End all sessions + $client->endSessions($sessions); + + // Verify sessions are cleaned up + $infoAfter = $client->getConnectionInfo(); + $this->assertLessThan($info['activeSessions'], $infoAfter['activeSessions']); + } + + protected function tearDown(): void + { + // Clean up test data + if (self::$client) { + try { + self::$client->dropCollection('test_collection'); + } catch (Exception $e) { + // Ignore cleanup errors + } + } + } +}