RuriLib.Parallelization

Hello, Ruri I hope you are doing great I have a few questions that I want to ask about the example of RuriLib.Parallelization. I don’t know C# and I’m trying to learn.

Func<int, CancellationToken, Task<bool>> parityCheck = new(async (number, token) => 
            {
                // This is the body of your work function
                await Task.Delay(50, token);
                return number % 2 == 0;
            });

// This func takes an input type of ‘int’, a cancellation token, and an output type of Task of bool

As this comment indicates this function takes an input of type int and a cancellation token. I learned about cancellation tokens it is necessary to cancel the task right? but why is this function take an int input?
what is the meaning of this piece of code?

new(async (number, token) =>

and what this operator called =>. I’m kinda nob in C# but I’m trying my best to learn it.

await Task.Delay(50, token);
return number % 2 == 0;

1st statement delayed the task for 50 milliseconds, right? and 2nd statements take the number mode by 2, and if it is equal to zero it returns the number.
Correct me If I’m wrong.

Hey, I’ll try to break down the code a bit more for you.

Func<int, CancellationToken, Task<bool>>

this is the actual TYPE of the variable parityCheck. This variable holds in fact a function (a method without a name). This specific function can take and return any other type. In this example, I chose int and bool but it could as well take a string and output another string. In that case you would write something like this

Func<string, CancellationToken, Task<string>>

Those types could even be classes or whatever you like.

After the = sign, there is the body of the actual function. The function is asynchronous (in fact, it returns a task) so we have to put the async keyword. Then we take the two input parameters which we decide to call number and token. These can be arbitrary names that you choose.

Now you can use the first parameter and do stuff with it, in order to check some condition, and then return a result of the other type that you chose.

This operator => is the arrow, in this case it is used to declare a lambda function. The new keyword is just some syntax sugar for new Func<int, CancellationToken, Task<bool>>. It was added in a recent version of C#.

Then the first statement

await Task.Delay(50, token);

waits 50 ms asynchronously, and cancels the wait if the token is cancelled (e.g. the user presses the stop button in your application).

The second statement

return number % 2 == 0;

returns a bool as specified in the return type of the Func as we discussed before, and it simply returns the result of the modulus operation between your number and the number 2. It does not return the number, it returns true if the result of your number mod 2 is zero (a.k.a. the number is even) and false if it’s not zero (a.k.a. the number is odd).

Hope I was clear, feel free to ask any other questions you might have.

2 Likes

Thanks a lot. I’ll knock you after digesting this piece of information.

I was able to understand the above function but can you break down this piece of code for me like u did before that was very helpful for me

var parallelizer = ParallelizerFactory<int, bool>.Create(
                type: ParallelizerType.TaskBased, // Use task-based (it's better)
                workItems: Enumerable.Range(1, 100), // The work items are all integers from 1 to 100
                workFunction: parityCheck, // Use the work function we defined above
                degreeOfParallelism: 3, // Use 5 concurrent tasks at most
                totalAmount: 100, // The total amount of tasks you expect to have, used for calculating progress
                skip: 0); // How many items to skip from the start of the provided enumerable

Okay, so we are creating a variable called parallelizer.

We call the ParallelizerFactory and we pass the input and output types of our choice. In this case, the code will basically use a factory that is able to produce parallelizers that take int as input and produce bool as output. As I said before, you could do string and string or whatever other types you need to work with. These types have to match the ones you used in the work function that I described above.

After this, we specify the parallelizer type (just leave it like that, the other option is just for testing).

Then we define the wok items, which means all the items that will be given to the parallelizer to process. This needs to be an IEnumerable of the input type that you specified. In this case the parallelizer takes int values so you need an IEnumerable<int> that I simply produce by taking a range of numbers from 1 to 100. If you parallelizer takes string as input type, you can pass an IEnumerable<string> (which means for example a List<string> or string[] since they both implement the IEnumerable<string> interface).

After this we pass the work function that we created previously, which tells the parallelizer how it should process each input to produce the output of the type that you choce (in this case a bool).

Then the degree of parallelism specifies how many tasks should be active at once.

The toal amount tells the parallelizer how many items are passed in the workItems field. This is needed because the IEnumerable that we pass does not contain the count of all items, as it could be enumerated little by little (for example think of a file with 1 billion lines, we sure don’t want to load them all up in memory so we read it little by little and we don’t know how many lines we expect, so we count them beforehand and give that number to the parallelizer). This count is used to calculate the progress, remaining time etc.

Finally the skip is used to resume previous sessions for example if you know you completed 10k items of the IEnumerable it can skip those 10k items since you already processed them.

After all of this, you have a working parallelizer that can be started, aborted etc. Before you do that, make sure to hook the event handlers that will be called when the parallelizer processes a result, otherwise you have no way of using the results that it produces!

parallelizer.NewResult += OnResult;

And the relative method

private static void OnResult(object sender, ResultDetails<int, bool> value)
            => Console.WriteLine($"Got result {value.Result} from the parity check of {value.Item}");

This method also needs to take the types that the parallelizer handles. In this case int and bool. As an argument you will have the item that was processed (in this case an int) and the output result bool that was generated by applying the work function to the item.

int → work function → bool

Hope I was clear, feel free to ask anything else.

1 Like

@Ruri is it normal that tasks are not processed in order?
and one more question

workItems: Enumerable.Range(1, 10)

if we use string data type instead of int in ParallelizerFactory and workFunction then how to tell the range to ParallelizerFactory

Yes it’s normal that they are not processed in order, that’s exactly how it works. The tasks are parallelized and executed at the same time so there is no guarantee that they are completed in order.

Otherwise if they had to wait for each other to finish, it would be exactly the same as single thread execution which is slow and doesn’t take advantage of a parallelizer at all :sweat_smile:

As for the other question, if you use strings then it makes absolutely zero sense to produce a range of integers. Why don’t you create an array of strings instead, for example by using File.ReadLines() to read the lines from a file, and then pass that array as workItems instead of the range? The range only makes sense in case of integers.

1 Like

Thanks a lot again, @Ruri you are helpful and supportive

1 Like

@Ruri sorry if I’m asking a childish question as I’m new in C#
here it is what I did like you said for string

string[] lines = File.ReadAllLines("Test.txt");
 Func<string[], CancellationToken, Task<string>> parityCheck = new(async (number, token) =>
var parallelizer = ParallelizerFactory<string[], string>.Create(

then I passed the above lines as work items, but I’m getting this error what I’m doing wrong here.
Capture

It should be

Func<string, CancellationToken, Task<string>>

so basically the first argument is string not string[]. It identifies only a single item in the list not the entrie list so you should only put the type of the individual item of the list ^^

Of course same in the parallelizer factory, just use <string, string>

1 Like

Thanks, everything is crystal clear now but I do have a few more questions about this code

        private static void OnCompleted(object sender, EventArgs e) => Console.WriteLine("All work completed!");
        private static void OnTaskError(object sender, ErrorDetails<string> details)
            => Console.WriteLine($"Got error {details.Exception.Message} while processing the item {details.Item}");
        private static void OnException(object sender, Exception ex) => Console.WriteLine($"Exception: {ex.Message}");

OnCompleted function trigger when all the jobs or tasks are completed.
OnTaskError function trigger when there is an error in the task. I want to ask one thing about this function like if we are performing a task and it was canceled in the middle due to some error will parallelizer do the same task again or that task has been skipped?

OnException function trigger when there is an Exception in the task right? and I want to ask what type of exception will trigger this function and in case of exception, the task is skipped or not?

The OnCompleted function triggers when the parallelizer finished processing all work items.

The OnTaskError function triggers when there is an uncaught exception in the task (e.g. you didn’t wrap a code that can fail in a try/catch block). In this case it will not be tried again, so what I suggest to do is wrapping the workFunction code in a try/catch and if it fails go back to the top of the function and try again (at least that’s what I do in OB2).

The OnException function triggers when there is an exception in the whole parallelizer (which would stop everything). It’s just there for precaution but it should never get triggered.

So basically, OnTaskError is the exception inside the workFunction, OnException is at the parallelizer level (something went terribly wrong) but it shouldn’t happen under normal circumstances. For example it could happen if you pass File.ReadLines(...) as workitems and then you delete the file in the middle of the process so it cannot read it anymore. On the other hand if you use File.ReadAllLines(...) it will read all the lines into memory at the beginning so even if you delete the file you’re fine, but it will be very expensive for big files.

1 Like

Hello bro, happy new year hope this year comes happiness to you. Need some guidance on how and where to handle success, failure, ban, retry, and others in parallelize or OnResult?

Happy new year!

If your parallelizer takes a string and returns another string you could make it return something like SUCCESS, FAIL etc. conditionally from the work function, and then in the OnResult method you check whether it’s one or the other and write it to a file or just log it to the console as you need.

As for something like retry, I would just loop inside the work function as there is no support to put back items in the parallelizer queue at the moment. For example in OB2 I use a goto statement in case of retry and I go back to the top of the work function and redo everything ^^

If you need to return something more complex (for example both a status and a result or multiple variables) you can create a new class with all the fields you need, and then your parallelizer will take a string and return an object of the type of your class (e.g. <string, CustomResult>)

2 Likes

Got error The ReadAsync method cannot be called when another read operation is pending. while processing the item

What does this error mean? I’m getting this error very often. Should I be worried about this error or it is just a warning.

1 Like

and one thing more how OB check if the proxy is alive or not?

I need more information about this, where is the error happening? Looks like you’re using the same object from multiple tasks or something.

To check if a proxy is alive just create a client, set the proxy and try to get a page with a given timeout. Which library are you using for http operations?

I’m using extreme.net

I’m not very familiar with that library anymore, I used it in OB1 but haven’t touched it for a very long time now. Anyways once you have a proxied client then just make a request to whatever site, using a try/catch and in the catch you will have the failure state, while in the try the success state.

Thanks but is there any library to perform HTTP operations like Ruri Parallelization?