test field access and dset

This commit is contained in:
Michael Davidsaver
2018-11-04 12:33:09 -08:00
parent a3524c5aa0
commit 11a93fce21
5 changed files with 154 additions and 34 deletions

View File

@ -192,10 +192,6 @@ static PyObject* pyField_getval(pyField *self)
return PyErr_Format(PyExc_ValueError, "Error fetching array info for %s.%s",
self->addr.precord->name,
self->addr.pfldDes->name);
else if(noe<1) {
PyErr_SetString(PyExc_IndexError, "zero length array");
return NULL;
}
rawfield = self->addr.pfield;
/* get_array_info can modify pfield in >3.15.0.1 */
@ -388,8 +384,8 @@ static PyObject *pyField_setlen(pyField *self, PyObject *args)
return NULL;
}
if(len<1 || len > self->addr.no_elements) {
PyErr_Format(PyExc_ValueError, "Requested length %ld out of range [1,%lu)",
if(len > self->addr.no_elements) {
PyErr_Format(PyExc_ValueError, "Requested length %ld out of range [0,%lu)",
(long)len, (unsigned long)self->addr.no_elements);
return NULL;
}

View File

@ -57,7 +57,5 @@ device(aao, INST_IO, pydevsupComOut, "Python Device")
_dbapi.dbReadDatabase(F.name)
_dbapi._dbd_setup()
@atexit.register
def _fini():
print("ATEXIT")
def _fini(iocMain=False):
_dbapi._dbd_cleanup()

View File

@ -338,5 +338,8 @@ def processLink(name, lstr):
modname, args = parts[0], parts[1] if len(parts)>1 else None
else:
args = lstr
modname, _sep, attr = modname.partition('|')
mod = importmod(modname)
if attr:
mod = getattr(mod, attr)
return rec, mod.build(rec, args)

View File

@ -3,6 +3,9 @@ import os
import unittest
import tempfile
import numpy
from numpy.testing import assert_array_almost_equal, assert_array_equal
from ..db import getRecord
from .. import _dbapi
from .. import _init
@ -44,25 +47,6 @@ class IOCHelper(unittest.TestCase):
_dbapi._UTest.testIocShutdownOk()
self.running = False
class TestIOC(IOCHelper):
def test_base(self):
pass
def test_start(self):
self.iocInit()
self.iocShutdown()
def test_db(self):
with tempfile.NamedTemporaryFile() as F:
F.write('record(longin, "test") {}\n')
F.flush()
_dbapi.dbReadDatabase(F.name)
rec = getRecord("test")
self.assertEqual(rec.VAL, 0)
rec.VAL = 5
self.assertEqual(rec.VAL, 5)
class TestScan(IOCHelper):
db = """
record(longout, src) {
@ -75,8 +59,140 @@ class TestScan(IOCHelper):
def test_link(self):
src, tgt = getRecord('src'), getRecord('tgt')
src.VAL = 42
self.assertEqual(src.VAL, 42)
self.assertEqual(tgt.VAL, 0)
src.scan(sync=True)
self.assertEqual(tgt.VAL, 42)
with src:
src.VAL = 42
self.assertEqual(src.VAL, 42)
with tgt:
self.assertEqual(tgt.VAL, 0)
src.scan(sync=True) # lock and dbProcess() on this thread
with tgt:
self.assertEqual(tgt.VAL, 42)
class TestField(IOCHelper):
db = """
record(ai, "rec:ai") {
field(VAL , "4.2")
field(RVAL, "42")
}
record(stringin, "rec:si") {
field(VAL, "")
}
record(waveform, "rec:wf:a") {
field(FTVL, "DOUBLE")
field(NELM, "10")
}
record(waveform, "rec:wf:s") {
field(FTVL, "STRING")
field(NELM, "10")
}
"""
autostart = True
def test_ai(self):
rec = getRecord("rec:ai")
with rec:
self.assertEqual(rec.VAL, 4.2)
self.assertEqual(rec.RVAL, 42)
rec.VAL = 5.2
rec.RVAL = 52
self.assertEqual(rec.VAL, 5.2)
self.assertEqual(rec.RVAL, 52)
rec.VAL += 1.0
self.assertEqual(rec.VAL, 6.2)
def test_si(self):
rec = getRecord("rec:si")
with rec:
self.assertEqual(rec.VAL, "")
rec.VAL = "test"
self.assertEqual(rec.VAL, "test")
rec.VAL = ""
self.assertEqual(rec.VAL, "")
# implicitly truncates
rec.VAL = "This is a really long string which should be truncated"
self.assertEqual(rec.VAL, "This is a really long string which shou")
# TODO: test unicode
def test_wf_float(self):
rec = getRecord("rec:wf:a")
with rec:
assert_array_almost_equal(rec.VAL, [])
rec.VAL = numpy.arange(5)
assert_array_almost_equal(rec.VAL, numpy.arange(5))
rec.VAL = numpy.arange(10)
assert_array_almost_equal(rec.VAL, numpy.arange(10))
with self.assertRaises(ValueError):
rec.VAL = numpy.arange(15)
rec.VAL = []
assert_array_almost_equal(rec.VAL, [])
# in-place modification
fld = rec.field('VAL')
fld.putarraylen(5)
arr = fld.getarray()
self.assertEqual(arr.shape, (10,)) # size of NELM
arr[:5] = numpy.arange(5) # we only fill in the part in use
arr[2] = 42
assert_array_almost_equal(rec.VAL, [0, 1, 42, 3, 4])
def test_wf_string(self):
rec = getRecord("rec:wf:s")
with rec:
assert_array_equal(rec.VAL, numpy.asarray([], dtype='S40'))
rec.VAL = ["zero", "", "one", "This is a really long string which should be truncated", "", "last"]
assert_array_equal(rec.VAL, ["zero", "", "one", "This is a really long string which shoul", "", "last"])
class TestDset(IOCHelper):
db = """
record(longin, "rec:li") {
field(DTYP, "Python Device")
field(INP , "@devsup.test.test_db|TestDset foo bar")
}
"""
autostart = True
class Increment(object):
def process(self, rec, reason):
rec.VAL += 1
def detach(self, rec):
pass
@classmethod
def build(klass, rec, args):
if rec.name()=='rec:li':
return klass.Increment()
else:
raise RuntimeError("Unsupported")
def test_increment(self):
rec = getRecord('rec:li')
with rec:
self.assertEqual(rec.VAL, 0)
self.assertEqual(rec.UDF, 1)
rec.scan(sync=True)
with rec:
self.assertEqual(rec.VAL, 1)
self.assertEqual(rec.UDF, 0)

View File

@ -33,6 +33,13 @@ static void cleanupPy(void *junk)
/* special "fake" hook for shutdown */
//pyhook((initHookState)9999);
if(PyRun_SimpleString("import devsup\n"
"devsup._fini(iocMain=True)\n"
)) {
PyErr_Print();
PyErr_Clear();
}
Py_Finalize(); // calls python atexit hooks
}
@ -118,7 +125,7 @@ static void pySetupReg(void)
setupPyPath();
if(PyRun_SimpleString("import devsup\n"
"devsup._init(True)\n"
"devsup._init(iocMain=True)\n"
)) {
PyErr_Print();
PyErr_Clear();