Pipeline.h

Go to the documentation of this file.
00001 /*
00002  * $Id: Pipeline.h 562 2007-12-02 08:04:16Z mindstorm2600 $
00003  *
00004  * X-VR2 
00005  * 
00006  * Copyright (C) Juan V. Guerrero 2007
00007  * 
00008  * Juan V. Guerrero <mindstorm2600@users.sourceforge.net>
00009  * 
00010  * This program is free software, distributed under the terms of
00011  * the GNU General Public License Version 2. See the LICENSE file
00012  * at the top of the source tree.
00013  */
00014 #ifndef __XVR2_PIPELINE_H__
00015 #define __XVR2_PIPELINE_H__
00016 #include<xvr2/CoreExceptions.h>
00017 #include<xvr2/SharedVar.h>
00018 #include<xvr2/PipelineFilter.h>
00019 #include<xvr2/String.h>
00020 #include<xvr2/Vector.h>
00021 #include<xvr2/Thread.h>
00022 #include<xvr2/Map.h>
00023 #include<xvr2/Queue.h>
00024 #include<xvr2/Deque.h>
00025 #include<xvr2/String.h>
00026 
00027 namespace xvr2 {
00028 
00029         template<typename _Tp = xvr2::String >
00030         class Pipeline : public xvr2::Thread {
00031                 public:
00032                         typedef PipelineFilter<_Tp> PipelineFilterT;
00033                 private:
00034                         typedef Pipeline<_Tp> MyPipelineT;
00035                         Int64 id_counter;
00036                 protected:
00037                         struct task {
00038                                 _Tp val;
00039                                 Int64 id;
00040                                 task(){
00041                                         id = 0;
00042                                         val = _Tp();
00043                                 }
00044                                 task(Int64 _id, const _Tp &v){
00045                                         id = _id;
00046                                         val = v;
00047                                 }
00048                                 task(const task &t){
00049                                         val = t.val;
00050                                         id = t.id;
00051                                 }
00052                                 task &operator=(const task &t){
00053                                         val = t.val;
00054                                         id = t.id;
00055                                         return *this;
00056                                 }
00057                         };
00058                         xvr2::Deque<task> pqueue;
00059                         SharedVar<bool> is_active;
00060                         xvr2::Vector< PipelineFilterT*  > filter;
00061                         struct lt {
00062                                 bool operator()(int a, int b){
00063                                         return (a < b)?true:false;
00064                                 }
00065                         };
00067                         xvr2::Map<Int64, bool, lt> fones;
00069                         xvr2::Map<Int64, _Tp, lt> dones;
00070                 public:
00071                         class Reader {
00072                                 private:
00073                                         MyPipelineT *pipeline;
00074                                 protected:
00075                                         Int64 id;
00076                                 public:
00077                                         Reader(){ }
00078                                         Reader(Int64 pid, MyPipelineT *_pipe){
00079                                                 id = pid;
00080                                                 pipeline = _pipe;
00081                                         }
00082                                         Reader(const Reader &t){
00083                                                 id = t.id;
00084                                                 pipeline = t.pipeline;
00085                                         }
00086                                         bool finished(){
00087                                                 bool ret = false;
00088                                                 pipeline->fones.lock();
00089                                                 if(pipeline->fones[id]){
00090                                                         ret = true;
00091                                                 }
00092                                                 pipeline->fones.unlock();
00093                                                 return ret;
00094                                         }
00095                                         _Tp read(bool wait = false){
00096                                                 if(!finished() && wait){
00097                                                         while(!finished()) usleep(100);
00098                                                 }
00099                                                 pipeline->dones.lock();
00100                                                 _Tp v = pipeline->dones[id];
00101                                                 pipeline->dones.unlock();
00102                                                 return _Tp(v);
00103                                         }
00104                         };
00106                         Pipeline() : xvr2::Thread() {
00107                                 id_counter = time(0);
00108                         }
00109                         
00110                         ~Pipeline(){
00111                                 
00112                         }
00113                         
00115                         void shutdown(){
00116                                 is_active = false;
00117                         }
00118                         
00119                         Reader write(const _Tp &val){
00120                                 pqueue.lock();
00121                                 pqueue.push_back(task(id_counter, val));
00122                                 Reader t(id_counter++, this);
00123                                 pqueue.unlock();
00124                                 return Reader(t);
00125                         }
00126                         
00128                         void addFilter(PipelineFilterT *f){
00129                                 filter.lock();
00130                                 filter.push_back(f);
00131                                 filter.unlock();
00132                         }
00133                         
00134                         void operator()(){
00135                                 while(is_active.getValue()){
00136                                         //Get a value from the input queue;
00137                                         pqueue.lock();
00138                                         if(pqueue.size() > 0){
00139                                                 task ts = pqueue.front();
00140                                                 pqueue.pop_front();
00141                                                 pqueue.unlock();
00142                                                 filter.lock();
00143                                                 _Tp tmp;
00144                                                 for(size_t i = 0; i < filter.size(); i++){
00145                                                         if(i == 0){
00146                                                                 tmp = filter[i]->operator()(ts.val);
00147                                                         }
00148                                                         else{
00149                                                                 tmp = filter[i]->operator()(tmp);
00150                                                         }
00151                                                 }
00152                                                 ts.val = tmp;
00153                                                 filter.unlock();
00154                                                 dones.lock();
00155                                                 dones[ts.id] = ts.val;
00156                                                 dones.unlock();
00157                                                 fones.lock();
00158                                                 fones[ts.id] = true;
00159                                                 fones.unlock();
00160                                         }
00161                                         else{
00162                                                 pqueue.unlock();
00163                                                 usleep(100);
00164                                         }
00165                                 }
00166                         }
00167                         
00168                         void run(){
00169                                 this->operator()();
00170                         }
00171         };
00172 }
00173 
00174 #endif

Generated on Fri Jun 20 22:55:47 2008 for X-VR2 SDK by  doxygen 1.5.5