Edit:
After some feedback and some time to work on this I've improved the examples and introduced the Thread Pool Executor concept.
Motivation
Every now and then we find a need for doing some work in parallel. Recently I’ve worked on a process that was taking far too long to be of any use. It was taking large amounts of data in and creating an even larger amount of records given complex rules, validations and enrichment.
Refreshing some ABAP parallelism techniques, my options were basically to split the input data and then running FMs
in background, starting a new task or
writing a report and scheduling a job with parallelism being ranges of a selection criteria.
All valid options, I suppose, but I got used to work purely in OO and hammering FMs here and there to have my parallelism had a particular
bad smell. It would prevent me to assert some critical behavior via unit tests, for starters, and it would also add this new responsibility, running in parallel, to weird places with code that looked boring and repetitive (take data from classes, run fm, take data back…).
So I decided to find an OO approach to parallelism in ABAP, and took Java Thread as inspiration. I was particularly drawn to Java Thread given its simple
Runnable interface, which allows developers to quickly wrap what needs to be done in parallel in a single method. I could use it to simply call the existing methods I already had. Below is an example on how it would look like in Java.
public class MyRunnable implements Runnable {
public void run(){
myOtherClassMethodThatDoesStuff( )
}
}
Runnable runnable = new MyRunnable(); // or an anonymous class, or lambda...
Thread thread = new Thread(runnable);
thread.start();
zthread
There are probably hundreds of similar implementations to this out there. Hope this one can be useful for some of you. My concept is exactly the same as in Java, having the Runnable interface and the Thread as the starting point. As I’m used to (and like) the callback idea behind Javascript, I’ve thrown it into to mix. It made some of the code I needed easier to write and test. It looks something like this:
There is a thread factory interface and some default implementations too, which I've discarded from the diagram, to help unit test classes using Threads.
Usage
You can check git for the most up to date documentation. I´m putting some examples here to show how to use it in general.
The examples below are building up from previous examples. So zc_my_runnable is not always declared and its implementation is the one from its last time defined.
Raw way
Simply implement your runnable interface, give it to a Thread and start it. The drawback of this is that is up to the caller to manage how many Threads can run in parallel, and implement logic wait for Threads to finish so others can start and so forth.
If your program will have 2 or 3 threads that is ok and this way suits you. But if you are splitting large data into smaller parts, if it ends up 100 smaller parts this will consume all of your system resources.
Always prefer using a Pool Executor like in the next section below.
"example have merged definition and implementation
class zcl_my_runnable definition.
public section.
interfaces zif_runnable.
method constructor.
"store data to process async/paralel.
endmethod.
method zif_runnable~run.
"process data
endmethod.
endclass.
data(runnable1) = new zcl_my_runable( datasplit1 ).
data(runnable2) = new zcl_my_runable( datasplit2 ).
new zcl_thread( runnable1 )->start( ).
new zcl_thread( runnable2 )->start( ).
zcl_thread=>join_all( ). "awaits all threads to finish
Thread pool executor
The thread pool executor is an executor service that allows to limit the number of threads that can run in parallel based on its pool size, allowing to work to be forked indefinitely without consuming all system resources. Java has several executor services implementations, I’m delivering a basic one that can be extended as seem fit.
The runnable interface implementation is the same as the previous example.
"only 10 will be allowed in parallel in the pool
data(executor) = zcl_executors=>new_fixed_thread_pool( 10 ).
data(runnable1) = new zcl_my_runnable( datasplit1 ).
data(runnable2) = new zcl_my_runnable( datasplit2 ).
"submits individual runnables
executor->submit( runnable1 ).
executor->submit( runnable2 ).
"Asks for a 100 runnables to run but,
"due to the pool size only 10 will
"be allowed to be active at a time
executor->invoke_all( a100Runnables ).
"blocks executor from queing new runnables
executor->shutdown( ).
"awaits threads to finish and queue to be empty
executor->await_termination( ).
Retrieving results
You can get the result of your threads in multiple ways, but all require that your
result be class implementing the
zif_runnable_result interface.
Ideally, you want your results to be separate objects, as it should be significantly smaller than your runnable in terms of data size and this plays a role in resource consumption and the serialization process behind threads.
In the example below, however, we are implementing it in the runnable itself.
class zcl_my_runnable definition.
public section.
interfaces zif_runnable,zif_runnable_result.
data: my_result type i.
methods:
construtor
importing
numbers type any table.
get_sum returning value(rv_result) type i.
method zif_runnable~run.
my_result = reduce #( init result = 0
for number in numbers
next result = result + number ).
ro_result = me. "important!
endmethod.
method get_sum.
rv_result = my_result.
endmethod.
endclass.
From Threads
If you are creating your threads manually, the result will come from the Thread get_result( ) method. If this method is called before finished, zcx_still_running is raised. You can check the thread was successful with get_error( ).
data(runnable1) = new zcl_my_runnable( value #( ( 10 ) ( 10 ) ( 10 ) ). "30
data(runnable12) = new zcl_my_runnable( value #( ( 20 ) ( 20 ) ( 20 ) ). "60
data(thread) = new zcl_thread( runnable1 ).
data(thread2) = new zcl_thread( runnable2 ).
thread->start( ).
thread2->start( ).
zcl_thread=>join_all( ).
data(result) = cast zcl_my_runnable( thread->get_result( ) )->get_sum( ).
data(result2) = cast zcl_my_runnable( thread2->get_result( ) )->get_sum( ).
data(total) = result + result2.
write: total. "90
From Executors
Executors don’t return the Threads created, but rather a
Future representing that Thread result. When asking the
Future for its result, it will wait for the termination of its Thread if it is still running. Therefore the following code is synchronous but runs in parallel (different session). If an error happens during the runnable execution, the Future will raise zcx_thread_execution with the error as its previous exception.
executor->submit( runnable1 )->get( ).
For actual async and parallel code to execute, wait for the executor to terminate.
"for submits
data(future) = executor->submit( runnable1 ).
data(future2) = executor->submit( runnable2 ).
executor->shutdown( ).
executor->await_termination( ).
data(result) = cast zcl_my_runnable( future->get( ) )->get_sum( ). "30 from last example
data(result2) = cast zcl_my_runnable( future2->get( ) )->get_sum( ). "60 from last example
write: result + result2. "90
"same for invoke_all
data(futures) = executor->invoke_all( value#( ( runnable1 ) ( runnable2 ) ).
executor->shutdown( ).
executor->await_termination( ).
loop at futures into data(future).
total = total + cast zcl_my_runnable( future->get( ) )->get_sum( ).
endloop.
write: total. "90
Callbacks
Callbacks are my preferable way to manage async/parallel work. It is a simple concept to understand and it allows decoupling the code responsible for
defining work from the code responsible for
processing results.
Simply implement the
zif_thread_callback interface and give it to Threads or Executors and use its
on_result to collect, merge or aknowledge results as you please.
The callback object must have a valid reference throughout the lifecycle of the Threads.
class zcl_sum_callback definition.
public section.
interfaces zif_thread_callback.
methods:
get_total returning value(rv_result) type i.
private section.
data: my_total type i.
method zif_thread_callback~on_result.
"be safe
check io_result is instance of zcl_my_runnable.
data(result) = cast zcl_my_runnable( io_result ).
my_total = my_total + result->get_sum( ).
endmethod.
method zif_thread_callback~on_error.
"this method is triggered in case any error happen during runnable execution.
"you can raise your own exceptions there as well
"taskname is available in both on_callback and on_error.
"taskname is an optional value of thread constructor
raise exception type zcx_my_calculation_error
exporting
previous = io_error
taskname = iv_taskname.
endmethod.
method get_total.
rv_result = my_total.
endmethod.
endclass.
data(callback) = new zcl_sum_callback( ).
"giving it to threads
new zcl_thread(
io_runnable = runnable1
io_callback = callback )->start( ). "sum to 30
"giving it to executor
data(executor) = zcl_executors=>new_fixed_thread_pool(
iv_threads = 10
io_callback = callback ).
executor->submit( runnable2 ). "sum to 60
executor->await_termination( ).
data(result) = callback->get_total( ).
write: result. "90
Some thoughts
Feel free to check the code behind it. What is happening is the serialization of the runnable so it can be sent to a function module as a string. This FM is called using options
starting new task and
on end of task. In the FM, the runnable is deserialized and its
run method is called. Whatever result or error that happen are serialized by the FM and deserialized back by the Thread class. If a callback routine is defined, it is called with the result or error. This means a couple of things.
- Avoid complex runnables, all serialization constrains of the id transformation are applicable. If you have complex runnables, relying on other objects, prefer instantiating those objects inside the run method.
- Parallel work runs on dialog workers. As far as I can tell, the only restrictions are memory allocation and time execution (5 min being default). Make sure you split into chuncks that fit dialog workers restrictions.
- Each Thread runs on a different session, on top of serialized/deserialized objects, so…Don’t fall into the trap of expecting a singleton to be shared between threads.
- If you have a server with 2 processors, it doesn’t matter if you have 20 threads as they will just round robin your CPU time. Most production servers have a good enough number of threads available, but it is not always the case for development servers. Make sure you account for this when developing, testing, deploying.
- Threads can´t be stopped. Unfortunately I did not found a way to get the PID of a thread based on this task name.
- If a callback routine is defined, it must still be a valid reference at the end of the Thread.
SAP does have something in newer versions, check the comments about cl_abap_parallel. I think overall we still need some other tools to allow an raw OO implementation to the
philosopher’s dinner problem.
But, as I say this I also realize we are moving away from heavy process and relying on more stream like flows with small API endpoints. In any case, this is still a tool that can be used when needed.
Cheers and enjoy!
Git repository here.