1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.jetspeed.aggregator.impl;
19
20 import java.security.AccessControlContext;
21 import java.security.PrivilegedAction;
22
23 import javax.security.auth.Subject;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.jetspeed.aggregator.RenderingJob;
28 import org.apache.jetspeed.aggregator.Worker;
29 import org.apache.jetspeed.aggregator.WorkerMonitor;
30 import org.apache.jetspeed.security.JSSubject;
31
32 /***
33 * Worker thread processes jobs and notify its WorkerMonitor when completed.
34 * When no work is available, the worker simply sets itself in a waiting mode
35 * pending reactivation by the WorkerMonitor
36 *
37 * @author <a href="mailto:raphael@apache.org">Raphael Luta</a>
38 * @author <a>Woonsan Ko</a>
39 * @version $Id: WorkerImpl.java 587064 2007-10-22 11:54:11Z woonsan $
40 */
41 public class WorkerImpl extends Thread implements Worker
42 {
43 /*** Commons logging */
44 protected final static Log log = LogFactory.getLog(WorkerImpl.class);
45
46 /*** Running status of this worker */
47 private boolean running = true;
48
49 /*** Counter of consecutive jobs that can be processed before the
50 worker being actually put back on the idle queue */
51 private int jobCount = 0;
52
53 /*** Job to process */
54 private Runnable job = null;
55
56 /*** Context to process job within */
57 private AccessControlContext context = null;
58
59 /*** Monitor for this Worker */
60 private WorkerMonitor monitor = null;
61
62 public WorkerImpl(WorkerMonitor monitor)
63 {
64 super();
65 this.setMonitor(monitor);
66 this.setDaemon(true);
67 }
68
69 public WorkerImpl(WorkerMonitor monitor, ThreadGroup tg, String name)
70 {
71 super(tg, name);
72 this.setMonitor(monitor);
73 this.setDaemon(true);
74 }
75
76 /***
77 * Return the number of jobs processed by this worker since the last time it
78 * has been on the idle queue
79 */
80 public int getJobCount()
81 {
82 return this.jobCount;
83 }
84
85 /***
86 * Reset the processed job counter
87 */
88 public void resetJobCount()
89 {
90 this.jobCount=0;
91 }
92
93 /***
94 * Sets the running status of this Worker. If set to false, the Worker will
95 * stop after processing its current job.
96 */
97 public void setRunning(boolean status)
98 {
99 this.running = status;
100 }
101
102 /***
103 * Sets the moitor of this worker
104 */
105 public void setMonitor(WorkerMonitor monitor)
106 {
107 this.monitor = monitor;
108 }
109
110 /***
111 * Sets the job to execute in security context
112 */
113 public void setJob(Runnable job, AccessControlContext context)
114 {
115 this.job = job;
116 this.context = context;
117 }
118
119 /***
120 * Sets the job to execute
121 */
122 public void setJob(Runnable job)
123 {
124 this.job = job;
125 this.context = null;
126 }
127
128 /***
129 * Retrieves the job to execute
130 */
131 public Runnable getJob()
132 {
133 return this.job;
134 }
135
136 /***
137 * Process the job assigned, then notify Monitor. If no job available,
138 * go into sleep mode
139 */
140 public void run()
141 {
142 while (running)
143 {
144
145 synchronized (this)
146 {
147 if (this.job == null)
148 {
149 try
150 {
151 this.wait();
152 }
153 catch (InterruptedException e)
154 {
155
156 }
157 }
158 }
159
160
161 if (this.job != null)
162 {
163 log.debug("Processing job for window :" + ((RenderingJob)job).getWindow().getId());
164 Subject subject = null;
165 if (this.context != null)
166 {
167 subject = JSSubject.getSubject(this.context);
168 }
169 if (subject != null)
170 {
171 JSSubject.doAsPrivileged(subject, new PrivilegedAction()
172 {
173 public Object run()
174 {
175 try
176 {
177 WorkerImpl.this.job.run();
178 }
179 catch (Throwable t)
180 {
181 log.error("Thread error", t);
182 }
183 return null;
184 }
185 }, this.context);
186 }
187 else
188 {
189 try
190 {
191 this.job.run();
192 }
193 catch (Throwable t)
194 {
195 log.error("Thread error", t);
196 }
197 }
198 }
199
200 this.jobCount++;
201
202
203 ((WorkerMonitorImpl) monitor).release(this);
204 }
205 }
206
207 }