diff --git a/platform_utils.py b/platform_utils.py index a280982a..00c51d9b 100644 --- a/platform_utils.py +++ b/platform_utils.py @@ -15,11 +15,8 @@ import errno import os import platform -from queue import Queue -import select import shutil import stat -from threading import Thread def isWindows(): @@ -31,161 +28,6 @@ def isWindows(): return platform.system() == "Windows" -class FileDescriptorStreams(object): - """ Platform agnostic abstraction enabling non-blocking I/O over a - collection of file descriptors. This abstraction is required because - fctnl(os.O_NONBLOCK) is not supported on Windows. - """ - @classmethod - def create(cls): - """ Factory method: instantiates the concrete class according to the - current platform. - """ - if isWindows(): - return _FileDescriptorStreamsThreads() - else: - return _FileDescriptorStreamsNonBlocking() - - def __init__(self): - self.streams = [] - - def add(self, fd, dest, std_name): - """ Wraps an existing file descriptor as a stream. - """ - self.streams.append(self._create_stream(fd, dest, std_name)) - - def remove(self, stream): - """ Removes a stream, when done with it. - """ - self.streams.remove(stream) - - @property - def is_done(self): - """ Returns True when all streams have been processed. - """ - return len(self.streams) == 0 - - def select(self): - """ Returns the set of streams that have data available to read. - The returned streams each expose a read() and a close() method. - When done with a stream, call the remove(stream) method. - """ - raise NotImplementedError - - def _create_stream(self, fd, dest, std_name): - """ Creates a new stream wrapping an existing file descriptor. - """ - raise NotImplementedError - - -class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams): - """ Implementation of FileDescriptorStreams for platforms that support - non blocking I/O. - """ - def __init__(self): - super(_FileDescriptorStreamsNonBlocking, self).__init__() - self._poll = select.poll() - self._fd_to_stream = {} - - class Stream(object): - """ Encapsulates a file descriptor """ - - def __init__(self, fd, dest, std_name): - self.fd = fd - self.dest = dest - self.std_name = std_name - self.set_non_blocking() - - def set_non_blocking(self): - import fcntl - flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) - fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - def fileno(self): - return self.fd.fileno() - - def read(self): - return self.fd.read(4096) - - def close(self): - self.fd.close() - - def _create_stream(self, fd, dest, std_name): - stream = self.Stream(fd, dest, std_name) - self._fd_to_stream[stream.fileno()] = stream - self._poll.register(stream, select.POLLIN) - return stream - - def remove(self, stream): - self._poll.unregister(stream) - del self._fd_to_stream[stream.fileno()] - super(_FileDescriptorStreamsNonBlocking, self).remove(stream) - - def select(self): - return [self._fd_to_stream[fd] for fd, _ in self._poll.poll()] - - -class _FileDescriptorStreamsThreads(FileDescriptorStreams): - """ Implementation of FileDescriptorStreams for platforms that don't support - non blocking I/O. This implementation requires creating threads issuing - blocking read operations on file descriptors. - """ - - def __init__(self): - super(_FileDescriptorStreamsThreads, self).__init__() - # The queue is shared accross all threads so we can simulate the - # behavior of the select() function - self.queue = Queue(10) # Limit incoming data from streams - - def _create_stream(self, fd, dest, std_name): - return self.Stream(fd, dest, std_name, self.queue) - - def select(self): - # Return only one stream at a time, as it is the most straighforward - # thing to do and it is compatible with the select() function. - item = self.queue.get() - stream = item.stream - stream.data = item.data - return [stream] - - class QueueItem(object): - """ Item put in the shared queue """ - - def __init__(self, stream, data): - self.stream = stream - self.data = data - - class Stream(object): - """ Encapsulates a file descriptor """ - - def __init__(self, fd, dest, std_name, queue): - self.fd = fd - self.dest = dest - self.std_name = std_name - self.queue = queue - self.data = None - self.thread = Thread(target=self.read_to_queue) - self.thread.daemon = True - self.thread.start() - - def close(self): - self.fd.close() - - def read(self): - data = self.data - self.data = None - return data - - def read_to_queue(self): - """ The thread function: reads everything from the file descriptor into - the shared queue and terminates when reaching EOF. - """ - for line in iter(self.fd.readline, b''): - self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line)) - self.fd.close() - self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, b'')) - - def symlink(source, link_name): """Creates a symbolic link pointing to source named link_name. Note: On Windows, source must exist on disk, as the implementation needs