This commit is contained in:
2025-04-01 00:38:54 +02:00
parent d4d4c0c09d
commit 87da06293a
22351 changed files with 5168854 additions and 7538 deletions

View File

@@ -0,0 +1,74 @@
<?php
namespace GuzzleHttp\Ring\Client;
/**
* Client specific utility functions.
*/
class ClientUtils
{
/**
* Returns the default cacert bundle for the current system.
*
* First, the openssl.cafile and curl.cainfo php.ini settings are checked.
* If those settings are not configured, then the common locations for
* bundles found on Red Hat, CentOS, Fedora, Ubuntu, Debian, FreeBSD, OS X
* and Windows are checked. If any of these file locations are found on
* disk, they will be utilized.
*
* Note: the result of this function is cached for subsequent calls.
*
* @return string
* @throws \RuntimeException if no bundle can be found.
*/
public static function getDefaultCaBundle()
{
static $cached = null;
static $cafiles = [
// Red Hat, CentOS, Fedora (provided by the ca-certificates package)
'/etc/pki/tls/certs/ca-bundle.crt',
// Ubuntu, Debian (provided by the ca-certificates package)
'/etc/ssl/certs/ca-certificates.crt',
// FreeBSD (provided by the ca_root_nss package)
'/usr/local/share/certs/ca-root-nss.crt',
// OS X provided by homebrew (using the default path)
'/usr/local/etc/openssl/cert.pem',
// Windows?
'C:\\windows\\system32\\curl-ca-bundle.crt',
'C:\\windows\\curl-ca-bundle.crt',
];
if ($cached) {
return $cached;
}
if ($ca = ini_get('openssl.cafile')) {
return $cached = $ca;
}
if ($ca = ini_get('curl.cainfo')) {
return $cached = $ca;
}
foreach ($cafiles as $filename) {
if (file_exists($filename)) {
return $cached = $filename;
}
}
throw new \RuntimeException(self::CA_ERR);
}
const CA_ERR = "
No system CA bundle could be found in any of the the common system locations.
PHP versions earlier than 5.6 are not properly configured to use the system's
CA bundle by default. In order to verify peer certificates, you will need to
supply the path on disk to a certificate bundle to the 'verify' request
option: http://docs.guzzlephp.org/en/5.3/clients.html#verify. If you do not
need a specific certificate bundle, then Mozilla provides a commonly used CA
bundle which can be downloaded here (provided by the maintainer of cURL):
https://raw.githubusercontent.com/bagder/ca-bundle/master/ca-bundle.crt. Once
you have a CA bundle available on disk, you can set the 'openssl.cafile' PHP
ini setting to point to the path to the file, allowing you to omit the 'verify'
request option. See http://curl.haxx.se/docs/sslcerts.html for more
information.";
}

View File

@@ -0,0 +1,560 @@
<?php
namespace GuzzleHttp\Ring\Client;
use GuzzleHttp\Ring\Core;
use GuzzleHttp\Ring\Exception\ConnectException;
use GuzzleHttp\Ring\Exception\RingException;
use GuzzleHttp\Stream\LazyOpenStream;
use GuzzleHttp\Stream\StreamInterface;
/**
* Creates curl resources from a request
*/
class CurlFactory
{
/**
* Creates a cURL handle, header resource, and body resource based on a
* transaction.
*
* @param array $request Request hash
* @param null|resource $handle Optionally provide a curl handle to modify
*
* @return array Returns an array of the curl handle, headers array, and
* response body handle.
* @throws \RuntimeException when an option cannot be applied
*/
public function __invoke(array $request, $handle = null)
{
$headers = [];
$options = $this->getDefaultOptions($request, $headers);
$this->applyMethod($request, $options);
if (isset($request['client'])) {
$this->applyHandlerOptions($request, $options);
}
$this->applyHeaders($request, $options);
unset($options['_headers']);
// Add handler options from the request's configuration options
if (isset($request['client']['curl'])) {
$options = $this->applyCustomCurlOptions(
$request['client']['curl'],
$options
);
}
if (!$handle) {
$handle = curl_init();
}
$body = $this->getOutputBody($request, $options);
curl_setopt_array($handle, $options);
return [$handle, &$headers, $body];
}
/**
* Creates a response hash from a cURL result.
*
* @param callable $handler Handler that was used.
* @param array $request Request that sent.
* @param array $response Response hash to update.
* @param array $headers Headers received during transfer.
* @param resource $body Body fopen response.
*
* @return array
*/
public static function createResponse(
callable $handler,
array $request,
array $response,
array $headers,
$body
) {
if (isset($response['transfer_stats']['url'])) {
$response['effective_url'] = $response['transfer_stats']['url'];
}
if (!empty($headers)) {
$startLine = explode(' ', array_shift($headers), 3);
$headerList = Core::headersFromLines($headers);
$response['headers'] = $headerList;
$response['version'] = isset($startLine[0]) ? substr($startLine[0], 5) : null;
$response['status'] = isset($startLine[1]) ? (int) $startLine[1] : null;
$response['reason'] = isset($startLine[2]) ? $startLine[2] : null;
$response['body'] = $body;
Core::rewindBody($response);
}
return !empty($response['curl']['errno']) || !isset($response['status'])
? self::createErrorResponse($handler, $request, $response)
: $response;
}
private static function createErrorResponse(
callable $handler,
array $request,
array $response
) {
static $connectionErrors = [
CURLE_OPERATION_TIMEOUTED => true,
CURLE_COULDNT_RESOLVE_HOST => true,
CURLE_COULDNT_CONNECT => true,
CURLE_SSL_CONNECT_ERROR => true,
CURLE_GOT_NOTHING => true,
];
// Retry when nothing is present or when curl failed to rewind.
if (!isset($response['err_message'])
&& (empty($response['curl']['errno'])
|| $response['curl']['errno'] == 65)
) {
return self::retryFailedRewind($handler, $request, $response);
}
$message = isset($response['err_message'])
? $response['err_message']
: sprintf('cURL error %s: %s',
$response['curl']['errno'],
isset($response['curl']['error'])
? $response['curl']['error']
: 'See http://curl.haxx.se/libcurl/c/libcurl-errors.html');
$error = isset($response['curl']['errno'])
&& isset($connectionErrors[$response['curl']['errno']])
? new ConnectException($message)
: new RingException($message);
return $response + [
'status' => null,
'reason' => null,
'body' => null,
'headers' => [],
'error' => $error,
];
}
private function getOutputBody(array $request, array &$options)
{
// Determine where the body of the response (if any) will be streamed.
if (isset($options[CURLOPT_WRITEFUNCTION])) {
return $request['client']['save_to'];
}
if (isset($options[CURLOPT_FILE])) {
return $options[CURLOPT_FILE];
}
if ($request['http_method'] != 'HEAD') {
// Create a default body if one was not provided
return $options[CURLOPT_FILE] = fopen('php://temp', 'w+');
}
return null;
}
private function getDefaultOptions(array $request, array &$headers)
{
$url = Core::url($request);
$startingResponse = false;
$options = [
'_headers' => $request['headers'],
CURLOPT_CUSTOMREQUEST => $request['http_method'],
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => false,
CURLOPT_HEADER => false,
CURLOPT_CONNECTTIMEOUT => 150,
CURLOPT_HEADERFUNCTION => function ($ch, $h) use (&$headers, &$startingResponse) {
$value = trim($h);
if ($value === '') {
$startingResponse = true;
} elseif ($startingResponse) {
$startingResponse = false;
$headers = [$value];
} else {
$headers[] = $value;
}
return strlen($h);
},
];
if (isset($request['version'])) {
if ($request['version'] == 2.0) {
$options[CURLOPT_HTTP_VERSION] = CURL_HTTP_VERSION_2_0;
} else if ($request['version'] == 1.1) {
$options[CURLOPT_HTTP_VERSION] = CURL_HTTP_VERSION_1_1;
} else {
$options[CURLOPT_HTTP_VERSION] = CURL_HTTP_VERSION_1_0;
}
}
if (defined('CURLOPT_PROTOCOLS')) {
$options[CURLOPT_PROTOCOLS] = CURLPROTO_HTTP | CURLPROTO_HTTPS;
}
return $options;
}
private function applyMethod(array $request, array &$options)
{
if (isset($request['body'])) {
$this->applyBody($request, $options);
return;
}
switch ($request['http_method']) {
case 'PUT':
case 'POST':
// See http://tools.ietf.org/html/rfc7230#section-3.3.2
if (!Core::hasHeader($request, 'Content-Length')) {
$options[CURLOPT_HTTPHEADER][] = 'Content-Length: 0';
}
break;
case 'HEAD':
$options[CURLOPT_NOBODY] = true;
unset(
$options[CURLOPT_WRITEFUNCTION],
$options[CURLOPT_READFUNCTION],
$options[CURLOPT_FILE],
$options[CURLOPT_INFILE]
);
}
}
private function applyBody(array $request, array &$options)
{
$contentLength = Core::firstHeader($request, 'Content-Length');
$size = $contentLength !== null ? (int) $contentLength : null;
// Send the body as a string if the size is less than 1MB OR if the
// [client][curl][body_as_string] request value is set.
if (($size !== null && $size < 1000000) ||
isset($request['client']['curl']['body_as_string']) ||
is_string($request['body'])
) {
$options[CURLOPT_POSTFIELDS] = Core::body($request);
// Don't duplicate the Content-Length header
$this->removeHeader('Content-Length', $options);
$this->removeHeader('Transfer-Encoding', $options);
} else {
$options[CURLOPT_UPLOAD] = true;
if ($size !== null) {
// Let cURL handle setting the Content-Length header
$options[CURLOPT_INFILESIZE] = $size;
$this->removeHeader('Content-Length', $options);
}
$this->addStreamingBody($request, $options);
}
// If the Expect header is not present, prevent curl from adding it
if (!Core::hasHeader($request, 'Expect')) {
$options[CURLOPT_HTTPHEADER][] = 'Expect:';
}
// cURL sometimes adds a content-type by default. Prevent this.
if (!Core::hasHeader($request, 'Content-Type')) {
$options[CURLOPT_HTTPHEADER][] = 'Content-Type:';
}
}
private function addStreamingBody(array $request, array &$options)
{
$body = $request['body'];
if ($body instanceof StreamInterface) {
$options[CURLOPT_READFUNCTION] = function ($ch, $fd, $length) use ($body) {
return (string) $body->read($length);
};
if (!isset($options[CURLOPT_INFILESIZE])) {
if ($size = $body->getSize()) {
$options[CURLOPT_INFILESIZE] = $size;
}
}
} elseif (is_resource($body)) {
$options[CURLOPT_INFILE] = $body;
} elseif ($body instanceof \Iterator) {
$buf = '';
$options[CURLOPT_READFUNCTION] = function ($ch, $fd, $length) use ($body, &$buf) {
if ($body->valid()) {
$buf .= $body->current();
$body->next();
}
$result = (string) substr($buf, 0, $length);
$buf = substr($buf, $length);
return $result;
};
} else {
throw new \InvalidArgumentException('Invalid request body provided');
}
}
private function applyHeaders(array $request, array &$options)
{
foreach ($options['_headers'] as $name => $values) {
foreach ($values as $value) {
$options[CURLOPT_HTTPHEADER][] = "$name: $value";
}
}
// Remove the Accept header if one was not set
if (!Core::hasHeader($request, 'Accept')) {
$options[CURLOPT_HTTPHEADER][] = 'Accept:';
}
}
/**
* Takes an array of curl options specified in the 'curl' option of a
* request's configuration array and maps them to CURLOPT_* options.
*
* This method is only called when a request has a 'curl' config setting.
*
* @param array $config Configuration array of custom curl option
* @param array $options Array of existing curl options
*
* @return array Returns a new array of curl options
*/
private function applyCustomCurlOptions(array $config, array $options)
{
$curlOptions = [];
foreach ($config as $key => $value) {
if (is_int($key)) {
$curlOptions[$key] = $value;
}
}
return $curlOptions + $options;
}
/**
* Remove a header from the options array.
*
* @param string $name Case-insensitive header to remove
* @param array $options Array of options to modify
*/
private function removeHeader($name, array &$options)
{
foreach (array_keys($options['_headers']) as $key) {
if (!strcasecmp($key, $name)) {
unset($options['_headers'][$key]);
return;
}
}
}
/**
* Applies an array of request client options to a the options array.
*
* This method uses a large switch rather than double-dispatch to save on
* high overhead of calling functions in PHP.
*/
private function applyHandlerOptions(array $request, array &$options)
{
foreach ($request['client'] as $key => $value) {
switch ($key) {
// Violating PSR-4 to provide more room.
case 'verify':
if ($value === false) {
unset($options[CURLOPT_CAINFO]);
$options[CURLOPT_SSL_VERIFYHOST] = 0;
$options[CURLOPT_SSL_VERIFYPEER] = false;
continue 2;
}
$options[CURLOPT_SSL_VERIFYHOST] = 2;
$options[CURLOPT_SSL_VERIFYPEER] = true;
if (is_string($value)) {
$options[CURLOPT_CAINFO] = $value;
if (!file_exists($value)) {
throw new \InvalidArgumentException(
"SSL CA bundle not found: $value"
);
}
}
break;
case 'decode_content':
if ($value === false) {
continue 2;
}
$accept = Core::firstHeader($request, 'Accept-Encoding');
if ($accept) {
$options[CURLOPT_ENCODING] = $accept;
} else {
$options[CURLOPT_ENCODING] = '';
// Don't let curl send the header over the wire
$options[CURLOPT_HTTPHEADER][] = 'Accept-Encoding:';
}
break;
case 'save_to':
if (is_string($value)) {
if (!is_dir(dirname($value))) {
throw new \RuntimeException(sprintf(
'Directory %s does not exist for save_to value of %s',
dirname($value),
$value
));
}
$value = new LazyOpenStream($value, 'w+');
}
if ($value instanceof StreamInterface) {
$options[CURLOPT_WRITEFUNCTION] =
function ($ch, $write) use ($value) {
return $value->write($write);
};
} elseif (is_resource($value)) {
$options[CURLOPT_FILE] = $value;
} else {
throw new \InvalidArgumentException('save_to must be a '
. 'GuzzleHttp\Stream\StreamInterface or resource');
}
break;
case 'timeout':
if (defined('CURLOPT_TIMEOUT_MS')) {
$options[CURLOPT_TIMEOUT_MS] = $value * 1000;
} else {
$options[CURLOPT_TIMEOUT] = $value;
}
break;
case 'connect_timeout':
if (defined('CURLOPT_CONNECTTIMEOUT_MS')) {
$options[CURLOPT_CONNECTTIMEOUT_MS] = $value * 1000;
} else {
$options[CURLOPT_CONNECTTIMEOUT] = $value;
}
break;
case 'proxy':
if (!is_array($value)) {
$options[CURLOPT_PROXY] = $value;
} elseif (isset($request['scheme'])) {
$scheme = $request['scheme'];
if (isset($value[$scheme])) {
$options[CURLOPT_PROXY] = $value[$scheme];
}
}
break;
case 'cert':
if (is_array($value)) {
$options[CURLOPT_SSLCERTPASSWD] = $value[1];
$value = $value[0];
}
if (!file_exists($value)) {
throw new \InvalidArgumentException(
"SSL certificate not found: {$value}"
);
}
$options[CURLOPT_SSLCERT] = $value;
break;
case 'ssl_key':
if (is_array($value)) {
$options[CURLOPT_SSLKEYPASSWD] = $value[1];
$value = $value[0];
}
if (!file_exists($value)) {
throw new \InvalidArgumentException(
"SSL private key not found: {$value}"
);
}
$options[CURLOPT_SSLKEY] = $value;
break;
case 'progress':
if (!is_callable($value)) {
throw new \InvalidArgumentException(
'progress client option must be callable'
);
}
$options[CURLOPT_NOPROGRESS] = false;
$options[CURLOPT_PROGRESSFUNCTION] =
function () use ($value) {
$args = func_get_args();
// PHP 5.5 pushed the handle onto the start of the args
if (is_resource($args[0])) {
array_shift($args);
}
call_user_func_array($value, $args);
};
break;
case 'debug':
if ($value) {
$options[CURLOPT_STDERR] = Core::getDebugResource($value);
$options[CURLOPT_VERBOSE] = true;
}
break;
}
}
}
/**
* This function ensures that a response was set on a transaction. If one
* was not set, then the request is retried if possible. This error
* typically means you are sending a payload, curl encountered a
* "Connection died, retrying a fresh connect" error, tried to rewind the
* stream, and then encountered a "necessary data rewind wasn't possible"
* error, causing the request to be sent through curl_multi_info_read()
* without an error status.
*/
private static function retryFailedRewind(
callable $handler,
array $request,
array $response
) {
// If there is no body, then there is some other kind of issue. This
// is weird and should probably never happen.
if (!isset($request['body'])) {
$response['err_message'] = 'No response was received for a request '
. 'with no body. This could mean that you are saturating your '
. 'network.';
return self::createErrorResponse($handler, $request, $response);
}
if (!Core::rewindBody($request)) {
$response['err_message'] = 'The connection unexpectedly failed '
. 'without providing an error. The request would have been '
. 'retried, but attempting to rewind the request body failed.';
return self::createErrorResponse($handler, $request, $response);
}
// Retry no more than 3 times before giving up.
if (!isset($request['curl']['retries'])) {
$request['curl']['retries'] = 1;
} elseif ($request['curl']['retries'] == 2) {
$response['err_message'] = 'The cURL request was retried 3 times '
. 'and did no succeed. cURL was unable to rewind the body of '
. 'the request and subsequent retries resulted in the same '
. 'error. Turn on the debug option to see what went wrong. '
. 'See https://bugs.php.net/bug.php?id=47204 for more information.';
return self::createErrorResponse($handler, $request, $response);
} else {
$request['curl']['retries']++;
}
return $handler($request);
}
}

View File

@@ -0,0 +1,135 @@
<?php
namespace GuzzleHttp\Ring\Client;
use GuzzleHttp\Ring\Future\CompletedFutureArray;
use GuzzleHttp\Ring\Core;
/**
* HTTP handler that uses cURL easy handles as a transport layer.
*
* Requires PHP 5.5+
*
* When using the CurlHandler, custom curl options can be specified as an
* associative array of curl option constants mapping to values in the
* **curl** key of the "client" key of the request.
*/
class CurlHandler
{
/** @var callable */
private $factory;
/** @var array Array of curl easy handles */
private $handles = [];
/** @var array Array of owned curl easy handles */
private $ownedHandles = [];
/** @var int Total number of idle handles to keep in cache */
private $maxHandles;
/**
* Accepts an associative array of options:
*
* - factory: Optional callable factory used to create cURL handles.
* The callable is passed a request hash when invoked, and returns an
* array of the curl handle, headers resource, and body resource.
* - max_handles: Maximum number of idle handles (defaults to 5).
*
* @param array $options Array of options to use with the handler
*/
public function __construct(array $options = [])
{
$this->handles = $this->ownedHandles = [];
$this->factory = isset($options['handle_factory'])
? $options['handle_factory']
: new CurlFactory();
$this->maxHandles = isset($options['max_handles'])
? $options['max_handles']
: 5;
}
public function __destruct()
{
foreach ($this->handles as $handle) {
if (is_resource($handle)) {
curl_close($handle);
}
}
}
/**
* @param array $request
*
* @return CompletedFutureArray
*/
public function __invoke(array $request)
{
return new CompletedFutureArray(
$this->_invokeAsArray($request)
);
}
/**
* @internal
*
* @param array $request
*
* @return array
*/
public function _invokeAsArray(array $request)
{
$factory = $this->factory;
// Ensure headers are by reference. They're updated elsewhere.
$result = $factory($request, $this->checkoutEasyHandle());
$h = $result[0];
$hd =& $result[1];
$bd = $result[2];
Core::doSleep($request);
curl_exec($h);
$response = ['transfer_stats' => curl_getinfo($h)];
$response['curl']['error'] = curl_error($h);
$response['curl']['errno'] = curl_errno($h);
$response['transfer_stats'] = array_merge($response['transfer_stats'], $response['curl']);
$this->releaseEasyHandle($h);
return CurlFactory::createResponse([$this, '_invokeAsArray'], $request, $response, $hd, $bd);
}
private function checkoutEasyHandle()
{
// Find an unused handle in the cache
if (false !== ($key = array_search(false, $this->ownedHandles, true))) {
$this->ownedHandles[$key] = true;
return $this->handles[$key];
}
// Add a new handle
$handle = curl_init();
$id = (int) $handle;
$this->handles[$id] = $handle;
$this->ownedHandles[$id] = true;
return $handle;
}
private function releaseEasyHandle($handle)
{
$id = (int) $handle;
if (count($this->ownedHandles) > $this->maxHandles) {
curl_close($this->handles[$id]);
unset($this->handles[$id], $this->ownedHandles[$id]);
} else {
// curl_reset doesn't clear these out for some reason
static $unsetValues = [
CURLOPT_HEADERFUNCTION => null,
CURLOPT_WRITEFUNCTION => null,
CURLOPT_READFUNCTION => null,
CURLOPT_PROGRESSFUNCTION => null,
];
curl_setopt_array($handle, $unsetValues);
curl_reset($handle);
$this->ownedHandles[$id] = false;
}
}
}

View File

@@ -0,0 +1,248 @@
<?php
namespace GuzzleHttp\Ring\Client;
use GuzzleHttp\Ring\Future\FutureArray;
use React\Promise\Deferred;
/**
* Returns an asynchronous response using curl_multi_* functions.
*
* This handler supports future responses and the "delay" request client
* option that can be used to delay before sending a request.
*
* When using the CurlMultiHandler, custom curl options can be specified as an
* associative array of curl option constants mapping to values in the
* **curl** key of the "client" key of the request.
*
* @property resource $_mh Internal use only. Lazy loaded multi-handle.
*/
class CurlMultiHandler
{
/** @var callable */
private $factory;
private $selectTimeout;
private $active;
private $handles = [];
private $delays = [];
private $maxHandles;
/**
* This handler accepts the following options:
*
* - mh: An optional curl_multi resource
* - handle_factory: An optional callable used to generate curl handle
* resources. the callable accepts a request hash and returns an array
* of the handle, headers file resource, and the body resource.
* - select_timeout: Optional timeout (in seconds) to block before timing
* out while selecting curl handles. Defaults to 1 second.
* - max_handles: Optional integer representing the maximum number of
* open requests. When this number is reached, the queued futures are
* flushed.
*
* @param array $options
*/
public function __construct(array $options = [])
{
if (isset($options['mh'])) {
$this->_mh = $options['mh'];
}
$this->factory = isset($options['handle_factory'])
? $options['handle_factory'] : new CurlFactory();
$this->selectTimeout = isset($options['select_timeout'])
? $options['select_timeout'] : 1;
$this->maxHandles = isset($options['max_handles'])
? $options['max_handles'] : 100;
}
public function __get($name)
{
if ($name === '_mh') {
return $this->_mh = curl_multi_init();
}
throw new \BadMethodCallException();
}
public function __destruct()
{
// Finish any open connections before terminating the script.
if ($this->handles) {
$this->execute();
}
if (isset($this->_mh)) {
curl_multi_close($this->_mh);
unset($this->_mh);
}
}
public function __invoke(array $request)
{
$factory = $this->factory;
$result = $factory($request);
$entry = [
'request' => $request,
'response' => [],
'handle' => $result[0],
'headers' => &$result[1],
'body' => $result[2],
'deferred' => new Deferred(),
];
$id = (int) $result[0];
$future = new FutureArray(
$entry['deferred']->promise(),
[$this, 'execute'],
function () use ($id) {
return $this->cancel($id);
}
);
$this->addRequest($entry);
// Transfer outstanding requests if there are too many open handles.
if (count($this->handles) >= $this->maxHandles) {
$this->execute();
}
return $future;
}
/**
* Runs until all outstanding connections have completed.
*/
public function execute()
{
do {
if ($this->active &&
curl_multi_select($this->_mh, $this->selectTimeout) === -1
) {
// Perform a usleep if a select returns -1.
// See: https://bugs.php.net/bug.php?id=61141
usleep(250);
}
// Add any delayed futures if needed.
if ($this->delays) {
$this->addDelays();
}
do {
$mrc = curl_multi_exec($this->_mh, $this->active);
} while ($mrc === CURLM_CALL_MULTI_PERFORM);
$this->processMessages();
// If there are delays but no transfers, then sleep for a bit.
if (!$this->active && $this->delays) {
usleep(500);
}
} while ($this->active || $this->handles);
}
private function addRequest(array &$entry)
{
$id = (int) $entry['handle'];
$this->handles[$id] = $entry;
// If the request is a delay, then add the reques to the curl multi
// pool only after the specified delay.
if (isset($entry['request']['client']['delay'])) {
$this->delays[$id] = microtime(true) + ($entry['request']['client']['delay'] / 1000);
} elseif (empty($entry['request']['future'])) {
curl_multi_add_handle($this->_mh, $entry['handle']);
} else {
curl_multi_add_handle($this->_mh, $entry['handle']);
// "lazy" futures are only sent once the pool has many requests.
if ($entry['request']['future'] !== 'lazy') {
do {
$mrc = curl_multi_exec($this->_mh, $this->active);
} while ($mrc === CURLM_CALL_MULTI_PERFORM);
$this->processMessages();
}
}
}
private function removeProcessed($id)
{
if (isset($this->handles[$id])) {
curl_multi_remove_handle(
$this->_mh,
$this->handles[$id]['handle']
);
curl_close($this->handles[$id]['handle']);
unset($this->handles[$id], $this->delays[$id]);
}
}
/**
* Cancels a handle from sending and removes references to it.
*
* @param int $id Handle ID to cancel and remove.
*
* @return bool True on success, false on failure.
*/
private function cancel($id)
{
// Cannot cancel if it has been processed.
if (!isset($this->handles[$id])) {
return false;
}
$handle = $this->handles[$id]['handle'];
unset($this->delays[$id], $this->handles[$id]);
curl_multi_remove_handle($this->_mh, $handle);
curl_close($handle);
return true;
}
private function addDelays()
{
$currentTime = microtime(true);
foreach ($this->delays as $id => $delay) {
if ($currentTime >= $delay) {
unset($this->delays[$id]);
curl_multi_add_handle(
$this->_mh,
$this->handles[$id]['handle']
);
}
}
}
private function processMessages()
{
while ($done = curl_multi_info_read($this->_mh)) {
$id = (int) $done['handle'];
if (!isset($this->handles[$id])) {
// Probably was cancelled.
continue;
}
$entry = $this->handles[$id];
$entry['response']['transfer_stats'] = curl_getinfo($done['handle']);
if ($done['result'] !== CURLM_OK) {
$entry['response']['curl']['errno'] = $done['result'];
$entry['response']['curl']['error'] = curl_error($done['handle']);
}
$result = CurlFactory::createResponse(
$this,
$entry['request'],
$entry['response'],
$entry['headers'],
$entry['body']
);
$this->removeProcessed($id);
$entry['deferred']->resolve($result);
}
}
}

View File

@@ -0,0 +1,58 @@
<?php
namespace GuzzleHttp\Ring\Client;
/**
* Provides basic middleware wrappers.
*
* If a middleware is more complex than a few lines of code, then it should
* be implemented in a class rather than a static method.
*/
class Middleware
{
/**
* Sends future requests to a future compatible handler while sending all
* other requests to a default handler.
*
* When the "future" option is not provided on a request, any future responses
* are automatically converted to synchronous responses and block.
*
* @param callable $default Handler used for non-streaming responses
* @param callable $future Handler used for future responses
*
* @return callable Returns the composed handler.
*/
public static function wrapFuture(
callable $default,
callable $future
) {
return function (array $request) use ($default, $future) {
return empty($request['client']['future'])
? $default($request)
: $future($request);
};
}
/**
* Sends streaming requests to a streaming compatible handler while sendin
* all other requests to a default handler.
*
* This, for example, could be useful for taking advantage of the
* performance benefits of curl while still supporting true streaming
* through the StreamHandler.
*
* @param callable $default Handler used for non-streaming responses
* @param callable $streaming Handler used for streaming responses
*
* @return callable Returns the composed handler.
*/
public static function wrapStreaming(
callable $default,
callable $streaming
) {
return function (array $request) use ($default, $streaming) {
return empty($request['client']['stream'])
? $default($request)
: $streaming($request);
};
}
}

View File

@@ -0,0 +1,52 @@
<?php
namespace GuzzleHttp\Ring\Client;
use GuzzleHttp\Ring\Core;
use GuzzleHttp\Ring\Future\CompletedFutureArray;
use GuzzleHttp\Ring\Future\FutureArrayInterface;
/**
* Ring handler that returns a canned response or evaluated function result.
*/
class MockHandler
{
/** @var callable|array|FutureArrayInterface */
private $result;
/**
* Provide an array or future to always return the same value. Provide a
* callable that accepts a request object and returns an array or future
* to dynamically create a response.
*
* @param array|FutureArrayInterface|callable $result Mock return value.
*/
public function __construct($result)
{
$this->result = $result;
}
public function __invoke(array $request)
{
Core::doSleep($request);
$response = is_callable($this->result)
? call_user_func($this->result, $request)
: $this->result;
if (is_array($response)) {
$response = new CompletedFutureArray($response + [
'status' => null,
'body' => null,
'headers' => [],
'reason' => null,
'effective_url' => null,
]);
} elseif (!$response instanceof FutureArrayInterface) {
throw new \InvalidArgumentException(
'Response must be an array or FutureArrayInterface. Found '
. Core::describeType($request)
);
}
return $response;
}
}

View File

@@ -0,0 +1,414 @@
<?php
namespace GuzzleHttp\Ring\Client;
use GuzzleHttp\Ring\Core;
use GuzzleHttp\Ring\Exception\ConnectException;
use GuzzleHttp\Ring\Exception\RingException;
use GuzzleHttp\Ring\Future\CompletedFutureArray;
use GuzzleHttp\Stream\InflateStream;
use GuzzleHttp\Stream\StreamInterface;
use GuzzleHttp\Stream\Stream;
use GuzzleHttp\Stream\Utils;
/**
* RingPHP client handler that uses PHP's HTTP stream wrapper.
*/
class StreamHandler
{
private $options;
private $lastHeaders;
public function __construct(array $options = [])
{
$this->options = $options;
}
public function __invoke(array $request)
{
$url = Core::url($request);
Core::doSleep($request);
try {
// Does not support the expect header.
$request = Core::removeHeader($request, 'Expect');
$stream = $this->createStream($url, $request);
return $this->createResponse($request, $url, $stream);
} catch (RingException $e) {
return $this->createErrorResponse($url, $e);
}
}
private function createResponse(array $request, $url, $stream)
{
$hdrs = $this->lastHeaders;
$this->lastHeaders = null;
$parts = explode(' ', array_shift($hdrs), 3);
$response = [
'version' => substr($parts[0], 5),
'status' => $parts[1],
'reason' => isset($parts[2]) ? $parts[2] : null,
'headers' => Core::headersFromLines($hdrs),
'effective_url' => $url,
];
$stream = $this->checkDecode($request, $response, $stream);
// If not streaming, then drain the response into a stream.
if (empty($request['client']['stream'])) {
$dest = isset($request['client']['save_to'])
? $request['client']['save_to']
: fopen('php://temp', 'r+');
$stream = $this->drain($stream, $dest);
}
$response['body'] = $stream;
return new CompletedFutureArray($response);
}
private function checkDecode(array $request, array $response, $stream)
{
// Automatically decode responses when instructed.
if (!empty($request['client']['decode_content'])) {
switch (Core::firstHeader($response, 'Content-Encoding', true)) {
case 'gzip':
case 'deflate':
$stream = new InflateStream(Stream::factory($stream));
break;
}
}
return $stream;
}
/**
* Drains the stream into the "save_to" client option.
*
* @param resource $stream
* @param string|resource|StreamInterface $dest
*
* @return Stream
* @throws \RuntimeException when the save_to option is invalid.
*/
private function drain($stream, $dest)
{
if (is_resource($stream)) {
if (!is_resource($dest)) {
$stream = Stream::factory($stream);
} else {
stream_copy_to_stream($stream, $dest);
fclose($stream);
rewind($dest);
return $dest;
}
}
// Stream the response into the destination stream
$dest = is_string($dest)
? new Stream(Utils::open($dest, 'r+'))
: Stream::factory($dest);
Utils::copyToStream($stream, $dest);
$dest->seek(0);
$stream->close();
return $dest;
}
/**
* Creates an error response for the given stream.
*
* @param string $url
* @param RingException $e
*
* @return array
*/
private function createErrorResponse($url, RingException $e)
{
// Determine if the error was a networking error.
$message = $e->getMessage();
// This list can probably get more comprehensive.
if (strpos($message, 'getaddrinfo') // DNS lookup failed
|| strpos($message, 'Connection refused')
) {
$e = new ConnectException($e->getMessage(), 0, $e);
}
return new CompletedFutureArray([
'status' => null,
'body' => null,
'headers' => [],
'effective_url' => $url,
'error' => $e
]);
}
/**
* Create a resource and check to ensure it was created successfully
*
* @param callable $callback Callable that returns stream resource
*
* @return resource
* @throws \RuntimeException on error
*/
private function createResource(callable $callback)
{
$errors = null;
set_error_handler(function ($_, $msg, $file, $line) use (&$errors) {
$errors[] = [
'message' => $msg,
'file' => $file,
'line' => $line
];
return true;
});
$resource = $callback();
restore_error_handler();
if (!$resource) {
$message = 'Error creating resource: ';
foreach ($errors as $err) {
foreach ($err as $key => $value) {
$message .= "[$key] $value" . PHP_EOL;
}
}
throw new RingException(trim($message));
}
return $resource;
}
private function createStream($url, array $request)
{
static $methods;
if (!$methods) {
$methods = array_flip(get_class_methods(__CLASS__));
}
// HTTP/1.1 streams using the PHP stream wrapper require a
// Connection: close header
if ((!isset($request['version']) || $request['version'] == '1.1')
&& !Core::hasHeader($request, 'Connection')
) {
$request['headers']['Connection'] = ['close'];
}
// Ensure SSL is verified by default
if (!isset($request['client']['verify'])) {
$request['client']['verify'] = true;
}
$params = [];
$options = $this->getDefaultOptions($request);
if (isset($request['client'])) {
foreach ($request['client'] as $key => $value) {
$method = "add_{$key}";
if (isset($methods[$method])) {
$this->{$method}($request, $options, $value, $params);
}
}
}
return $this->createStreamResource(
$url,
$request,
$options,
$this->createContext($request, $options, $params)
);
}
private function getDefaultOptions(array $request)
{
$headers = "";
foreach ($request['headers'] as $name => $value) {
foreach ((array) $value as $val) {
$headers .= "$name: $val\r\n";
}
}
$context = [
'http' => [
'method' => $request['http_method'],
'header' => $headers,
'protocol_version' => isset($request['version']) ? $request['version'] : 1.1,
'ignore_errors' => true,
'follow_location' => 0,
],
];
$body = Core::body($request);
if (isset($body)) {
$context['http']['content'] = $body;
// Prevent the HTTP handler from adding a Content-Type header.
if (!Core::hasHeader($request, 'Content-Type')) {
$context['http']['header'] .= "Content-Type:\r\n";
}
}
$context['http']['header'] = rtrim($context['http']['header']);
return $context;
}
private function add_proxy(array $request, &$options, $value, &$params)
{
if (!is_array($value)) {
$options['http']['proxy'] = $value;
} else {
$scheme = isset($request['scheme']) ? $request['scheme'] : 'http';
if (isset($value[$scheme])) {
$options['http']['proxy'] = $value[$scheme];
}
}
}
private function add_timeout(array $request, &$options, $value, &$params)
{
$options['http']['timeout'] = $value;
}
private function add_verify(array $request, &$options, $value, &$params)
{
if ($value === true) {
// PHP 5.6 or greater will find the system cert by default. When
// < 5.6, use the Guzzle bundled cacert.
if (PHP_VERSION_ID < 50600) {
$options['ssl']['cafile'] = ClientUtils::getDefaultCaBundle();
}
} elseif (is_string($value)) {
$options['ssl']['cafile'] = $value;
if (!file_exists($value)) {
throw new RingException("SSL CA bundle not found: $value");
}
} elseif ($value === false) {
$options['ssl']['verify_peer'] = false;
$options['ssl']['allow_self_signed'] = true;
return;
} else {
throw new RingException('Invalid verify request option');
}
$options['ssl']['verify_peer'] = true;
$options['ssl']['allow_self_signed'] = false;
}
private function add_cert(array $request, &$options, $value, &$params)
{
if (is_array($value)) {
$options['ssl']['passphrase'] = $value[1];
$value = $value[0];
}
if (!file_exists($value)) {
throw new RingException("SSL certificate not found: {$value}");
}
$options['ssl']['local_cert'] = $value;
}
private function add_progress(array $request, &$options, $value, &$params)
{
$fn = function ($code, $_1, $_2, $_3, $transferred, $total) use ($value) {
if ($code == STREAM_NOTIFY_PROGRESS) {
$value($total, $transferred, null, null);
}
};
// Wrap the existing function if needed.
$params['notification'] = isset($params['notification'])
? Core::callArray([$params['notification'], $fn])
: $fn;
}
private function add_debug(array $request, &$options, $value, &$params)
{
if ($value === false) {
return;
}
static $map = [
STREAM_NOTIFY_CONNECT => 'CONNECT',
STREAM_NOTIFY_AUTH_REQUIRED => 'AUTH_REQUIRED',
STREAM_NOTIFY_AUTH_RESULT => 'AUTH_RESULT',
STREAM_NOTIFY_MIME_TYPE_IS => 'MIME_TYPE_IS',
STREAM_NOTIFY_FILE_SIZE_IS => 'FILE_SIZE_IS',
STREAM_NOTIFY_REDIRECTED => 'REDIRECTED',
STREAM_NOTIFY_PROGRESS => 'PROGRESS',
STREAM_NOTIFY_FAILURE => 'FAILURE',
STREAM_NOTIFY_COMPLETED => 'COMPLETED',
STREAM_NOTIFY_RESOLVE => 'RESOLVE',
];
static $args = ['severity', 'message', 'message_code',
'bytes_transferred', 'bytes_max'];
$value = Core::getDebugResource($value);
$ident = $request['http_method'] . ' ' . Core::url($request);
$fn = function () use ($ident, $value, $map, $args) {
$passed = func_get_args();
$code = array_shift($passed);
fprintf($value, '<%s> [%s] ', $ident, $map[$code]);
foreach (array_filter($passed) as $i => $v) {
fwrite($value, $args[$i] . ': "' . $v . '" ');
}
fwrite($value, "\n");
};
// Wrap the existing function if needed.
$params['notification'] = isset($params['notification'])
? Core::callArray([$params['notification'], $fn])
: $fn;
}
private function applyCustomOptions(array $request, array &$options)
{
if (!isset($request['client']['stream_context'])) {
return;
}
if (!is_array($request['client']['stream_context'])) {
throw new RingException('stream_context must be an array');
}
$options = array_replace_recursive(
$options,
$request['client']['stream_context']
);
}
private function createContext(array $request, array $options, array $params)
{
$this->applyCustomOptions($request, $options);
return $this->createResource(
function () use ($request, $options, $params) {
return stream_context_create($options, $params);
},
$request,
$options
);
}
private function createStreamResource(
$url,
array $request,
array $options,
$context
) {
return $this->createResource(
function () use ($url, $context) {
if (false === strpos($url, 'http')) {
trigger_error("URL is invalid: {$url}", E_USER_WARNING);
return null;
}
$resource = fopen($url, 'r', null, $context);
$this->lastHeaders = $http_response_header;
return $resource;
},
$request,
$options
);
}
}