Azure PowerShell Serie (4): [Special] Run Twitter Hive Scripts

In dem Beitrag davor (Azure PowerShell Serie: Run Hive Scripts) im Rahmen unserer Azure PowerShell Serie sind wir ein PowerShell-Skript durchgegangen, um schön angenehm Hive-Skripte von PowerShell aus in einem HDInsight-Cluster auszuführen.

Wie sieht es denn in einem konkreten Fall aus? Nehmen wir einen Schritt aus der Blog-Serie Big Data Twitter Demo. Zur Erinnerung: Tweets haben wir in Echtzeit mittels StreamInsight extrahiert und deren Rohdaten im Windows Azure Blob Stroage gespeichert. Mithilfe eines HDInsight-Clusters haben wir daraufhin Hive-Skripte ausgeführt und weitere Hive-Tabellen erstellt, um im Nachhinein eine schnelle Importierung der Hive-Tabellen in Excel und den Gebrauch der mächtigen Power BI Tools zu ermöglichen: Hive-Analyse starten.

Hier ist der dazugehörige PowerShell-Skript, um die lokal gespeicherten Hive-Skripte per PowerShell auf den neu erstellten HDInsight-Cluster auszuführen:

001002003004005006007008009010011012013014015016017018019020021022023024025026027028029030031032033034035036037038039040041042043044045046047048049050051052053054055056057058059060061062063064065066067068069070071072073074075076077078079080081082083084085086087088089090091092093094095096097098099100101 ############################################## HiveQL on Tweets in HDInsight-Cluster############################################## 0. Azure Account DetailsAdd-AzureAccount$subName = "Internal Consumption"Select-AzureSubscription -SubscriptionName $subName# Azure account details automatically set$subID = Get-AzureSubscription -Current | %{ $_.SubscriptionId } ################################################################### 1. Input information$clusterName = "<HDInsightClusterName>"$location = "<DatacenterLocation>" #e.g. North Europe, West Europe, etc.$numNodes = 1 #start small$storageAccount = "<StorageAccountName>"$defaultContainer = "<StorageContainerName>"# Variables automatically set for you$storageKey = Get-AzureStorageKey $storageAccount | %{ $_.Primary } $storageContext = New-AzureStorageContext -StorageAccountName $storageAccount ` -StorageAccountKey $storageKey$fullStorage = "${storageAccount}.blob.core.windows.net"# local HiveQL scripts$localFileNames = "1_create-tweet-input", "2_create-tweet-details", "3_insert-tweet-details", "4_create-others"$localFolder = "C:\<scriptFilesPath>"$blobFolder = "scripts"################################################################### 2. Upload HiveQL scripts from local to Azure Blob Storageforeach ($item in $localFileNames){ $hqlLocalFile = "$localFolder\$item.hql" $hqlBlobName = "$blobFolder/$item.hql" Write-Host "Copying $hqlLocalFile to $hqlBlobName" -BackgroundColor Green # Copy the file from local workstation to WASB Set-AzureStorageBlobContent -File $hqlLocalFile -Container $defaultContainer ` -Blob $hqlBlobName -Context $storageContext}################################################################### 3. Create HDInsight Cluster and configure it$clusterCreds = Get-Credential -Message "New admin account to be created for your HDInsight cluster"# Simple createNew-AzureHDInsightCluster -Name $clusterName -Subscription $subID ` -Location $location -DefaultStorageAccountName $storageAccount ` -DefaultStorageAccountKey $storageKey ` -DefaultStorageContainerName $defaultContainer ` -Credential $clusterCreds -ClusterSizeInNodes $numNodes################################################################### 4. Execute HiveQL scriptforeach ($item in $localFileNames){ # set script $hqlScriptFile = "wasb://$defaultContainer@$storageAccount.blob.core.windows.net/$blobFolder/$item.hql" Write-Host "Invoking HiveQL script $item" -BackgroundColor Green Use-AzureHDInsightCluster $clusterName # execute HiveQL script Invoke-Hive -File $hqlScriptFile}################################################################### 5. OPTIONAL: Clean up# Remove HDInsight clusterRemove-AzureHDInsightCluster -Name $clusterName# OPTIONAL: remove scriptsforeach ($item in $localFileNames){ $hqlLocalFile = "$localFolder\$item.hql" $hqlBlobName = "$blobFolder/$item.hql" Write-Host "Removing $hqlBlobName" -BackgroundColor Green # Remove the file from WASB Remove-AzureStorageBlob -Blob $hqlBlobName -Container $defaultContainer ` -Context $storageContext}
  

Wie wir im Abschnitt 1 sehen können, haben wir bereits Hive-Skripte erstellt und lokal abgespeichert. Die Hive-Skripte sind auch hier zu finden:

 --1_create-tweet-input.hql
DROP TABLE tweet_input;
CREATE EXTERNAL TABLE tweet_input (tweet string) PARTITIONED BY (date string);

ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-09') LOCATION '/twitter/2013-12-09';
ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-10') LOCATION '/twitter/2013-12-10';
ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-11') LOCATION '/twitter/2013-12-11';
ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-12') LOCATION '/twitter/2013-12-12';
ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-13') LOCATION '/twitter/2013-12-13';
--ALTER TABLE tweet_input ADD PARTITION (date = '2013-11-05') LOCATION '/twitter/2013-11-05';
--ALTER TABLE tweet_input ADD PARTITION (date = '2013-11-06') LOCATION '/twitter/2013-11-06';
--ALTER TABLE tweet_input ADD PARTITION (date = '2013-11-07') LOCATION '/twitter/2013-11-07';
--ALTER TABLE tweet_input ADD PARTITION (date = '2013-11-08') LOCATION '/twitter/2013-11-08';

 

 --2_create-tweet-details.hql
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

drop table tweet_details;
create table tweet_details
(
    id bigint,
    id_str string,
    created_at string,
    created_at_date string,
    created_at_year string,
    created_at_month string,
    created_at_day string,
    created_at_time string,
    in_reply_to_user_id_str string,
    text string,
    contributors string,
    is_a_retweet boolean,
    truncated string,
    coordinates string,
    source string,
    retweet_count int,
    url string,
    hashtags array<string>,
    user_mentions array<string>,
    first_hashtag string,
    first_user_mention string,
    screen_name string,
    name string,
    followers_count int,
    listed_count int,
    friends_count int,
    lang string,
    user_location string,
    time_zone string,
    profile_image_url string
)
partitioned by (partition_key string)
STORED AS SEQUENCEFILE;

 

 --3_insert-tweet-details.hql
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;


insert overwrite table tweet_details
partition (partition_key)
select
    cast(get_json_object(tweet, '$.id_str') as bigint) as id,        
    get_json_object(tweet, '$.id_str') as id_str,        
    get_json_object(tweet, '$.created_at') as created_at,
    concat(substr (get_json_object(tweet, '$.created_at'),1,10),' ',
        substr (get_json_object(tweet, '$.created_at'),27,4)) as created_at_date,
    substr (get_json_object(tweet, '$.created_at'),27,4) as created_at_year,
    case substr (get_json_object(tweet, '$.created_at'),5,3)
        when "Jan" then "01"
        when "Feb" then "02"
        when "Mar" then "03"
        when "Apr" then "04"
        when "May" then "05"
        when "Jun" then "06"
        when "Jul" then "07"
        when "Aug" then "08"
        when "Sep" then "09"
        when "Oct" then "10"
        when "Nov" then "11"
        when "Dec" then "12" end as created_at_month,
    substr (get_json_object(tweet, '$.created_at'),9,2) as created_at_day,
    substr (get_json_object(tweet, '$.created_at'),12,8) as created_at_time,
    get_json_object(tweet, '$.in_reply_to_user_id_str') as in_reply_to_user_id_str,
    get_json_object(tweet, '$.text') as text,
    get_json_object(tweet, '$.contributors') as contributors,
    (cast (get_json_object(tweet, '$.retweet_count') as int) != 0) as is_a_retweet,
    get_json_object(tweet, '$.truncated') as truncated,
    get_json_object(tweet, '$.coordinates') as coordinates,
    get_json_object(tweet, '$.source') as source,
    cast (get_json_object(tweet, '$.retweet_count') as int) as retweet_count,
    get_json_object(tweet, '$.entities.display_url') as url,
    array(    
        trim(lower(get_json_object(tweet, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(tweet, '$.entities.hashtags[1].text'))),
        trim(lower(get_json_object(tweet, '$.entities.hashtags[2].text'))),
        trim(lower(get_json_object(tweet, '$.entities.hashtags[3].text'))),
        trim(lower(get_json_object(tweet, '$.entities.hashtags[4].text')))) as hashtags,
    array(
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[0].screen_name'))),
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[1].screen_name'))),
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[2].screen_name'))),
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[3].screen_name'))),
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[4].screen_name')))) as user_mentions,
    trim(lower(get_json_object(tweet, '$.entities.hashtags[0].text'))) as first_hashtag,
    trim(lower(get_json_object(tweet, '$.entities.user_mentions[0].screen_name'))) as first_user_mention,
    get_json_object(tweet, '$.user.screen_name') as screen_name,
    get_json_object(tweet, '$.user.name') as name,
    cast (get_json_object(tweet, '$.user.followers_count') as int) as followers_count,
    cast (get_json_object(tweet, '$.user.listed_count') as int) as listed_count,
    cast (get_json_object(tweet, '$.user.friends_count') as int) as friends_count,
    get_json_object(tweet, '$.user.lang') as lang,
    get_json_object(tweet, '$.user.location') as user_location,
    get_json_object(tweet, '$.user.time_zone') as time_zone,
    get_json_object(tweet, '$.user.profile_image_url') as profile_image_url,
    concat(substr (get_json_object(tweet, '$.created_at'),1,10),' ',
        substr (get_json_object(tweet, '$.created_at'),27,4)) as partition_key

from tweet_input
where (length(tweet) > 500);
 --4_create-others.hql

--
-- Geo-Koordinaten
--


drop table tweet_coordinates;
create table tweet_coordinates 
( 
    id_str string,
    coordinate_type string, 
    longitude float,
    latitude float
)
STORED AS SEQUENCEFILE;


insert overwrite table tweet_coordinates
select 
    id_str,
    get_json_object(coordinates, '$.type') as type, 
    get_json_object(coordinates, '$.coordinates[0]') as longitude, 
    get_json_object(coordinates, '$.coordinates[1]') as latitude 
from tweet_details

where get_json_object(coordinates, '$.coordinates[1]') is not null;


-- SELECT * FROM tweet_coordinates limit 1;




--
-- HASHTAGS
--


drop table tweet_hashtags;

create table tweet_hashtags
( 
    id_str string,
    hashtag string
)
partitioned by (tag_index int)
STORED AS SEQUENCEFILE;


insert overwrite table tweet_hashtags partition(tag_index=0)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[0] is not null;

insert overwrite table tweet_hashtags partition(tag_index=1)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[1] is not null;

insert overwrite table tweet_hashtags partition(tag_index=2)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[2] is not null;

insert overwrite table tweet_hashtags partition(tag_index=3)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[3] is not null;

insert overwrite table tweet_hashtags partition(tag_index=4)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[4] is not null;

-- Test
-- select hashtag, count(*) as hashtag_count 
-- from 
-- tweet_hashtags 
-- group by hashtag 
-- order by hashtag_count;



--
-- USER MENTIONS
--

drop table tweet_user_mentions;

create table tweet_user_mentions
( 
    id_str string,
    user_mention string
)
partitioned by (mention_index int)
STORED AS SEQUENCEFILE;


insert overwrite table tweet_user_mentions partition(mention_index=0)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[0] is not null
limit 100;

insert overwrite table tweet_user_mentions partition(mention_index=1)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[1] is not null
limit 100;

insert overwrite table tweet_user_mentions partition(mention_index=2)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[2] is not null
limit 100;

insert overwrite table tweet_user_mentions partition(mention_index=3)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[3] is not null
limit 100;

insert overwrite table tweet_user_mentions partition(mention_index=4)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[4] is not null
limit 100;


-- Test
-- select user_mention, count(*) as user_mention_count 
-- from tweet_user_mentions 
-- group by user_mention 
-- order by user_mention_count;


--
-- Tweeter
--


drop table tweeter;

create table tweeter 
( 
    screen_name string, 
    max_followers int, 
    max_friends int
);


insert overwrite table tweeter
select MIN(screen_name), max(followers_count) max_followers, max(friends_count) max_friends
from tweet_details 
group by LOWER(screen_name);