Coverage Report - kg.apc.jmeter.threads.AbstractSimpleThreadGroup
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractSimpleThreadGroup
35%
36/102
23%
9/38
2.312
 
 1  
 package kg.apc.jmeter.threads;
 2  
 
 3  
 import java.util.Map;
 4  
 import java.util.Map.Entry;
 5  
 import java.util.concurrent.ConcurrentHashMap;
 6  
 
 7  
 import org.apache.jmeter.engine.StandardJMeterEngine;
 8  
 import org.apache.jmeter.engine.TreeCloner;
 9  
 import org.apache.jmeter.threads.AbstractThreadGroup;
 10  
 import org.apache.jmeter.threads.JMeterContext;
 11  
 import org.apache.jmeter.threads.JMeterContextService;
 12  
 import org.apache.jmeter.threads.JMeterThread;
 13  
 import org.apache.jmeter.threads.ListenerNotifier;
 14  
 import org.apache.jmeter.util.JMeterUtils;
 15  
 import org.apache.jorphan.collections.ListedHashTree;
 16  
 import org.apache.jorphan.logging.LoggingManager;
 17  
 import org.apache.log.Logger;
 18  
 
 19  
 public abstract class AbstractSimpleThreadGroup extends AbstractThreadGroup {
 20  1
     private static final Logger log = LoggingManager.getLoggerForClass();
 21  
 
 22  1
     private static final long WAIT_TO_DIE = JMeterUtils.getPropDefault("jmeterengine.threadstop.wait", 5 * 1000); // 5 seconds
 23  
 
 24  
     // List of active threads
 25  67
     private final Map<JMeterThread, Thread> allThreads = new ConcurrentHashMap<JMeterThread, Thread>();
 26  
 
 27  
     /**
 28  
      * Is test (still) running?
 29  
      */
 30  67
     private volatile boolean running = false;
 31  
 
 32  
     //JMeter 2.7 Compatibility
 33  67
     private long tgStartTime = -1;
 34  
     private static final long TOLERANCE = 1000;
 35  
 
 36  
 
 37  
     /**
 38  
      * No-arg constructor.
 39  
      */
 40  67
     public AbstractSimpleThreadGroup() {
 41  67
     }
 42  
 
 43  
     protected abstract void scheduleThread(JMeterThread thread, long now);
 44  
 
 45  
     //JMeter 2.7 compatibility
 46  
     public void scheduleThread(JMeterThread thread) {
 47  33
        if(System.currentTimeMillis()-tgStartTime > TOLERANCE) {
 48  5
            tgStartTime = System.currentTimeMillis();
 49  
        }
 50  33
        scheduleThread(thread, tgStartTime);
 51  33
     }
 52  
 
 53  
     
 54  
     @Override
 55  
     public void start(int groupCount, ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine) {
 56  1
         running = true;
 57  
 
 58  1
         int numThreads = getNumThreads();
 59  
         
 60  1
         log.info("Starting thread group number " + groupCount
 61  
                 + " threads " + numThreads);
 62  
        
 63  1
             long now = System.currentTimeMillis(); // needs to be same time for all threads in the group
 64  1
             final JMeterContext context = JMeterContextService.getContext();
 65  1
             for (int i = 0; running && i < numThreads; i++) {
 66  0
                 JMeterThread jmThread = makeThread(groupCount, notifier, threadGroupTree, engine, i, context);
 67  0
                 scheduleThread(jmThread, now); // set start and end time
 68  0
                 Thread newThread = new Thread(jmThread, jmThread.getThreadName());
 69  0
                 registerStartedThread(jmThread, newThread);
 70  0
                 newThread.start();
 71  
             }
 72  
 
 73  1
         log.info("Started thread group number "+groupCount);
 74  1
     }
 75  
 
 76  
     private void registerStartedThread(JMeterThread jMeterThread, Thread newThread) {
 77  0
         allThreads.put(jMeterThread, newThread);
 78  0
     }
 79  
 
 80  
     private JMeterThread makeThread(int groupCount,
 81  
             ListenerNotifier notifier, ListedHashTree threadGroupTree,
 82  
             StandardJMeterEngine engine, int i,
 83  
             JMeterContext context) { // N.B. Context needs to be fetched in the correct thread
 84  0
         boolean onErrorStopTest = getOnErrorStopTest();
 85  0
         boolean onErrorStopTestNow = getOnErrorStopTestNow();
 86  0
         boolean onErrorStopThread = getOnErrorStopThread();
 87  0
         boolean onErrorStartNextLoop = getOnErrorStartNextLoop();
 88  0
         String groupName = getName();
 89  0
         final JMeterThread jmeterThread = new JMeterThread(cloneTree(threadGroupTree), this, notifier);
 90  0
         jmeterThread.setThreadNum(i);
 91  0
         jmeterThread.setThreadGroup(this);
 92  0
         jmeterThread.setInitialContext(context);
 93  0
         final String threadName = groupName + " " + (groupCount) + "-" + (i + 1);
 94  0
         jmeterThread.setThreadName(threadName);
 95  0
         jmeterThread.setEngine(engine);
 96  0
         jmeterThread.setOnErrorStopTest(onErrorStopTest);
 97  0
         jmeterThread.setOnErrorStopTestNow(onErrorStopTestNow);
 98  0
         jmeterThread.setOnErrorStopThread(onErrorStopThread);
 99  0
         jmeterThread.setOnErrorStartNextLoop(onErrorStartNextLoop);
 100  0
         return jmeterThread;
 101  
     }
 102  
 
 103  
     @Override
 104  
     public boolean stopThread(String threadName, boolean now) {
 105  1
         for(Entry<JMeterThread, Thread> entry : allThreads.entrySet()){
 106  0
             JMeterThread thrd = entry.getKey();
 107  0
             if (thrd.getThreadName().equals(threadName)){
 108  0
                 thrd.stop();
 109  0
                 thrd.interrupt();
 110  0
                 if (now) {
 111  0
                     Thread t = entry.getValue();
 112  0
                     if (t != null) {
 113  0
                         t.interrupt();
 114  
                     }
 115  
                 }
 116  0
                 return true;
 117  
             }
 118  0
         }
 119  1
         return false;
 120  
     }
 121  
 
 122  
     @Override
 123  
     public void threadFinished(JMeterThread thread) {
 124  1
         log.debug("Ending thread " + thread.getThreadName());
 125  1
         allThreads.remove(thread);
 126  1
     }
 127  
 
 128  
     @Override
 129  
     public void tellThreadsToStop() {
 130  1
         running = false;
 131  1
         for (Entry<JMeterThread, Thread> entry : allThreads.entrySet()) {
 132  0
             JMeterThread item = entry.getKey();
 133  0
             item.stop(); // set stop flag
 134  0
             item.interrupt(); // interrupt sampler if possible
 135  0
             Thread t = entry.getValue();
 136  0
             if (t != null ) { // Bug 49734
 137  0
                 t.interrupt(); // also interrupt JVM thread
 138  
             }
 139  0
         }
 140  1
     }
 141  
 
 142  
     @Override
 143  
     public void stop() {
 144  1
         running = false;
 145  1
         for (JMeterThread item : allThreads.keySet()) {
 146  0
             item.stop();
 147  0
         }
 148  1
     }
 149  
 
 150  
     @Override
 151  
     public int numberOfActiveThreads() {
 152  1
         return allThreads.size();
 153  
     }
 154  
 
 155  
     @Override
 156  
     public boolean verifyThreadsStopped() {
 157  1
         boolean stoppedAll = true;
 158  1
         for (Thread t : allThreads.values()) {
 159  0
             stoppedAll = stoppedAll && verifyThreadStopped(t);
 160  0
         }
 161  1
         return stoppedAll;
 162  
     }
 163  
 
 164  
     private boolean verifyThreadStopped(Thread thread) {
 165  0
         boolean stopped = true;
 166  0
         if (thread != null) {
 167  0
             if (thread.isAlive()) {
 168  
                 try {
 169  0
                     thread.join(WAIT_TO_DIE);
 170  0
                 } catch (InterruptedException e) {
 171  0
                 }
 172  0
                 if (thread.isAlive()) {
 173  0
                     stopped = false;
 174  0
                     log.warn("Thread won't exit: " + thread.getName());
 175  
                 }
 176  
             }
 177  
         }
 178  0
         return stopped;
 179  
     }
 180  
 
 181  
     @Override
 182  
     public void waitThreadsStopped() {
 183  1
         for (Thread t : allThreads.values()) {
 184  0
             waitThreadStopped(t);
 185  0
         }
 186  1
     }
 187  
 
 188  
     private void waitThreadStopped(Thread thread) {
 189  0
         if (thread != null) {
 190  0
             while (thread.isAlive()) {
 191  
                 try {
 192  0
                     thread.join(WAIT_TO_DIE);
 193  0
                 } catch (InterruptedException e) {
 194  0
                 }
 195  
             }
 196  
         }
 197  0
     }
 198  
 
 199  
     private ListedHashTree cloneTree(ListedHashTree tree) {
 200  0
         TreeCloner cloner = new TreeCloner(true);
 201  0
         tree.traverse(cloner);
 202  0
         return cloner.getClonedTree();
 203  
     }
 204  
 }
 205