-
Notifications
You must be signed in to change notification settings - Fork 0
Make the multi-threading interface a bit more flexible #44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ding interface, deprecate BFSMT scraper
Oh, and I should also add that obviously the main speed decrease is going to be the web driver pool. Even using BFSMTStrategy with a large number (24+) of threads, most of the threads are eventually going to end up stuck waiting for another thread to release a driver back into the pool. And for BFSStrategy, since tasks hold on to their web drivers when loading, other threads will also block waiting for them. |
} | ||
} | ||
PositionWriter writer = new PositionHibernateWriter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should write as we scrape so if the scraper crashes we don't lose all progress.
|
||
import java.util.List; | ||
|
||
public interface PositionCallback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just use a runnable here, it's so similar anyways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also we are using the convention I-
for interfaces. Look into anonymous classes too I think this is unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about using a runnable, but it would still require us to extend the class. The idea behind the callback class was just for it to run with the finished results upon completion, so we don't have to worry resolving the function stack with return arguments. Will look more into this.
} | ||
|
||
public void setup(Company company) { | ||
mLinkCache.put(company.getName(), mInitialStrategy.fetchInitialLinks(company)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's this second call doing here? Does it have some side effect you are using? If it does then it's bad practice cause it doesn't make sense while reading it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that was mainly used so that we can treat it as a separate task that the scheduled executor service can process concurrently along with the actual scraping. See Main::scrapePositions() to see how it's used. I admit though, it's not the best design.
@@ -10,5 +10,5 @@ | |||
|
|||
public interface IPositionScraperStrategy { | |||
Logger logger = LoggerFactory.getLogger(PositionScraper.class); | |||
List<Position> fetch(Company company, List<String> initialLinks); | |||
void fetch(Company company, List<String> initialLinks, PositionCallback callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider overloading the method so one doesn't require the callback
if(googled.isEmpty()) { | ||
logger.warn("Link strategy could not find any initial links."); | ||
if (googled.isEmpty()) { | ||
logger.warn(String.format("Link strategy could not find any initial links for %s.", company.getName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add some quotes around the company name
try { | ||
Thread.sleep(PAGE_LOAD_DELAY_MS); | ||
} catch (InterruptedException e) { | ||
logger.error("Could not wait for page to load.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could not wait for javascript to load*
List<Callable<List<Position>>> tasks = Lists.newArrayList(); | ||
try (MyWebDriverPool pool = new MyWebDriverPool()) { | ||
InitialLinkStrategy linkStrategy = new GoogleInitialLinkStrategy(); | ||
ScheduledExecutorService executor = Executors.newScheduledThreadPool(24); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is 24 a safe value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, the executor will be throttled by the amount of web drivers available, so it doesn't really matter what that is. As long as it isn't an insanely high number, I think it's ok.
try { | ||
futures = executor.invokeAll(tasks); | ||
latch.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown the executor too
executor.execute(() -> { | ||
positionScraper.setup(company); | ||
executor.execute(() -> { | ||
positionScraper.fetch(company, (intermediate) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename intermediate -> results
@@ -114,15 +103,17 @@ private static void scrapePositions(String name) { | |||
} | |||
|
|||
private static void scrapePositions(List<Company> companies) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it clearer that this method is for single threading. Why are we still maintaining single threading as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. It should probably be transitioned over to the multi threading variant.
How will this occur? How can a task hold on to a driver from the pool after it finishes executing? |
We discussed this before and I thought you said it the performance increase would be negligible. Was the performance increase good enough to warrant these changes? |
If you don't mind, can you provide some benchmarks between master and this branch. Just a depth of 20-25 should be enough to demonstrate noticeable differences. |
Right now, you're acquiring a driver briefly, fetching the page source as fast as possible, releasing the driver back into the pool, and then waiting. The actual behavior should be to acquire the driver, call fetch, wait 2 seconds, get the page source, and then release the driver. Because it has to hold on to the driver for those 2 seconds, it can't release it. Then, when you put the task back into the scheduled executor service, it gets put last in the task queue. The tasks ahead of it were previously added and are waiting on the sephamore to unlock so they can also acquire a driver. So essentially what happens is the tasks ahead wait for the sephamore, preventing the waiting task from finishing and releasing the driver, causing deadlock.
It's not necessarily just a performance thing. I didn't like how each scraper was using their own executor service, dumping all their tasks on that single thread, and then just blocking their main thread. You're effectively doubling the number of threads you're using unnecessarily because the scraper's main thread (in the original work stealing pool) is just stuck waiting for the executor service thread to finish everything. Not to mention that the hacks for shutting down the service on completion. It's easier just to keep a single executor service open for all tasks and then just let it do the prioritizing. |
… the list callback
Benchmarks will be updated as they finish. Companies used:
Constants:
Commands used:
Note that Citadel LLC has a space; the command is not split up there. Results (best of two trials):
Conclusion: |
@@ -9,6 +9,7 @@ | |||
|
|||
# Log files | |||
*.log* | |||
/logs/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need a starting forward slash here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in logs/
instead of /logs/
@@ -11,6 +11,7 @@ | |||
import com.internhub.data.positions.scrapers.IPositionScraper; | |||
import com.internhub.data.positions.scrapers.ScheduledPositionScraper; | |||
import com.internhub.data.positions.scrapers.strategies.impl.GoogleInitialLinkStrategy; | |||
import com.internhub.data.positions.scrapers.strategies.impl.PositionBFSMTStrategy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this import get used?
Tackles #43 .
A lot of the earlier multi-threading code had a lot of cruft that wasn't really doing much. For example: the unnecessary spawning of an entire scheduled executor service per scraper per company, when that service had only 1 thread itself and the main thread per company was just blocking on the individual service.
The new proposal is this: keep a massive scheduled executor service available to all scrapers. Now, treat each scraper process as simply a chain of Runnable's (maybe with delay) to be executed in sequence and ending a callback that returns a completed list of positions. The naive BFSStrategy will simply be a giant Runnable that does the entire BFS process + scraping + waiting without deferring to the service. The faster BFSMTStrategy will be better broken up as such:
This seemingly simple change yields much faster results. Why? BFSStrategy runs all in one thread and to completion. This means that if you add 3N BFSStrategies to the executor service with N threads, the executor will complete the first N strategies added, then the second N strategies added, then the third N strategies added. When the N threads are stuck waiting, they are unable to do anything else. However, since BFSMTStrategy is broken up into separate tasks, this means that rather than waiting for a page to load, a thread can simply drop its task back into the pool and go try to take another available task. In this manner, the BFSMTStrategy runnables are continuously passed between threads, leading to a performance increase. (Note that there is still a performance increase using BFSStrategy + the scheduled executor service, which is what is enabled right now - more on this below).
However, BFSMTStrategy is 'broken' at the moment and has been deprecated. This is because it waits after it releases control of the driver rather than during. Waiting during acquisition of a driver from the driver pool must handled entirely differently! In fact, it might be impossible to use the idea of a Runnable chain with how the driver pool is currently set up. This is because if a task holds on to a driver from the pool and is then put back into the service with a delay, it will get stuck behind other tasks waiting for the semaphore to be released. Essentially, it leads to deadlock. A solution might be to initialize the executor service with less threads than open drivers - this will be looked into later. Another problem is of course that you can't use a try-with block when transferring control of a driver acquisition to another task, which could lead to deadlock if either task fails and doesn't release the driver.
It should be pretty easy to switch out strategies, since all you have to do is change the one line in Main::scrapePositions() where they are initialized.