Внутреннее соединение эквивалентно глобальному окну, и предыдущее сообщение было сохранено.Когда приходит сообщение, которое может быть связано, выводится декартово произведение нового сообщения!
package SQL;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @Author you guess
* @Date 2021/1/6 15:22
* @Version 1.0
* @Desc
*/
public class DataStreamSql1_Join {
private static final Logger LOG = LoggerFactory.getLogger(MinMinByMaxMaxBy.MinMinByMaxMaxByTest.class);
private static final String[] TYPE = {"a苹果", "b梨", "c西瓜", "d葡萄", "e火龙果"};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, envSettings);
//添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
DataStreamSource<Order> orderSourceA = env.addSource(new SourceFunction<Order>() {
private volatile boolean isRunning = true;
private final Random random = new Random();
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (isRunning) {
TimeUnit.SECONDS.sleep(1);
Order order = new Order(TYPE[random.nextInt(TYPE.length)], Long.valueOf(random.nextInt(10)));
System.out.println(new Date() + ",orderSourceA提交元素:" + order);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
}, "order-infoA");
DataStreamSource<Order> orderSourceB = env.addSource(new SourceFunction<Order>() {
private volatile boolean isRunning = true;
private final Random random = new Random();
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (isRunning) {
TimeUnit.SECONDS.sleep(1);
Order order = new Order(TYPE[random.nextInt(TYPE.length)], Long.valueOf(random.nextInt(10)));
System.out.println(new Date() + ",orderSourceB提交元素:" + order);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
}, "order-infoB");
stEnv.registerDataStream("tableA", orderSourceA);
stEnv.registerDataStream("tableB", orderSourceB);
Table result = stEnv.sqlQuery("SELECT A.name,A.qtty,B.qtty from tableA A inner join tableB B on A.name = B.name");
stEnv.toRetractStream(result, Row.class).print();//这里要用Row类型
env.execute("Flink Streaming Java API Skeleton");
}
/**
* Simple POJO.
*/
public static class Order {
public String name;
public Long qtty;
public Order() {
}
public Order(String name, Long qtty) {
this.name = name;
this.qtty = qtty;
}
@Override
public String toString() {
return "Order{" +
"name='" + name + '\'' +
", qtty=" + qtty +
'}';
}
}
}
Ср, 06 января, 15:28:04 CST 2021, элемент отправки orderSourceA: Order{name='d виноград', qtty=9}
Ср, 06 января, 15:28:04 CST 2021, элемент отправки orderSourceB: Order{name='bear', qtty=8}
Ср, 06 января, 15:28:05 CST 2021, элемент отправки orderSourceA: Order{name='bear', qtty=4}
Ср, 06 января, 15:28:05 CST 2021, элемент отправки orderSourceB: Order{name='bear', qtty=6}
5> (правда,bpear,4,8)
5> (правда,bpear,4,6)
Ср, 06 января, 15:28:06 CST 2021, элемент отправки orderSourceA: Order{name='a apple', qtty=3}
Ср, 06 января, 15:28:06, CST 2021, элемент отправки orderSourceB: Order{name='e драконий фрукт', qtty=1}
Ср, 06 января, 15:28:07, CST 2021, элемент отправки orderSourceA: Order{name='e Dragon Fruit', qtty=4}
Ср, 06 января, 15:28:07 CST 2021, элемент отправки orderSourceB: Order{name='a apple', qtty=6}
9> (правда, яблоко, 3, 6)
4> (true,eDragonfruit,4,1)
Ср, 06 января, 15:28:08 CST 2021, элемент отправки orderSourceB: Order{name='e драконий фрукт', qtty=9}
Ср, 06 января, 15:28:08, CST 2021, элемент отправки orderSourceA: Order{name='e фрукт дракона', qtty=6}
4> (правда, eDragonfruit, 4,9)
4> (правда, eDragonfruit, 6,1)
4> (правда, eDragonfruit, 6,9)
Среда, 06 января, 15:28:09 CST 2021, элемент отправки orderSourceA: Order{name='e фрукт дракона', qtty=1}
Ср, 06 января, 15:28:09 CST 2021, элемент отправки orderSourceB: Order{name='e фрукт дракона', qtty=2}
4> (true,eDragonfruit,4,2)
4> (правда, eDragonfruit, 6,2)
4> (true,eDragonfruit,1,1)
4> (true,eDragonfruit,1,2)
4> (правда, eDragonfruit, 1,9)
Ср, 06 января, 15:28:10 CST 2021, элемент отправки orderSourceA: Order{name='c арбуз', qtty=5}
Ср, 06 января, 15:28:10 CST 2021, элемент отправки orderSourceB: Order{name='e драконий фрукт', qtty=6}
4> (true,eDragonfruit,4,6)
4> (правда, eDragonfruit, 6, 6)
4> (true,eDragonfruit,1,6)
Ср, 06 января, 15:28:11 CST 2021, элемент отправки orderSourceA: Order{name='c арбуз', qtty=6}
Ср, 06 января, 15:28:11 CST 2021, элемент отправки orderSourceB: Order{name='c арбуз', qtty=0}
8> (правда,cарбуз,5,0)
8> (правда,cарбуз,6,0)
Ср, 06 января, 15:28:12 CST 2021, элемент отправки orderSourceA: Order{name='bear', qtty=2}
Среда, 06 января, 15:28:12 CST 2021, элемент отправки orderSourceB: Order{name='d виноград', qtty=7}//Нет элемента отправки с orderSourceA до: Order{name='d виноград', qtty= 9} Совпало, так что вывода нет, а теперь можно сопоставить! !
3> (правда,d виноград,9,7)
5> (правда,bpear,2,8)
5> (правда,bpear,2,6)
Ср, 06 января, 15:28:13 CST 2021, элемент отправки orderSourceB: Order{name='d виноград', qtty=1}
Ср, 06 января, 15:28:13 CST 2021, элемент отправки orderSourceA: Order{name='d виноград', qtty=5}
3> (правда,d виноград,9,1)
3> (правда,d виноград,5,1)
3> (правда,d виноград,5,7)
Ср, 06 января, 15:28:14 CST 2021, элемент отправки orderSourceB: Order{name='d виноград', qtty=5}
Ср, 06 января, 15:28:14, CST 2021, элемент отправки orderSourceA: Order{name='e фрукт дракона', qtty=9}
3> (правда,d виноград,5,5)
3> (правда,d виноград,9,5)
4> (правда, eDragonfruit,9,6)
4> (правда, eDragonfruit, 9,1)
4> (правда, eDragonfruit, 9,2)
4> (правда, eDragonfruit,9,9)
Ср, 06 января, 15:28:15 CST 2021, orderSourceB submit element: Order{name='e dragon fruit', qtty=5} //Каждый раз, когда приходит новый элемент, если он может совпадать, он выводит декартово произведение
Ср, 06 января, 15:28:15 CST 2021, orderSourceA submit element: Order{name='e dragon fruit', qtty=4} //Каждый раз, когда приходит новый элемент, если он может совпадать, он выводит декартово произведение
4> (true,eDragonfruit,4,5)
4> (правда, eDragonfruit, 6,5)
4> (правда, eDragonfruit, 1,5)
4> (правда, eDragonfruit, 9,5)
4> (true,eDragonfruit,4,5)
4> (true,eDragonfruit,4,6)
4> (true,eDragonfruit,4,1)
4> (true,eDragonfruit,4,2)
4> (правда, eDragonfruit, 4,9)
Синтаксис разных версий flink сильно различается:
<flink.version>1.9.2</flink.version>
jdk1.8