Creating a transform module for kafka

I created a transform module test.py:

import mgp


@mgp.transformation
def transform_test(
    messages: mgp.Messages,
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
    result_queries = []

    for i in range(messages.total_messages()):
        message = messages.message_at(i)
        payload_as_str = message.payload().decode("utf-8")
        result_queries.append(
            mgp.Record(
                query="CREATE (Test {timestamp: $timestamp, payload: $payload, topic: $topic})",
                parameters={
                    "timestamp": message.timestamp(),
                    "payload": payload_as_str,
                    "topic": message.topic_name(),
                },
            )
        )

    return result_queries

I stored it in the query modules directory and have successfully loaded it with mg.load(“test”);
Now, I’m trying to create a stream in Memgraph with create stream test topics test transform test.transform_test();

That fails with the following message:

Error: Query failed: line 1:61 mismatched input '(' expecting {<EOF>, ';'}

But, if I remove the parenthesis, I can literally put any string after the transform keyword (valid, or invalid), and the query will pass. Be how it may, after running CALL mg.transformations() YIELD *, nothing is returned.

What is a sample query for creating a kafka stream that will work. The documentation has no working examples or formats it expects.

Your environment

  • Docker
  • Memgraph version 1.6.1.-community
  • Operating system and version irrelevant

The documentation has no working examples or formats it expects.

Not sure what you mean by this, but the example does work. The problem in your case is with the module name, it’s not specific for streams. test is some kind of internal module and that is the reason why loading of your test.py silently fails.

But accepting anything as a transformation is questionable and something we can discuss.
@benjo would you mind explaining the current behaviour?

By examples in the documentation I would expect a real working example e.g.

CREATE STREAM spotify-playlists
TOPIC spotify-playlist
TRANSFORM file.function_name();

The problem is, I have to infer that parenthesis need to, or needn’t be added to the module name.

The documentation has one small example about how to work with Kafka streams, and another small one about how to implement a transformation in Python and C.

I did check it and I can reproduce the problem @medo mentioned. After the hint of @toni I could fix it by renaming test.py to test_1.py. I think the issue is really the built-in test module as @toni said. Basically when memgraph see a xyz.py file in the query modules, it tries to loads the xyzmodule. In case of test.py it will be equivalent to import test which is resolved to the builtin test module as far as I can understand (I haven’t debugged into python now, because I have confidence about this).

From the top of my head I don’t know how we can improve it or we can improve it at all, but we can have a look and discuss it definitely.

Shouldn’t we check the existence of the module file in the modules directory? That seems straightforward. No ‘internal’ modules should exist, if they aren’t in the folder. Otherwise, how can the user know what to call his modules?

During the implementation of Kafka integration two problems popped up regarding to this:

  1. With the current circumstances this would require a lot of changes in our tests because of some technical limitations of the current source code.
  2. A user can remove the transformation at any point, and also create a stream and then add a transformation module, therefore we weren’t sure whether this would help or sometimes create more confusion: if the check would be there, then the user can expect that when a stream is created, the transformation will be there always, but transformations can be removed/added at any point. Changing this requires refactoring how query modules work.

As implementing such small thing requires a lot of effort, we prioritized other things more important than this. We can always reiterate and make the changes if it is reasonable.

The “internal” module is the built-in Python module, coming from the Python ecosystem, not something we did. It would be the same with numpy.py if you have numpy installed.

Open up a python interpreter and type import test and it will succeed as it imports the builtin test module of Python. If you have a test.py in the working directory, then import test will prioritize the file over the module in the interpreter, but this doesn’t work for query modules because they are loaded in a different way. We can try to improve this, because it is not a pleasant experience.

The “internal” module is the built-in Python module, coming from the Python ecosystem, not something we did. It would be the same with numpy.py if you have numpy installed.

Yeah, that’s exactly my point. Explicit imports like import .test should be used. The other option is calling __import__ manually. I’m not sure how the current query modules work, but I’m curious how much is “a lot of effort”. When I have more time to fix this, I’ll make further inquiries!