Content-Length: 532303 | pFad | http://github.com/python/cpython/pull/18817/commits/e9b56b8b954004ae9eb66b34d1019855c58ad0dd

6B gh-76785: Multiple Interpreters in the Stdlib (PEP 554) by nanjekyejoannah · Pull Request #18817 · python/cpython · GitHub
Skip to content

gh-76785: Multiple Interpreters in the Stdlib (PEP 554) #18817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add send buffer functionality
  • Loading branch information
nanjekyejoannah committed Mar 4, 2020
commit e9b56b8b954004ae9eb66b34d1019855c58ad0dd
104 changes: 54 additions & 50 deletions Lib/interpreters.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ def get_current():
class Interpreter:

def __init__(self, id):
self.id = id
self._id = id

@property
def id(self):
return self._id

def is_running(self):
"""is_running() -> bool

Return whether or not the identified interpreter is running.
"""
return _interpreters.is_running(self.id)
return _interpreters.is_running(self._id)

def destroy(self):
"""destroy()
Expand All @@ -52,7 +56,7 @@ def destroy(self):
Attempting to destroy the current
interpreter results in a RuntimeError. So does an unrecognized ID
"""
return _interpreters.destroy(self.id)
return _interpreters.destroy(self._id)

def run(self, src_str, /, *, channels=None):
"""run(src_str, /, *, channels=None)
Expand All @@ -61,7 +65,7 @@ def run(self, src_str, /, *, channels=None):
This blocks the current thread until done.
"""
try:
_interpreters.run_string(self.id, src_str)
_interpreters.run_string(self._id, src_str, channels)
except RunFailedError as err:
logger.error(err)
raise
Expand All @@ -71,7 +75,7 @@ def is_shareable(obj):
""" is_shareable(obj) -> Bool

Return `True` if the object's data can be shared between
interpreters.
interpreters and `False` otherwise.
"""
return _interpreters.is_shareable(obj)

Expand All @@ -89,25 +93,21 @@ def list_all_channels():

Return all open channels.
"""
cid = _interpreters.channel_list_all()
return (RecvChannel(cid), SendChannel(cid))
return [(RecvChannel(cid), SendChannel(cid)) for cid in _interpreters.channel_list_all()]

def wait(self, timeout):
def wait(timeout):
#The implementation for wait
# will be non trivial to be useful
import time
time.sleep(timeout)

def associate_interp_to_channel(id, cid):
pass

class RecvChannel:

def __init__(self, id):
self.id = id
self.interpreters = _interpreters.channel_list_interpreters(cid, send=False)
self.interpreters = _interpreters.channel_list_interpreters(self.id, send=False)

def recv(self, timeout=2):
def recv(self):
""" channel_recv() -> obj

Get the next object from the channel,
Expand All @@ -117,94 +117,98 @@ def recv(self, timeout=2):
obj = _interpreters.channel_recv(self.id)
if obj == None:
wait(timeout)
obj = obj = _interpreters.channel_recv(self.id)

# Pending: See issue 52 on multi-core python project
associate_interp_to_channel(interpId, Cid)

obj = _interpreters.channel_recv(self.id)
return obj

def recv_nowait(self, default=None):
"""recv_nowait(default=None) -> object

Like recv(), but return the default instead of waiting.
"""
return _interpreters.channel_recv(self.id)

def send_buffer(self, obj):
""" send_buffer(obj)

Send the object's buffer to the receiving end of the channel
and wait. Associate the interpreter with the channel.
"""
pass

def send_buffer_nowait(self, obj):
""" send_buffer_nowait(obj)

Like send_buffer(), but return False if not received.
"""
pass
obj = _interpreters.channel_recv(self.id)
if obj == None:
obj = default
return obj

def release(self):
""" release()

No longer associate the current interpreterwith the channel
(on the sending end).
(on the receiving end).
"""
return _interpreters(self.id)
return _interpreters.channel_release(self.id, recv=True)

def close(self, force=False):
"""close(force=False)

Close the channel in all interpreters..
"""
return _interpreters.channel_close(self.id, force)
return _interpreters.channel_close(self.id, recv=force)


class SendChannel:

def __init__(self, id):
self.id = id
self.interpreters = _interpreters.list_all()
self.interpreters = _interpreters.channel_list_interpreters(self.id, send=True)

def send(self, obj):
""" send(obj)

Send the object (i.e. its data) to the receiving end of the channel
and wait. Associate the interpreter with the channel.
"""
obj = _interpreters.channel_send(self.id, obj)
_interpreters.channel_send(self.id, obj)
wait(2)
associate_interp_to_channel(interpId, Cid)

def send_nowait(self, obj):
""" send_nowait(obj)

Like send(), but return False if not received.
"""
try:
obj = _interpreters.channel_send(self.id, obj)
except:
_interpreters.channel_send(self.id, obj)
recv_obj = _interpreters.channel_recv(self.id)
if recv_obj:
return obj
else:
return False

return obj
def send_buffer(self, obj):
""" ssend_buffer(obj)

Send the object's buffer to the receiving
end of the channel and wait. Associate the interpreter
with the channel.
"""
_interpreters.channel_send_buffer(self.id, obj)
wait(2)

def send_buffer_nowait(self, obj):
""" send_buffer_nowait(obj)

Like send(), but return False if not received.
"""
_interpreters.channel_send_buffer(self.id, obj)
recv_obj = _interpreters.channel_recv(self.id)
if recv_obj:
return obj
else:
return False

def release(self):
""" release()

No longer associate the current interpreter with the channel
No longer associate the current interpreterwith the channel
(on the sending end).
"""
return _interpreters.channel_release(self.id)
return _interpreters.channel_release(self.id, send=True)

def close(self, force=False):
""" close(force=False)
"""close(force=False)

No longer associate the current interpreterwith the channel
(on the sending end).
Close the channel in all interpreters..
"""
return _interpreters.channel_close(self.id, force)
return _interpreters.channel_close(self.id, send=force)


class ChannelError(Exception):
Expand Down
93 changes: 93 additions & 0 deletions Modules/_xxsubinterpretersmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,71 @@ _channel_destroy(_channels *channels, int64_t id)
return 0;
}

static int
_channel_send_buffer(_channels *channels, int64_t id, PyObject *obj)
{
const char *s = NULL;
Py_buffer view = {NULL, NULL};
if (PyObject_GetBuffer(obj, &view, PyBUF_SIMPLE) != 0){
return -1;
}

s = view.buf;
if (s == NULL) {
PyBuffer_Release(&view);
return -1;
}

PyInterpreterState *interp = _get_current();
if (interp == NULL) {
PyBuffer_Release(&view);
return -1;
}

// Look up the channel.
PyThread_type_lock mutex = NULL;
_PyChannelState *chan = _channels_lookup(channels, id, &mutex);
if (chan == NULL) {
PyBuffer_Release(&view);
return -1;
}
// Past this point we are responsible for releasing the mutex.

if (chan->closing != NULL) {
PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
PyThread_release_lock(mutex);
PyBuffer_Release(&view);
return -1;
}

// Convert the buffer to cross-interpreter data.
_PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
if (data == NULL) {
PyThread_release_lock(mutex);
PyBuffer_Release(&view);
return -1;
}
if (_PyObject_GetCrossInterpreterData((PyObject *)s, data) != 0) {
PyThread_release_lock(mutex);
PyMem_Free(data);
PyBuffer_Release(&view);
return -1;
}

// Add the data to the channel.
int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
PyThread_release_lock(mutex);
if (res != 0) {
_PyCrossInterpreterData_Release(data);
PyMem_Free(data);
PyBuffer_Release(&view);
return -1;
}

PyBuffer_Release(&view);
return 0;
}

static int
_channel_send(_channels *channels, int64_t id, PyObject *obj)
{
Expand Down Expand Up @@ -2462,6 +2527,32 @@ PyDoc_STRVAR(channel_send_doc,
\n\
Add the object's data to the channel's queue.");

static PyObject *
channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "obj", NULL};
PyObject *id;
PyObject *obj;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
"OO:channel_send_buffer", kwlist, &id, &obj)) {
return NULL;
}
int64_t cid = _Py_CoerceID(id);
if (cid < 0) {
return NULL;
}

if (_channel_send_buffer(&_globals.channels, cid, obj) != 0) {
return NULL;
}
Py_RETURN_NONE;
}

PyDoc_STRVAR(channel_send_buffer_doc,
"channel_send_buffer(cid, obj)\n\
\n\
Add the object's buffer to the channel's queue.");

static PyObject *
channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
{
Expand Down Expand Up @@ -2609,6 +2700,8 @@ static PyMethodDef module_functions[] = {
METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
{"channel_send", (PyCFunction)(void(*)(void))channel_send,
METH_VARARGS | METH_KEYWORDS, channel_send_doc},
{"channel_send_buffer", (PyCFunction)(void(*)(void))channel_send_buffer,
METH_VARARGS | METH_KEYWORDS, channel_send_buffer_doc},
{"channel_recv", (PyCFunction)(void(*)(void))channel_recv,
METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
{"channel_close", (PyCFunction)(void(*)(void))channel_close,
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/python/cpython/pull/18817/commits/e9b56b8b954004ae9eb66b34d1019855c58ad0dd

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy