1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.jetspeed.statistics.impl;
18
19 import java.sql.Connection;
20 import java.sql.PreparedStatement;
21 import java.sql.SQLException;
22 import java.util.Iterator;
23 import java.util.LinkedList;
24 import java.util.List;
25
26 import javax.sql.DataSource;
27
28 /***
29 * <p>
30 * BatchedStatistics
31 * </p>
32 *
33 * @author <a href="mailto:chris@bluesunrise.com">Chris Schaefer </a>
34 * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a>
35 * @version $Id: TestPortletEntityDAO.java,v 1.3 2005/05/24 14:43:19 ate Exp $
36 */
37 public abstract class BatchedStatistics implements Runnable
38 {
39
40 public BatchedStatistics(DataSource ds, int batchSize,
41 long msElapsedTimeThreshold, String name)
42 {
43 this.ds = ds;
44 this.msElapsedTimeThreshold = msElapsedTimeThreshold;
45 this.batchSize = batchSize;
46 this.name = name;
47 if (this.name == null)
48 {
49 this.name = this.getClass().getName();
50 }
51 msLastFlushTime = System.currentTimeMillis();
52 thread = new Thread(this, name);
53
54 }
55
56 public void startThread() {
57
58 thread.start();
59
60
61
62 while (this.done)
63 {
64 try
65 {
66 Thread.sleep(1);
67 } catch (InterruptedException e)
68 {
69 }
70 }
71 }
72
73 protected Connection getConnection() throws SQLException
74 {
75 return ds.getConnection();
76 }
77
78 /***
79 * should only be called from code synchronized to the linked list
80 */
81 private void checkAndDoFlush()
82 {
83 long msCurrentTime = System.currentTimeMillis();
84 if ((logRecords.size() >= batchSize)
85 || (msCurrentTime - msLastFlushTime > msElapsedTimeThreshold))
86 {
87 flush();
88 msLastFlushTime = msCurrentTime;
89 }
90 }
91
92 public void addStatistic(LogRecord logRecord)
93 {
94 synchronized (logRecords)
95 {
96 logRecords.add(logRecord);
97 checkAndDoFlush();
98 }
99 }
100
101 public boolean isDone()
102 {
103 return done;
104 }
105
106 public void tellThreadToStop()
107 {
108 keepRunning = false;
109
110 }
111
112 private boolean done = true;
113
114 private boolean keepRunning = true;
115
116 public void run()
117 {
118 done = false;
119 while (keepRunning)
120 {
121 try
122 {
123 synchronized (this.thread)
124 {
125 this.thread.wait(msElapsedTimeThreshold / 4);
126 }
127 } catch (InterruptedException ie)
128 {
129 keepRunning = false;
130 }
131 synchronized (logRecords)
132 {
133 checkAndDoFlush();
134 }
135 }
136
137
138 synchronized (logRecords)
139 {
140 flush();
141 }
142 done = true;
143 }
144
145
146
147
148
149
150
151 public void flush()
152 {
153 if (logRecords.isEmpty()) return;
154
155 Connection con = null;
156 PreparedStatement stm = null;
157
158 try
159 {
160 con = getConnection();
161 boolean autoCommit = con.getAutoCommit();
162 con.setAutoCommit(false);
163
164 stm = getPreparedStatement(con);
165 Iterator recordIterator = logRecords.iterator();
166 while (recordIterator.hasNext())
167 {
168 LogRecord record = (LogRecord) recordIterator.next();
169
170 loadOneRecordToStatement(stm, record);
171
172 stm.addBatch();
173 }
174 stm.executeBatch();
175 con.commit();
176
177 logRecords.clear();
178 con.setAutoCommit(autoCommit);
179 }
180 catch (SQLException e)
181 {
182
183 e.printStackTrace();
184 try
185 {
186 con.rollback();
187 }
188 catch (Exception e2) {}
189 }
190 finally
191 {
192 try
193 {
194 if (stm != null) stm.close();
195 } catch (SQLException se)
196 {
197 }
198 releaseConnection(con);
199 }
200 }
201
202 abstract protected PreparedStatement getPreparedStatement(Connection con)
203 throws SQLException;
204
205 abstract protected void loadOneRecordToStatement(PreparedStatement stm,
206 LogRecord rec) throws SQLException;
207
208 void releaseConnection(Connection con)
209 {
210 try
211 {
212 if (con != null) con.close();
213 } catch (SQLException e)
214 {
215 }
216 }
217
218 protected Thread thread;
219
220 protected long msLastFlushTime = 0;
221
222 protected int batchSize = 10;
223
224 protected long msElapsedTimeThreshold = 5000;
225
226 protected List logRecords = new LinkedList();
227
228 protected DataSource ds = null;
229
230 protected String name;
231
232 public abstract boolean canDoRecordType(LogRecord rec);
233
234 }