00001 /* 00002 * $Id: Pipeline.h 562 2007-12-02 08:04:16Z mindstorm2600 $ 00003 * 00004 * X-VR2 00005 * 00006 * Copyright (C) Juan V. Guerrero 2007 00007 * 00008 * Juan V. Guerrero <mindstorm2600@users.sourceforge.net> 00009 * 00010 * This program is free software, distributed under the terms of 00011 * the GNU General Public License Version 2. See the LICENSE file 00012 * at the top of the source tree. 00013 */ 00014 #ifndef __XVR2_PIPELINE_H__ 00015 #define __XVR2_PIPELINE_H__ 00016 #include<xvr2/CoreExceptions.h> 00017 #include<xvr2/SharedVar.h> 00018 #include<xvr2/PipelineFilter.h> 00019 #include<xvr2/String.h> 00020 #include<xvr2/Vector.h> 00021 #include<xvr2/Thread.h> 00022 #include<xvr2/Map.h> 00023 #include<xvr2/Queue.h> 00024 #include<xvr2/Deque.h> 00025 #include<xvr2/String.h> 00026 00027 namespace xvr2 { 00028 00029 template<typename _Tp = xvr2::String > 00030 class Pipeline : public xvr2::Thread { 00031 public: 00032 typedef PipelineFilter<_Tp> PipelineFilterT; 00033 private: 00034 typedef Pipeline<_Tp> MyPipelineT; 00035 Int64 id_counter; 00036 protected: 00037 struct task { 00038 _Tp val; 00039 Int64 id; 00040 task(){ 00041 id = 0; 00042 val = _Tp(); 00043 } 00044 task(Int64 _id, const _Tp &v){ 00045 id = _id; 00046 val = v; 00047 } 00048 task(const task &t){ 00049 val = t.val; 00050 id = t.id; 00051 } 00052 task &operator=(const task &t){ 00053 val = t.val; 00054 id = t.id; 00055 return *this; 00056 } 00057 }; 00058 xvr2::Deque<task> pqueue; 00059 SharedVar<bool> is_active; 00060 xvr2::Vector< PipelineFilterT* > filter; 00061 struct lt { 00062 bool operator()(int a, int b){ 00063 return (a < b)?true:false; 00064 } 00065 }; 00067 xvr2::Map<Int64, bool, lt> fones; 00069 xvr2::Map<Int64, _Tp, lt> dones; 00070 public: 00071 class Reader { 00072 private: 00073 MyPipelineT *pipeline; 00074 protected: 00075 Int64 id; 00076 public: 00077 Reader(){ } 00078 Reader(Int64 pid, MyPipelineT *_pipe){ 00079 id = pid; 00080 pipeline = _pipe; 00081 } 00082 Reader(const Reader &t){ 00083 id = t.id; 00084 pipeline = t.pipeline; 00085 } 00086 bool finished(){ 00087 bool ret = false; 00088 pipeline->fones.lock(); 00089 if(pipeline->fones[id]){ 00090 ret = true; 00091 } 00092 pipeline->fones.unlock(); 00093 return ret; 00094 } 00095 _Tp read(bool wait = false){ 00096 if(!finished() && wait){ 00097 while(!finished()) usleep(100); 00098 } 00099 pipeline->dones.lock(); 00100 _Tp v = pipeline->dones[id]; 00101 pipeline->dones.unlock(); 00102 return _Tp(v); 00103 } 00104 }; 00106 Pipeline() : xvr2::Thread() { 00107 id_counter = time(0); 00108 } 00109 00110 ~Pipeline(){ 00111 00112 } 00113 00115 void shutdown(){ 00116 is_active = false; 00117 } 00118 00119 Reader write(const _Tp &val){ 00120 pqueue.lock(); 00121 pqueue.push_back(task(id_counter, val)); 00122 Reader t(id_counter++, this); 00123 pqueue.unlock(); 00124 return Reader(t); 00125 } 00126 00128 void addFilter(PipelineFilterT *f){ 00129 filter.lock(); 00130 filter.push_back(f); 00131 filter.unlock(); 00132 } 00133 00134 void operator()(){ 00135 while(is_active.getValue()){ 00136 //Get a value from the input queue; 00137 pqueue.lock(); 00138 if(pqueue.size() > 0){ 00139 task ts = pqueue.front(); 00140 pqueue.pop_front(); 00141 pqueue.unlock(); 00142 filter.lock(); 00143 _Tp tmp; 00144 for(size_t i = 0; i < filter.size(); i++){ 00145 if(i == 0){ 00146 tmp = filter[i]->operator()(ts.val); 00147 } 00148 else{ 00149 tmp = filter[i]->operator()(tmp); 00150 } 00151 } 00152 ts.val = tmp; 00153 filter.unlock(); 00154 dones.lock(); 00155 dones[ts.id] = ts.val; 00156 dones.unlock(); 00157 fones.lock(); 00158 fones[ts.id] = true; 00159 fones.unlock(); 00160 } 00161 else{ 00162 pqueue.unlock(); 00163 usleep(100); 00164 } 00165 } 00166 } 00167 00168 void run(){ 00169 this->operator()(); 00170 } 00171 }; 00172 } 00173 00174 #endif