Tower:: Service implementation

Hi,

I have an external process (simulation) that accepts input via stdin and reports the results on stdout. I.e. when I use it from the commandline I can run multiple simulation after each other by:

  1. Start the program
  2. Issue a input on one line and press enter
  3. Wait for the result printed as a json structure on one line of stdout
  4. Issue the next input etc.

This allows me to run multiple simulations using the same process (since the startup time is very slow).

Now, I'm trying to wrap this in a tower-service::Service including backpressure such that I can use it from my code in the same way, i.e. keeping a child process alive and use it multiple times to avoid multiple startups.

To my actuall questions:

  1. Is the code sane & fullfilling the service contract? I think so after reading the documentation several times - but seems to sometimes run into deadlocks when using it - indicating that I've missed something.
  2. Is there a better way tom implement this pattern? I feel like the code is overly complex for something this simple but struggling to simplify it (or to break it into multple functions) or write it in a clearer style.

Any input apprechiated

/V

Do you strictly need to use a middleware layer like Tower for this project? It tends to make code more convoluted, although there are of course cases where it's appropriate.

If it's just backpressure you're after, you could use a simple bounded channel like sync_channel in the standard lib or the equivalent in Tokio.

My questions would be:

  1. Are Tokio and Tower needed to fit into a bigger framework, or are you free to use a simpler setup?
  2. Is your main program (not the simulator) a long-lived, interactive process or a batch program that runs some jobs and then exits?
  3. Does your external simulator process use a single core or multiple cores?
  4. Do you want to start up multiple instances of the external process (to allow a single-core simulator to utilize multiple cores)?

Hi,

No I guess it's not strictly needed to use tower - it was morethe prospect of actually using it to learn it (It's a long running web server built on axum where this is integrated so seemed natural).

  1. Tokio and tower is used throughout the rest of the application (web server on tokio) - but again, not strictly necisarry for this part - but I'd like to learn it.
  2. Long lived (web server)
  3. Multiple cores (I control the number using env variables when starting it - depending on the problem it does not scale linearly)
  4. Yes - I'm currently using 4 using the tower middlewares Buffer and Balance

/V

OK, fair enough. It looks like using Tower is appropriate then.

I haven't seen your entire codebase, but it seems like you could simplify things a lot by replacing MultRunner's lease + waiters code using a Semaphore rather than essentially re-implementing a semaphore's logic by yourself.

In addition, if you are going to run on Tokio, you should use an asynchronous Command to spawn your external processes to prevent blocking your Tokio threads.

You should probably also spawn your external threads when the server starts and store references to them in an object pool.

Thany you very much.

I have now updated the code using the PullSemaphore (couldn't get it to work using just the Semaphore you linked).

For the async Command I was allready using that.

Changing to the semaphore also fixed the deadlock I had been experiencing.

Any further pointers if the resulting code code be improved to be more "rust-y" are aprechiated:

My mistake. I saw std::process::Stdio and thought you were using the sync Command.

I can't test your program since not all the code is available, but I am curious about a few things:

  1. You stated you're using Tower's Balance and Buffer. Is that still the case?
  2. Am I correct that you create 4 MultRunner instances (one for each external simulator process), and then use Balance to load-balance between the MultRunners?
  3. How big is your Tower Buffer? Does it only queue one request at a time for each MultRunner?
  4. Are you using the Semaphore in MultRunner to make sure it only processes one request at a time?

Hi,

Yes sorry - It's hard to create a example you can run since it depends on an external executeable)

1: Yes - basically my web server uses a few different tower services of which this is one of them - so i wrapp this service in a buffer(32).balance(4 of this service)
2: Yes
3: typically 32 - but I might also have to introduce a timeout
4: Well the underlying process would always just be processing one at a time (it doesn't read the next line from stdin before having completed the last request and dumped it to stdout) - so Im using the semaphore to ensure that im not buffering an unbounded number of requests inside an instance of the process that will then only process the sequentially - that would lead both to bad load distribution between the 4 and possibly an unbounded buffer of requests where the original client has gone away / timeout allready)

/V

Yeah, I think your overall design looks reasonable.

Like I mentioned above, I would still recommend moving creation of your child processes to when your server starts up, since you mentioned this takes some time, but you may still need to check the child process is alive with each request.

Also, another potential problem, which I haven't tested, but it seems like you release the semaphore permit (take() on Line 51) as soon as you start using the child process. This means the next request can immediately acquire the permit before the first simulation ends, so you may have two or more users for the same MultRunner at any time. Ideally, you shouldn't release the permit until after you're done with the current request. Perhaps you could assign each request a unique Job ID and use that to decide whether call() should run, rather than take() the permit.

Also, I'm not sure why you need Lines 35-37.

Hi again,

Yes - fair point, missed that but will launch them in new.

Ref the take and semaphore - if that is indeed how it works It’s a bug. However I was under the impression that it’s only released once it’s Dropped (and the semaphore only has a capacity of 1), and since I’m sending it to call_helper it should only be dropped once that function returns?

Lines 35-37 are interesting- I initially did not have them as I could not find in the service documentation they are needed. However - without them it deadlocks. They are also there in eve the implementation of tower Buffer.

My best guess is that the scheduler might end up callback poll twice before call() - and expects also the second call to return ready and else will not isssue any call()

V

Hm, I'm wondering if we're reinventing the wheel here. There is a ConcurrencyLimitLayer in Tower. Is that any use?

Hmm,

Will yes (and no?) - I guess my implementation of a service still has to fullfill the basic tower::Service contract or I could get problems in the future?

But I guess what you are suggesting is that I simply make my service always return ready in poll, Option::take() the underlying child process in call() and panic if two concurrent calls to call are made? (And wrap it in that concurrencylimitlayer with 1)

That would for sure make my code easier :slight_smile:

I will try it out - though it feels a bit strange to code in that panic in my service (though I agree - it should not happen)

/V

Oh, I see. You want the permit to be dropped at the end of a simulation when the helper function returns. I guess this could work assuming the compiler doesn't optimize away the permit since you don't use it for anything. Maybe you could write a test to println the time when each simulation starts and ends, then send several concurrent requests and check your code is working as expected? Without being able to run the code I'm not sure exactly what it's doing.

I haven't used ConcurrencyLimit before, but it is also based around a semaphore and it seems by looking at the source code that you're basically reimplementing it.

I think you could remove your semaphore and just use ConcurrencyLimit. If you only allow 1 concurrent user then there shouldn't be any problem as the ConcurrencyLimitLayer will prevent simultaneous access to the simulator process. But of course, write a test to make sure. :grinning:

Hang on,

Is the compiler allowed to optimise this out? I use this pattern in many places assuming that the compiler has to model the drop after the function returns - modify that would change the meaning of the code?

Very surprised to learn the compiler is allowed to change the meaning of a valid program?

/V

Well, you don't use the permit anywhere in your helper function, so one could argue that dropping it at the beginning, middle or end of the function makes no difference.

However, I've checked with Godbolt, and unused vars do seem to be dropped at the end of the scope, at least on this level of compiler optimization.

I wouldn't worry too much about it, but it's always worth writing tests to make sure the code is doing what you think it is.

According to the reference

All function parameters are in the scope of the entire function body, so are dropped last when evaluating the function.

Dropping something like an i32 isn't meaningful so it doesn't matter when it happens or if it happens at all but when it does matter the compiler should retain this order.

Thanks for that clarification

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.