Content-Length: 241799 | pFad | http://github.com/mjakubowski84/parquet4s/issues/351

CE feat(akkaPekko): Retry mechanism for ParquetPartitioningFlow · Issue #351 · mjakubowski84/parquet4s · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(akkaPekko): Retry mechanism for ParquetPartitioningFlow #351

Open
utkuaydn opened this issue Jun 13, 2024 · 4 comments
Open

feat(akkaPekko): Retry mechanism for ParquetPartitioningFlow #351

utkuaydn opened this issue Jun 13, 2024 · 4 comments

Comments

@utkuaydn
Copy link

utkuaydn commented Jun 13, 2024

Hi,

In the akkaPekko module, I noticed that there is no handling mechanism in ParquetPartitioningFlow when calling write from the hadoop ParquetWriter which might throw an IOException. I thought it would also be neat to have a retrial mechanism in place for failed records so I created #350. I think this is something that would be nice to have, let me know if you agree.

@utkuaydn utkuaydn changed the title feat: Retry mechanism for ParquetPartitioningFlow feat(akkaPekko): Retry mechanism for ParquetPartitioningFlow Jun 13, 2024
@mjakubowski84
Copy link
Owner

mjakubowski84 commented Jun 19, 2024

Hi!
Sorry for the late response.
Could you please explain how this mechanism would be different from using Akka's RetryFlow as a wrapper around ParquetPartitioningFlow?

@utkuaydn
Copy link
Author

utkuaydn commented Jun 20, 2024

Hi!
So, correct me if I'm wrong, but the problem with using RetryFlow as a wrapper would be that it requires a decisionRetry function where it has the type (In, Out) => Option[In] and ParquetPartitioningFlow when using ParquetStreams.viaParquet.of[T]....write(...) will have In: T, Out: T. At a glance, you would think after a potential exception is thrown from the Hadoop write function, the written counter would not increment and modifiedPartitions would not update, which is the case. But, if you look at onPush you'll see that it emits the message exactly as it was received (with the writing steps in between). Meaning, that the element basically passes through the flow without any modification, so there is no way to decide after the flow if the element was actually written. With the change I'm proposing it would be possible to catch these exceptions and actually retry writing based on the configuration.

Repository owner deleted a comment from q-marcinjakubowski Jun 28, 2024
@mjakubowski84
Copy link
Owner

I am sorry, I meant the RestartFlow, which restarts the flow on error (or RestartSource/RestartSink)

Then, on the restart, the flow is rebuilt, and all connections to Hadoop are reopened.
I have Parquet streams running for a couple of years, and honestly, I am not experiencing IO issues. Therefore, I can't tell if reattempting to write a record on an IO error — without explicitly reestablishing a connection — is a sensible approach.
Assuming that the underlying Hadoop driver can handle automatic reconnection, this might be okay. However, IMHO, a good practice is to recreate a flow and reconnect to fix the IO problem (or fail miserably if the reconnection is no longer possible).

You might have more experience with such IO errors. What do you say?I am sorry, I meant the RestartFlow, which restarts the flow on error (or RestartSource/RestartSink)

Then, on the restart, the flow is rebuilt, and all connections to Hadoop are reopened.
I have Parquet streams running for a couple of years, and honestly, I am not experiencing IO issues. Therefore, I can't tell if reattempting to write a record on an IO error — without explicitly reestablishing a connection — is a sensible approach.
Assuming that the underlying Hadoop driver can handle automatic reconnection, this might be okay. However, IMHO, a good practice is to recreate a flow and reconnect to fix the IO problem (or fail miserably if the reconnection is no longer possible).

You might have more experience with such IO errors. What do you say?

@utkuaydn
Copy link
Author

utkuaydn commented Jul 3, 2024

Firstly, I think if something can throw an exception it's always better to assume it definitely will 😄. For example, we can never be sure if the bucket we are writing to just decides to shut itself down only to come back online after a minute or two.

So, I had to do some digging on RestartFlow, and found:

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. A termination signal from either end of the wrapped Flow will cause the other end to be terminated, and any in transit messages will be lost. During backoff, this Flow will backpressure.

Using RestartFlow can cause elements to be discarded which defeats the purpose of having this function in the first place. I believe this would be useful when, for example, an exception that is impossible to handle is thrown in the Flow, so the Flow has to be restarted even though it means compromising completeness.

Thus, I think it's still best if failed writes and exceptions are handled within ParquetPartitioningFlow. What you said about re-opening the writer does make sense so I'll refactor the PR to accommodate that and think a bit more about how to handle this case. Let me know what you think :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/mjakubowski84/parquet4s/issues/351

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy