Page MenuHomeSealhub

No OneTemporary

diff --git a/src/future/Future.php b/src/future/Future.php
index 8078aacc..b8656bd6 100644
--- a/src/future/Future.php
+++ b/src/future/Future.php
@@ -1,257 +1,277 @@
<?php
/**
* A 'future' or 'promise' is an object which represents the result of some
* pending computation. For a more complete overview of futures, see
* @{article:Using Futures}.
*/
abstract class Future extends Phobject {
private $hasResult = false;
private $hasStarted = false;
private $hasEnded = false;
private $result;
private $exception;
private $futureKey;
private $serviceProfilerCallID;
+ private $raiseExceptionOnStart = true;
private static $nextKey = 1;
/**
* Is this future's process complete? Specifically, can this future be
* resolved without blocking?
*
* @return bool If true, the external process is complete and resolving this
* future will not block.
*/
abstract public function isReady();
/**
* Resolve a future and return its result, blocking until the result is ready
* if necessary.
*
* @return wild Future result.
*/
public function resolve() {
$args = func_get_args();
if (count($args)) {
throw new Exception(
pht(
'Parameter "timeout" to "Future->resolve()" is no longer '.
'supported. Update the caller so it no longer passes a '.
'timeout.'));
}
- if (!$this->hasResult() && !$this->hasException()) {
+ if (!$this->canResolve()) {
$graph = new FutureIterator(array($this));
$graph->resolveAll();
}
if ($this->hasException()) {
throw $this->getException();
}
return $this->getResult();
}
- final public function startFuture() {
- if ($this->hasStarted) {
- throw new Exception(
- pht(
- 'Future has already started; futures can not start more '.
- 'than once.'));
- }
- $this->hasStarted = true;
-
- $this->startServiceProfiler();
- $this->updateFuture();
- }
-
final public function updateFuture() {
- if ($this->hasException()) {
- return;
- }
-
- if ($this->hasResult()) {
+ if ($this->canResolve()) {
return;
}
try {
$this->isReady();
} catch (Exception $ex) {
$this->setException($ex);
} catch (Throwable $ex) {
$this->setException($ex);
}
}
- final public function endFuture() {
- if (!$this->hasException() && !$this->hasResult()) {
- throw new Exception(
- pht(
- 'Trying to end a future which has no exception and no result. '.
- 'Futures must resolve before they can be ended.'));
- }
-
- if ($this->hasEnded) {
- throw new Exception(
- pht(
- 'Future has already ended; futures can not end more '.
- 'than once.'));
- }
- $this->hasEnded = true;
-
- $this->endServiceProfiler();
- }
-
private function startServiceProfiler() {
// NOTE: This is a soft dependency so that we don't need to build the
// ServiceProfiler into the Phage agent. Normally, this class is always
// available.
if (!class_exists('PhutilServiceProfiler')) {
return;
}
$params = $this->getServiceProfilerStartParameters();
if ($params === null) {
return;
}
$profiler = PhutilServiceProfiler::getInstance();
$call_id = $profiler->beginServiceCall($params);
$this->serviceProfilerCallID = $call_id;
}
private function endServiceProfiler() {
$call_id = $this->serviceProfilerCallID;
if ($call_id === null) {
return;
}
$params = $this->getServiceProfilerResultParameters();
$profiler = PhutilServiceProfiler::getInstance();
$profiler->endServiceCall($call_id, $params);
}
protected function getServiceProfilerStartParameters() {
return array();
}
protected function getServiceProfilerResultParameters() {
return array();
}
/**
* Retrieve a list of sockets which we can wait to become readable while
* a future is resolving. If your future has sockets which can be
* `select()`ed, return them here (or in @{method:getWriteSockets}) to make
* the resolve loop do a `select()`. If you do not return sockets in either
* case, you'll get a busy wait.
*
* @return list A list of sockets which we expect to become readable.
*/
public function getReadSockets() {
return array();
}
/**
* Retrieve a list of sockets which we can wait to become writable while a
* future is resolving. See @{method:getReadSockets}.
*
* @return list A list of sockets which we expect to become writable.
*/
public function getWriteSockets() {
return array();
}
/**
* Default amount of time to wait on stream select for this future. Normally
* 1 second is fine, but if the future has a timeout sooner than that it
* should return the amount of time left before the timeout.
*/
public function getDefaultWait() {
return 1;
}
public function start() {
- $this->isReady();
+ if ($this->hasStarted) {
+ throw new Exception(
+ pht(
+ 'Future has already started; futures can not start more '.
+ 'than once.'));
+ }
+ $this->hasStarted = true;
+
+ $this->startServiceProfiler();
+
+ $this->updateFuture();
+
+ if ($this->raiseExceptionOnStart) {
+ if ($this->hasException()) {
+ throw $this->getException();
+ }
+ }
+
return $this;
}
/**
* Retrieve the final result of the future.
*
* @return wild Final resolution of this future.
*/
final protected function getResult() {
if (!$this->hasResult()) {
throw new Exception(
pht(
'Future has not yet resolved. Resolve futures before retrieving '.
'results.'));
}
return $this->result;
}
final protected function setResult($result) {
if ($this->hasResult()) {
throw new Exception(
pht(
'Future has already resolved. Futures may not resolve more than '.
'once.'));
}
$this->hasResult = true;
$this->result = $result;
+ $this->endFuture();
+
return $this;
}
final public function hasResult() {
return $this->hasResult;
}
- final private function setException($exception) {
+ private function setException($exception) {
// NOTE: The parameter may be an Exception or a Throwable.
$this->exception = $exception;
+
+ $this->endFuture();
+
return $this;
}
- final private function getException() {
+ private function getException() {
return $this->exception;
}
final public function hasException() {
return ($this->exception !== null);
}
final public function setFutureKey($key) {
if ($this->futureKey !== null) {
throw new Exception(
pht(
'Future already has a key ("%s") assigned.',
$key));
}
$this->futureKey = $key;
return $this;
}
final public function getFutureKey() {
if ($this->futureKey === null) {
$this->futureKey = sprintf('Future/%d', self::$nextKey++);
}
return $this->futureKey;
}
+ final public function setRaiseExceptionOnStart($raise) {
+ $this->raiseExceptionOnStart = $raise;
+ return $this;
+ }
+
+ final public function getHasFutureStarted() {
+ return $this->hasStarted;
+ }
+
+ final public function canResolve() {
+ if ($this->hasResult()) {
+ return true;
+ }
+
+ if ($this->hasException()) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private function endFuture() {
+ if ($this->hasEnded) {
+ throw new Exception(
+ pht(
+ 'Future has already ended; futures can not end more '.
+ 'than once.'));
+ }
+ $this->hasEnded = true;
+
+ $this->endServiceProfiler();
+ }
+
}
diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php
index e33282c8..028c02da 100644
--- a/src/future/FutureIterator.php
+++ b/src/future/FutureIterator.php
@@ -1,464 +1,463 @@
<?php
/**
* FutureIterator aggregates @{class:Future}s and allows you to respond to them
* in the order they resolve. This is useful because it minimizes the amount of
* time your program spends waiting on parallel processes.
*
* $futures = array(
* 'a.txt' => new ExecFuture('wc -c a.txt'),
* 'b.txt' => new ExecFuture('wc -c b.txt'),
* 'c.txt' => new ExecFuture('wc -c c.txt'),
* );
*
* foreach (new FutureIterator($futures) as $key => $future) {
* // IMPORTANT: keys are preserved but the order of elements is not. This
* // construct iterates over the futures in the order they resolve, so the
* // fastest future is the one you'll get first. This allows you to start
* // doing followup processing as soon as possible.
*
* list($err, $stdout) = $future->resolve();
* do_some_processing($stdout);
* }
*
* For a general overview of futures, see @{article:Using Futures}.
*
* @task basics Basics
* @task config Configuring Iteration
* @task iterator Iterator Interface
* @task internal Internals
*/
final class FutureIterator
extends Phobject
implements Iterator {
private $hold = array();
private $wait = array();
private $work = array();
private $futures = array();
private $key;
private $limit;
private $timeout;
private $isTimeout = false;
private $hasRewound = false;
/* -( Basics )------------------------------------------------------------- */
/**
* Create a new iterator over a list of futures.
*
* @param list List of @{class:Future}s to resolve.
* @task basics
*/
public function __construct(array $futures) {
assert_instances_of($futures, 'Future');
foreach ($futures as $map_key => $future) {
$future->setFutureKey($map_key);
$this->addFuture($future);
}
}
/**
* Block until all futures resolve.
*
* @return void
* @task basics
*/
public function resolveAll() {
// If a caller breaks out of a "foreach" and then calls "resolveAll()",
// interpret it to mean that we should iterate over whatever futures
// remain.
if ($this->hasRewound) {
while ($this->valid()) {
$this->next();
}
} else {
iterator_to_array($this);
}
}
/**
* Add another future to the set of futures. This is useful if you have a
* set of futures to run mostly in parallel, but some futures depend on
* others.
*
* @param Future @{class:Future} to add to iterator
* @task basics
*/
public function addFuture(Future $future) {
$key = $future->getFutureKey();
if (isset($this->futures[$key])) {
throw new Exception(
pht(
'This future graph already has a future with key "%s". Each '.
'future must have a unique key.',
$key));
}
$this->futures[$key] = $future;
$this->hold[$key] = $key;
return $this;
}
/* -( Configuring Iteration )---------------------------------------------- */
/**
* Set a maximum amount of time you want to wait before the iterator will
* yield a result. If no future has resolved yet, the iterator will yield
* null for key and value. Among other potential uses, you can use this to
* show some busy indicator:
*
* $futures = id(new FutureIterator($futures))
* ->setUpdateInterval(1);
* foreach ($futures as $future) {
* if ($future === null) {
* echo "Still working...\n";
* } else {
* // ...
* }
* }
*
* This will echo "Still working..." once per second as long as futures are
* resolving. By default, FutureIterator never yields null.
*
* @param float Maximum number of seconds to block waiting on futures before
* yielding null.
* @return this
*
* @task config
*/
public function setUpdateInterval($interval) {
$this->timeout = $interval;
return $this;
}
/**
* Limit the number of simultaneously executing futures.
*
* $futures = id(new FutureIterator($futures))
* ->limit(4);
* foreach ($futures as $future) {
* // Run no more than 4 futures simultaneously.
* }
*
* @param int Maximum number of simultaneous jobs allowed.
* @return this
*
* @task config
*/
public function limit($max) {
$this->limit = $max;
return $this;
}
public function setMaximumWorkingSetSize($limit) {
$this->limit = $limit;
return $this;
}
public function getMaximumWorkingSetSize() {
return $this->limit;
}
/* -( Iterator Interface )------------------------------------------------- */
/**
* @task iterator
*/
public function rewind() {
if ($this->hasRewound) {
throw new Exception(
pht('Future graphs can not be rewound.'));
}
$this->hasRewound = true;
$this->next();
}
/**
* @task iterator
*/
public function next() {
$this->key = null;
$this->updateWorkingSet();
if (!$this->work) {
return;
}
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
$working_set = array_select_keys($this->futures, $this->work);
while (true) {
// Update every future first. This is a no-op on futures which have
// already resolved or failed, but we want to give futures an
// opportunity to make progress even if we can resolve something.
foreach ($working_set as $future_key => $future) {
$future->updateFuture();
}
// Check if any future has resolved or failed. If we have any such
// futures, we'll return the first one from the iterator.
$resolve_key = null;
foreach ($working_set as $future_key => $future) {
- if ($future->hasException()) {
- $resolve_key = $future_key;
- break;
- }
-
- if ($future->hasResult()) {
+ if ($future->canResolve()) {
$resolve_key = $future_key;
break;
}
}
// We've found a future to resolve, so we're done here for now.
if ($resolve_key !== null) {
$this->moveFutureToDone($resolve_key);
return;
}
// We don't have any futures to resolve yet. Check if we're reached
// an update interval.
$wait_time = 1;
if ($timeout !== null) {
$elapsed = microtime(true) - $start;
if ($elapsed > $timeout) {
$this->isTimeout = true;
return;
}
$wait_time = min($wait_time, $timeout - $elapsed);
}
// We're going to wait. If possible, we'd like to wait with sockets.
// If we can't, we'll just sleep.
$read_sockets = array();
$write_sockets = array();
foreach ($working_set as $future_key => $future) {
$sockets = $future->getReadSockets();
foreach ($sockets as $socket) {
$read_sockets[] = $socket;
}
$sockets = $future->getWriteSockets();
foreach ($sockets as $socket) {
$write_sockets[] = $socket;
}
}
$use_sockets = ($read_sockets || $write_sockets);
if ($use_sockets) {
foreach ($working_set as $future) {
$wait_time = min($wait_time, $future->getDefaultWait());
}
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
} else {
usleep(1000);
}
}
}
/**
* @task iterator
*/
public function current() {
if ($this->isTimeout) {
return null;
}
return $this->futures[$this->key];
}
/**
* @task iterator
*/
public function key() {
if ($this->isTimeout) {
return null;
}
return $this->key;
}
/**
* @task iterator
*/
public function valid() {
if ($this->isTimeout) {
return true;
}
return ($this->key !== null);
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
protected function updateWorkingSet() {
$limit = $this->getMaximumWorkingSetSize();
$work_count = count($this->work);
// If we're already working on the maximum number of futures, we just have
// to wait for something to resolve. There's no benefit to updating the
// queue since we can never make any meaningful progress.
if ($limit) {
if ($work_count >= $limit) {
return;
}
}
// If any futures that are currently held are no longer blocked by
// dependencies, move them from "hold" to "wait".
foreach ($this->hold as $future_key) {
if (!$this->canMoveFutureToWait($future_key)) {
continue;
}
$this->moveFutureToWait($future_key);
}
$wait_count = count($this->wait);
$hold_count = count($this->hold);
if (!$work_count && !$wait_count && $hold_count) {
throw new Exception(
pht(
'Future graph is stalled: some futures are held, but no futures '.
'are waiting or working. The graph can never resolve.'));
}
// Figure out how many futures we can start. If we don't have a limit,
// we can start every waiting future. If we do have a limit, we can only
// start as many futures as we have slots for.
if ($limit) {
$work_limit = min($limit, $wait_count);
} else {
$work_limit = $wait_count;
}
// If we're ready to start futures, start them now.
if ($work_limit) {
foreach ($this->wait as $future_key) {
$this->moveFutureToWork($future_key);
$work_limit--;
if (!$work_limit) {
return;
}
}
}
}
private function canMoveFutureToWait($future_key) {
return true;
}
private function moveFutureToWait($future_key) {
unset($this->hold[$future_key]);
$this->wait[$future_key] = $future_key;
}
private function moveFutureToWork($future_key) {
unset($this->wait[$future_key]);
$this->work[$future_key] = $future_key;
- $this->futures[$future_key]->startFuture();
+ $future = $this->futures[$future_key];
+
+ if (!$future->getHasFutureStarted()) {
+ $future
+ ->setRaiseExceptionOnStart(false)
+ ->start();
+ }
}
private function moveFutureToDone($future_key) {
$this->key = $future_key;
unset($this->work[$future_key]);
// Before we return, do another working set update so we start any
// futures that are ready to go as soon as we can.
$this->updateWorkingSet();
-
- $this->futures[$future_key]->endFuture();
}
/**
* Wait for activity on one of several sockets.
*
* @param list List of sockets expected to become readable.
* @param list List of sockets expected to become writable.
* @param float Timeout, in seconds.
* @return void
*/
private function waitForSockets(
array $read_list,
array $write_list,
$timeout = 1.0) {
static $handler_installed = false;
if (!$handler_installed) {
// If we're spawning child processes, we need to install a signal handler
// here to catch cases like execing '(sleep 60 &) &' where the child
// exits but a socket is kept open. But we don't actually need to do
// anything because the SIGCHLD will interrupt the stream_select(), as
// long as we have a handler registered.
if (function_exists('pcntl_signal')) {
if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) {
throw new Exception(pht('Failed to install signal handler!'));
}
}
$handler_installed = true;
}
$timeout_sec = (int)$timeout;
$timeout_usec = (int)(1000000 * ($timeout - $timeout_sec));
$exceptfds = array();
$ok = @stream_select(
$read_list,
$write_list,
$exceptfds,
$timeout_sec,
$timeout_usec);
if ($ok === false) {
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
// to a busy wait.
}
}
public static function handleSIGCHLD($signo) {
// This function is a dummy, we just need to have some handler registered
// so that PHP will get interrupted during "stream_select()". If we don't
// register a handler, "stream_select()" won't fail.
}
}
diff --git a/src/future/exec/ExecFuture.php b/src/future/exec/ExecFuture.php
index 0264d4d3..ecce090c 100644
--- a/src/future/exec/ExecFuture.php
+++ b/src/future/exec/ExecFuture.php
@@ -1,994 +1,1017 @@
<?php
/**
* Execute system commands in parallel using futures.
*
* ExecFuture is a future, which means it runs asynchronously and represents
* a value which may not exist yet. See @{article:Using Futures} for an
* explanation of futures. When an ExecFuture resolves, it returns the exit
* code, stdout and stderr of the process it executed.
*
* ExecFuture is the core command execution implementation in libphutil, but is
* exposed through a number of APIs. See @{article:Command Execution} for more
* discussion about executing system commands.
*
* @task create Creating ExecFutures
* @task resolve Resolving Execution
* @task config Configuring Execution
* @task info Command Information
* @task interact Interacting With Commands
* @task internal Internals
*/
final class ExecFuture extends PhutilExecutableFuture {
private $pipes = array();
private $proc = null;
private $start = null;
private $procStatus = null;
private $stdout = null;
private $stderr = null;
private $stdin = null;
private $closePipe = true;
private $stdoutPos = 0;
private $stderrPos = 0;
private $readBufferSize;
private $stdoutSizeLimit = PHP_INT_MAX;
private $stderrSizeLimit = PHP_INT_MAX;
private $profilerCallID;
private $killedByTimeout;
private $windowsStdoutTempFile = null;
private $windowsStderrTempFile = null;
private $terminateTimeout;
private $didTerminate;
private $killTimeout;
private static $descriptorSpec = array(
0 => array('pipe', 'r'), // stdin
1 => array('pipe', 'w'), // stdout
2 => array('pipe', 'w'), // stderr
);
protected function didConstruct() {
$this->stdin = new PhutilRope();
}
/* -( Command Information )------------------------------------------------ */
/**
* Retrieve the byte limit for the stderr buffer.
*
* @return int Maximum buffer size, in bytes.
* @task info
*/
public function getStderrSizeLimit() {
return $this->stderrSizeLimit;
}
/**
* Retrieve the byte limit for the stdout buffer.
*
* @return int Maximum buffer size, in bytes.
* @task info
*/
public function getStdoutSizeLimit() {
return $this->stdoutSizeLimit;
}
/**
* Get the process's pid. This only works after execution is initiated, e.g.
* by a call to start().
*
* @return int Process ID of the executing process.
* @task info
*/
public function getPID() {
$status = $this->procGetStatus();
return $status['pid'];
}
+ public function hasPID() {
+ if ($this->procStatus) {
+ return true;
+ }
+
+ if ($this->proc) {
+ return true;
+ }
+
+ return false;
+ }
+
/* -( Configuring Execution )---------------------------------------------- */
/**
* Set a maximum size for the stdout read buffer. To limit stderr, see
* @{method:setStderrSizeLimit}. The major use of these methods is to use less
* memory if you are running a command which sometimes produces huge volumes
* of output that you don't really care about.
*
* NOTE: Setting this to 0 means "no buffer", not "unlimited buffer".
*
* @param int Maximum size of the stdout read buffer.
* @return this
* @task config
*/
public function setStdoutSizeLimit($limit) {
$this->stdoutSizeLimit = $limit;
return $this;
}
/**
* Set a maximum size for the stderr read buffer.
* See @{method:setStdoutSizeLimit} for discussion.
*
* @param int Maximum size of the stderr read buffer.
* @return this
* @task config
*/
public function setStderrSizeLimit($limit) {
$this->stderrSizeLimit = $limit;
return $this;
}
/**
* Set the maximum internal read buffer size this future. The future will
* block reads once the internal stdout or stderr buffer exceeds this size.
*
* NOTE: If you @{method:resolve} a future with a read buffer limit, you may
* block forever!
*
* TODO: We should probably release the read buffer limit during
* @{method:resolve}, or otherwise detect this. For now, be careful.
*
* @param int|null Maximum buffer size, or `null` for unlimited.
* @return this
*/
public function setReadBufferSize($read_buffer_size) {
$this->readBufferSize = $read_buffer_size;
return $this;
}
/* -( Interacting With Commands )------------------------------------------ */
/**
* Read and return output from stdout and stderr, if any is available. This
* method keeps a read cursor on each stream, but the entire streams are
* still returned when the future resolves. You can call read() again after
* resolving the future to retrieve only the parts of the streams you did not
* previously read:
*
* $future = new ExecFuture('...');
* // ...
* list($stdout) = $future->read(); // Returns output so far
* list($stdout) = $future->read(); // Returns new output since first call
* // ...
* list($stdout) = $future->resolvex(); // Returns ALL output
* list($stdout) = $future->read(); // Returns unread output
*
* NOTE: If you set a limit with @{method:setStdoutSizeLimit} or
* @{method:setStderrSizeLimit}, this method will not be able to read data
* past the limit.
*
* NOTE: If you call @{method:discardBuffers}, all the stdout/stderr data
* will be thrown away and the cursors will be reset.
*
* @return pair <$stdout, $stderr> pair with new output since the last call
* to this method.
* @task interact
*/
public function read() {
$stdout = $this->readStdout();
$result = array(
$stdout,
(string)substr($this->stderr, $this->stderrPos),
);
$this->stderrPos = strlen($this->stderr);
return $result;
}
public function readStdout() {
if ($this->start) {
- $this->isReady(); // Sync
+ $this->updateFuture(); // Sync
}
$result = (string)substr($this->stdout, $this->stdoutPos);
$this->stdoutPos = strlen($this->stdout);
return $result;
}
/**
* Write data to stdin of the command.
*
* @param string Data to write.
* @param bool If true, keep the pipe open for writing. By default, the pipe
* will be closed as soon as possible so that commands which
* listen for EOF will execute. If you want to keep the pipe open
* past the start of command execution, do an empty write with
* `$keep_pipe = true` first.
* @return this
* @task interact
*/
public function write($data, $keep_pipe = false) {
if (strlen($data)) {
if (!$this->stdin) {
throw new Exception(pht('Writing to a closed pipe!'));
}
$this->stdin->append($data);
}
$this->closePipe = !$keep_pipe;
return $this;
}
/**
* Permanently discard the stdout and stderr buffers and reset the read
* cursors. This is basically useful only if you are streaming a large amount
* of data from some process.
*
* Conceivably you might also need to do this if you're writing a client using
* @{class:ExecFuture} and `netcat`, but you probably should not do that.
*
* NOTE: This completely discards the data. It won't be available when the
* future resolves. This is almost certainly only useful if you need the
* buffer memory for some reason.
*
* @return this
* @task interact
*/
public function discardBuffers() {
$this->discardStdoutBuffer();
$this->stderr = '';
$this->stderrPos = 0;
return $this;
}
public function discardStdoutBuffer() {
$this->stdout = '';
$this->stdoutPos = 0;
return $this;
}
/**
* Returns true if this future was killed by a timeout configured with
* @{method:setTimeout}.
*
* @return bool True if the future was killed for exceeding its time limit.
*/
public function getWasKilledByTimeout() {
return $this->killedByTimeout;
}
/* -( Configuring Execution )---------------------------------------------- */
/**
* Set a hard limit on execution time. If the command runs longer, it will
* be terminated and the future will resolve with an error code. You can test
* if a future was killed by a timeout with @{method:getWasKilledByTimeout}.
*
* The subprocess will be sent a `TERM` signal, and then a `KILL` signal a
* short while later if it fails to exit.
*
* @param int Maximum number of seconds this command may execute for before
* it is signaled.
* @return this
* @task config
*/
public function setTimeout($seconds) {
$this->terminateTimeout = $seconds;
$this->killTimeout = $seconds + min($seconds, 60);
return $this;
}
/* -( Resolving Execution )------------------------------------------------ */
/**
* Resolve a command you expect to exit with return code 0. Works like
* @{method:resolve}, but throws if $err is nonempty. Returns only
* $stdout and $stderr. See also @{function:execx}.
*
* list($stdout, $stderr) = $future->resolvex();
*
* @param float Optional timeout after which resolution will pause and
* execution will return to the caller.
* @return pair <$stdout, $stderr> pair.
* @task resolve
*/
public function resolvex() {
$result = $this->resolve();
return $this->raiseResultError($result);
}
/**
* Resolve a command you expect to return valid JSON. Works like
* @{method:resolvex}, but also throws if stderr is nonempty, or stdout is not
* valid JSON. Returns a PHP array, decoded from the JSON command output.
*
* @param float Optional timeout after which resolution will pause and
* execution will return to the caller.
* @return array PHP array, decoded from JSON command output.
* @task resolve
*/
public function resolveJSON() {
list($stdout, $stderr) = $this->resolvex();
if (strlen($stderr)) {
$cmd = $this->getCommand();
throw new CommandException(
pht(
"JSON command '%s' emitted text to stderr when none was expected: %d",
$cmd,
$stderr),
$cmd,
0,
$stdout,
$stderr);
}
try {
return phutil_json_decode($stdout);
} catch (PhutilJSONParserException $ex) {
$cmd = $this->getCommand();
throw new CommandException(
pht(
"JSON command '%s' did not produce a valid JSON object on stdout: %s",
$cmd,
$stdout),
$cmd,
0,
$stdout,
$stderr);
}
}
/**
* Resolve the process by abruptly terminating it.
*
* @return list List of <err, stdout, stderr> results.
* @task resolve
*/
public function resolveKill() {
if (!$this->hasResult()) {
$signal = 9;
if ($this->proc) {
proc_terminate($this->proc, $signal);
}
$this->closeProcess();
$result = array(
128 + $signal,
$this->stdout,
$this->stderr,
);
$this->recordResult($result);
}
return $this->getResult();
}
private function recordResult(array $result) {
$resolve_on_error = $this->getResolveOnError();
if (!$resolve_on_error) {
$result = $this->raiseResultError($result);
}
$this->setResult($result);
}
private function raiseResultError($result) {
list($err, $stdout, $stderr) = $result;
if ($err) {
$cmd = $this->getCommand();
if ($this->getWasKilledByTimeout()) {
// NOTE: The timeout can be a float and PhutilNumber only handles
// integers, so just use "%s" to render it.
$message = pht(
'Command killed by timeout after running for more than %s seconds.',
$this->terminateTimeout);
} else {
$message = pht('Command failed with error #%d!', $err);
}
throw new CommandException(
$message,
$cmd,
$err,
$stdout,
$stderr);
}
return array($stdout, $stderr);
}
/* -( Internals )---------------------------------------------------------- */
/**
* Provides read sockets to the future core.
*
* @return list List of read sockets.
* @task internal
*/
public function getReadSockets() {
list($stdin, $stdout, $stderr) = $this->pipes;
$sockets = array();
if (isset($stdout) && !feof($stdout)) {
$sockets[] = $stdout;
}
if (isset($stderr) && !feof($stderr)) {
$sockets[] = $stderr;
}
return $sockets;
}
/**
* Provides write sockets to the future core.
*
* @return list List of write sockets.
* @task internal
*/
public function getWriteSockets() {
list($stdin, $stdout, $stderr) = $this->pipes;
$sockets = array();
if (isset($stdin) && $this->stdin->getByteLength() && !feof($stdin)) {
$sockets[] = $stdin;
}
return $sockets;
}
/**
* Determine if the read buffer is empty.
*
* @return bool True if the read buffer is empty.
* @task internal
*/
public function isReadBufferEmpty() {
return !strlen($this->stdout);
}
/**
* Determine if the write buffer is empty.
*
* @return bool True if the write buffer is empty.
* @task internal
*/
public function isWriteBufferEmpty() {
return !$this->getWriteBufferSize();
}
/**
* Determine the number of bytes in the write buffer.
*
* @return int Number of bytes in the write buffer.
* @task internal
*/
public function getWriteBufferSize() {
if (!$this->stdin) {
return 0;
}
return $this->stdin->getByteLength();
}
/**
* Reads some bytes from a stream, discarding output once a certain amount
* has been accumulated.
*
* @param resource Stream to read from.
* @param int Maximum number of bytes to return from $stream. If
* additional bytes are available, they will be read and
* discarded.
* @param string Human-readable description of stream, for exception
* message.
* @param int Maximum number of bytes to read.
* @return string The data read from the stream.
* @task internal
*/
private function readAndDiscard($stream, $limit, $description, $length) {
$output = '';
if ($length <= 0) {
return '';
}
do {
$data = fread($stream, min($length, 64 * 1024));
if (false === $data) {
throw new Exception(pht('Failed to read from %s', $description));
}
$read_bytes = strlen($data);
if ($read_bytes > 0 && $limit > 0) {
if ($read_bytes > $limit) {
$data = substr($data, 0, $limit);
}
$output .= $data;
$limit -= strlen($data);
}
if (strlen($output) >= $length) {
break;
}
} while ($read_bytes > 0);
return $output;
}
/**
* Begin or continue command execution.
*
* @return bool True if future has resolved.
* @task internal
*/
public function isReady() {
// NOTE: We have a soft dependencies on PhutilErrorTrap here, to avoid
// the need to build it into the Phage agent. Under normal circumstances,
// this class are always available.
if (!$this->pipes) {
$is_windows = phutil_is_windows();
if (!$this->start) {
// We might already have started the timer via initiating resolution.
$this->start = microtime(true);
}
$unmasked_command = $this->getCommand();
$unmasked_command = $unmasked_command->getUnmaskedString();
$pipes = array();
if ($this->hasEnv()) {
$env = $this->getEnv();
} else {
$env = null;
}
$cwd = $this->getCWD();
// NOTE: See note above about Phage.
if (class_exists('PhutilErrorTrap')) {
$trap = new PhutilErrorTrap();
} else {
$trap = null;
}
$spec = self::$descriptorSpec;
if ($is_windows) {
$stdout_file = new TempFile();
$stderr_file = new TempFile();
$stdout_handle = fopen($stdout_file, 'wb');
if (!$stdout_handle) {
throw new Exception(
pht(
'Unable to open stdout temporary file ("%s") for writing.',
$stdout_file));
}
$stderr_handle = fopen($stderr_file, 'wb');
if (!$stderr_handle) {
throw new Exception(
pht(
'Unable to open stderr temporary file ("%s") for writing.',
$stderr_file));
}
$spec = array(
0 => self::$descriptorSpec[0],
1 => $stdout_handle,
2 => $stderr_handle,
);
}
$proc = @proc_open(
$unmasked_command,
$spec,
$pipes,
$cwd,
$env,
array(
'bypass_shell' => true,
));
if ($trap) {
$err = $trap->getErrorsAsString();
$trap->destroy();
} else {
$err = error_get_last();
if ($err) {
$err = $err['message'];
}
}
if ($is_windows) {
fclose($stdout_handle);
fclose($stderr_handle);
}
if (!is_resource($proc)) {
// When you run an invalid command on a Linux system, the "proc_open()"
// works and then the process (really a "/bin/sh -c ...") exits after
// it fails to resolve the command.
// When you run an invalid command on a Windows system, we bypass the
// shell and the "proc_open()" itself fails. See also T13504. Fail the
// future immediately, acting as though it exited with an error code
// for consistency with Linux.
$result = array(
1,
'',
pht(
'Call to "proc_open()" to open a subprocess failed: %s',
$err),
);
$this->recordResult($result);
return true;
}
if ($is_windows) {
$stdout_handle = fopen($stdout_file, 'rb');
if (!$stdout_handle) {
throw new Exception(
pht(
'Unable to open stdout temporary file ("%s") for reading.',
$stdout_file));
}
$stderr_handle = fopen($stderr_file, 'rb');
if (!$stderr_handle) {
throw new Exception(
pht(
'Unable to open stderr temporary file ("%s") for reading.',
$stderr_file));
}
$pipes = array(
0 => $pipes[0],
1 => $stdout_handle,
2 => $stderr_handle,
);
$this->windowsStdoutTempFile = $stdout_file;
$this->windowsStderrTempFile = $stderr_file;
}
$this->pipes = $pipes;
$this->proc = $proc;
list($stdin, $stdout, $stderr) = $pipes;
if (!$is_windows) {
// On Windows, we redirect process standard output and standard error
// through temporary files. Files don't block, so we don't need to make
// these streams nonblocking.
if ((!stream_set_blocking($stdout, false)) ||
(!stream_set_blocking($stderr, false)) ||
(!stream_set_blocking($stdin, false))) {
$this->__destruct();
throw new Exception(pht('Failed to set streams nonblocking.'));
}
}
$this->tryToCloseStdin();
return false;
}
if (!$this->proc) {
return true;
}
list($stdin, $stdout, $stderr) = $this->pipes;
while (isset($this->stdin) && $this->stdin->getByteLength()) {
$write_segment = $this->stdin->getAnyPrefix();
try {
$bytes = fwrite($stdin, $write_segment);
} catch (RuntimeException $ex) {
// If the subprocess has exited, we may get a broken pipe error here
// in recent versions of PHP. There does not seem to be any way to
// get the actual error code other than reading the exception string.
// For now, treat this as if writes are blocked.
break;
}
if ($bytes === false) {
throw new Exception(pht('Unable to write to stdin!'));
} else if ($bytes) {
$this->stdin->removeBytesFromHead($bytes);
} else {
// Writes are blocked for now.
break;
}
}
$this->tryToCloseStdin();
// Read status before reading pipes so that we can never miss data that
// arrives between our last read and the process exiting.
$status = $this->procGetStatus();
$read_buffer_size = $this->readBufferSize;
$max_stdout_read_bytes = PHP_INT_MAX;
$max_stderr_read_bytes = PHP_INT_MAX;
if ($read_buffer_size !== null) {
$max_stdout_read_bytes = $read_buffer_size - strlen($this->stdout);
$max_stderr_read_bytes = $read_buffer_size - strlen($this->stderr);
}
if ($max_stdout_read_bytes > 0) {
$this->stdout .= $this->readAndDiscard(
$stdout,
$this->getStdoutSizeLimit() - strlen($this->stdout),
'stdout',
$max_stdout_read_bytes);
}
if ($max_stderr_read_bytes > 0) {
$this->stderr .= $this->readAndDiscard(
$stderr,
$this->getStderrSizeLimit() - strlen($this->stderr),
'stderr',
$max_stderr_read_bytes);
}
$is_done = false;
if (!$status['running']) {
// We may still have unread bytes on stdout or stderr, particularly if
// this future is being buffered and streamed. If we do, we don't want to
// consider the subprocess to have exited until we've read everything.
// See T9724 for context.
if (feof($stdout) && feof($stderr)) {
$is_done = true;
}
}
if ($is_done) {
$signal_info = null;
// If the subprocess got nuked with `kill -9`, we get a -1 exitcode.
// Upgrade this to a slightly more informative value by examining the
// terminating signal code.
$err = $status['exitcode'];
if ($err == -1) {
if ($status['signaled']) {
$signo = $status['termsig'];
$err = 128 + $signo;
$signal_info = pht(
"<Process was terminated by signal %s (%d).>\n\n",
phutil_get_signal_name($signo),
$signo);
}
}
$result = array(
$err,
$this->stdout,
$signal_info.$this->stderr,
);
$this->recordResult($result);
$this->closeProcess();
return true;
}
$elapsed = (microtime(true) - $this->start);
if ($this->terminateTimeout && ($elapsed >= $this->terminateTimeout)) {
if (!$this->didTerminate) {
$this->killedByTimeout = true;
$this->sendTerminateSignal();
return false;
}
}
if ($this->killTimeout && ($elapsed >= $this->killTimeout)) {
$this->killedByTimeout = true;
$this->resolveKill();
return true;
}
}
/**
* @return void
* @task internal
*/
public function __destruct() {
if (!$this->proc) {
return;
}
// NOTE: If we try to proc_close() an open process, we hang indefinitely. To
// avoid this, kill the process explicitly if it's still running.
$status = $this->procGetStatus();
if ($status['running']) {
$this->sendTerminateSignal();
if (!$this->waitForExit(5)) {
$this->resolveKill();
}
} else {
$this->closeProcess();
}
}
/**
* Close and free resources if necessary.
*
* @return void
* @task internal
*/
private function closeProcess() {
foreach ($this->pipes as $pipe) {
if (isset($pipe)) {
@fclose($pipe);
}
}
$this->pipes = array(null, null, null);
if ($this->proc) {
@proc_close($this->proc);
$this->proc = null;
}
$this->stdin = null;
unset($this->windowsStdoutTempFile);
unset($this->windowsStderrTempFile);
}
/**
* Execute `proc_get_status()`, but avoid pitfalls.
*
* @return dict Process status.
* @task internal
*/
private function procGetStatus() {
// After the process exits, we only get one chance to read proc_get_status()
// before it starts returning garbage. Make sure we don't throw away the
// last good read.
if ($this->procStatus) {
if (!$this->procStatus['running']) {
return $this->procStatus;
}
}
+
+ // See T13555. This may occur if you call "getPID()" on a future which
+ // exited immediately without ever creating a valid subprocess.
+
+ if (!$this->proc) {
+ throw new Exception(
+ pht(
+ 'Attempting to get subprocess status in "ExecFuture" with no '.
+ 'valid subprocess.'));
+ }
+
$this->procStatus = proc_get_status($this->proc);
return $this->procStatus;
}
/**
* Try to close stdin, if we're done using it. This keeps us from hanging if
* the process on the other end of the pipe is waiting for EOF.
*
* @return void
* @task internal
*/
private function tryToCloseStdin() {
if (!$this->closePipe) {
// We've been told to keep the pipe open by a call to write(..., true).
return;
}
if ($this->stdin->getByteLength()) {
// We still have bytes to write.
return;
}
list($stdin) = $this->pipes;
if (!$stdin) {
// We've already closed stdin.
return;
}
// There's nothing stopping us from closing stdin, so close it.
@fclose($stdin);
$this->pipes[0] = null;
}
public function getDefaultWait() {
$wait = parent::getDefaultWait();
$next_timeout = $this->getNextTimeout();
if ($next_timeout) {
if (!$this->start) {
$this->start = microtime(true);
}
$elapsed = (microtime(true) - $this->start);
$wait = max(0, min($next_timeout - $elapsed, $wait));
}
return $wait;
}
private function getNextTimeout() {
if ($this->didTerminate) {
return $this->killTimeout;
} else {
return $this->terminateTimeout;
}
}
private function sendTerminateSignal() {
$this->didTerminate = true;
proc_terminate($this->proc);
return $this;
}
private function waitForExit($duration) {
$start = microtime(true);
while (true) {
$status = $this->procGetStatus();
if (!$status['running']) {
return true;
}
$waited = (microtime(true) - $start);
if ($waited > $duration) {
return false;
}
}
}
protected function getServiceProfilerStartParameters() {
return array(
'type' => 'exec',
'command' => phutil_string_cast($this->getCommand()),
);
}
protected function getServiceProfilerResultParameters() {
if ($this->hasResult()) {
$result = $this->getResult();
$err = idx($result, 0);
} else {
$err = null;
}
return array(
'err' => $err,
);
}
}
diff --git a/src/repository/graph/query/ArcanistGitCommitGraphQuery.php b/src/repository/graph/query/ArcanistGitCommitGraphQuery.php
index 2491a549..53587e45 100644
--- a/src/repository/graph/query/ArcanistGitCommitGraphQuery.php
+++ b/src/repository/graph/query/ArcanistGitCommitGraphQuery.php
@@ -1,209 +1,209 @@
<?php
final class ArcanistGitCommitGraphQuery
extends ArcanistCommitGraphQuery {
private $seen = array();
private $futures = array();
private $iterators = array();
private $cursors = array();
private $iteratorKey = 0;
public function execute() {
$this->newFutures();
$this->executeIterators();
return $this->seen;
}
private function newFutures() {
$head_hashes = $this->getHeadHashes();
$exact_hashes = $this->getExactHashes();
if (!$head_hashes && !$exact_hashes) {
throw new Exception(pht('Need head hashes or exact hashes!'));
}
$api = $this->getRepositoryAPI();
$ref_lists = array();
if ($head_hashes) {
$refs = array();
if ($head_hashes !== null) {
foreach ($head_hashes as $hash) {
$refs[] = $hash;
}
}
$tail_hashes = $this->getTailHashes();
if ($tail_hashes !== null) {
foreach ($tail_hashes as $tail_hash) {
$refs[] = sprintf('^%s^@', $tail_hash);
}
}
$ref_lists[] = $refs;
}
if ($exact_hashes !== null) {
foreach ($exact_hashes as $exact_hash) {
$ref_list = array();
$ref_list[] = $exact_hash;
$ref_list[] = sprintf('^%s^@', $exact_hash);
$ref_list[] = '--';
$ref_lists[] = $ref_list;
}
}
$flags = array();
$min_epoch = $this->getMinimumEpoch();
if ($min_epoch !== null) {
$flags[] = '--after';
$flags[] = date('c', $min_epoch);
}
$max_epoch = $this->getMaximumEpoch();
if ($max_epoch !== null) {
$flags[] = '--before';
$flags[] = date('c', $max_epoch);
}
foreach ($ref_lists as $ref_list) {
$ref_blob = implode("\n", $ref_list)."\n";
$fields = array(
'%e',
'%H',
'%P',
'%ct',
'%B',
);
$format = implode('%x02', $fields).'%x01';
$future = $api->newFuture(
'log --format=%s %Ls --stdin',
$format,
$flags);
$future->write($ref_blob);
$future->setResolveOnError(true);
$this->futures[] = $future;
}
}
private function executeIterators() {
while ($this->futures || $this->iterators) {
$iterator_limit = 8;
while (count($this->iterators) < $iterator_limit) {
if (!$this->futures) {
break;
}
$future = array_pop($this->futures);
- $future->startFuture();
+ $future->start();
$iterator = id(new LinesOfALargeExecFuture($future))
->setDelimiter("\1");
$iterator->rewind();
$iterator_key = $this->getNextIteratorKey();
$this->iterators[$iterator_key] = $iterator;
}
$limit = $this->getLimit();
foreach ($this->iterators as $iterator_key => $iterator) {
$this->executeIterator($iterator_key, $iterator);
if ($limit) {
if (count($this->seen) >= $limit) {
return;
}
}
}
}
}
private function getNextIteratorKey() {
return $this->iteratorKey++;
}
private function executeIterator($iterator_key, $lines) {
$graph = $this->getGraph();
$limit = $this->getLimit();
$is_done = false;
while (true) {
if (!$lines->valid()) {
$is_done = true;
break;
}
$line = $lines->current();
$lines->next();
if ($line === "\n") {
continue;
}
$fields = explode("\2", $line);
if (count($fields) !== 5) {
throw new Exception(
pht(
'Failed to split line "%s" from "git log".',
$line));
}
list($encoding, $hash, $parents, $commit_epoch, $message) = $fields;
// TODO: Handle encoding, see DiffusionLowLevelCommitQuery.
$node = $graph->getNode($hash);
if (!$node) {
$node = $graph->newNode($hash);
}
$this->seen[$hash] = $node;
$node
->setCommitMessage($message)
->setCommitEpoch((int)$commit_epoch);
if (strlen($parents)) {
$parents = explode(' ', $parents);
$parent_nodes = array();
foreach ($parents as $parent) {
$parent_node = $graph->getNode($parent);
if (!$parent_node) {
$parent_node = $graph->newNode($parent);
}
$parent_nodes[$parent] = $parent_node;
$parent_node->addChildNode($node);
}
$node->setParentNodes($parent_nodes);
} else {
$parents = array();
}
if ($limit) {
if (count($this->seen) >= $limit) {
break;
}
}
}
if ($is_done) {
unset($this->iterators[$iterator_key]);
}
}
}

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jan 24, 05:28 (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
601542
Default Alt Text
(52 KB)

Event Timeline