Cluster computations

Read(293) Label: cluster computing,

esProc allows using cluster computing to perform complicated analytic and processing tasks. A cluster system consists of multiple parallel servers running on a network of independent computers. Every single computer in the network can send a parallel computing request to the cluster.

A cluster system enhances computational performance significantly by gathering the computational power of multiple servers. It can be expanded whenever necessary without paying the high cost associated with the mainframe.

Performing cluster computation with callx function

We learned how to launch the parallel servers in The Server. After parallel servers are started and a cluster system is created, cluster computation can be performed through callx function. In esProc, every server will carry out the parallel computing tasks by computing the specified dfx file and returning the result.

To perform a cluster computation, first prepare the cellset file to be calculated by the subtasks, like CalcStock.dfx shown below:

 

A

B

1

>output@t("calc begin, SID="+string(arg1) +". ")

=file("StockRecord.txt").cursor@t()

2

=B1.select(SID==arg1)

=A2.fetch()

3

=B2.count()

=B2.max(Closing)

4

=B2.min(Closing)

=round(B2.avg(Closing),2)

5

>output@t("calc finish, SID="+string(arg1) +". ")

return A3,B3,A4,B4

The StockRecord.txt file contains closing prices of some stocks in a certain period of time. Each subtask computes the transaction information for stocks of specified codes, including the total number of trading days, the highest and lowest closing prices and the average closing price. The average closing price will be simply computed by trading days, without taking the number of transactions into consideration. In order to know the detailed information about the execution of subtasks, output@t function is used at the beginning of the computation and before result returning to output the times and prompts. In the cellset file, arg1 is a parameter for passing stock codes in and needs to be set before the parallel computation. Select Set arguments before running to set the parameter:

Parallel computing requires that in computers that run servers the cellset file CalcStock.dfx should exist and be placed in the mainPath configured for every server.

This computation involves one node A, whose IP address is 192.168.10.229:8281 and on which three processes are running. They are Process I, Process II and Process III and their port numbers are 8281, 8282 and 8283 respectively.

Then you can start the parallel computation using callx function in the main program on any computer in the network:

 

A

1

[192.168.10.229:8281]

2

[124051,128857,131893,136760,139951,145380]

3

=callx("CalcStock.dfx",A2;A1)

4

=A3.new(A2(#):SID,~(1):Count,~(2):Maximum,~(3):Minimum,~(4):Average)

About callx’s parameters in A3’s expression, the two before the semicolon are used for computing the cellset file. The parallel processing will divide the computational task into multiple subtasks according to the sequence parameter; the length of the sequence parameter is the number of subtasks. As with here, the multiple parameters are separated by the comma, in which a single-value parameter, if any, will be duplicated to every subtask. The parameter after the semicolon is the sequence of nodes, each of which is represented by its main process’s IP and port number, which is a string in the format of “IP:port number”, like “192.168.10.229:8281”. After the computation on a parallel server is completed, the result set needs to be returned by the return statement; multiple result sets will be returned in the form of a sequence.

In this example, you need to perform statistical analysis for six stocks. Their codes are 124051,128857,131893,136760,139951,145380, which correspond subtasks a, b, c, d, e, f that will be distributed among nodes during the parallel processing. If the number of subtasks is equivalent to the length of the node list, the subtasks will be evenly allocated to the nodes; otherwise node computing ability will be assessed and nodes getting high scores have priorities in receiving jobs; the number of tasks received by a node is the Preferred task number. If a node isn’t started or the number of processes running on it reaches its maximum, then move on to the next node in the list. A node will give the task it receives to multiple processes; a process will receive a new job after it finishes the current one. We can read the distribution and execution messages of the subtasks on each of their System Output window:

As can be seen from the output information, Process I handles tasks a and d, Process II handles tasks b and e, and Process III handles tasks c and f. According to the exection time of theses tasks, the task number appropriate for the node is 3 because it has three processes run on it. First the node receives three tasks a, b, c, which are given to the three processes. Tasks d, e, f are waiting in queue and will be allocated to any available process in order.

After the computations are completed, the result can be viewed in A4:

This example shows that, by splitting apart a data-intensive computational task and having each process handle one of the parts, cluster computing can make full use of the computers’ computing ability to increase efficiency and effectively avoid the memory overflow usually happened during big data processing through the use of multiple nodes.

As processes are executed separately on each node, they are independent of each other. And the data source and the data file needed in the computation on a node should be already configured and stored on it. So the dfx file must be placed on each node beforehand.

Let’s look at another example. Still we’ll get the statistical data of the six stocks, but this time each process only gets data of a certain year. The corresponding cellset file CalcStock2.dfx is as follows:

 

A

B

1

>output@t("calc begin, year: "+string(arg1)+". ")

=file("StockRecord"+string(arg1)+".txt")

2

=B1.cursor@t()

[124051,128857,131893,136760,139951,145380]

3

=A2.select(B3.pos(SID)>0).fetch()

=A3.groups(SID;count(~):Count,max(Closing):Maximum,min(Closing):Minimum,sum(Closing):Sum)

4

>output@t("calc finish, year: "+string(arg1)+". ")

return B3.derive(arg1:Year)

Stock data of different years are stored in their respective data files. For example the data of 2010 is stored in StockRecord2010.txt. The five data files used for computation are from 2010 to 2014. Each process computes the transaction data of the six stocks in a specified year. As in the above example, output@t function is used to output times and prompts as the computation begins and before the result is returned.

In the cellset file, arg1 is a parameter that passes a specified year whose data is to be computed and that is added in B4’s return result for checking:

Then perform cluster computation in the main program:

 

A

1

[192.168.10.229:8281]

2

[2010,2011,2012,2013,2014]

3

=callx("CalcStock2.dfx":~~|~,A2;A1)

4

=A3(1).group(SID;~.max(Maximum):Maximum,~.min(Minimum):Minimum,round(~.sum(Sum)/~.sum(Count),2):Averge)

A3 performs cluster computation using callx(dfx:x,…;h) function. The function has a reduce expression x – ~~|~, in which ~~ is the current result of executing reduce and ~ is the result returned by the current task. The expression unions the records returned by all processes. Each process executes reduce to get a new result ~~ after it returns its own result. After callx is executed, the status of the distribution and execution of the tasks is displayed on each server’s System Output window:

On the main process window, we can view the the execution of reduce actions. Here’s A3’s result:

By executing reduce actions, callx gets a sequence of results returned from nodes. According to the above result, processes on each of the nodes taking part in a parallel computation don’t finish their tasks in a certain order. Thus the result of computing data of 2011 may be listed before that of computing data of 2010.

A4 groups and aggregates A3’s result to calculate the highest and lowest prices and the average price:

Besides in a server cluster, a reduce expression can be used in a multithreaded processing that only uses callx function. For example:

 

A

1

[2010,2011,2012,2013,2014]

2

=callx("CalcStock2.dfx":~~|~,A2;A1)

3

=A3.group(SID;~.max(Maximum):Maximum,~.min(Minimum):Minimum,round(~.sum(Sum)/~.sum(Count),2):Averge)

A2’s callx expression shows that the multithreaded processing doesn’t involve nodes and thus the node list h isn’t specified. As the reduce actions are not taken care of by the nodes’ main processes, A2 returns the reduce result of calculating the expression ~~, instead of a sequence returned according to the number of nodes. Here’s A2’s result:

The aggregate result in A3 is the same as the result of A4 in the previous cellset.

During a cluster computation involving multiple nodes, if there are nodes that can’t handle the task allocated to it, the task distribution is problematic. The use of @a option in callx function will strictly distribute each task to the server storing the correponding data file. Now we add a Node II that has two sub processes and on which the main process is 192.168.10.245:8281. Node II stores three data files –StockRecord2010.txt, StockRecord2011.txt and StockRecord2012.txt. We use the following program to perform cluster computation over the above stock data:

 

A

1

[192.168.10.229:8281,192.168.10.245:8281]

2

[2010,2011,2012,2013,2014]

3

=callx("CalcStock2.dfx":~~|~,A2;A1)

4

=A3.conj().group(SID;~.max(Maximum):Maximum,~.min(Minimum):Minimum,round(~.sum(Sum)/~.sum(Count),2):Averge)

There are three sub processes on Node I and thus it possesses better computing ability. It will receive the first three tasks and the other two tasks will be given to Node II. Yet, as Node II doesn’t have all the data files needed to get the tasks done, the execution reports error:

In order to avoid the error, add sequences of task numbers after the node sequence to specify the tasks for each node when calling the callx function. For example:

 

A

1

[192.168.10.229:8281,192.168.10.245:8281]

2

[2010,2011,2012,2013,2014]

3

=callx("CalcStock2.dfx":~~|~,A2;A1,[[4,5],[1,2,3]])

4

=A3.conj().group(SID;~.max(Maximum):Maximum,~.min(Minimum):Minimum,round(~.sum(Sum)/~.sum(Count),2):Averge)

The callx(dfx:x,…;h,s) function in A3 adds parameter s to distribute task number 4 and task number 5 to Node I and task number 1, task number 2 and task number 3 to Node II. View how the tasks are distributed by clicking cell A3:

Despite the fact that Node I has more sub processes, tasks will be distributed according to the specification. Note that here A3 will first union the results returned from the two nodes before performing the aggregation. Here’s A3’s result:

The records in the result are the same as those obtained from the single node computation.

As we can see from the above instance, there may be potential troubles when certain tasks can only be performed on a certain node, even resulting in the failure of the whole task if one of the servers goes wrong. To address the issue, we can store one data file on multiple servers to improve system steadines. See section 10.4 Data Redundancy for detailed information.

Performing cluster computations with fork statement

Multithreading discussed that the fork statement can be used to execute a piece of code using multiple theads. The callx() function used in the above section is responsible for executing a dfx file at each of the servers in cluster.

So callx’s work can be done by fork statement. For example:

 

A

B

C

1

[192.168.10.229:8281]

 

 

2

[124051,128857,131893,136760,139951,145380]

 

 

3

fork A2;A1

>output@t("calc begin, SID="+string(A3) +". ")

 

4

 

=file("D:/files/txt/StockRecord.txt").import@t()

=B4.select(SID==A3)

5

 

=C4.count()

=C4.max(Closing)

6

 

=C4.min(Closing)

=round(C4.avg(Closing),2)

7

 

>output@t("calc finish, SID="+string(A3) +". ")

return B5,C5,B6,C6

8

=A3.new(A2(#):SID,~(1):Count,~(2):Maximum,~(3):Minimum,~(4):Average)

 

 

This is functionally equivalent to writing a subroutine requiring repeated execution in a code block, saving the effort of maintaining multiple dfx files. During a cluster computation, add task number sequence s at the end of the fork statement to distribute tasks to certain nodes, like fork A2;A1:[[4,5],[1,2,3]]. A3 collects results of all tasks and return the following:

The order of the records returned by A3 is in line with the parameter order. A8 further processes the result of the cluster computations and return the following table:

We can also add reduce actions in the fork statement:

 

A

B

C

1

[192.168.10.229:8281]

 

 

2

[124051,128857,131893,136760,139951,145380]

 

 

3

fork A2;A1

>output@t("calc begin, SID="+string(A3) +". ")

 

4

 

=file("D:/files/txt/StockRecord.txt").import@t()

=B4.select(SID==A3)

5

 

=C4.count()

=C4.max(Closing)

6

 

=C4.min(Closing)

=round(C4.avg(Closing),2)

7

 

>output@t("calc finish, SID="+string(A3) +". ")

return A3,B5,C5,B6,C6

8

reduce

if(ift(~~),~~.record(~),create(SID, Count, Maximum, Minimum, Average ).record(~~).record(~))

 

The reduce code block uses reduce functions. Create a result table sequence when performing the first reduce and populate ~~’s first task result and the second task result returned by ~ to the newly-created table sequence. The subsequent reduce actions will add their returned records to the table sequence. Here’s the result of the cluster computation in A3 after execution:

As only one node is involved, the table sequence it returns is the final aggregate result. But the order of the returned records is unfixed when we use reduce function on a node to handle the task results because tasks won’t be finished in a fixed order; the order of records is in consistent with the task completion order. Thus the SID is returned directly from each task process.

Handling computation-intensive tasks

In the previous examples, cluster computing is used to handle data-intensive computational tasks. By processing part of the data in each process, it makes the most use of the limited memory. In other scenarios, the task involves extremely complicated computation that can also be split apart through cluster computing into multiple subtasks that will be distributed among multiple nodes for execution. Results of the subtasks will be merged by the main node.

Take the following subroutine CalcPi.dfx as an example:

 

A

B

C

1

1000000

0

>output@t("Task "+ string(arg1)+ " start...")

2

for A1

=rand()

=rand()

3

 

=B2*B2+C2*C2

 

4

 

if B3<1

>B1=B1+1

5

>output@t("Task "+ string(arg1)+ " finish.")

return B1

 

Parameter arg1 is used to record serial numbers of subtasks:

This subroutine is created to estimate the value of π - ratio of circumference to diameter - using probability theory. Look at the following picture:

There is the quarter-circle in a square with side length of 1. The area of the square is 1 and the area of the sector is π/4. The probability of a point of the square that falls in the sector is the ratio of their areas, i.e.π/4. This subroutine randomly generates 1,000,000 points whose x, y coordinates are within the interval [0,1), computes the distance of each of these points from the origin, records the number of points that fall in the sector and then estimates the value of π.

Here’s the main program:

 

A

B

1

[192.168.10.229:8281]

20

2

=callx("CalcPi.dfx",to(B1);A1)

=A2.sum()*4/(1000000d*B1)

By invoking the subroutine 20 times, the cluster computing distributes the computations of 20,000,000 points among processes (they are those used in the previous section). Distribution and execution information of the tasks can be viewed on each process’s System Output window:

If the number of tasks exceeds the maximum task number the nodes can handle and processes on all nodes have already received their tasks, the subsequent tasks need to wait to be distributed until a process is available. In this case a task will be given to any idle process, so distribution solutions may vary. Each computational process is independent. A task distributed later than another one may be accomplished earlier, which will not affect the final result of parallel processing.

The approximate value of π is computed in B2. Because the computation is performed based on probability, results are slightly different from each other.