Download code: kafka2.zip

1. Modify constants.py

Replace {surname} with your surname.

2. Try simple producer & consumer

Try to run producer.py and consumer.py in parallel.
Try to run multiple consumer instances and watch behavior of the consumer group.
STOP the producer when you're finished.

Common attributes of a message: Extend consumer so that it outputs also message.timestamp and message.offset. Set auto_offset_reset='earliest' and remove consumer group from the consumer so that it consumes all messages produced in the previously. Run the consumer (you do not need to run the producer).
We will use topic SERVER_LOGS_TOPIC. It contains messages in JSON format tracking e-commerce store activity. Check producer_server_logs.py to see how it works, but do NOT run it.
    {
        'event': 'view_item',
        'user_id': '560e00a2-4b9c-4ddd-80f6-b771f205259e',
        'timestamp': '1983-03-20T02: 48: 27',
        'item_id': 'e5b48f98-0ced-4c57-a668-0e833f56754b',
        'ip_address': '197.92.150.166',
        'device': 'mobile',
        'category': 'toys'
    }
    {
        'event': 'registration',
        'user_id': 'a7cc6709-47c5-417f-b907-397b27523807',
        'timestamp': '1996-07-17T14: 38: 06',
        'ip_address': '93.98.34.98'
    }
    {
        'event': 'purchase',
        'user_id': 'da1fd65f-1430-4f5d-a3e8-878e888d2d6c',
        'timestamp': '2012-07-15T16: 20: 04',
        'item_id': '0d1909b3-5b74-4c6a-9978-d77ed74ac4fc',
        'amount': 188.55,
        'currency': 'EUR',
        'payment_method': 'paypal',
        'ip_address': '58.16.211.249'
    }
    

3. Filter high valuable purchases

Create topic high_value_purchases_{surname} (via Kafka CLI tools).

Try consumer in filter_purchases.py that filters high valuable purchases and sends them to your HIGH_VALUE_PURCHASES_TOPIC. High valuable purchase is a "purchase" event where amount > 100. Check the content of you HIGH_VALUE_PURCHASES_TOPIC (via CLI tools or Python consumer).

4. Report error 400 threshold violation

Consume server logs topic and count the number of errors with error code 400. If it exceeeds ERROR_400_THRESHOLD, output to console "Threshold of {ERROR_400_THRESHOLD} errors exceeded.". Modify the code from previous example.

5. Aggregate view items

Create topic view_item_count_topic_{surname} (via Kafka CLI tools).

Try consumer in aggregate_view_items.py that counts the number of view items per day and sends aggregated value to your VIEW_ITEM_COUNT_TOPIC. Check the content of you VIEW_ITEM_COUNT_TOPIC (via CLI tools or Python consumer).

6. Aggregate logins

Consume server logs topic, count the number of logins per minute and print aggregated values to console. Modify the code from previous example.