FROM $com.sap.sles.base
RUN python3.6 -m pip install --user tweepy
RUN python3.6 -m pip install --user pandas
RUN python3.6 -m pip install --user boto3
RUN python3.6 -m pip install --user tornado==5.0.2
RUN python3.6 -m pip install --user wget
import sys
import os
import shutil
import pathlib
import string
import time
import json
import csv
import wget
from datetime import datetime
from tweepy import Stream
from tweepy.streaming import StreamListener
from tweepy import API
from tweepy import OAuthHandler
# from twitter_client import get_twitter_auth
# export PYTHONIOENCODING=UTF-8 avoid UnicodeEncodeError: 'ascii' codec can't encode character errors
def get_twitter_auth():
"""Setup Twitter authentication.
Return: tweepy.OAuthHandler object"""
try:
consumer_key = 'USE YOUR OWN'
consumer_secret = 'USE YOUR OWN'
access_token = 'USE YOUR OWN'
access_secret = 'USE YOUR OWN'
except KeyError:
sys.stderr.write("TWITTER_* environment variables not set\n")
sys.exit(1)
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
return auth
def get_twitter_client():
"""Setup Twitter API client.
Return: tweepy.API object"""
auth = get_twitter_auth()
client = API(auth,wait_on_rate_limit=True)
return client
api.send("debug", str("Before class"))
class CustomListener(StreamListener):
"""Custom StreamListener for streaming Twitter data."""
def on_data(self, data):
try:
all_data = json.loads(data)
api.send("debug", str("Before if statement"))
# If image is tweeted then entities is > 4 and download image
if len(all_data["entities"]) > 4:
# l = len(all_data["entities"]
api.send("debug", "Inside if")
# extract meta-data we need
id_str = all_data["id_str"]
date = all_data["created_at"]
tweet = all_data["text"]
user_id = all_data["user"]["id_str"]
username = all_data["user"]["screen_name"]
location = all_data["user"]["location"]
followers_count = all_data["user"]["followers_count"]
friends_count = all_data["user"]["friends_count"]
listed_count = all_data["user"]["listed_count"]
# api.send("output", str(id_str))
output_folder = '/vrep/vflow/scripts/twitter/twitter_pictures/' + id_str
api.send("debug", str("Saved output_folder"))
# Save Tweet meta-data to json file for futher processing later
file = pathlib.Path(output_folder)
if file.exists ():
print("Recreate folder")
shutil.rmtree(output_folder)
os.makedirs(output_folder)
else:
print("Create folder")
os.makedirs(output_folder)
for media_ in all_data["entities"]["media"]:
download_ = media_["media_url"]
# filter_ = media_["description"]
try:
filter_ = media_["description"]
except:
filter_ = None
# print("pass", filter_)
# print(id_str, date, username, filter_, download_, location)
filename = filename = download_.split('media/')[1]
file_datetime = datetime.strftime(datetime.strptime(date,'%a %b %d %H:%M:%S +0000 %Y'), '%Y-%m-%d%H:%M:%S')
wget.download(download_, out=output_folder+'/'+ file_datetime + "_" + filename)
# api.send("output", str(id_str))
# Save tweet meta-data
with open(output_folder + '/all_data.json', 'w') as outfile:
json.dump(all_data, outfile)
api.send("debug", str("Downloaded image"))
api.send("output", str(id_str))
else:
api.send("output1", str("No image..."))
except BaseException as e:
sys.stderr.write("Error on_data: {}\n".format(e))
time.sleep(5)
return True
def on_error(self, status):
api.send("debug", str(status))
if status == 420:
sys.stderr.write("Rate limit exceeded\n".format(status))
return False
else:
sys.stderr.write("Error {}\n".format(status))
return True
if __name__ == '__main__':
# query = sys.argv[1:] # list of CLI arguments
query = ['#digitalfactorydatatest']
query_fname = ' '.join(query) # string
auth = get_twitter_auth()
twitter_stream = Stream(auth, CustomListener(query_fname))
# twitter_stream.filter(track=query, encoding = 'unicode-escape')
twitter_stream.filter(track=query)
import time
import shutil
import os
# import urllib.request
import boto3
import csv
def on_input(data):
id_str = data
aws_credentials = '/vrep/vflow/scripts/twitter/credentials.csv'
# Setup AWS permissions
with open(aws_credentials, 'r') as input:
reader = csv.reader(input)
for line in reader:
access_key_id = line[0]
secret_access_key = line[1]
# Setup Boto client
s3client = boto3.client('s3',
aws_access_key_id = access_key_id,
aws_secret_access_key = secret_access_key,
region_name = 'eu-west-1')
# will be dynamic from upstream twitter operator
# id_str = '1288097597010444295'
bucket = 'USE-YOUR-OWN-BUCKET-HERE'
output_folder = '/vrep/vflow/scripts/twitter/twitter_pictures/' + id_str
os.chdir(output_folder)
uploads = os.listdir(output_folder)
imgs = []
for file in uploads:
# print(file)
imgs.append(file)
print(imgs)
for files in imgs:
file = id_str + '/' + files
s3client.upload_file(files, bucket,file)
api.send("output", str(id_str))
api.set_port_callback("input", on_input)
import os.path
import json
import time
import pandas as pd
from pandas.io.json import json_normalize
import boto3
import csv
def on_input(data):
# Tweet metadata
id_str = data
raw_data = '/vrep/vflow/scripts/twitter/twitter_pictures/' + id_str + '/all_data.json'
data = json.load(open(raw_data))
# Tweet Data
tweet_created_at = data['created_at']
tweet_id_str = data['id_str']
tweet_text = data['text']
tweet_geo = data['geo']
tweet_coordinates = data['coordinates']
tweet_place = data['place']
# User data
user_id = data['user']['id_str']
user_name = data['user']['name']
screen_name = data['user']['screen_name']
location = data['user']['location']
followers_count = data['user']['followers_count']
friends_count = data['user']['friends_count']
created_at = data['user']['created_at']
time_zone = data['user']['time_zone']
geo_enabled = data['user']['geo_enabled']
lang = data['user']['lang']
following = data['user']['following']
aws_credentials = '/vrep/vflow/scripts/twitter/credentials.csv'
# Setup AWS permissions
with open(aws_credentials, 'r') as input:
reader = csv.reader(input)
for line in reader:
access_key_id = line[0]
secret_access_key = line[1]
# setup boto aws S3 client
s3client = boto3.client('s3',
aws_access_key_id = access_key_id,
aws_secret_access_key = secret_access_key,
region_name = 'eu-west-1')
# setup boto Rekognition aws client
facesclient = boto3.client('rekognition',
aws_access_key_id = access_key_id,
aws_secret_access_key = secret_access_key,
region_name = 'eu-west-1')
# Reading files
image_data = s3client.list_objects(
Bucket='USE-YOUR-OWN-BUCKET-HERE',
Prefix= id_str + '/20'
)
# list of images
images = []
gender_90 = []
age = []
shortcodes = []
em = dict
mood = []
moods = []
emotions = []
list_all = []
df_results = pd.DataFrame()
df_tweet = pd.DataFrame()
# check if 'media' key exists (only created when image is uploaded)
try:
api.send("debug", str("Image - AWS processing required..."))
media_id = data['entities']['media'][0]['id_str']
media_media_url = data['entities']['media'][0]['media_url']
tweet_data = str(tweet_created_at) + ";" + str(tweet_id_str) + ";" + str(tweet_text).replace(";", " ").replace("\n", " ") + ";" + str(tweet_geo) + ";" + str(tweet_coordinates) + ";" + str(tweet_place)
user_data = str(user_id) + ";" + str(user_name) + ";" + str(screen_name) + ";" + str(location) + ";" + str(followers_count) + ";" + str(friends_count) + ";" + str(created_at) + ";" + str(time_zone) + ";" + str(geo_enabled) + ";" + str(lang) + ";" + str(following)
media_data = str(media_id) + ";" + str(media_media_url)
all_data = tweet_data + ";" + user_data + ";" + media_data
cols = ['tweet_created_at', 'tweet_id_str', 'tweet_text', 'tweet_geo', 'tweet_coordinates', 'tweet_place', 'user_id', 'user_name', 'screen_name', 'location', 'followers_count', 'friends_count', 'created_at', 'time_zone','geo_enabled', 'lang', 'following', 'media_id', 'media_media_url']
df_tweet = pd.DataFrame([x.split(';') for x in all_data.split('\n')], columns=cols)
# Append images uploaded to bucket to list (data returns dict)
for key, value in image_data.items():
if key == 'Contents':
for obj in value:
images.append(obj['Key'])
# print(images)
#
# Main loop, upload each image in bucket to Rekognition for classification
#
for image in images:
photo = image
# Submit photo, get all image attributes from classification
response = facesclient.detect_faces(
Image={
'S3Object': {
'Bucket': 'USE-YOUR-OWN-BUCKET-HERE',
'Name': photo,
},
},
Attributes=['ALL']
)
for key, value in response.items():
if key == 'FaceDetails':
for people_attr in value:
gender_value = people_attr['Gender']
box_pos = people_attr['BoundingBox']
age_range = people_attr['AgeRange']
confidence = gender_value['Confidence']
gender = gender_value['Value']
emotion_value = people_attr['Emotions']
age_low = str(age_range['Low'])
age_high = str(age_range['High'])
box_width = box_pos['Width']
box_height = box_pos['Height']
box_left = box_pos['Left']
box_top = box_pos['Top']
age_ranges = age_low + "-" + age_high
for ec in emotion_value:
if ec['Confidence'] >= 20 and confidence >= 20:
ec_data = ec['Confidence'], ec['Type'], age_ranges, photo, gender, confidence,box_width, box_height,box_left, box_top
list_all.append(ec_data)
cols = ['emotion_confidence', 'emotion', 'age_ranges', 'shortcode', 'gender', 'gender_confidence', 'box_width', 'box_height','box_left', 'box_top']
df_data = pd.DataFrame(list_all)
df_data.columns = cols
split_df = df_data["shortcode"].str.split('/', expand=True)
split_df.columns = [f"shortcode_{id_}" for id_ in range(len(split_df.columns))]
df_data = pd.merge(df_data, split_df, how="left", left_index=True, right_index=True)
df_data = df_data.rename(columns={'shortcode_0': 'tweet_id_str'})
df_results = pd.merge(df_data, df_tweet, how='inner', on=['tweet_id_str'])
# df_results
to_sac = df_results.to_csv(sep = ";", index=False, header=False)
api.send("output", to_sac)
api.send("debug", to_sac)
except Exception as e:
api.send('debug', str(e))
api.set_port_callback("input", on_input)
CREATE COLUMN TABLE "DBADMIN"."TWITTER"(
"EMOTIONAL_CONFIDENCE" DOUBLE,
"EMOTION" NVARCHAR(20),
"AGE_RANGE" NVARCHAR(10),
"SHORTCODE" NVARCHAR(200),
"GENDER" NVARCHAR(10),
"GENDER_CONFIDENCE" DOUBLE,
"BOX_WIDTH" NVARCHAR(100),
"BOX_HEIGHT" NVARCHAR(100),
"BOX_LEFT" NVARCHAR(100),
"BOX_TOP" NVARCHAR(100),
"TWEET_ID_STR" NVARCHAR(100),
"SHORTCODE_1" NVARCHAR(100),
"TWEET_CREATED_AT" NVARCHAR(100),
"TWEET_TEXT" NVARCHAR(500),
"TWEET_GEO" NVARCHAR(100),
"TWEET_COORDINATES" NVARCHAR(100),
"TWEET_PLACE" NVARCHAR(100),
"USER_ID" NVARCHAR(100),
"USER_NAME" NVARCHAR(100),
"SCREEN_NAME" NVARCHAR(100),
"LOCATION" NVARCHAR(100),
"FOLLOWERS_COUNT" NVARCHAR(100),
"FRIENDS_COUNT" NVARCHAR(100),
"CREATED_AT" NVARCHAR(100),
"TIME_ZONE" NVARCHAR(100),
"GEO_ENABLED" NVARCHAR(100),
"LANG" NVARCHAR(100),
"FOLLOWING" NVARCHAR(100),
"MEDIA_ID" NVARCHAR(100),
"MEDIA_MEDIA_URL" NVARCHAR(100)
)
UNLOAD PRIORITY 5 AUTO MERGE;
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
24 | |
14 | |
11 | |
10 | |
10 | |
9 | |
7 | |
6 | |
5 | |
5 |