Spring Batch - Étapes pour améliorer la performance
Je suis actuellement à l'élaboration de données chargeurs.La lecture d'un fichier et de l'écriture à la base de données. Je suis à l'aide de gestionnaire de partition de traiter plusieurs fichiers Séparés par des Virgules dans les 30 fils. Je veux l'échelle de grandeur et de débit.Tous les jours je recevoir 15000 fichiers(chacune avec 1 million d'enregistrements ) , comment puis-je l'échelle à l'aide de spring batch.je veux que le travail à terminer à l'intérieur d'un jour.Nous n'avons aucune open source de grille de calcul , qui peut le faire assez, ou est-il un simple amende étapes de paramétrage.
Le printemps lot de chargeur de données fonctionne en autonome. Il n'y a pas de conteneur web concernés. il s'exécute sur un seul ordinateur solaris avoir 24 processeurs. Les données sont écrites dans la base de données unique.par défaut isolement et la propagation est fourni.La configuration xml est donnée ci-dessous:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
<!-- IMPORT DB CONFIG -->
<import resource="classpath:bom/bom/bomloader/job/DataSourcePoolConfig.xml" />
<!-- USE ANNOTATIONS TO CONFIGURE SPRING BEANS -->
<context:component-scan base-package="bom.bom.bom" />
<!-- INJECT THE PROCESS PARAMS HASHMAP BEFORE CONTEXT IS INITIALISED -->
<bean id="holder" class="bom.bom.bom.loader.util.PlaceHolderBean" >
<property name="beanName" value="holder"/>
</bean>
<bean id="logger" class="bom.bom.bom.loader.util.PlaceHolderBean" >
<property name="beanName" value="logger"/>
</bean>
<bean id="dataMap" class="java.util.concurrent.ConcurrentHashMap" />
<!-- JOB REPOSITORY - WE USE DATABASE REPOSITORY -->
<!-- <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" >-->
<!-- <property name="transactionManager" ref="frdtransactionManager" />-->
<!-- <property name="dataSource" ref="frddataSource" />-->
<!-- <property name="databaseType" value="oracle" />-->
<!-- <property name="tablePrefix" value="batch_"/> -->
<!-- </bean>-->
<!-- JOB REPOSITORY - WE IN MEMORY REPOSITORY -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="frdtransactionManager" />
</bean>
<!-- <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">-->
<!-- <property name="dataSource" ref="frddataSource" />-->
<!-- <property name="tablePrefix" value="batch_"/> -->
<!-- </bean>-->
<!-- LAUNCH JOBS FROM A REPOSITORY -->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SyncTaskExecutor" />
</property>
</bean>
<!-- CONFIGURE SCHEDULING IN QUARTZ -->
<!-- <bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">-->
<!-- <property name="jobClass" value="bom.bom.bom.assurance.core.JobLauncherDetails" />-->
<!-- <property name="group" value="quartz-batch" />-->
<!-- <property name="jobDataAsMap">-->
<!-- <map>-->
<!-- <entry key="jobName" value="${jobname}"/>-->
<!-- <entry key="jobLocator" value-ref="jobRegistry"/>-->
<!-- <entry key="jobLauncher" value-ref="jobLauncher"/>-->
<!-- </map>-->
<!-- </property>-->
<!-- </bean>-->
<!-- RUN EVERY 2 HOURS -->
<!-- <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">-->
<!-- <property name="triggers">-->
<!-- <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">-->
<!-- <property name="jobDetail" ref="jobDetail" />-->
<!-- <property name="cronExpression" value="2/0 * * * * ?" />-->
<!-- </bean>-->
<!-- </property>-->
<!-- </bean>-->
<!-- -->
<!-- RUN STANDALONE -->
<bean id="jobRunner" class="bom.bom.bom.loader.core.DataLoaderJobRunner">
<constructor-arg value="${LOADER_NAME}" />
</bean>
<!-- Get all the files for the exchanges and feed as resource to the MultiResourcePartitioner -->
<bean id="fileresource" class="bom.bom.bom.loader.util.FiltersFoldersResourceFactory" p:dataMap-ref="dataMap">
<property name="filePath" value="${PARENT_PATH}" />
<property name="acceptedFolders" value="${EXCH_CODE}" />
<property name="logger" ref="logger" />
</bean>
<!-- The network Data Loading Configuration goes here -->
<job id="CDR_network _PARALLEL" xmlns="http://www.springframework.org/schema/batch" restartable="false" >
<step id="PREPARE_CLEAN" >
<flow parent="prepareCleanFlow" />
<next on="COMPLETED" to="LOAD_EXCHANGE_DATA" />
<fail on="FAILED" exit-code="Failed on cleaning error records."/>
</step>
<step id="LOAD_EXCHANGE_DATA" >
<tasklet ref="businessData" transaction-manager="ratransactionManager" />
<next on="COMPLETED" to="LOAD_CDR_FILES" />
<fail on="FAILED" exit-code="FAILED ON LOADING EXCHANGE INFORMATION FROM DB." />
</step>
<step id="LOAD_CDR_FILES" >
<tasklet ref="fileresource" transaction-manager="frdtransactionManager" />
<next on="COMPLETED" to="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" />
<fail on="FAILED" exit-code="FAILED ON LOADING CDR FILES." />
</step>
<step id="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" next="limitDecision" >
<partition step="filestep" partitioner="filepartitioner" >
<handler grid-size="100" task-executor="executorWithCallerRunsPolicy" />
</partition>
</step>
<decision id="limitDecision" decider="limitDecider">
<next on="COMPLETED" to="MOVE_RECS_STAGING_TO_MAIN_TABLE" />
<next on="CONTINUE" to="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" />
</decision>
<step id="MOVE_RECS_STAGING_TO_MAIN_TABLE" >
<tasklet ref="moveRecords" transaction-manager="ratransactionManager" >
<transaction-attributes isolation="SERIALIZABLE"/>
</tasklet>
<fail on="FAILED" exit-code="FAILED ON MOVING DATA TO THE MAIN TABLE." />
<next on="*" to="PREPARE_ARCHIVE"/>
</step>
<step id="PREPARE_ARCHIVE" >
<flow parent="prepareArchiveFlow" />
<fail on="FAILED" exit-code="FAILED ON Archiving files" />
<end on="*" />
</step>
</job>
<flow id="prepareCleanFlow" xmlns="http://www.springframework.org/schema/batch">
<step id="CLEAN_ERROR_RECORDS" next="archivefileExistsDecisionInFlow" >
<tasklet ref="houseKeeping" transaction-manager="ratransactionManager" />
</step>
<decision id="archivefileExistsDecisionInFlow" decider="archivefileExistsDecider">
<end on="NO_ARCHIVE_FILE" />
<next on="ARCHIVE_FILE_EXISTS" to="runprepareArchiveFlow" />
</decision>
<step id="runprepareArchiveFlow" >
<flow parent="prepareArchiveFlow" />
</step>
</flow>
<flow id="prepareArchiveFlow" xmlns="http://www.springframework.org/schema/batch" >
<step id="ARCHIVE_CDR_FILES" >
<tasklet ref="archiveFiles" transaction-manager="frdtransactionManager" />
</step>
</flow>
<bean id="archivefileExistsDecider" class="bom.bom.bom.loader.util.ArchiveFileExistsDecider" >
<property name="logger" ref="logger" />
<property name="frdjdbcTemplate" ref="frdjdbcTemplate" />
</bean>
<bean id="filepartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner" scope="step" >
<property name="resources" value="#{dataMap[processFiles]}"/>
</bean>
<task:executor id="executorWithCallerRunsPolicy"
pool-size="90-95"
queue-capacity="6"
rejection-policy="CALLER_RUNS"/>
<!-- <bean id="dynamicJobParameters" class="bom.bom.bom.assurance.core.DynamicJobParameters" />-->
<bean id="houseKeeping" class="bom.bom.bom.loader.core.HousekeepingOperation">
<property name="logger" ref="logger" />
<property name="jdbcTemplate" ref="rajdbcTemplate" />
<property name="frdjdbcTemplate" ref="frdjdbcTemplate" />
</bean>
<bean id="businessData" class="bom.bom.bom.loader.core.BusinessValidatorData">
<property name="logger" ref="logger" />
<property name="jdbcTemplate" ref="NrajdbcTemplate" />
<property name="param" value="${EXCH_CODE}" />
<property name="sql" value="${LOOKUP_QUERY}" />
</bean>
<step id="filestep" xmlns="http://www.springframework.org/schema/batch">
<tasklet transaction-manager="ratransactionManager" allow-start-if-complete="true" >
<chunk writer="jdbcItenWriter" reader="fileItemReader" processor="itemProcessor" commit-interval="500" retry-limit="2">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
<listeners>
<listener ref="customStepExecutionListener">
</listener>
</listeners>
</tasklet>
</step>
<bean id="moveRecords" class="bom.bom.bom.loader.core.MoveDataFromStaging">
<property name="logger" ref="logger" />
<property name="jdbcTemplate" ref="rajdbcTemplate" />
</bean>
<bean id="archiveFiles" class="bom.bom.bom.loader.core.ArchiveCDRFile" >
<property name="logger" ref="logger" />
<property name="jdbcTemplate" ref="frdjdbcTemplate" />
<property name="archiveFlag" value="${ARCHIVE_FILE}" />
<property name="archiveDir" value="${ARCHIVE_LOCATION}" />
</bean>
<bean id="limitDecider" class="bom.bom.bom.loader.util.LimitDecider" p:dataMap-ref="dataMap">
<property name="logger" ref="logger" />
</bean>
<!-- <bean id="multifileReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step" >-->
<!-- <property name="resources" value="#{stepExecutionContext[fileName]}" />-->
<!-- <property name="delegate" ref="fileItemReader" />-->
<!-- </bean>-->
<!-- READ EACH FILE PARALLELY -->
<bean id="fileItemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">
<property name="resource" value="#{stepExecutionContext[fileName]}" />
<property name="saveState" value="false" />
</bean>
<!-- LISTEN AT THE END OF EACH FILE TO DO POST PROCESSING -->
<bean id="customStepExecutionListener" class="bom.bom.bom.loader.core.StagingStepExecutionListener" scope="step">
<property name="logger" ref="logger" />
<property name="frdjdbcTemplate" ref="frdjdbcTemplate" />
<property name="jdbcTemplate" ref="rajdbcTemplate" />
<property name="sql" value="${INSERT_IA_QUERY_COLUMNS}" />
</bean>
<!-- CONFIGURE THE ITEM PROCESSOR TO DO BUSINESS LOGIC ON EACH ITEM -->
<bean id="itemProcessor" class="bom.bom.bom.loader.core.StagingLogicProcessor" scope="step">
<property name="logger" ref="logger" />
<property name="params" ref="businessData" />
</bean>
<!-- CONFIGURE THE JDBC ITEM WRITER TO WRITE IN TO DB -->
<bean id="jdbcItenWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter" scope="step">
<property name="dataSource" ref="radataSource"/>
<property name="sql">
<value>
<![CDATA[
${SQL1A}
]]>
</value>
</property>
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider">
</bean>
</property>
</bean>
<!-- <bean id="itemWriter" class="bom.bom.bom.assurance.core.LoaderDBWriter" scope="step">-->
<!-- <property name="sQL" value="${loader.sql}" />-->
<!-- <property name="jdbcTemplate" ref="NrajdbcTemplate" />-->
<!-- </bean>-->
<!-- CONFIGURE THE FLAT FILE ITEM READER TO READ INDIVIDUAL BATCH -->
<bean id="itemReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" abstract="true">
<property name="strict" value="false"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
<property name="names" value="${COLUMNS}" />
<property name="columns" value="${RANGE}" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="bom.bom.bom.loader.util.DataLoaderMapper">
<property name="params" value="${BEANPROPERTIES}"/>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Essayé:
- j'ai pu voir que la ThreadPoolExecutor se bloque au bout de 3 heures.Le prstat dans solaris dit qu'il est en cours de traitement, mais pas de traitement dans le journal.
- Essayé avec moins de taille de bloc de 500 ,en raison de la mémoire pied d'impression,pas de progrès.
- Comme il insère dans la base de données unique( 30 pool de connexions).est-il anythin je peux le faire ici.
Instances de visual vm
stacktrace de fil tous les sont bloqués au niveau de la connexion
Full thread dump Java HotSpot(TM) Server VM (11.3-b02 mixed mode):
"Attach Listener" daemon prio=3 tid=0x00bbf800 nid=0x26 waiting on condition [0x00000000..0x00000000]
java.lang.Thread.State: RUNNABLE
"executorWithCallerRunsPolicy-1" prio=3 tid=0x008a7000 nid=0x25 runnable [0xd5a7d000..0xd5a7fb70]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at oracle.net.ns.Packet.receive(Packet.java:240)
at oracle.net.ns.DataPacket.receive(DataPacket.java:92)
at oracle.net.ns.NetInputStream.getNextPacket(NetInputStream.java:172)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:117)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:92)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:77)
at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1034)
at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1(T4CMAREngine.java:1010)
at oracle.jdbc.driver.T4C8Oall.receive(T4C8Oall.java:588)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:194)
at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:953)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1222)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3387)
at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3468)
- locked <0xdbdafa30> (a oracle.jdbc.driver.T4CConnection)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1350)
at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:818)
at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:1)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:587)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:812)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:868)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:876)
at
source d'informationauteur Dead Programmer
Vous devez vous connecter pour publier un commentaire.
Je vous suggère d'abaisser le morceau de la taille à 50.
500 semble trop grande : vous attendez trop de tout en parlant avec la DB.
Dans le même temps, la baisse de la TaskExecutor piscine de taille ou d'augmenter votre DB taille du pool.
Vous pouvez choisir sur en regardant votre base de données de l'hôte : si c'est le CPU et IO n'est pas maxxed, augmenter votre DB taille du pool pour augmenter la DB de la charge. Si votre DB CPU est déjà à son maximum, baissez le TaskExecutor de la taille du pool. L'objectif est de disposer d'un fluide de processus.
Je pense que la bd sera votre principal réducteurs élément. Donc, commencez par le réglage de la DB taille du pool en fonction de la DB capacités d'accueil. Quand c'est fait, ajuster votre TaskExecutor piscine de taille en fonction de la DB taille du pool (TE de la taille du pool = DB taille du pool * 1.5), plus le lot d'hôte de capacités (CPU, mémoire et IOs).
Fractionnement de la réception de vos fichiers sur plusieurs disques durs peuvent aussi aider (si possible).
Je pense que le problème ici est de millions d'enregistrements dans le fichier. Puisque vous avez déjà réduit la taille de bloc , vous devez réduire le nombre d'enregistrements. Pour le test du saké, de réduire le nombre d'enregistrements dans chaque fichier pour 10k. Ma conjecture est que vous créer des objets, de faire quelques traitement et que vous faites cela pour 1m enregistrements dans une boucle. Chaque thread va tenir l'objet en mémoire, sauf si le traitement est terminé. Ma conjecture est à cause du volume de données, il y a trop d'objets dans votre mémoire qui ne sont pas des ordures collectées. Si la réduction de la taille de l'aide, alors vous pouvez essayer d'utiliser des objets légers dans votre code et essayez de définir chaque objet à null à la fin du traitement.
juste un correctif dans votre expression cron. C'est le bon pour 2 heures:
0 0 0/2 1/1 * ? *
BeanPropertyRowMapper
)? Qui peut entraver les performances.