Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
allworldit
python
forkedsubprocess
Commits
c4f5cf7b
Commit
c4f5cf7b
authored
Aug 31, 2019
by
Nigel Kukard
Browse files
Initial commit
parents
Pipeline
#4379
passed with stage
in 27 seconds
Changes
7
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
.gitlab-ci.yml
0 → 100644
View file @
c4f5cf7b
image
:
idmslinux/rolling
# Stages we need to progress through
stages
:
-
test
test_job
:
stage
:
test
script
:
# Create environment
-
pacman -Syu --noconfirm
-
pacman -S --noconfirm grep python
-
pacman -S --noconfirm python-pythondialog python-pytest python-pytest-runner python-pytest-cov python-pylint python-isort mypy pylama python-mccabe python-requests
# Run tests
-
python setup.py test
# Artifacts
artifacts
:
expire_in
:
1 day
paths
:
-
build/
LICENSE
0 → 100644
View file @
c4f5cf7b
Copyright (C) 2019, AllWorldIT.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
README.md
0 → 100644
View file @
c4f5cf7b
# Forked subprocess support for Python
This package allows for the running of a subprocess in the background, including the sending of data to the subprocess and
receiving of data from it. Threads are used to achieve the reading/writing of data.
## Basic usage
```
python
from
forkedsubprocess
import
ForkedSubprocess
process
=
ForkedSubprocess
([
'cat'
])
process
.
run
()
process
.
send
(
'some string'
)
returncode
=
process
.
wait
()
stdout
=
process
.
stdout
stderr
=
process
.
stderr
output
=
process
.
output
```
## Writing output to console
Output can be written to the console at the same time using
`enable_output=True`
.
```
python
from
forkedsubprocess
import
ForkedSubprocess
process
=
ForkedSubprocess
([
'cat'
],
enable_output
=
True
)
process
.
run
()
process
.
send
(
'some string'
)
returncode
=
process
.
wait
()
stdout
=
process
.
stdout
stderr
=
process
.
stderr
output
=
process
.
output
```
## Using a callback
A callback can be used for each line of output received.
```
python
from
forkedsubprocess
import
ForkedSubprocess
def
my_callback
(
line
:
str
):
print
(
f
'LINE:
{
line
}
'
)
process
=
ForkedSubprocess
([
'cat'
],
output_callback
=
my_callback
)
process
.
run
()
process
.
send
(
'some string'
)
returncode
=
process
.
wait
()
stdout
=
process
.
stdout
stderr
=
process
.
stderr
output
=
process
.
output
```
# License
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
setup.cfg
0 → 100644
View file @
c4f5cf7b
[aliases]
test = pytest
[tool:pytest]
addopts = --pylama --cov=src/ tests/ src/
[coverage:run]
branch = true
parallel = true
[coverage:report]
show_missing = true
exclude_lines =
pragma: no cover
def __repr__
if self.debug:
raise AssertionError
raise NotImplementedError
if __name__ == ['"]__main__['"]:$
[pylama]
linters = pep257,pycodestyle,pyflakes,pylint,mccabe,mypy,isort
# D202: No blank lines allowed after function docstring
# D203: 1 blank line required before class docstring
# D213: Multi-line docstring summary should start at the second line
# R0201: Method could be a function
# R0903: Too few public methods
ignore = D202,D203,D213,R0201,R0903
[pylama:pycodestyle]
max_line_length = 132
[pylama:pylint]
max_line_length = 132
[mypy]
ignore_missing_imports = true
setup.py
0 → 100644
View file @
c4f5cf7b
"""Forked Subprocess Support."""
import
re
from
setuptools
import
find_packages
,
setup
main_py
=
open
(
'src/forkedsubprocess/__init__.py'
).
read
()
metadata
=
dict
(
re
.
findall
(
"__([a-z]+)__ = '([^']+)'"
,
main_py
))
NAME
=
'forkedsubprocess'
VERSION
=
metadata
[
'version'
]
with
open
(
"README.md"
,
"r"
)
as
fh
:
LONG_DESCRIPTION
=
fh
.
read
()
setup
(
name
=
NAME
,
version
=
VERSION
,
author
=
"Nigel Kukard"
,
author_email
=
"nkukard@lbsd.net"
,
description
=
"Forked subprocess support for Python"
,
long_description
=
LONG_DESCRIPTION
,
long_description_content_type
=
"text/markdown"
,
url
=
"https://gitlab.devlabs.linuxassist.net/allworldit/python/forkedsubprocess"
,
classifiers
=
[
"Development Status :: 5 - Production/Stable"
,
"Intended Audience :: Developers"
,
"License :: OSI Approved :: MIT License"
,
"Operating System :: POSIX :: Linux"
,
"Programming Language :: Python :: 3 :: Only"
,
"Topic :: Software Development :: Libraries :: Python Modules"
,
],
python_requires
=
'>=3.6'
,
packages
=
find_packages
(
'src'
,
exclude
=
[
'tests'
]),
package_dir
=
{
''
:
'src'
},
package_data
=
{
''
:
[
'LICENSE'
]}
)
src/forkedsubprocess/__init__.py
0 → 100644
View file @
c4f5cf7b
"""Forked subprocesses support for Python."""
import
subprocess
import
threading
from
typing
import
IO
,
Any
,
Callable
,
Dict
,
List
,
Optional
__version__
=
'1.0.2'
# Define our callback types
InputCallback
=
Callable
[[],
str
]
OutputCallback
=
Callable
[[
str
],
None
]
# Only export these 3 items
__all__
=
[
'ForkedSubprocess'
,
'ForkedSubprocessNotRunningException'
,
'InputCallback'
,
'OutputCallback'
]
class
ForkedSubprocessNotRunningException
(
RuntimeError
):
"""Exception raised when a method is called on a subprocess which is supposed to be running."""
class
ForkedSubprocessIOBase
:
"""The ForkedSubprocessIOBase class handles the basics of readers and writers."""
# Pipe to read from
_pipe
:
IO
[
Any
]
# Thread
_thread
:
threading
.
Thread
def
__init__
(
self
,
pipe
:
IO
[
Any
]):
"""Initialize our class.
The lines_lists argument is a list of lists to add lines to.
This is so we can add to stdouterr and stdout/stderr at the same time.
"""
# Setup pipe we're going to be using
self
.
_pipe
=
pipe
# Setup thread
self
.
_thread
=
threading
.
Thread
(
target
=
self
.
_thread_target
)
def
start
(
self
):
"""Start the thread."""
self
.
_thread
.
start
()
def
wait
(
self
):
"""Wait until the thread exits."""
self
.
_thread
.
join
()
def
_thread_target
(
self
):
"""Responsible for reading data."""
raise
RuntimeError
(
'The method "_thread_target" should of been overridden'
)
class
ForkedSubprocessReader
(
ForkedSubprocessIOBase
):
"""The ForkedSubprocessReader class handles reading data from a stream."""
# Lines we've collected
_lines_lists
:
List
[
List
[
str
]]
# Output to terminal?
_output
:
bool
# Output callback to use
_output_callback
:
Optional
[
OutputCallback
]
def
__init__
(
self
,
pipe
:
IO
[
Any
],
lines_lists
:
List
[
List
[
str
]],
**
kwargs
):
"""Initialize our class.
The lines_lists argument is a list of lists to add lines to.
This is so we can add to stdouterr and stdout/stderr at the same time.
"""
# Initialize base class
super
().
__init__
(
pipe
)
# Setup lists we're going to output to
self
.
_lines_lists
=
lines_lists
# Are we going to output to the terminal?
self
.
_output
=
kwargs
.
get
(
'output'
,
False
)
# Setup the output callback if we have one
self
.
_output_callback
=
kwargs
.
get
(
'output_callback'
,
None
)
def
_thread_target
(
self
):
"""Responsible for reading data."""
# Loop with raw lines received
for
raw_line
in
iter
(
self
.
_pipe
.
readline
,
''
):
# Strip off newline
line
=
raw_line
.
rstrip
(
'
\n
'
)
# Add to all the lines lists
for
lines
in
self
.
_lines_lists
:
lines
.
append
(
line
)
# Are we outputting to terminal aswell?
if
self
.
_output
:
print
(
f
'
{
line
}
\n
'
)
# Are we going to send the line to the output callback
if
self
.
_output_callback
:
self
.
_output_callback
(
line
)
class
ForkedSubprocessWriter
(
ForkedSubprocessIOBase
):
"""The ForkedSubprocessWriter class handles writing an list to a stream."""
# Writer condition
_writer_cond
:
threading
.
Condition
# Lines we're going to be sending
_lines
:
List
[
str
]
# Trigger to stop thread and exit
_stop
:
bool
# Are we closed?
_closed
:
bool
def
__init__
(
self
,
pipe
:
IO
[
Any
]):
"""Initialize our class."""
# Initialize base class
super
().
__init__
(
pipe
)
# Create the thread and its condition
self
.
_writer_cond
=
threading
.
Condition
()
# Setup the lines list we're going to be sending
self
.
_lines
=
[]
# Setup stop condition
self
.
_stop
=
False
# Closed status
self
.
_closed
=
False
def
send
(
self
,
text
:
str
):
"""Send text to the pipe."""
self
.
_lines
.
append
(
text
)
# Use lock
with
self
.
_writer_cond
:
self
.
_writer_cond
.
notify_all
()
def
stop
(
self
):
"""Send stop signal."""
# Notify threads to process, and in this case exit
self
.
_stop
=
True
# Use lock
with
self
.
_writer_cond
:
self
.
_writer_cond
.
notify_all
()
def
_thread_target
(
self
):
"""Worker of this class."""
# Carry on looping
while
True
:
# If we're stopping break
if
self
.
_stop
:
break
# Use lock
with
self
.
_writer_cond
:
# If there is no lines, wait
if
not
self
.
_lines
:
self
.
_writer_cond
.
wait
()
# Loop with lines...
while
self
.
_lines
:
line
=
self
.
_lines
.
pop
(
0
)
self
.
_pipe
.
write
(
f
'
{
line
}
\n
'
)
# Close pipe as we're done
self
.
_pipe
.
close
()
# pylama: ignore=R0902
class
ForkedSubprocess
:
"""The ForkedSubprocess class handles curating a subprocess to send and receive data."""
# Command and args to create the subprocess
_args
:
List
[
str
]
# Environment to pass to subprocess
_env
:
Optional
[
Dict
[
str
,
str
]]
# This is the process we created
_process
:
subprocess
.
Popen
# STDIO readers and writers
_stdout_reader
:
Optional
[
ForkedSubprocessReader
]
_stderr_reader
:
Optional
[
ForkedSubprocessReader
]
_stdin_writer
:
Optional
[
ForkedSubprocessWriter
]
# Output
_stdout
:
List
[
str
]
_stderr
:
List
[
str
]
_output
:
List
[
str
]
# Are we going to output lines we received
_enable_output
:
bool
# Callback for output lines we received
_output_callback
:
Optional
[
OutputCallback
]
def
__init__
(
self
,
args
:
List
[
str
],
**
kwargs
):
"""Initialize our class."""
self
.
_args
=
args
# Check if we have an ENV to pass to the subprocess
self
.
_env
=
kwargs
.
get
(
'env'
,
None
)
# Enable output to terminal
self
.
_enable_output
=
kwargs
.
get
(
'enable_output'
,
False
)
# Check if we have an output callback
self
.
_output_callback
=
kwargs
.
get
(
'output_callback'
,
None
)
# Clear internal data
self
.
_clear_data
()
def
run
(
self
):
"""Run the subprocess."""
# Clear internal data
self
.
_clear_data
()
# Run process
self
.
_process
=
subprocess
.
Popen
(
self
.
_args
,
env
=
self
.
_env
,
stdin
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
,
text
=
True
)
# Check what we're going to output to the terminal
enable_stdout
=
False
enable_stderr
=
False
if
self
.
_enable_output
:
enable_stdout
=
True
enable_stderr
=
True
# Create the readers and writers
self
.
_stdout_reader
=
ForkedSubprocessReader
(
self
.
_process
.
stdout
,
[
self
.
_stdout
,
self
.
_output
],
output
=
enable_stdout
,
output_callback
=
self
.
_output_callback
)
self
.
_stderr_reader
=
ForkedSubprocessReader
(
self
.
_process
.
stderr
,
[
self
.
_stderr
,
self
.
_output
],
output
=
enable_stderr
,
output_callback
=
self
.
_output_callback
)
self
.
_stdin_writer
=
ForkedSubprocessWriter
(
self
.
_process
.
stdin
)
# Start the readers and writers
self
.
_stdout_reader
.
start
()
self
.
_stderr_reader
.
start
()
self
.
_stdin_writer
.
start
()
def
send
(
self
,
text
:
str
):
"""Send the subprocess text over stdin."""
# Raise an error
if
not
self
.
_process
:
raise
ForkedSubprocessNotRunningException
(
'send() was called on a subprocess which is not running'
)
# We use the IF so we don't trigger a typing error on the None possibility
if
self
.
_stdin_writer
:
self
.
_stdin_writer
.
send
(
text
)
def
wait
(
self
)
->
int
:
"""Wait for subprocess to exit."""
# Raise an error
if
not
self
.
_process
:
raise
ForkedSubprocessNotRunningException
(
'wait() was called on a subprocess which is not running'
)
# Stop writer
self
.
_stdin_writer
.
stop
()
# Wait for process exit
self
.
_process
.
wait
()
# Wait for threads to read data and exit
self
.
_stdin_writer
.
wait
()
self
.
_stdout_reader
.
wait
()
self
.
_stderr_reader
.
wait
()
return
self
.
_process
.
returncode
@
property
def
output
(
self
):
"""Return the output we got from the process."""
return
self
.
_output
@
property
def
stderr
(
self
):
"""Return the stderr we got from the process."""
return
self
.
_stderr
@
property
def
stdout
(
self
):
"""Return the stdout we got from the process."""
return
self
.
_stdout
def
_clear_data
(
self
):
"""Clear internal data."""
self
.
_process
=
None
self
.
_stdout_reader
=
None
self
.
_stderr_reader
=
None
self
.
_stdin_writer
=
None
self
.
_stdout
=
[]
self
.
_stderr
=
[]
self
.
_output
=
[]
tests/test_forkedsubprocess.py
0 → 100644
View file @
c4f5cf7b
"""Tests for forkedsubprocess."""
import
pytest
from
forkedsubprocess
import
(
ForkedSubprocess
,
ForkedSubprocessNotRunningException
)
class
TestForkedSubprocess
():
"""Test the ForkedSubprocess class."""
def
test_wait_exception_without_running_process
(
self
):
"""Test ForkedSubprocess wait() failure with no running process."""
process
=
ForkedSubprocess
([
'echo'
,
'hello world'
])
with
pytest
.
raises
(
ForkedSubprocessNotRunningException
):
process
.
wait
()
def
test_send_exception_without_running_process
(
self
):
"""Test ForkedSubprocess send() failure with no running process."""
process
=
ForkedSubprocess
([
'cat'
])
with
pytest
.
raises
(
ForkedSubprocessNotRunningException
):
process
.
send
(
'line'
)
def
test_stdout
(
self
):
"""Test stdout."""
process
=
ForkedSubprocess
([
'echo'
,
'hello world'
])
process
.
run
()
process
.
wait
()
assert
process
.
output
==
[
'hello world'
],
'Our test string "hello world" should be returned in the .output property'
assert
process
.
stdout
==
[
'hello world'
],
'Our test string "hello world" should be returned in the .stdout property'
assert
process
.
stderr
==
[],
'Nothing should be returned in the .stderr property'
def
test_stderr
(
self
):
"""Test stderr."""
process
=
ForkedSubprocess
([
'sh'
,
'-c'
,
'echo "test stderr" >&2'
])
process
.
run
()
process
.
wait
()
assert
process
.
output
==
[
'test stderr'
],
'Our test string "test stderr" should be returned in the .output property'
assert
process
.
stdout
==
[],
'Nothing should of been returned in the .stdout property'
assert
process
.
stderr
==
[
'test stderr'
],
'Our test string "test stderr" should of been returned in the .stderr property'
def
test_stdout_stderr
(
self
):
"""Test stdout and stderr."""
process
=
ForkedSubprocess
([
'sh'
,
'-c'
,
'echo "test stdout"; echo "test stderr" >&2'
])
process
.
run
()
process
.
wait
()
assert
len
(
process
.
output
)
==
2
,
'We should have two lines returned'
assert
'test stdout'
in
process
.
output
,
'Our test strings "test stdout" should be returned in the .output property'
assert
'test stderr'
in
process
.
output
,
'Our test strings "test stderr" should be returned in the .output property'
assert
process
.
stdout
==
[
'test stdout'
],
'Our test string "test stdout" should of been returned in the .stdout property'
assert
process
.
stderr
==
[
'test stderr'
],
'Our test string "test stderr" should of been returned in the .stderr property'
def
test_console_output
(
self
):
"""Test stdout and stderr."""
process
=
ForkedSubprocess
([
'sh'
,
'-c'
,
'echo "test stdout"; echo "test stderr" >&2'
],
enable_output
=
True
)
process
.
run
()
process
.
wait
()
assert
len
(
process
.
output
)
==
2
,
'We should have two lines returned'
assert
'test stdout'
in
process
.
output
,
'Our test strings "test stdout" should be returned in the .output property'
assert
'test stderr'
in
process
.
output
,
'Our test strings "test stderr" should be returned in the .output property'
assert
process
.
stdout
==
[
'test stdout'
],
'Our test string "test stdout" should of been returned in the .stdout property'
assert
process
.
stderr
==
[
'test stderr'
],
'Our test string "test stderr" should of been returned in the .stderr property'
def
test_callback_output
(
self
):
"""Test callback output."""
output_lines
=
[]
def
output_callback
(
line
:
str
):
"""Output callback."""
output_lines
.
append
(
line
)
process
=
ForkedSubprocess
([
'sh'
,
'-c'
,
'echo "test stdout"; echo "test stderr" >&2'
],
output_callback
=
output_callback
)
process
.
run
()
process
.
wait
()
assert
process
.
output
==
[
'test stdout'
,
'test stderr'
],
\
'Our test strings ["test stdout", "test stderr"] should be returned in the .output property'
assert
process
.
stdout
==
[
'test stdout'
],
'Our test string "test stdout" should of been returned in the .stdout property'
assert
process
.
stderr
==
[
'test stderr'
],
'Our test string "test stderr" should of been returned in the .stderr property'
assert
process
.
stderr
==
[
'test stderr'
],
'Our test string "test stderr" should of been returned in the .stderr property'
assert
len
(
output_lines
)
==
2
,
'The output_callback should of gotten two lnes'
assert
'test stdout'
in
output_lines
,
'Our test string "test stdout" should of been sent via callback'
assert
'test stderr'
in
output_lines
,
'Our test string "test stderr" should of been sent via callback'
def
test_failed_process
(
self
):
"""Test failed process."""
process
=
ForkedSubprocess
([
'false'
])
process
.
run
()
returncode
=
process
.
wait
()
assert
process
.
output
==
[],
'Nothing should of been output'
assert
returncode
==
1
,
'The returncode we got should be 1'
def
test_piped_stdout
(
self
):