001 /*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016 package org.opengion.fukurou.process;
017
018 import org.opengion.fukurou.util.Argument;
019 import org.opengion.fukurou.util.SystemParameter;
020 import org.opengion.fukurou.util.LogWriter;
021
022 import org.opengion.fukurou.util.HybsEntry ;
023 import org.opengion.fukurou.util.Closer;
024 import org.opengion.fukurou.db.ConnectionFactory;
025
026 import java.util.Set ;
027 import java.util.HashSet ;
028 import java.util.Map ;
029 import java.util.LinkedHashMap ;
030
031 import java.sql.Connection;
032 import java.sql.Statement;
033 import java.sql.ResultSet;
034 import java.sql.SQLException;
035
036 /**
037 * Process_BulkQueryは、データベ?スから読み取った?容を??処?るために?
038 * ParamProcess のサブクラス(Process_DBParam)にセ?したり??したりす?
039 * FirstProcess と、ChainProcess のインターフェースを両方持った?実?ラスです?
040 *
041 * こ?クラスは、上流から?下流への処???度しか実行されません?
042 * FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します?
043 * ChainProcess は、その結果を取り?し?自??身の処?果と合せて?します?
044 *
045 * FirstProcess では?action は、query のみです?
046 * query は、指定?SQL?実行し、結果のSetをParamProcessに設定します?
047 * ChainProcess では?action は、query、bulkSet、minus、intersect が指定できます?
048 * query は、上記と同じです?
049 * minus は、?のSetから、SQL??実行結果を引き算し、結果Setを?設定します?
050 * intersect は、?のSetから、SQL??実行結果と重?る結果Setを?設定します?
051 * bulkSet は、?のSetを取り?し?SQL??して処?ます?
052 * 流れ?は、query で検索し?minusまた?intersect でSetオブジェクトを?し?bulkSet で
053 * 利用します?例えば、ORACLEから、ユニ?クキーのSetを作?し?SQLServerのユニ?クキー?
054 * minusした結果を?ORACLEからDELETEすれば、不要な??タを削除するなどの処?実行可能になります?
055 * また?単純に、query ?を?チェインすれば、単発のUPDATE?実行することが可能です?
056 *
057 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に
058 * 設定された接?Connection)を使用します?
059 * DBID は、Process_DBParam の -configFile で?す?DBConfig.xml ファイルを使用します?
060 *
061 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ??
062 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に
063 * 繋げてください?
064 *
065 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?
066 *
067 * @og.formSample
068 * Process_BulkQuery -action=query -dbid=DBGE -sql="select KEY from TABLE_X"
069 *
070 * -action=処????) ??実行する??法を?しま?
071 * -action=query 単なるSQL?実行します?
072 * -action=bulkSet 実行したSQL??結果を?Set<String> オブジェクトに設定します?
073 * -action=minus Set<String> オブジェクトと、ここでの実行結果の差?とります?
074 * -action=intersect Set<String> オブジェクトと、ここでの実行結果の積?をとります?
075 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規?
076 * [ -sql=検索SQL? ] ??-sql="select * from GEA08"
077 * [ -sqlFile=検索SQLファイル ] ??-sqlFile=select.sql
078 * -sql= を指定しな??合?、ファイルで??してください?
079 * [ -sql_XXXX=固定? ] ??-sql_SYSTEM_ID=GE
080 * SQL?の{@XXXX}??を指定?固定?で置き換えます?
081 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'
082 * [ -bulkKey=XXXX ] ??-bulkKey=XXXX
083 * SQL?の{@XXXX}??をProcess_BulkQuery等で取得した?で置き換えます?
084 * WHERE SYSTEM_ID IN ( {@XXXX} ) ?WHERE SYSTEM_ID IN ( 'AA','BB','CC' )
085 * [ -bulkType=NUM|STR ] ??-bulType=STR
086 * Bulkの値を文字?に変換する場合に、数字型か??型を指定します?
087 * 数字型では、AA,BB,CC とし??型では?AA','BB','CC' に変換しま?初期値:STR)?
088 * [ -fetchSize=100 ] ?フェ?する行数(初期値:100)
089 * [ -display=[false/true] ] ?結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない])
090 * [ -debug=[false/true] ] ?デバッグ??を標準?力に表示する(true)かしな?false)?初期値:false[表示しない])
091 *
092 * @og.rev 5.3.4.0 (2011/04/01) 新規追?
093 * @version 4.0
094 * @author Kazuhiko Hasegawa
095 * @since JDK5.0,
096 */
097 public class Process_BulkQuery extends AbstractProcess implements FirstProcess , ChainProcess {
098 private static final int MAX_BULK_SET = 500 ; // ORACLE の制? 1000 なので?
099
100 private static final String ACT_QUERY = "query" ;
101 private static final String ACT_BULKSET = "bulkSet" ;
102 private static final String ACT_MINUS = "minus" ;
103 private static final String ACT_INTERSECT = "intersect" ;
104
105 private static final String[] ACTION_LST = new String[] { ACT_QUERY,ACT_BULKSET,ACT_MINUS,ACT_INTERSECT };
106
107 // private LineModel newData = null;
108
109 private String actionCmd = null; // SQL結果を加工(query:実行?minus:引き算?intersect:重??)
110 private String dbid = null; // メインDB接続ID
111
112 private String bulkKey = null;
113 private boolean bulkType = true; // true:STR , false:NUM
114
115 private int sqlCount = 0; // SQL??処?数
116 private int setCount = 0; // 取り出したSetの件数
117 private int outCount = 0; // マ?ジ後?Setの件数
118
119 private int fetchSize = 100;
120 private boolean display = false; // 表示しな?
121 private boolean debug = false; // ????
122 private boolean firstTime = true; // ??の?目
123
124 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map
125 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map
126
127 static {
128 mustProparty = new LinkedHashMap<String,String>();
129 mustProparty.put( "action", "実行する??法を?します?(query|minus|intersect)" );
130
131 usableProparty = new LinkedHashMap<String,String>();
132 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? );
133 usableProparty.put( "sql", "検索SQL?sql or sqlFile ??)? \"select * from GEA08\"" );
134 usableProparty.put( "sqlFile", "検索SQLファイル(sql or sqlFile ??)? select.sql" );
135 usableProparty.put( "sql_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" +
136 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
137 usableProparty.put( "dbid2", "DB接続ID2 ? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? );
138 usableProparty.put( "sql2", "検索SQL?(sql or sqlFile ??)? \"select * from GEA08\"" );
139 usableProparty.put( "sqlFile2", "検索SQLファイル2(sql or sqlFile ??)? select.sql" );
140 usableProparty.put( "sql2_", "SQL?中の{@XXXX}??を指定?固定?で置き換えます?" +
141 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
142 usableProparty.put( "bulkKey", "SQL?の{@XXXX}??をProcess_BulkQuery等で取得した?で置き換えます?" +
143 CR + "WHERE SYSTEM_ID IN ( {@XXXX} ) ?WHERE SYSTEM_ID IN ( 'AA','BB','CC' )" );
144 usableProparty.put( "bulkType", "Bulkの値を文字?に変換する場合に、文字型か?数字型を指定します?" +
145 CR + "数字型では、AA,BB,CC とし??型では?AA','BB','CC' に変換します?(初期値:STR)" );
146 usableProparty.put( "fetchSize","フェ?する行数 (初期値:100)" );
147 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? +
148 CR + "(初期値:false:表示しな?" );
149 usableProparty.put( "debug", "????を標準?力に表示する(true)かしな?false)? +
150 CR + "(初期値:false:表示しな?" );
151 }
152
153 /**
154 * ?ォルトコンストラクター?
155 * こ?クラスは、動??されます??ォルトコンストラクターで?
156 * super クラスに対して、?な初期化を行っておきます?
157 *
158 */
159 public Process_BulkQuery() {
160 super( "org.opengion.fukurou.process.Process_BulkQuery",mustProparty,usableProparty );
161 }
162
163 /**
164 * プロセスの初期化を行います?初めに??、呼び出されます?
165 * 初期処?ファイルオープン??オープン?に使用します?
166 *
167 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追?
168 *
169 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク?
170 */
171 public void init( final ParamProcess paramProcess ) {
172 Argument arg = getArgument();
173
174 actionCmd = arg.getProparty("action" , null , ACTION_LST );
175
176 fetchSize = arg.getProparty("fetchSize",fetchSize);
177 display = arg.getProparty("display",display);
178 debug = arg.getProparty("debug",debug);
179 // if( debug ) { println( arg.toString() ); } // 5.7.3.0 (2014/02/07) ????
180
181 dbid = arg.getProparty("dbid");
182 String sql = arg.getFileProparty("sql","sqlFile",true);
183 if( debug ) { println( "入力SQL:" + sql ); }
184
185 HybsEntry[] entry =arg.getEntrys( "sql_" ); //配?
186 SystemParameter sysParam = new SystemParameter( sql );
187 sql = sysParam.replace( entry );
188 if( debug ) { println( "変換SQL:" + sql ); }
189
190 if( ACT_BULKSET.equalsIgnoreCase( actionCmd ) ) {
191 bulkKey = arg.getProparty("bulkKey");
192 String bkType = arg.getProparty("bulkType");
193 if( bkType != null ) { bulkType = bkType.equalsIgnoreCase( "STR" ); }
194
195 Set<String> setData = paramProcess.getBulkData();
196 if( debug ) { println( setData.toString() ); }
197 setCount = setData.size();
198
199 if( setCount > 0 ) {
200 // 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追?
201 // sql = makeBulkQuery( sql,bulkKey,bulkType,setData );
202 // if( debug ) { println( "BulkSQL:" + sql ); }
203 // createSetData( paramProcess, dbid, sql );
204 String[] sqls = makeBulkQuery( sql,bulkKey,bulkType,setData );
205 for( int i=0; i<sqls.length; i++ ) {
206 if( debug ) { println( "BulkSQL:" + sqls[i] ); }
207 createSetData( paramProcess, dbid, sqls[i] );
208 }
209 }
210 }
211 else if( ACT_QUERY.equalsIgnoreCase( actionCmd ) ) {
212 Set<String> setData2 = createSetData( paramProcess, dbid, sql );
213 if( debug ) { println( setData2.toString() ); }
214 setCount = setData2.size();
215 outCount = setCount;
216 paramProcess.setBulkData( setData2 );
217 }
218 else {
219 Set<String> setData = paramProcess.getBulkData();
220 Set<String> setData2 = createSetData( paramProcess, dbid, sql );
221 setCount = setData2.size();
222
223 if( ACT_MINUS.equalsIgnoreCase( actionCmd ) ) {
224 setData.removeAll( setData2 );
225 }
226 else if( ACT_INTERSECT.equalsIgnoreCase( actionCmd ) ) {
227 setData.retainAll( setData2 );
228 }
229 outCount = setData.size();
230 if( debug ) { println( setData.toString() ); }
231 paramProcess.setBulkData( setData );
232 }
233 }
234
235 /**
236 * プロセスの終?行います??に??、呼び出されます?
237 * 終???ファイルクローズ??クローズ?に使用します?
238 *
239 * @param isOK ト?タルで、OK?たかど? [true:成功/false:失敗]
240 */
241 public void end( final boolean isOK ) {
242 // 何もありません?
243 }
244
245 /**
246 * こ???タの処?おいて、次の処?出来るかど?を問?わせます?
247 * こ?呼び出し1回毎に、次の??タを取得する準備を行います?
248 *
249 * @return 処?きる:true / 処?きな?false
250 */
251 public boolean next() {
252 return firstTime;
253 }
254
255 /**
256 * 引数の LineModel を??るメソ?です?
257 * 変換処?? LineModel を返します?
258 * 後続??行わな?????タのフィルタリングを行う場?は?
259 * null ??タを返します?つまり?null ??タは、後続??行わな?
260 * フラグの代わりにも使用して?す?
261 * なお?変換処?? LineModel と、オリジナルの LineModel が?
262 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す?
263 * ドキュメントに明記されて???合?、副作用が問題になる?合??
264 * ???とに自?コピ?(クローン)して下さ??
265 *
266 * @param data オリジナルのLineModel
267 *
268 * @return 処?換後?LineModel
269 */
270 @SuppressWarnings(value={"unchecked"})
271 public LineModel action( final LineModel data ) {
272 return data ;
273 }
274
275 /**
276 * ??に?行データである LineModel を作?しま?
277 * FirstProcess は、次?処?チェインして???の行データ?
278 * 作?して、後続? ChainProcess クラスに処?ータを渡します?
279 *
280 * @param rowNo 処?の行番号
281 *
282 * @return 処?換後?LineModel
283 */
284 public LineModel makeLineModel( final int rowNo ) {
285 firstTime = false; // ?しか処?な?め?false を設定する?
286
287 LineModel model = new LineModel();
288
289 model.setRowNo( rowNo );
290
291 return model;
292 }
293
294 /**
295 * ?で使用する Set オブジェクトを作?します?
296 * Exception 以外では、? Set<String> オブジェクトを返します?
297 *
298 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追?
299 *
300 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク?
301 * @param dbid 接続?ID
302 * @param sql 実行するSQL?検索系)
303 *
304 * @return 実行結果から取り出した、最初?カラ??みを集めた Setオブジェク?
305 * @throws RuntimeException ??タベ?ス処?できなかった?合?
306 */
307 private Set<String> createSetData( final ParamProcess paramProcess, final String dbid, final String sql ) {
308 Set<String> data = new HashSet<String>();
309
310 Connection connection = null;
311 Statement stmt = null;
312 ResultSet resultSet = null;
313
314 try {
315 connection = paramProcess.getConnection( dbid );
316 stmt = connection.createStatement();
317 if( fetchSize > 0 ) { stmt.setFetchSize( fetchSize ); }
318 if( stmt.execute( sql ) ) { // true:検索系 , false:更新系
319 resultSet = stmt.getResultSet();
320 while( resultSet.next() ) {
321 sqlCount++ ;
322 String str = resultSet.getString(1);
323 if( display ) { println( str ); }
324 data.add( str );
325 }
326 }
327 else {
328 // sqlCount = stmt.getUpdateCount(); // 5.3.9.0 (2011/09/01)
329 sqlCount += stmt.getUpdateCount();
330 }
331 }
332 catch (SQLException ex) {
333 String errMsg = "SQL を実行できませんでした? + CR
334 + "errMsg=[" + ex.getMessage() + "]" + CR
335 + "errorCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR
336 + "DBID=" + dbid + CR
337 + "SQL =" + sql ;
338
339 throw new RuntimeException( errMsg,ex );
340 }
341 finally {
342 Closer.resultClose( resultSet );
343 Closer.stmtClose( stmt );
344
345 ConnectionFactory.remove( connection,dbid );
346 }
347 return data;
348 }
349
350 /**
351 * ?で使用する Set オブジェクトを作?します?
352 * Exception 以外では、? Set<String[]> オブジェクトを返します?
353 *
354 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追?
355 *
356 * @param sql オリジナルのSQL?
357 * @param bulkKey ?処?置き換えるキー??
358 * @param bulkType ?型(true)か?数字型(false)を指?
359 * @param setData ?処???なるSetオブジェク?
360 *
361 * @return オリジナルのSQL?に ?処????と置換したSQL??配?
362 */
363 private String[] makeBulkQuery( final String sql, final String bulkKey, final boolean bulkType,final Set<String> setData ) {
364 String[] sqls = new String[ (setData.size()/MAX_BULK_SET) + 1 ];
365 int idx = 0;
366 int cnt = 0;
367
368 StringBuilder buf = new StringBuilder();
369 String bulkVal = null;
370 if( bulkType ) { // ??の場?
371 for( String key : setData ) {
372 cnt++;
373 buf.append( ",'" ).append( key ).append( "'" );
374 if( cnt >= MAX_BULK_SET ) {
375 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
376 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
377 cnt = 0;
378 buf = new StringBuilder();
379 }
380 }
381 if( cnt > 0 ) { // きっちりで終わらな???
382 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
383 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
384 }
385 }
386 else { // 数字?場?
387 for( String key : setData ) {
388 cnt++;
389 buf.append( "," ).append( key );
390 if( cnt >= MAX_BULK_SET ) {
391 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
392 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
393 cnt = 0;
394 buf = new StringBuilder();
395 }
396 }
397 if( cnt > 0 ) { // きっちりで終わらな???
398 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
399 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
400 }
401 }
402 // String bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
403
404 // return sql.replace( "{@" + bulkKey + "}" ,bulkVal );
405 return sqls;
406 }
407
408 /**
409 * プロセスの処?果のレポ?ト表現を返します?
410 * 処??ログラ?、?力件数、?力件数などの??です?
411 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ?
412 * 形式で出してください?
413 *
414 * @return 処?果のレポ??
415 */
416 public String report() {
417 String report = "[" + getClass().getName() + "]" + CR
418 + TAB + "Action : " + actionCmd + CR
419 + TAB + "DBID : " + dbid + CR
420 + TAB + "sqlCount : " + sqlCount + CR
421 + TAB + "setCount : " + setCount + CR
422 + TAB + "outCount : " + outCount ;
423
424 return report ;
425 }
426
427 /**
428 * こ?クラスの使用方法を返します?
429 *
430 * @return こ?クラスの使用方?
431 */
432 public String usage() {
433 StringBuilder buf = new StringBuilder();
434
435 buf.append( "Process_BulkQueryは、データベ?スから読み取った?容を??処?るために? ).append( CR );
436 buf.append( "ParamProcess のサブクラス(Process_DBParam)にセ?したり??したりす? ).append( CR );
437 buf.append( "FirstProcess と、ChainProcess のインターフェースを両方持った?実?ラスです?" ).append( CR );
438 buf.append( CR );
439 buf.append( "こ?クラスは、上流から?下流への処???度しか実行されません? ).append( CR );
440 buf.append( "FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します?" ).append( CR );
441 buf.append( "ChainProcess は、その結果を取り?し?自??身の処?果と合せて?します?" ).append( CR );
442 buf.append( CR );
443 buf.append( "FirstProcess では?action は、query のみです?" ).append( CR );
444 buf.append( " query は、指定?SQL?実行し、結果のSetをParamProcessに設定します?" ).append( CR );
445 buf.append( "ChainProcess では?action は、query、bulkSet、minus、intersect が指定できます?" ).append( CR );
446 buf.append( " query は、上記と同じです?" ).append( CR );
447 buf.append( " minus は、?のSetから、SQL??実行結果を引き算し、結果Setを?設定します?" ).append( CR );
448 buf.append( " intersect は、?のSetから、SQL??実行結果と重?る結果Setを?設定します?" ).append( CR );
449 buf.append( " bulkSet は、?のSetを取り?し?SQL??して処?ます?" ).append( CR );
450 buf.append( CR );
451 buf.append( "流れ?は、query で検索し?minusまた?intersect でSetオブジェクトを?し?" ).append( CR );
452 buf.append( "bulkSet で利用します?例えば、ORACLEから、ユニ?クキーのSetを作?し?" ).append( CR );
453 buf.append( "SQLServerのユニ?クキーをminusした結果を?ORACLEからDELETEすれば、不要な" ).append( CR );
454 buf.append( "??タを削除するなどの処?実行可能になります?また?単純に、query ?を?" ).append( CR );
455 buf.append( "チェインすれば、単発のUPDATE?実行することが可能です?" ).append( CR );
456 buf.append( CR );
457 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR );
458 buf.append( "設定された接?Connection)を使用します?" ).append( CR );
459 buf.append( CR );
460 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR );
461 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR );
462 buf.append( "繋げてください? ).append( CR );
463 buf.append( CR );
464 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR );
465 buf.append( CR ).append( CR );
466
467 buf.append( getArgument().usage() ).append( CR );
468
469 return buf.toString();
470 }
471
472 /**
473 * こ?クラスは、main メソ?から実行できません?
474 *
475 * @param args コマンド引数配?
476 */
477 public static void main( final String[] args ) {
478 LogWriter.log( new Process_BulkQuery().usage() );
479 }
480 }