mirror of
https://gerrit.googlesource.com/git-repo
synced 2024-12-21 07:16:21 +00:00
platform_utils: delete unused FileDescriptorStreams APIs
Now that we've converted the few users of this over to subprocess APIs, we don't need this anymore. It's been a bit hairy to maintain across different operating systems, so there's no desire to bring it back. Using multiprocessing Pool to batch things has been working better in general anyways. Change-Id: I10769e96f60ecf27a80d8cc2aa0d1b199085252e Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297682 Reviewed-by: Michael Mortensen <mmortensen@google.com> Tested-by: Mike Frysinger <vapier@google.com>
This commit is contained in:
parent
be24a54d9c
commit
f0925c482f
@ -15,11 +15,8 @@
|
|||||||
import errno
|
import errno
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
from queue import Queue
|
|
||||||
import select
|
|
||||||
import shutil
|
import shutil
|
||||||
import stat
|
import stat
|
||||||
from threading import Thread
|
|
||||||
|
|
||||||
|
|
||||||
def isWindows():
|
def isWindows():
|
||||||
@ -31,161 +28,6 @@ def isWindows():
|
|||||||
return platform.system() == "Windows"
|
return platform.system() == "Windows"
|
||||||
|
|
||||||
|
|
||||||
class FileDescriptorStreams(object):
|
|
||||||
""" Platform agnostic abstraction enabling non-blocking I/O over a
|
|
||||||
collection of file descriptors. This abstraction is required because
|
|
||||||
fctnl(os.O_NONBLOCK) is not supported on Windows.
|
|
||||||
"""
|
|
||||||
@classmethod
|
|
||||||
def create(cls):
|
|
||||||
""" Factory method: instantiates the concrete class according to the
|
|
||||||
current platform.
|
|
||||||
"""
|
|
||||||
if isWindows():
|
|
||||||
return _FileDescriptorStreamsThreads()
|
|
||||||
else:
|
|
||||||
return _FileDescriptorStreamsNonBlocking()
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.streams = []
|
|
||||||
|
|
||||||
def add(self, fd, dest, std_name):
|
|
||||||
""" Wraps an existing file descriptor as a stream.
|
|
||||||
"""
|
|
||||||
self.streams.append(self._create_stream(fd, dest, std_name))
|
|
||||||
|
|
||||||
def remove(self, stream):
|
|
||||||
""" Removes a stream, when done with it.
|
|
||||||
"""
|
|
||||||
self.streams.remove(stream)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_done(self):
|
|
||||||
""" Returns True when all streams have been processed.
|
|
||||||
"""
|
|
||||||
return len(self.streams) == 0
|
|
||||||
|
|
||||||
def select(self):
|
|
||||||
""" Returns the set of streams that have data available to read.
|
|
||||||
The returned streams each expose a read() and a close() method.
|
|
||||||
When done with a stream, call the remove(stream) method.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
def _create_stream(self, fd, dest, std_name):
|
|
||||||
""" Creates a new stream wrapping an existing file descriptor.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
|
|
||||||
class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams):
|
|
||||||
""" Implementation of FileDescriptorStreams for platforms that support
|
|
||||||
non blocking I/O.
|
|
||||||
"""
|
|
||||||
def __init__(self):
|
|
||||||
super(_FileDescriptorStreamsNonBlocking, self).__init__()
|
|
||||||
self._poll = select.poll()
|
|
||||||
self._fd_to_stream = {}
|
|
||||||
|
|
||||||
class Stream(object):
|
|
||||||
""" Encapsulates a file descriptor """
|
|
||||||
|
|
||||||
def __init__(self, fd, dest, std_name):
|
|
||||||
self.fd = fd
|
|
||||||
self.dest = dest
|
|
||||||
self.std_name = std_name
|
|
||||||
self.set_non_blocking()
|
|
||||||
|
|
||||||
def set_non_blocking(self):
|
|
||||||
import fcntl
|
|
||||||
flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
|
|
||||||
fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
||||||
|
|
||||||
def fileno(self):
|
|
||||||
return self.fd.fileno()
|
|
||||||
|
|
||||||
def read(self):
|
|
||||||
return self.fd.read(4096)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self.fd.close()
|
|
||||||
|
|
||||||
def _create_stream(self, fd, dest, std_name):
|
|
||||||
stream = self.Stream(fd, dest, std_name)
|
|
||||||
self._fd_to_stream[stream.fileno()] = stream
|
|
||||||
self._poll.register(stream, select.POLLIN)
|
|
||||||
return stream
|
|
||||||
|
|
||||||
def remove(self, stream):
|
|
||||||
self._poll.unregister(stream)
|
|
||||||
del self._fd_to_stream[stream.fileno()]
|
|
||||||
super(_FileDescriptorStreamsNonBlocking, self).remove(stream)
|
|
||||||
|
|
||||||
def select(self):
|
|
||||||
return [self._fd_to_stream[fd] for fd, _ in self._poll.poll()]
|
|
||||||
|
|
||||||
|
|
||||||
class _FileDescriptorStreamsThreads(FileDescriptorStreams):
|
|
||||||
""" Implementation of FileDescriptorStreams for platforms that don't support
|
|
||||||
non blocking I/O. This implementation requires creating threads issuing
|
|
||||||
blocking read operations on file descriptors.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super(_FileDescriptorStreamsThreads, self).__init__()
|
|
||||||
# The queue is shared accross all threads so we can simulate the
|
|
||||||
# behavior of the select() function
|
|
||||||
self.queue = Queue(10) # Limit incoming data from streams
|
|
||||||
|
|
||||||
def _create_stream(self, fd, dest, std_name):
|
|
||||||
return self.Stream(fd, dest, std_name, self.queue)
|
|
||||||
|
|
||||||
def select(self):
|
|
||||||
# Return only one stream at a time, as it is the most straighforward
|
|
||||||
# thing to do and it is compatible with the select() function.
|
|
||||||
item = self.queue.get()
|
|
||||||
stream = item.stream
|
|
||||||
stream.data = item.data
|
|
||||||
return [stream]
|
|
||||||
|
|
||||||
class QueueItem(object):
|
|
||||||
""" Item put in the shared queue """
|
|
||||||
|
|
||||||
def __init__(self, stream, data):
|
|
||||||
self.stream = stream
|
|
||||||
self.data = data
|
|
||||||
|
|
||||||
class Stream(object):
|
|
||||||
""" Encapsulates a file descriptor """
|
|
||||||
|
|
||||||
def __init__(self, fd, dest, std_name, queue):
|
|
||||||
self.fd = fd
|
|
||||||
self.dest = dest
|
|
||||||
self.std_name = std_name
|
|
||||||
self.queue = queue
|
|
||||||
self.data = None
|
|
||||||
self.thread = Thread(target=self.read_to_queue)
|
|
||||||
self.thread.daemon = True
|
|
||||||
self.thread.start()
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self.fd.close()
|
|
||||||
|
|
||||||
def read(self):
|
|
||||||
data = self.data
|
|
||||||
self.data = None
|
|
||||||
return data
|
|
||||||
|
|
||||||
def read_to_queue(self):
|
|
||||||
""" The thread function: reads everything from the file descriptor into
|
|
||||||
the shared queue and terminates when reaching EOF.
|
|
||||||
"""
|
|
||||||
for line in iter(self.fd.readline, b''):
|
|
||||||
self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line))
|
|
||||||
self.fd.close()
|
|
||||||
self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, b''))
|
|
||||||
|
|
||||||
|
|
||||||
def symlink(source, link_name):
|
def symlink(source, link_name):
|
||||||
"""Creates a symbolic link pointing to source named link_name.
|
"""Creates a symbolic link pointing to source named link_name.
|
||||||
Note: On Windows, source must exist on disk, as the implementation needs
|
Note: On Windows, source must exist on disk, as the implementation needs
|
||||||
|
Loading…
Reference in New Issue
Block a user