New array API for PVValueArray using shared_vector<T>

* In PVScalarArray

Add methods assign, getAs/putFrom, and copyOut/copyIn to allow get/put
with implicit convert.

assign() copys on PVScalarArray to another converting as necessary.
If the types do not match then an allocate and convert is done.

getAs/putFrom work with shared_vector<T> and can avoid a allocate
and convert operation if the types match.

copyOut/copyIn use plain C arrays will do either a copy if the types
match, and a convert otherwise.  No allocation is performed.

* In PVValueArray<T>

All array operations re-implemented in terms of
two virtual methods

  virtual const shared_vector<T>& viewUnsafe() const;
  virtual void swap(shared_vector<T>&);

Some convienence methods are also included:

  shared_vector<const T> view() const
  shared_vector<T> take()
  shared_vector<T> reuse()

Deprecate get(...), put(...), and shareData(...)

Remove getVector() and getSharedVector()

Adjust DefaultPVArray accordingly
This commit is contained in:
Michael Davidsaver
2013-05-02 18:09:53 -04:00
parent 5f4ca240b4
commit e843779555
2 changed files with 364 additions and 254 deletions

View File

@@ -196,51 +196,31 @@ public:
typedef const std::vector<T> const_vector;
typedef std::tr1::shared_ptr<vector> shared_vector;
typedef ::epics::pvData::shared_vector<T> svector;
typedef ::epics::pvData::shared_vector<const T> const_svector;
DefaultPVArray(ScalarArrayConstPtr const & scalarArray);
virtual ~DefaultPVArray();
virtual void setCapacity(size_t capacity);
virtual void setLength(size_t length);
virtual size_t get(size_t offset, size_t length, PVArrayData<T> &data) ;
virtual size_t put(size_t offset,size_t length, const_pointer from,
size_t fromOffset);
virtual void shareData(
std::tr1::shared_ptr<std::vector<T> > const & value,
std::size_t capacity,
std::size_t length);
virtual pointer get() ;
virtual pointer get() const ;
virtual vector const & getVector() { return *value.get(); }
virtual shared_vector const & getSharedVector(){return value;};
virtual const svector& viewUnsafe() const;
virtual void swap(svector &other);
// from Serializable
virtual void serialize(ByteBuffer *pbuffer,SerializableControl *pflusher) const;
virtual void deserialize(ByteBuffer *pbuffer,DeserializableControl *pflusher);
virtual void serialize(ByteBuffer *pbuffer,
SerializableControl *pflusher, size_t offset, size_t count) const;
private:
shared_vector value;
svector value;
};
template<typename T>
T *DefaultPVArray<T>::get()
{
std::vector<T> *vec = value.get();
T *praw = &((*vec)[0]);
return praw;
}
template<typename T>
T *DefaultPVArray<T>::get() const
{
std::vector<T> *vec = value.get();
T *praw = &((*vec)[0]);
return praw;
}
template<typename T>
DefaultPVArray<T>::DefaultPVArray(ScalarArrayConstPtr const & scalarArray)
: PVValueArray<T>(scalarArray),
value(std::tr1::shared_ptr<std::vector<T> >(new std::vector<T>()))
value()
{ }
@@ -251,101 +231,41 @@ DefaultPVArray<T>::~DefaultPVArray()
template<typename T>
void DefaultPVArray<T>::setCapacity(size_t capacity)
{
if(PVArray::getCapacity()==capacity) return;
if(!PVArray::isCapacityMutable()) {
std::string message("not capacityMutable");
PVField::message(message, errorMessage);
return;
if(capacity>value.capacity()) {
value.reserve(capacity);
PVArray::setCapacityLength(value.capacity(), value.size());
}
size_t length = PVArray::getLength();
if(length>capacity) length = capacity;
size_t oldCapacity = PVArray::getCapacity();
if(oldCapacity>capacity) {
std::vector<T> array;
array.reserve(capacity);
array.resize(length);
T * from = get();
for (size_t i=0; i<length; i++) array[i] = from[i];
value->swap(array);
} else {
value->reserve(capacity);
}
PVArray::setCapacityLength(capacity,length);
}
template<typename T>
void DefaultPVArray<T>::setLength(size_t length)
{
if(PVArray::getLength()==length) return;
size_t capacity = PVArray::getCapacity();
if(length>capacity) {
if(!PVArray::isCapacityMutable()) {
std::string message("not capacityMutable");
PVField::message(message, errorMessage);
return;
}
setCapacity(length);
}
value->resize(length);
PVArray::setCapacityLength(capacity,length);
if(length == value.size())
return;
else if(length < value.size())
value.slice(0, length);
else
value.resize(length);
PVArray::setCapacityLength(value.capacity(), value.size());
}
template<typename T>
const typename DefaultPVArray<T>::svector& DefaultPVArray<T>::viewUnsafe() const
{
return value;
}
template<typename T>
size_t DefaultPVArray<T>::get(size_t offset, size_t len, PVArrayData<T> &data)
void DefaultPVArray<T>::swap(svector &other)
{
size_t n = len;
size_t length = this->getLength();
if(offset+len > length) {
n = length-offset;
//if(n<0) n = 0;
}
data.data = *value.get();
data.offset = offset;
return n;
if(this->isImmutable())
THROW_EXCEPTION2(std::logic_error,"Immutable");
value.swap(other);
PVArray::setCapacityLength(value.capacity(), value.size());
}
template<typename T>
size_t DefaultPVArray<T>::put(size_t offset,size_t len,
const_pointer from,size_t fromOffset)
{
if(PVField::isImmutable()) {
PVField::message("field is immutable",errorMessage);
return 0;
}
T * pvalue = get();
if(from==pvalue) return len;
if(len<1) return 0;
size_t length = this->getLength();
size_t capacity = this->getCapacity();
if(offset+len > length) {
size_t newlength = offset + len;
if(newlength>capacity) {
setCapacity(newlength);
newlength = this->getCapacity();
len = newlength - offset;
if(len<=0) return 0;
}
length = newlength;
setLength(length);
}
pvalue = get();
for(size_t i=0;i<len;i++) {
pvalue[i+offset] = from[i+fromOffset];
}
this->setLength(length);
this->postPut();
return len;
}
template<typename T>
void DefaultPVArray<T>::shareData(
std::tr1::shared_ptr<std::vector<T> > const & sharedValue,
std::size_t capacity,
std::size_t length)
{
value = sharedValue;
PVArray::setCapacityLength(capacity,length);
}
template<typename T>
void DefaultPVArray<T>::serialize(ByteBuffer *pbuffer,
@@ -357,93 +277,91 @@ template<typename T>
void DefaultPVArray<T>::deserialize(ByteBuffer *pbuffer,
DeserializableControl *pcontrol) {
size_t size = SerializeHelper::readSize(pbuffer, pcontrol);
// alignment if (size>0) { pcontrol->ensureData(sizeof(T)-1); pbuffer->align(sizeof(T)); }
//if(size>=0) {
// prepare array, if necessary
if(size>this->getCapacity()) this->setCapacity(size);
// set new length
this->setLength(size);
// try to avoid deserializing from the buffer
// this is only possible if we do not need to do endian-swapping
if (!pbuffer->reverse<T>())
if (pcontrol->directDeserialize(pbuffer, (char*)(get()), size, sizeof(T)))
{
// inform about the change?
PVField::postPut();
return;
}
value.resize(size); // TODO: avoid copy of stuff we will then overwrite
// retrieve value from the buffer
size_t i = 0;
T * pvalue = get();
while(true) {
/*
size_t maxIndex = min(size-i, (int)(pbuffer->getRemaining()/sizeof(T)))+i;
for(; i<maxIndex; i++)
value[i] = pbuffer->get<T>();
*/
size_t maxCount = min(size-i, (pbuffer->getRemaining()/sizeof(T)));
pbuffer->getArray(pvalue+i, maxCount);
i += maxCount;
if(i<size)
pcontrol->ensureData(sizeof(T)); // this is not OK since can exceen max local buffer (size-i)*sizeof(T));
else
break;
}
PVArray::setCapacityLength(value.capacity(), value.size());
T* cur = value.data();
// try to avoid deserializing from the buffer
// this is only possible if we do not need to do endian-swapping
if (!pbuffer->reverse<T>())
if (pcontrol->directDeserialize(pbuffer, (char*)cur, size, sizeof(T)))
{
// inform about the change?
PVField::postPut();
//}
// TODO null arrays (size == -1) not supported
return;
}
// retrieve value from the buffer
size_t remaining = size;
while(remaining) {
const size_t have_bytes = pbuffer->getRemaining();
// correctly rounds down in an element is partially received
const size_t available = have_bytes/sizeof(T);
if(available == 0) {
size_t want = sizeof(T);
if(remaining==1 && sizeof(T)>1) {
// Need to wait for the last few bytes
// of the final element.
// available==0 implies have_bytes<sizeof(T)
want = sizeof(T) - have_bytes;
}
// recv() at least one element, or remaining buffer
pcontrol->ensureData(want);
continue;
}
const size_t n2read = std::min(remaining, available);
pbuffer->getArray(cur, n2read);
cur += n2read;
remaining -= n2read;
}
// inform about the change?
PVField::postPut();
}
template<typename T>
void DefaultPVArray<T>::serialize(ByteBuffer *pbuffer,
SerializableControl *pflusher, size_t offset, size_t count) const {
// cache
size_t length = this->getLength();
SerializableControl *pflusher, size_t offset, size_t count) const
{
//TODO: avoid incrementing the ref counter...
svector temp(value);
temp.slice(offset, count);
count = temp.size();
// check bounds
/*if(offset<0)
offset = 0;
else*/ if(offset>length) offset = length;
//if(count<0) count = length;
size_t maxCount = length-offset;
if(count>maxCount) count = maxCount;
// write
SerializeHelper::writeSize(count, pbuffer, pflusher);
//if (count == 0) return; pcontrol->ensureData(sizeof(T)-1); pbuffer->align(sizeof(T));
SerializeHelper::writeSize(temp.size(), pbuffer, pflusher);
T* cur = temp.data();
// try to avoid copying into the buffer
// this is only possible if we do not need to do endian-swapping
if (!pbuffer->reverse<T>())
if (pflusher->directSerialize(pbuffer, (const char*)(get()+offset), count, sizeof(T)))
if (pflusher->directSerialize(pbuffer, (const char*)cur, count, sizeof(T)))
return;
size_t end = offset+count;
size_t i = offset;
T * pvalue = const_cast<T *>(get());
while(true) {
/*
size_t maxIndex = min<int>(end-i, (int)(pbuffer->getRemaining()/sizeof(T)))+i;
for(; i<maxIndex; i++)
pbuffer->put<T>(value[i]);
*/
size_t maxCount = min<int>(end-i, (int)(pbuffer->getRemaining()/sizeof(T)));
pbuffer->putArray(pvalue+i, maxCount);
i += maxCount;
if(i<end)
while(count) {
const size_t empty = pbuffer->getRemaining();
const size_t space_for = empty/sizeof(T);
if(space_for==0) {
pflusher->flushSerializeBuffer();
else
break;
// Can we be certain that more space is now free???
// If not then we spinnnnnnnnn
continue;
}
const size_t n2send = std::min(count, space_for);
pbuffer->putArray(cur, n2send);
cur += n2send;
count -= n2send;
}
pflusher->flushSerializeBuffer();
}
// specializations for String
@@ -452,42 +370,36 @@ template<>
void DefaultPVArray<String>::deserialize(ByteBuffer *pbuffer,
DeserializableControl *pcontrol) {
size_t size = SerializeHelper::readSize(pbuffer, pcontrol);
//if(size>=0) {
// prepare array, if necessary
if(size>getCapacity()) setCapacity(size);
// set new length
setLength(size);
// retrieve value from the buffer
String * pvalue = get();
for(size_t i = 0; i<size; i++) {
pvalue[i] = SerializeHelper::deserializeString(pbuffer,
pcontrol);
}
// inform about the change?
postPut();
//}
// TODO null arrays (size == -1) not supported
// Decide if we must re-allocate
if(size > value.size() || !value.unique())
value.resize(size);
else if(size < value.size())
value.slice(0, size);
setCapacityLength(size, size);
String * pvalue = value.data();
for(size_t i = 0; i<size; i++) {
pvalue[i] = SerializeHelper::deserializeString(pbuffer,
pcontrol);
}
// inform about the change?
postPut();
}
template<>
void DefaultPVArray<String>::serialize(ByteBuffer *pbuffer,
SerializableControl *pflusher, size_t offset, size_t count) const {
size_t length = getLength();
// check bounds
/*if(offset<0)
offset = 0;
else*/ if(offset>length) offset = length;
//if(count<0) count = length;
svector temp(value);
temp.slice(offset, count);
size_t maxCount = length-offset;
if(count>maxCount) count = maxCount;
SerializeHelper::writeSize(temp.size(), pbuffer, pflusher);
// write
SerializeHelper::writeSize(count, pbuffer, pflusher);
size_t end = offset+count;
String * pvalue = get();
for(size_t i = offset; i<end; i++) {
String * pvalue = temp.data();
for(size_t i = 0; i<temp.size(); i++) {
SerializeHelper::serializeString(pvalue[i], pbuffer, pflusher);
}
}