Calculer les différences entre successives des enregistrements dans Hadoop avec des Requêtes Hive
J'ai un tableau de la Ruche qui contient des données d'appels de clients.
Pour des raisons de simplicité de considérer il a 2 colonnes, la première colonne contient le numéro de client et la deuxième colonne contient le timestamp de l'appel (timestamp unix).
Je peux interroger ce tableau pour trouver tous les appels pour chaque client:
SELECT * FROM mytable SORT BY customer_id, call_time;
Le résultat est:
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
Est-il possible de créer une requête de la Ruche qui renvoie, pour chaque client, à partir du deuxième appel, l'intervalle de temps entre deux successives des appels?
Pour l'exemple ci-dessus que la requête doit retourner:
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
J'ai essayé d'adapter les solutions de l' sql solution, mais je suis coincé avec la Ruche: les limites de il accepte les sous-requêtes uniquement à PARTIR de et joint ne doit contenir que des égalités.
Merci.
EDIT1:
J'ai essayé d'utiliser une Ruche UDF fonction:
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
et l'utiliser avec le nom "delta".
Mais il me semble (à partir des journaux et le résultat) qu'il est utilisé à l'heure de la CARTE. 2 problèmes se posent:
Première: les données de La table doit être triée par numéro de Client et le timestamp AVANT d'utiliser cette fonction. La requête:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
ne fonctionne pas car le tri partie est effectuée à RÉDUIRE le temps, longtemps après ma fonction est utilisée.
Je peux trier les données de la table avant d'utiliser la fonction, mais je ne suis pas heureux avec cela parce que c'est une surcharge, je l'espère, à éviter.
Deuxième: Dans le cas d'un système distribué Hadoop configuration, les données sont réparties entre les emplois disponibles traqueurs. Donc, je crois qu'il y aura plusieurs instances de cette fonction, un pour chaque mappeur, de sorte qu'il est possible d'avoir le même client de données réparties sur 2 utilisateurs. Dans ce cas je vais perdre les appels des clients, ce qui n'est pas acceptable.
Je ne sais pas comment résoudre ce problème. Je sais que DISTRIBUER PAR garantit que toutes les données avec une valeur spécifique est envoyé à la même réducteur (donc veiller à ce que le TRI fonctionne comme prévu), quelqu'un sait si il y a quelque chose de similaire pour le mappeur?
Prochaine j'ai l'intention de suivre libjack la suggestion d'utiliser un script. Ce "calcul" est nécessaire, entre autres requêtes hive, donc j'ai envie de tout essayer Ruche propose, avant de passer à un autre outil, tel que suggéré par Balaswamy vaddeman.
EDIT2:
J'ai commencé à enquêter sur les scripts personnalisés solution. Mais, dans la première page du chapitre 14 de la Programmation de la Ruche livre (ce chapitre présente les scripts personnalisés), je trouve le paragraphe suivant:
Streaming est généralement moins efficace que le codage de l'comparables Udf ou
InputFormat objets. La sérialisation et la désérialisation de données à transmettre et
hors de la pipe est relativement inefficace. Il est également plus difficile à déboguer l'ensemble de la
programme d'une manière unifiée. Cependant, il est utile pour le prototypage rapide
et pour tirer parti de code existant qui n'est pas écrit en Java. Pour La Ruche
les utilisateurs qui ne veulent pas écrire du code Java, il peut être très efficace
approche.
Il était donc clair que les scripts personnalisés n'est pas la meilleure solution en termes d'efficacité.
Mais comment dois-je conserver mon UDF fonction, mais assurez-vous qu'il fonctionne comme prévu dans un distribué Hadoop configuration? J'ai trouvé la réponse à cette question dans l'UDF, les éléments Internes de la section de la Langue Manuel de l'UDF page wiki. Si j'écris ma requête:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
il est exécuté à RÉDUIRE le temps et les DISTRIBUER PAR et de TRI PAR des constructions de garantir que tous les enregistrements de la même client sont traitées par le même réducteur, dans l'ordre des appels.
Ce qui est au dessus de l'UDF, et cette requête construire résoudre mon problème.
(Désolé pour ne pas ajouter les liens, mais je ne suis pas autorisé à le faire parce que je n'ai pas assez de points de réputation)
Je ne sais pas comment faire cela dans la ruche, mais il y a en cascade api pour ce faire.il ya quelque chose appelé tampon en cascade.docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html
OriginalL'auteur Cipi | 2013-02-01
Vous devez vous connecter pour publier un commentaire.
C'est une vieille question, mais pour de futures références, j'écris ici une autre proposition:
Ruche Les fonctions de fenêtrage permet d'utiliser les boutons précédent /suivant les valeurs de votre requête.
Un simili code de requête peut être :
SÉLECTIONNEZ customer_id, GAL(call_time, 1, 0) OVER (PARTITION BY customer_id COMMANDE PAR call_time LIGNES 1 ci-dessus) - call_time from matable;
OriginalL'auteur jbaptiste
Vous pouvez utiliser explicitement
MAP-REDUCE
avec d'autres langage de programmation comme Java ou Python.D'où émanait de la carte
{cutomer_id,call_time}
et réducteur, vous obtiendrez{customer_id,list{time_stamp}}
et dans réducteur vous pouvez trier ces horodatages et peuvent traiter les données.OriginalL'auteur mat_vee
Peut-être quelqu'un rencontre une exigence similaire, la solution que j'ai trouvée est la suivante:
1) Créer une fonction personnalisée:
2) Créer un jar contenant cette fonction. Supposons que le jarname est myjar.jar.
3) Copier le jar de la machine avec de la Ruche. Supposons qu'il est placé dans /tmp
4) Définir la fonction personnalisée à l'intérieur de la Ruche:
5) l'Exécution de la requête:
Remarques:
un. J'ai supposé que le call_time colonne stocke les données comme bigint. Dans le cas où c'est une chaîne, dans les processus de la fonction nous récupérer en tant que chaîne de caractères (comme nous le faisons avec le code client), puis de l'analyser à Long
b. J'ai décidé d'utiliser un UDTF au lieu de l'UDF, car de cette façon, il génère toutes les données dont il a besoin. Dans le cas contraire (UDF) les données générées doit être filtrée pour ignorer les valeurs NULL. Ainsi, avec l'UDF fonction (DeltaComputerUDF) décrit dans la première édition de l'original post, la requête sera:
c. Les deux fonctions (UDF et UDTF) comme on le souhaite, peu importe l'ordre des lignes à l'intérieur de la table (donc il n'est pas nécessaire que les données de la table, triées par numéro de client et le temps de garde avant d'utiliser les fonctions delta)
OriginalL'auteur Cipi